1mod transaction_in_memory;
22mod transaction_redis;
23
24pub use transaction_in_memory::*;
25pub use transaction_redis::*;
26
27use crate::{
28 models::{
29 NetworkTransactionData, TransactionRepoModel, TransactionStatus, TransactionUpdateRequest,
30 },
31 repositories::{BatchDeleteResult, TransactionDeleteRequest, *},
32 utils::RedisConnections,
33};
34use async_trait::async_trait;
35use eyre::Result;
36use std::sync::Arc;
37
38#[async_trait]
40pub trait TransactionRepository: Repository<TransactionRepoModel, String> {
41 fn connection_info(&self) -> Option<(Arc<RedisConnections>, String)> {
45 None
46 }
47
48 async fn find_by_relayer_id(
50 &self,
51 relayer_id: &str,
52 query: PaginationQuery,
53 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError>;
54
55 async fn find_by_status(
59 &self,
60 relayer_id: &str,
61 statuses: &[TransactionStatus],
62 ) -> Result<Vec<TransactionRepoModel>, RepositoryError>;
63
64 async fn find_by_status_paginated(
77 &self,
78 relayer_id: &str,
79 statuses: &[TransactionStatus],
80 query: PaginationQuery,
81 oldest_first: bool,
82 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError>;
83
84 async fn find_by_nonce(
86 &self,
87 relayer_id: &str,
88 nonce: u64,
89 ) -> Result<Option<TransactionRepoModel>, RepositoryError>;
90
91 async fn update_status(
93 &self,
94 tx_id: String,
95 status: TransactionStatus,
96 ) -> Result<TransactionRepoModel, RepositoryError>;
97
98 async fn partial_update(
100 &self,
101 tx_id: String,
102 update: TransactionUpdateRequest,
103 ) -> Result<TransactionRepoModel, RepositoryError>;
104
105 async fn update_network_data(
107 &self,
108 tx_id: String,
109 network_data: NetworkTransactionData,
110 ) -> Result<TransactionRepoModel, RepositoryError>;
111
112 async fn set_sent_at(
114 &self,
115 tx_id: String,
116 sent_at: String,
117 ) -> Result<TransactionRepoModel, RepositoryError>;
118
119 async fn increment_status_check_failures(
121 &self,
122 tx_id: String,
123 ) -> Result<TransactionRepoModel, RepositoryError>;
124
125 async fn reset_status_check_consecutive_failures(
127 &self,
128 tx_id: String,
129 ) -> Result<TransactionRepoModel, RepositoryError>;
130
131 async fn record_stellar_insufficient_fee_retry(
133 &self,
134 tx_id: String,
135 sent_at: String,
136 ) -> Result<TransactionRepoModel, RepositoryError>;
137
138 async fn record_stellar_try_again_later_retry(
140 &self,
141 tx_id: String,
142 sent_at: String,
143 ) -> Result<TransactionRepoModel, RepositoryError>;
144
145 async fn set_confirmed_at(
147 &self,
148 tx_id: String,
149 confirmed_at: String,
150 ) -> Result<TransactionRepoModel, RepositoryError>;
151
152 async fn count_by_status(
155 &self,
156 relayer_id: &str,
157 statuses: &[TransactionStatus],
158 ) -> Result<u64, RepositoryError>;
159
160 async fn delete_by_ids(&self, ids: Vec<String>) -> Result<BatchDeleteResult, RepositoryError>;
175
176 async fn delete_by_requests(
188 &self,
189 requests: Vec<TransactionDeleteRequest>,
190 ) -> Result<BatchDeleteResult, RepositoryError>;
191}
192
193#[cfg(test)]
194mockall::mock! {
195 pub TransactionRepository {}
196
197 #[async_trait]
198 impl Repository<TransactionRepoModel, String> for TransactionRepository {
199 async fn create(&self, entity: TransactionRepoModel) -> Result<TransactionRepoModel, RepositoryError>;
200 async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError>;
201 async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError>;
202 async fn list_paginated(&self, query: PaginationQuery) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError>;
203 async fn update(&self, id: String, entity: TransactionRepoModel) -> Result<TransactionRepoModel, RepositoryError>;
204 async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError>;
205 async fn count(&self) -> Result<usize, RepositoryError>;
206 async fn has_entries(&self) -> Result<bool, RepositoryError>;
207 async fn drop_all_entries(&self) -> Result<(), RepositoryError>;
208 }
209
210 #[async_trait]
211 impl TransactionRepository for TransactionRepository {
212 fn connection_info(&self) -> Option<(Arc<RedisConnections>, String)>;
213 async fn find_by_relayer_id(&self, relayer_id: &str, query: PaginationQuery) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError>;
214 async fn find_by_status(&self, relayer_id: &str, statuses: &[TransactionStatus]) -> Result<Vec<TransactionRepoModel>, RepositoryError>;
215 async fn find_by_status_paginated(&self, relayer_id: &str, statuses: &[TransactionStatus], query: PaginationQuery, oldest_first: bool) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError>;
216 async fn find_by_nonce(&self, relayer_id: &str, nonce: u64) -> Result<Option<TransactionRepoModel>, RepositoryError>;
217 async fn update_status(&self, tx_id: String, status: TransactionStatus) -> Result<TransactionRepoModel, RepositoryError>;
218 async fn partial_update(&self, tx_id: String, update: TransactionUpdateRequest) -> Result<TransactionRepoModel, RepositoryError>;
219 async fn update_network_data(&self, tx_id: String, network_data: NetworkTransactionData) -> Result<TransactionRepoModel, RepositoryError>;
220 async fn set_sent_at(&self, tx_id: String, sent_at: String) -> Result<TransactionRepoModel, RepositoryError>;
221 async fn increment_status_check_failures(&self, tx_id: String) -> Result<TransactionRepoModel, RepositoryError>;
222 async fn reset_status_check_consecutive_failures(&self, tx_id: String) -> Result<TransactionRepoModel, RepositoryError>;
223 async fn record_stellar_insufficient_fee_retry(&self, tx_id: String, sent_at: String) -> Result<TransactionRepoModel, RepositoryError>;
224 async fn record_stellar_try_again_later_retry(&self, tx_id: String, sent_at: String) -> Result<TransactionRepoModel, RepositoryError>;
225 async fn set_confirmed_at(&self, tx_id: String, confirmed_at: String) -> Result<TransactionRepoModel, RepositoryError>;
226 async fn count_by_status(&self, relayer_id: &str, statuses: &[TransactionStatus]) -> Result<u64, RepositoryError>;
227 async fn delete_by_ids(&self, ids: Vec<String>) -> Result<BatchDeleteResult, RepositoryError>;
228 async fn delete_by_requests(&self, requests: Vec<TransactionDeleteRequest>) -> Result<BatchDeleteResult, RepositoryError>;
229 }
230}
231
232#[derive(Debug, Clone)]
234pub enum TransactionRepositoryStorage {
235 InMemory(InMemoryTransactionRepository),
236 Redis(RedisTransactionRepository),
237}
238
239impl TransactionRepositoryStorage {
240 pub fn new_in_memory() -> Self {
241 Self::InMemory(InMemoryTransactionRepository::new())
242 }
243 pub fn new_redis(
244 connections: Arc<RedisConnections>,
245 key_prefix: String,
246 ) -> Result<Self, RepositoryError> {
247 Ok(Self::Redis(RedisTransactionRepository::new(
248 connections,
249 key_prefix,
250 )?))
251 }
252
253 pub fn connection_info(&self) -> Option<(Arc<RedisConnections>, &str)> {
262 match self {
263 TransactionRepositoryStorage::InMemory(_) => None,
264 TransactionRepositoryStorage::Redis(repo) => {
265 Some((repo.connections.clone(), &repo.key_prefix))
266 }
267 }
268 }
269
270 pub fn key_prefix(&self) -> Option<&str> {
272 match self {
273 TransactionRepositoryStorage::InMemory(_) => None,
274 TransactionRepositoryStorage::Redis(repo) => Some(&repo.key_prefix),
275 }
276 }
277}
278
279#[async_trait]
280impl TransactionRepository for TransactionRepositoryStorage {
281 fn connection_info(&self) -> Option<(Arc<RedisConnections>, String)> {
282 TransactionRepositoryStorage::connection_info(self)
283 .map(|(connections, key_prefix)| (connections, key_prefix.to_string()))
284 }
285
286 async fn find_by_relayer_id(
287 &self,
288 relayer_id: &str,
289 query: PaginationQuery,
290 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
291 match self {
292 TransactionRepositoryStorage::InMemory(repo) => {
293 repo.find_by_relayer_id(relayer_id, query).await
294 }
295 TransactionRepositoryStorage::Redis(repo) => {
296 repo.find_by_relayer_id(relayer_id, query).await
297 }
298 }
299 }
300
301 async fn find_by_status(
302 &self,
303 relayer_id: &str,
304 statuses: &[TransactionStatus],
305 ) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
306 match self {
307 TransactionRepositoryStorage::InMemory(repo) => {
308 repo.find_by_status(relayer_id, statuses).await
309 }
310 TransactionRepositoryStorage::Redis(repo) => {
311 repo.find_by_status(relayer_id, statuses).await
312 }
313 }
314 }
315
316 async fn find_by_status_paginated(
317 &self,
318 relayer_id: &str,
319 statuses: &[TransactionStatus],
320 query: PaginationQuery,
321 oldest_first: bool,
322 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
323 match self {
324 TransactionRepositoryStorage::InMemory(repo) => {
325 repo.find_by_status_paginated(relayer_id, statuses, query, oldest_first)
326 .await
327 }
328 TransactionRepositoryStorage::Redis(repo) => {
329 repo.find_by_status_paginated(relayer_id, statuses, query, oldest_first)
330 .await
331 }
332 }
333 }
334
335 async fn find_by_nonce(
336 &self,
337 relayer_id: &str,
338 nonce: u64,
339 ) -> Result<Option<TransactionRepoModel>, RepositoryError> {
340 match self {
341 TransactionRepositoryStorage::InMemory(repo) => {
342 repo.find_by_nonce(relayer_id, nonce).await
343 }
344 TransactionRepositoryStorage::Redis(repo) => {
345 repo.find_by_nonce(relayer_id, nonce).await
346 }
347 }
348 }
349
350 async fn update_status(
351 &self,
352 tx_id: String,
353 status: TransactionStatus,
354 ) -> Result<TransactionRepoModel, RepositoryError> {
355 match self {
356 TransactionRepositoryStorage::InMemory(repo) => repo.update_status(tx_id, status).await,
357 TransactionRepositoryStorage::Redis(repo) => repo.update_status(tx_id, status).await,
358 }
359 }
360
361 async fn partial_update(
362 &self,
363 tx_id: String,
364 update: TransactionUpdateRequest,
365 ) -> Result<TransactionRepoModel, RepositoryError> {
366 match self {
367 TransactionRepositoryStorage::InMemory(repo) => {
368 repo.partial_update(tx_id, update).await
369 }
370 TransactionRepositoryStorage::Redis(repo) => repo.partial_update(tx_id, update).await,
371 }
372 }
373
374 async fn update_network_data(
375 &self,
376 tx_id: String,
377 network_data: NetworkTransactionData,
378 ) -> Result<TransactionRepoModel, RepositoryError> {
379 match self {
380 TransactionRepositoryStorage::InMemory(repo) => {
381 repo.update_network_data(tx_id, network_data).await
382 }
383 TransactionRepositoryStorage::Redis(repo) => {
384 repo.update_network_data(tx_id, network_data).await
385 }
386 }
387 }
388
389 async fn set_sent_at(
390 &self,
391 tx_id: String,
392 sent_at: String,
393 ) -> Result<TransactionRepoModel, RepositoryError> {
394 match self {
395 TransactionRepositoryStorage::InMemory(repo) => repo.set_sent_at(tx_id, sent_at).await,
396 TransactionRepositoryStorage::Redis(repo) => repo.set_sent_at(tx_id, sent_at).await,
397 }
398 }
399
400 async fn increment_status_check_failures(
401 &self,
402 tx_id: String,
403 ) -> Result<TransactionRepoModel, RepositoryError> {
404 match self {
405 TransactionRepositoryStorage::InMemory(repo) => {
406 repo.increment_status_check_failures(tx_id).await
407 }
408 TransactionRepositoryStorage::Redis(repo) => {
409 repo.increment_status_check_failures(tx_id).await
410 }
411 }
412 }
413
414 async fn reset_status_check_consecutive_failures(
415 &self,
416 tx_id: String,
417 ) -> Result<TransactionRepoModel, RepositoryError> {
418 match self {
419 TransactionRepositoryStorage::InMemory(repo) => {
420 repo.reset_status_check_consecutive_failures(tx_id).await
421 }
422 TransactionRepositoryStorage::Redis(repo) => {
423 repo.reset_status_check_consecutive_failures(tx_id).await
424 }
425 }
426 }
427
428 async fn record_stellar_insufficient_fee_retry(
429 &self,
430 tx_id: String,
431 sent_at: String,
432 ) -> Result<TransactionRepoModel, RepositoryError> {
433 match self {
434 TransactionRepositoryStorage::InMemory(repo) => {
435 repo.record_stellar_insufficient_fee_retry(tx_id, sent_at)
436 .await
437 }
438 TransactionRepositoryStorage::Redis(repo) => {
439 repo.record_stellar_insufficient_fee_retry(tx_id, sent_at)
440 .await
441 }
442 }
443 }
444
445 async fn record_stellar_try_again_later_retry(
446 &self,
447 tx_id: String,
448 sent_at: String,
449 ) -> Result<TransactionRepoModel, RepositoryError> {
450 match self {
451 TransactionRepositoryStorage::InMemory(repo) => {
452 repo.record_stellar_try_again_later_retry(tx_id, sent_at)
453 .await
454 }
455 TransactionRepositoryStorage::Redis(repo) => {
456 repo.record_stellar_try_again_later_retry(tx_id, sent_at)
457 .await
458 }
459 }
460 }
461
462 async fn set_confirmed_at(
463 &self,
464 tx_id: String,
465 confirmed_at: String,
466 ) -> Result<TransactionRepoModel, RepositoryError> {
467 match self {
468 TransactionRepositoryStorage::InMemory(repo) => {
469 repo.set_confirmed_at(tx_id, confirmed_at).await
470 }
471 TransactionRepositoryStorage::Redis(repo) => {
472 repo.set_confirmed_at(tx_id, confirmed_at).await
473 }
474 }
475 }
476
477 async fn count_by_status(
478 &self,
479 relayer_id: &str,
480 statuses: &[TransactionStatus],
481 ) -> Result<u64, RepositoryError> {
482 match self {
483 TransactionRepositoryStorage::InMemory(repo) => {
484 repo.count_by_status(relayer_id, statuses).await
485 }
486 TransactionRepositoryStorage::Redis(repo) => {
487 repo.count_by_status(relayer_id, statuses).await
488 }
489 }
490 }
491
492 async fn delete_by_ids(&self, ids: Vec<String>) -> Result<BatchDeleteResult, RepositoryError> {
493 match self {
494 TransactionRepositoryStorage::InMemory(repo) => repo.delete_by_ids(ids).await,
495 TransactionRepositoryStorage::Redis(repo) => repo.delete_by_ids(ids).await,
496 }
497 }
498
499 async fn delete_by_requests(
500 &self,
501 requests: Vec<TransactionDeleteRequest>,
502 ) -> Result<BatchDeleteResult, RepositoryError> {
503 match self {
504 TransactionRepositoryStorage::InMemory(repo) => repo.delete_by_requests(requests).await,
505 TransactionRepositoryStorage::Redis(repo) => repo.delete_by_requests(requests).await,
506 }
507 }
508}
509
510#[async_trait]
511impl Repository<TransactionRepoModel, String> for TransactionRepositoryStorage {
512 async fn create(
513 &self,
514 entity: TransactionRepoModel,
515 ) -> Result<TransactionRepoModel, RepositoryError> {
516 match self {
517 TransactionRepositoryStorage::InMemory(repo) => repo.create(entity).await,
518 TransactionRepositoryStorage::Redis(repo) => repo.create(entity).await,
519 }
520 }
521
522 async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError> {
523 match self {
524 TransactionRepositoryStorage::InMemory(repo) => repo.get_by_id(id).await,
525 TransactionRepositoryStorage::Redis(repo) => repo.get_by_id(id).await,
526 }
527 }
528
529 async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
530 match self {
531 TransactionRepositoryStorage::InMemory(repo) => repo.list_all().await,
532 TransactionRepositoryStorage::Redis(repo) => repo.list_all().await,
533 }
534 }
535
536 async fn list_paginated(
537 &self,
538 query: PaginationQuery,
539 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
540 match self {
541 TransactionRepositoryStorage::InMemory(repo) => repo.list_paginated(query).await,
542 TransactionRepositoryStorage::Redis(repo) => repo.list_paginated(query).await,
543 }
544 }
545
546 async fn update(
547 &self,
548 id: String,
549 entity: TransactionRepoModel,
550 ) -> Result<TransactionRepoModel, RepositoryError> {
551 match self {
552 TransactionRepositoryStorage::InMemory(repo) => repo.update(id, entity).await,
553 TransactionRepositoryStorage::Redis(repo) => repo.update(id, entity).await,
554 }
555 }
556
557 async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
558 match self {
559 TransactionRepositoryStorage::InMemory(repo) => repo.delete_by_id(id).await,
560 TransactionRepositoryStorage::Redis(repo) => repo.delete_by_id(id).await,
561 }
562 }
563
564 async fn count(&self) -> Result<usize, RepositoryError> {
565 match self {
566 TransactionRepositoryStorage::InMemory(repo) => repo.count().await,
567 TransactionRepositoryStorage::Redis(repo) => repo.count().await,
568 }
569 }
570
571 async fn has_entries(&self) -> Result<bool, RepositoryError> {
572 match self {
573 TransactionRepositoryStorage::InMemory(repo) => repo.has_entries().await,
574 TransactionRepositoryStorage::Redis(repo) => repo.has_entries().await,
575 }
576 }
577
578 async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
579 match self {
580 TransactionRepositoryStorage::InMemory(repo) => repo.drop_all_entries().await,
581 TransactionRepositoryStorage::Redis(repo) => repo.drop_all_entries().await,
582 }
583 }
584}
585
586#[cfg(test)]
587mod tests {
588 use chrono::Utc;
589 use color_eyre::Result;
590 use deadpool_redis::{Config, Runtime};
591
592 use super::*;
593 use crate::models::{
594 EvmTransactionData, NetworkTransactionData, TransactionStatus, TransactionUpdateRequest,
595 };
596 use crate::repositories::PaginationQuery;
597 use crate::utils::mocks::mockutils::create_mock_transaction;
598
599 fn create_test_transaction(id: &str, relayer_id: &str) -> TransactionRepoModel {
600 let mut transaction = create_mock_transaction();
601 transaction.id = id.to_string();
602 transaction.relayer_id = relayer_id.to_string();
603 transaction
604 }
605
606 fn create_test_transaction_with_status(
607 id: &str,
608 relayer_id: &str,
609 status: TransactionStatus,
610 ) -> TransactionRepoModel {
611 let mut transaction = create_test_transaction(id, relayer_id);
612 transaction.status = status;
613 transaction
614 }
615
616 fn create_test_transaction_with_nonce(
617 id: &str,
618 relayer_id: &str,
619 nonce: u64,
620 ) -> TransactionRepoModel {
621 let mut transaction = create_test_transaction(id, relayer_id);
622 if let NetworkTransactionData::Evm(ref mut evm_data) = transaction.network_data {
623 evm_data.nonce = Some(nonce);
624 }
625 transaction
626 }
627
628 fn create_test_update_request() -> TransactionUpdateRequest {
629 TransactionUpdateRequest {
630 status: Some(TransactionStatus::Sent),
631 status_reason: Some("Test reason".to_string()),
632 sent_at: Some(Utc::now().to_string()),
633 confirmed_at: None,
634 network_data: None,
635 priced_at: None,
636 hashes: Some(vec!["test_hash".to_string()]),
637 noop_count: None,
638 is_canceled: None,
639 delete_at: None,
640 metadata: None,
641 }
642 }
643
644 #[tokio::test]
645 async fn test_new_in_memory() {
646 let storage = TransactionRepositoryStorage::new_in_memory();
647
648 match storage {
649 TransactionRepositoryStorage::InMemory(_) => {
650 }
652 TransactionRepositoryStorage::Redis(_) => {
653 panic!("Expected InMemory variant, got Redis");
654 }
655 }
656 }
657
658 #[tokio::test]
659 async fn test_connection_info_returns_none_for_in_memory() {
660 let storage = TransactionRepositoryStorage::new_in_memory();
661
662 assert!(storage.connection_info().is_none());
664 }
665
666 #[tokio::test]
667 #[ignore = "Requires active Redis instance"]
668 async fn test_connection_info_returns_some_for_redis() -> Result<()> {
669 let redis_url = std::env::var("REDIS_TEST_URL")
670 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
671 let cfg = Config::from_url(&redis_url);
672 let pool = Arc::new(
673 cfg.builder()
674 .map_err(|e| eyre::eyre!("Failed to create Redis pool builder: {}", e))?
675 .max_size(16)
676 .runtime(Runtime::Tokio1)
677 .build()
678 .map_err(|e| eyre::eyre!("Failed to build Redis pool: {}", e))?,
679 );
680 let connections = Arc::new(RedisConnections::new_single_pool(pool.clone()));
681 let key_prefix = "test_prefix".to_string();
682
683 let storage = TransactionRepositoryStorage::new_redis(connections, key_prefix.clone())?;
684
685 let (returned_connection, returned_prefix) = storage
686 .connection_info()
687 .expect("Expected Redis connection info");
688
689 assert!(Arc::ptr_eq(&pool, returned_connection.primary()));
690 assert_eq!(returned_prefix, key_prefix);
691
692 Ok(())
693 }
694
695 #[tokio::test]
696 async fn test_create_in_memory() -> Result<()> {
697 let storage = TransactionRepositoryStorage::new_in_memory();
698 let transaction = create_test_transaction("test-tx", "test-relayer");
699
700 let created = storage.create(transaction.clone()).await?;
701 assert_eq!(created.id, transaction.id);
702 assert_eq!(created.relayer_id, transaction.relayer_id);
703 assert_eq!(created.status, transaction.status);
704
705 Ok(())
706 }
707
708 #[tokio::test]
709 async fn test_get_by_id_in_memory() -> Result<()> {
710 let storage = TransactionRepositoryStorage::new_in_memory();
711 let transaction = create_test_transaction("test-tx", "test-relayer");
712
713 storage.create(transaction.clone()).await?;
715
716 let retrieved = storage.get_by_id("test-tx".to_string()).await?;
718 assert_eq!(retrieved.id, transaction.id);
719 assert_eq!(retrieved.relayer_id, transaction.relayer_id);
720 assert_eq!(retrieved.status, transaction.status);
721
722 Ok(())
723 }
724
725 #[tokio::test]
726 async fn test_get_by_id_not_found_in_memory() -> Result<()> {
727 let storage = TransactionRepositoryStorage::new_in_memory();
728
729 let result = storage.get_by_id("non-existent".to_string()).await;
730 assert!(result.is_err());
731
732 Ok(())
733 }
734
735 #[tokio::test]
736 async fn test_list_all_in_memory() -> Result<()> {
737 let storage = TransactionRepositoryStorage::new_in_memory();
738
739 let transactions = storage.list_all().await?;
741 assert!(transactions.is_empty());
742
743 let tx1 = create_test_transaction("tx-1", "relayer-1");
745 let tx2 = create_test_transaction("tx-2", "relayer-2");
746
747 storage.create(tx1.clone()).await?;
748 storage.create(tx2.clone()).await?;
749
750 let all_transactions = storage.list_all().await?;
751 assert_eq!(all_transactions.len(), 2);
752
753 let ids: Vec<&str> = all_transactions.iter().map(|t| t.id.as_str()).collect();
754 assert!(ids.contains(&"tx-1"));
755 assert!(ids.contains(&"tx-2"));
756
757 Ok(())
758 }
759
760 #[tokio::test]
761 async fn test_list_paginated_in_memory() -> Result<()> {
762 let storage = TransactionRepositoryStorage::new_in_memory();
763
764 for i in 1..=5 {
766 let tx = create_test_transaction(&format!("tx-{i}"), "test-relayer");
767 storage.create(tx).await?;
768 }
769
770 let query = PaginationQuery {
772 page: 1,
773 per_page: 2,
774 };
775 let page = storage.list_paginated(query).await?;
776
777 assert_eq!(page.items.len(), 2);
778 assert_eq!(page.total, 5);
779 assert_eq!(page.page, 1);
780 assert_eq!(page.per_page, 2);
781
782 let query2 = PaginationQuery {
784 page: 2,
785 per_page: 2,
786 };
787 let page2 = storage.list_paginated(query2).await?;
788
789 assert_eq!(page2.items.len(), 2);
790 assert_eq!(page2.total, 5);
791 assert_eq!(page2.page, 2);
792 assert_eq!(page2.per_page, 2);
793
794 Ok(())
795 }
796
797 #[tokio::test]
798 async fn test_update_in_memory() -> Result<()> {
799 let storage = TransactionRepositoryStorage::new_in_memory();
800 let transaction = create_test_transaction("test-tx", "test-relayer");
801
802 storage.create(transaction.clone()).await?;
804
805 let mut updated_transaction = transaction.clone();
807 updated_transaction.status = TransactionStatus::Sent;
808 updated_transaction.status_reason = Some("Updated reason".to_string());
809
810 let result = storage
811 .update("test-tx".to_string(), updated_transaction.clone())
812 .await?;
813 assert_eq!(result.id, "test-tx");
814 assert_eq!(result.status, TransactionStatus::Sent);
815 assert_eq!(result.status_reason, Some("Updated reason".to_string()));
816
817 let retrieved = storage.get_by_id("test-tx".to_string()).await?;
819 assert_eq!(retrieved.status, TransactionStatus::Sent);
820 assert_eq!(retrieved.status_reason, Some("Updated reason".to_string()));
821
822 Ok(())
823 }
824
825 #[tokio::test]
826 async fn test_update_not_found_in_memory() -> Result<()> {
827 let storage = TransactionRepositoryStorage::new_in_memory();
828 let transaction = create_test_transaction("non-existent", "test-relayer");
829
830 let result = storage
831 .update("non-existent".to_string(), transaction)
832 .await;
833 assert!(result.is_err());
834
835 Ok(())
836 }
837
838 #[tokio::test]
839 async fn test_delete_by_id_in_memory() -> Result<()> {
840 let storage = TransactionRepositoryStorage::new_in_memory();
841 let transaction = create_test_transaction("test-tx", "test-relayer");
842
843 storage.create(transaction.clone()).await?;
845
846 let retrieved = storage.get_by_id("test-tx".to_string()).await?;
848 assert_eq!(retrieved.id, "test-tx");
849
850 storage.delete_by_id("test-tx".to_string()).await?;
852
853 let result = storage.get_by_id("test-tx".to_string()).await;
855 assert!(result.is_err());
856
857 Ok(())
858 }
859
860 #[tokio::test]
861 async fn test_delete_by_id_not_found_in_memory() -> Result<()> {
862 let storage = TransactionRepositoryStorage::new_in_memory();
863
864 let result = storage.delete_by_id("non-existent".to_string()).await;
865 assert!(result.is_err());
866
867 Ok(())
868 }
869
870 #[tokio::test]
871 async fn test_count_in_memory() -> Result<()> {
872 let storage = TransactionRepositoryStorage::new_in_memory();
873
874 let count = storage.count().await?;
876 assert_eq!(count, 0);
877
878 let tx1 = create_test_transaction("tx-1", "relayer-1");
880 let tx2 = create_test_transaction("tx-2", "relayer-2");
881
882 storage.create(tx1).await?;
883 let count_after_one = storage.count().await?;
884 assert_eq!(count_after_one, 1);
885
886 storage.create(tx2).await?;
887 let count_after_two = storage.count().await?;
888 assert_eq!(count_after_two, 2);
889
890 storage.delete_by_id("tx-1".to_string()).await?;
892 let count_after_delete = storage.count().await?;
893 assert_eq!(count_after_delete, 1);
894
895 Ok(())
896 }
897
898 #[tokio::test]
899 async fn test_has_entries_in_memory() -> Result<()> {
900 let storage = TransactionRepositoryStorage::new_in_memory();
901
902 let has_entries = storage.has_entries().await?;
904 assert!(!has_entries);
905
906 let transaction = create_test_transaction("test-tx", "test-relayer");
908 storage.create(transaction).await?;
909
910 let has_entries_after_create = storage.has_entries().await?;
911 assert!(has_entries_after_create);
912
913 storage.delete_by_id("test-tx".to_string()).await?;
915
916 let has_entries_after_delete = storage.has_entries().await?;
917 assert!(!has_entries_after_delete);
918
919 Ok(())
920 }
921
922 #[tokio::test]
923 async fn test_drop_all_entries_in_memory() -> Result<()> {
924 let storage = TransactionRepositoryStorage::new_in_memory();
925
926 for i in 1..=5 {
928 let tx = create_test_transaction(&format!("tx-{i}"), "test-relayer");
929 storage.create(tx).await?;
930 }
931
932 let count_before = storage.count().await?;
934 assert_eq!(count_before, 5);
935
936 let has_entries_before = storage.has_entries().await?;
937 assert!(has_entries_before);
938
939 storage.drop_all_entries().await?;
941
942 let count_after = storage.count().await?;
944 assert_eq!(count_after, 0);
945
946 let has_entries_after = storage.has_entries().await?;
947 assert!(!has_entries_after);
948
949 let all_transactions = storage.list_all().await?;
950 assert!(all_transactions.is_empty());
951
952 Ok(())
953 }
954
955 #[tokio::test]
956 async fn test_find_by_relayer_id_in_memory() -> Result<()> {
957 let storage = TransactionRepositoryStorage::new_in_memory();
958
959 let tx1 = create_test_transaction("tx-1", "relayer-1");
961 let tx2 = create_test_transaction("tx-2", "relayer-1");
962 let tx3 = create_test_transaction("tx-3", "relayer-2");
963
964 storage.create(tx1).await?;
965 storage.create(tx2).await?;
966 storage.create(tx3).await?;
967
968 let query = PaginationQuery {
970 page: 1,
971 per_page: 10,
972 };
973 let result = storage.find_by_relayer_id("relayer-1", query).await?;
974
975 assert_eq!(result.items.len(), 2);
976 assert_eq!(result.total, 2);
977
978 for tx in result.items {
980 assert_eq!(tx.relayer_id, "relayer-1");
981 }
982
983 Ok(())
984 }
985
986 #[tokio::test]
987 async fn test_find_by_status_in_memory() -> Result<()> {
988 let storage = TransactionRepositoryStorage::new_in_memory();
989
990 let tx1 =
992 create_test_transaction_with_status("tx-1", "relayer-1", TransactionStatus::Pending);
993 let tx2 = create_test_transaction_with_status("tx-2", "relayer-1", TransactionStatus::Sent);
994 let tx3 =
995 create_test_transaction_with_status("tx-3", "relayer-1", TransactionStatus::Pending);
996 let tx4 =
997 create_test_transaction_with_status("tx-4", "relayer-2", TransactionStatus::Pending);
998
999 storage.create(tx1).await?;
1000 storage.create(tx2).await?;
1001 storage.create(tx3).await?;
1002 storage.create(tx4).await?;
1003
1004 let statuses = vec![TransactionStatus::Pending];
1006 let result = storage.find_by_status("relayer-1", &statuses).await?;
1007
1008 assert_eq!(result.len(), 2);
1009
1010 for tx in result {
1012 assert_eq!(tx.status, TransactionStatus::Pending);
1013 assert_eq!(tx.relayer_id, "relayer-1");
1014 }
1015
1016 Ok(())
1017 }
1018
1019 #[tokio::test]
1020 async fn test_find_by_nonce_in_memory() -> Result<()> {
1021 let storage = TransactionRepositoryStorage::new_in_memory();
1022
1023 let tx1 = create_test_transaction_with_nonce("tx-1", "relayer-1", 10);
1025 let tx2 = create_test_transaction_with_nonce("tx-2", "relayer-1", 20);
1026 let tx3 = create_test_transaction_with_nonce("tx-3", "relayer-2", 10);
1027
1028 storage.create(tx1).await?;
1029 storage.create(tx2).await?;
1030 storage.create(tx3).await?;
1031
1032 let result = storage.find_by_nonce("relayer-1", 10).await?;
1034
1035 assert!(result.is_some());
1036 let found_tx = result.unwrap();
1037 assert_eq!(found_tx.id, "tx-1");
1038 assert_eq!(found_tx.relayer_id, "relayer-1");
1039
1040 if let NetworkTransactionData::Evm(evm_data) = found_tx.network_data {
1042 assert_eq!(evm_data.nonce, Some(10));
1043 }
1044
1045 let not_found = storage.find_by_nonce("relayer-1", 99).await?;
1047 assert!(not_found.is_none());
1048
1049 Ok(())
1050 }
1051
1052 #[tokio::test]
1053 async fn test_update_status_in_memory() -> Result<()> {
1054 let storage = TransactionRepositoryStorage::new_in_memory();
1055 let transaction = create_test_transaction("test-tx", "test-relayer");
1056
1057 storage.create(transaction).await?;
1059
1060 let updated = storage
1062 .update_status("test-tx".to_string(), TransactionStatus::Sent)
1063 .await?;
1064
1065 assert_eq!(updated.id, "test-tx");
1066 assert_eq!(updated.status, TransactionStatus::Sent);
1067
1068 let retrieved = storage.get_by_id("test-tx".to_string()).await?;
1070 assert_eq!(retrieved.status, TransactionStatus::Sent);
1071
1072 Ok(())
1073 }
1074
1075 #[tokio::test]
1076 async fn test_partial_update_in_memory() -> Result<()> {
1077 let storage = TransactionRepositoryStorage::new_in_memory();
1078 let transaction = create_test_transaction("test-tx", "test-relayer");
1079
1080 storage.create(transaction).await?;
1082
1083 let update_request = create_test_update_request();
1085 let updated = storage
1086 .partial_update("test-tx".to_string(), update_request)
1087 .await?;
1088
1089 assert_eq!(updated.id, "test-tx");
1090 assert_eq!(updated.status, TransactionStatus::Sent);
1091 assert_eq!(updated.status_reason, Some("Test reason".to_string()));
1092 assert!(updated.sent_at.is_some());
1093 assert_eq!(updated.hashes, vec!["test_hash".to_string()]);
1094
1095 Ok(())
1096 }
1097
1098 #[tokio::test]
1099 async fn test_update_network_data_in_memory() -> Result<()> {
1100 let storage = TransactionRepositoryStorage::new_in_memory();
1101 let transaction = create_test_transaction("test-tx", "test-relayer");
1102
1103 storage.create(transaction).await?;
1105
1106 let new_evm_data = EvmTransactionData {
1108 nonce: Some(42),
1109 gas_limit: Some(21000),
1110 ..Default::default()
1111 };
1112 let new_network_data = NetworkTransactionData::Evm(new_evm_data);
1113
1114 let updated = storage
1115 .update_network_data("test-tx".to_string(), new_network_data)
1116 .await?;
1117
1118 assert_eq!(updated.id, "test-tx");
1119 if let NetworkTransactionData::Evm(evm_data) = updated.network_data {
1120 assert_eq!(evm_data.nonce, Some(42));
1121 assert_eq!(evm_data.gas_limit, Some(21000));
1122 } else {
1123 panic!("Expected EVM network data");
1124 }
1125
1126 Ok(())
1127 }
1128
1129 #[tokio::test]
1130 async fn test_set_sent_at_in_memory() -> Result<()> {
1131 let storage = TransactionRepositoryStorage::new_in_memory();
1132 let transaction = create_test_transaction("test-tx", "test-relayer");
1133
1134 storage.create(transaction).await?;
1136
1137 let sent_at = Utc::now().to_string();
1139 let updated = storage
1140 .set_sent_at("test-tx".to_string(), sent_at.clone())
1141 .await?;
1142
1143 assert_eq!(updated.id, "test-tx");
1144 assert_eq!(updated.sent_at, Some(sent_at));
1145
1146 Ok(())
1147 }
1148
1149 #[tokio::test]
1150 async fn test_set_confirmed_at_in_memory() -> Result<()> {
1151 let storage = TransactionRepositoryStorage::new_in_memory();
1152 let transaction = create_test_transaction("test-tx", "test-relayer");
1153
1154 storage.create(transaction).await?;
1156
1157 let confirmed_at = Utc::now().to_string();
1159 let updated = storage
1160 .set_confirmed_at("test-tx".to_string(), confirmed_at.clone())
1161 .await?;
1162
1163 assert_eq!(updated.id, "test-tx");
1164 assert_eq!(updated.confirmed_at, Some(confirmed_at));
1165
1166 Ok(())
1167 }
1168
1169 #[tokio::test]
1170 async fn test_create_duplicate_id_in_memory() -> Result<()> {
1171 let storage = TransactionRepositoryStorage::new_in_memory();
1172 let transaction = create_test_transaction("duplicate-id", "test-relayer");
1173
1174 storage.create(transaction.clone()).await?;
1176
1177 let result = storage.create(transaction.clone()).await;
1179 assert!(result.is_err());
1180
1181 Ok(())
1182 }
1183
1184 #[tokio::test]
1185 async fn test_workflow_in_memory() -> Result<()> {
1186 let storage = TransactionRepositoryStorage::new_in_memory();
1187
1188 assert!(!storage.has_entries().await?);
1190 assert_eq!(storage.count().await?, 0);
1191
1192 let transaction = create_test_transaction("workflow-test", "test-relayer");
1194 let created = storage.create(transaction.clone()).await?;
1195 assert_eq!(created.id, "workflow-test");
1196
1197 assert!(storage.has_entries().await?);
1199 assert_eq!(storage.count().await?, 1);
1200
1201 let retrieved = storage.get_by_id("workflow-test".to_string()).await?;
1203 assert_eq!(retrieved.id, "workflow-test");
1204
1205 let updated = storage
1207 .update_status("workflow-test".to_string(), TransactionStatus::Sent)
1208 .await?;
1209 assert_eq!(updated.status, TransactionStatus::Sent);
1210
1211 let retrieved_updated = storage.get_by_id("workflow-test".to_string()).await?;
1213 assert_eq!(retrieved_updated.status, TransactionStatus::Sent);
1214
1215 storage.delete_by_id("workflow-test".to_string()).await?;
1217
1218 assert!(!storage.has_entries().await?);
1220 assert_eq!(storage.count().await?, 0);
1221
1222 let result = storage.get_by_id("workflow-test".to_string()).await;
1223 assert!(result.is_err());
1224
1225 Ok(())
1226 }
1227
1228 #[tokio::test]
1229 async fn test_multiple_relayers_workflow() -> Result<()> {
1230 let storage = TransactionRepositoryStorage::new_in_memory();
1231
1232 let tx1 =
1234 create_test_transaction_with_status("tx-1", "relayer-1", TransactionStatus::Pending);
1235 let tx2 = create_test_transaction_with_status("tx-2", "relayer-1", TransactionStatus::Sent);
1236 let tx3 =
1237 create_test_transaction_with_status("tx-3", "relayer-2", TransactionStatus::Pending);
1238
1239 storage.create(tx1).await?;
1240 storage.create(tx2).await?;
1241 storage.create(tx3).await?;
1242
1243 let query = PaginationQuery {
1245 page: 1,
1246 per_page: 10,
1247 };
1248 let relayer1_txs = storage.find_by_relayer_id("relayer-1", query).await?;
1249 assert_eq!(relayer1_txs.items.len(), 2);
1250
1251 let pending_txs = storage
1253 .find_by_status("relayer-1", &[TransactionStatus::Pending])
1254 .await?;
1255 assert_eq!(pending_txs.len(), 1);
1256 assert_eq!(pending_txs[0].id, "tx-1");
1257
1258 assert_eq!(storage.count().await?, 3);
1260
1261 Ok(())
1262 }
1263
1264 #[tokio::test]
1265 async fn test_pagination_edge_cases_in_memory() -> Result<()> {
1266 let storage = TransactionRepositoryStorage::new_in_memory();
1267
1268 let query = PaginationQuery {
1270 page: 1,
1271 per_page: 10,
1272 };
1273 let page = storage.list_paginated(query).await?;
1274 assert_eq!(page.items.len(), 0);
1275 assert_eq!(page.total, 0);
1276 assert_eq!(page.page, 1);
1277 assert_eq!(page.per_page, 10);
1278
1279 let transaction = create_test_transaction("single-tx", "test-relayer");
1281 storage.create(transaction).await?;
1282
1283 let query = PaginationQuery {
1285 page: 1,
1286 per_page: 10,
1287 };
1288 let page = storage.list_paginated(query).await?;
1289 assert_eq!(page.items.len(), 1);
1290 assert_eq!(page.total, 1);
1291 assert_eq!(page.page, 1);
1292 assert_eq!(page.per_page, 10);
1293
1294 let query = PaginationQuery {
1296 page: 3,
1297 per_page: 10,
1298 };
1299 let page = storage.list_paginated(query).await?;
1300 assert_eq!(page.items.len(), 0);
1301 assert_eq!(page.total, 1);
1302 assert_eq!(page.page, 3);
1303 assert_eq!(page.per_page, 10);
1304
1305 Ok(())
1306 }
1307
1308 #[tokio::test]
1309 async fn test_find_by_relayer_id_pagination() -> Result<()> {
1310 let storage = TransactionRepositoryStorage::new_in_memory();
1311
1312 for i in 1..=10 {
1314 let tx = create_test_transaction(&format!("tx-{i}"), "test-relayer");
1315 storage.create(tx).await?;
1316 }
1317
1318 let query = PaginationQuery {
1320 page: 1,
1321 per_page: 3,
1322 };
1323 let page1 = storage.find_by_relayer_id("test-relayer", query).await?;
1324 assert_eq!(page1.items.len(), 3);
1325 assert_eq!(page1.total, 10);
1326 assert_eq!(page1.page, 1);
1327 assert_eq!(page1.per_page, 3);
1328
1329 let query = PaginationQuery {
1331 page: 2,
1332 per_page: 3,
1333 };
1334 let page2 = storage.find_by_relayer_id("test-relayer", query).await?;
1335 assert_eq!(page2.items.len(), 3);
1336 assert_eq!(page2.total, 10);
1337 assert_eq!(page2.page, 2);
1338 assert_eq!(page2.per_page, 3);
1339
1340 Ok(())
1341 }
1342
1343 #[tokio::test]
1344 async fn test_find_by_multiple_statuses() -> Result<()> {
1345 let storage = TransactionRepositoryStorage::new_in_memory();
1346
1347 let tx1 =
1349 create_test_transaction_with_status("tx-1", "test-relayer", TransactionStatus::Pending);
1350 let tx2 =
1351 create_test_transaction_with_status("tx-2", "test-relayer", TransactionStatus::Sent);
1352 let tx3 = create_test_transaction_with_status(
1353 "tx-3",
1354 "test-relayer",
1355 TransactionStatus::Confirmed,
1356 );
1357 let tx4 =
1358 create_test_transaction_with_status("tx-4", "test-relayer", TransactionStatus::Failed);
1359
1360 storage.create(tx1).await?;
1361 storage.create(tx2).await?;
1362 storage.create(tx3).await?;
1363 storage.create(tx4).await?;
1364
1365 let statuses = vec![TransactionStatus::Pending, TransactionStatus::Sent];
1367 let result = storage.find_by_status("test-relayer", &statuses).await?;
1368
1369 assert_eq!(result.len(), 2);
1370
1371 let found_statuses: Vec<TransactionStatus> =
1373 result.iter().map(|tx| tx.status.clone()).collect();
1374 assert!(found_statuses.contains(&TransactionStatus::Pending));
1375 assert!(found_statuses.contains(&TransactionStatus::Sent));
1376
1377 Ok(())
1378 }
1379
1380 #[tokio::test]
1381 async fn test_record_stellar_try_again_later_retry_in_memory() -> Result<()> {
1382 let storage = TransactionRepositoryStorage::new_in_memory();
1383 let mut transaction = create_test_transaction("test-tx", "test-relayer");
1384 transaction.status = TransactionStatus::Sent;
1385 storage.create(transaction).await?;
1386
1387 let sent_at = "2025-03-18T10:00:00Z".to_string();
1388 let updated = storage
1389 .record_stellar_try_again_later_retry("test-tx".to_string(), sent_at.clone())
1390 .await?;
1391
1392 assert_eq!(updated.id, "test-tx");
1393 assert_eq!(updated.sent_at, Some(sent_at));
1394 let meta = updated.metadata.expect("metadata should be set");
1395 assert_eq!(meta.try_again_later_retries, 1);
1396 assert_eq!(meta.consecutive_failures, 0);
1397 assert_eq!(meta.total_failures, 0);
1398 assert_eq!(meta.insufficient_fee_retries, 0);
1399
1400 Ok(())
1401 }
1402
1403 #[tokio::test]
1404 async fn test_record_stellar_try_again_later_retry_accumulates_in_memory() -> Result<()> {
1405 let storage = TransactionRepositoryStorage::new_in_memory();
1406 let mut transaction = create_test_transaction("test-tx", "test-relayer");
1407 transaction.status = TransactionStatus::Sent;
1408 storage.create(transaction).await?;
1409
1410 storage
1411 .record_stellar_try_again_later_retry(
1412 "test-tx".to_string(),
1413 "2025-03-18T10:00:00Z".to_string(),
1414 )
1415 .await?;
1416
1417 let updated = storage
1418 .record_stellar_try_again_later_retry(
1419 "test-tx".to_string(),
1420 "2025-03-18T10:01:00Z".to_string(),
1421 )
1422 .await?;
1423
1424 assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:01:00Z"));
1425 let meta = updated.metadata.unwrap();
1426 assert_eq!(meta.try_again_later_retries, 2);
1427
1428 Ok(())
1429 }
1430
1431 #[tokio::test]
1432 async fn test_record_stellar_try_again_later_retry_noop_on_final_state_in_memory() -> Result<()>
1433 {
1434 let storage = TransactionRepositoryStorage::new_in_memory();
1435 let mut transaction = create_test_transaction("test-tx", "test-relayer");
1436 transaction.status = TransactionStatus::Confirmed;
1437 transaction.sent_at = Some("old-time".to_string());
1438 storage.create(transaction).await?;
1439
1440 let result = storage
1441 .record_stellar_try_again_later_retry("test-tx".to_string(), "new-time".to_string())
1442 .await?;
1443
1444 assert_eq!(result.sent_at.as_deref(), Some("old-time"));
1445 assert!(result.metadata.is_none());
1446
1447 Ok(())
1448 }
1449
1450 #[tokio::test]
1451 async fn test_record_stellar_try_again_later_retry_not_found_in_memory() -> Result<()> {
1452 let storage = TransactionRepositoryStorage::new_in_memory();
1453
1454 let result = storage
1455 .record_stellar_try_again_later_retry(
1456 "nonexistent".to_string(),
1457 "2025-03-18T10:00:00Z".to_string(),
1458 )
1459 .await;
1460
1461 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1462
1463 Ok(())
1464 }
1465}