1use crate::config::ServerConfig;
4use crate::constants::FINAL_TRANSACTION_STATUSES;
5use crate::domain::transaction::common::is_final_state;
6use crate::metrics::{
7 TRANSACTIONS_BY_STATUS, TRANSACTIONS_CREATED, TRANSACTIONS_FAILED,
8 TRANSACTIONS_INSUFFICIENT_FEE_FAILED, TRANSACTIONS_INSUFFICIENT_FEE_SUCCESS,
9 TRANSACTIONS_SUBMITTED, TRANSACTIONS_SUCCESS, TRANSACTIONS_TRY_AGAIN_LATER_FAILED,
10 TRANSACTIONS_TRY_AGAIN_LATER_SUCCESS, TRANSACTION_PROCESSING_TIME,
11};
12use crate::models::{
13 NetworkTransactionData, PaginationQuery, RepositoryError, TransactionRepoModel,
14 TransactionStatus, TransactionUpdateRequest,
15};
16use crate::repositories::redis_base::RedisRepository;
17use crate::repositories::{
18 BatchDeleteResult, BatchRetrievalResult, PaginatedResult, Repository, TransactionDeleteRequest,
19 TransactionRepository,
20};
21use crate::utils::RedisConnections;
22use async_trait::async_trait;
23use chrono::Utc;
24use redis::{AsyncCommands, Script};
25use std::fmt;
26use std::sync::Arc;
27use tracing::{debug, error, warn};
28
29const RELAYER_PREFIX: &str = "relayer";
30const TX_PREFIX: &str = "tx";
31const STATUS_PREFIX: &str = "status";
32const STATUS_SORTED_PREFIX: &str = "status_sorted";
33const NONCE_PREFIX: &str = "nonce";
34const TX_TO_RELAYER_PREFIX: &str = "tx_to_relayer";
35const RELAYER_LIST_KEY: &str = "relayer_list";
36const TX_BY_CREATED_AT_PREFIX: &str = "tx_by_created_at";
37
38#[derive(Clone)]
39pub struct RedisTransactionRepository {
40 pub connections: Arc<RedisConnections>,
41 pub key_prefix: String,
42}
43
44impl RedisRepository for RedisTransactionRepository {}
45
46impl RedisTransactionRepository {
47 pub fn new(
48 connections: Arc<RedisConnections>,
49 key_prefix: String,
50 ) -> Result<Self, RepositoryError> {
51 if key_prefix.is_empty() {
52 return Err(RepositoryError::InvalidData(
53 "Redis key prefix cannot be empty".to_string(),
54 ));
55 }
56
57 Ok(Self {
58 connections,
59 key_prefix,
60 })
61 }
62
63 fn tx_key(&self, relayer_id: &str, tx_id: &str) -> String {
65 format!(
66 "{}:{}:{}:{}:{}",
67 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX, tx_id
68 )
69 }
70
71 fn tx_to_relayer_key(&self, tx_id: &str) -> String {
73 format!(
74 "{}:{}:{}:{}",
75 self.key_prefix, RELAYER_PREFIX, TX_TO_RELAYER_PREFIX, tx_id
76 )
77 }
78
79 fn relayer_status_key(&self, relayer_id: &str, status: &TransactionStatus) -> String {
81 format!(
82 "{}:{}:{}:{}:{}",
83 self.key_prefix, RELAYER_PREFIX, relayer_id, STATUS_PREFIX, status
84 )
85 }
86
87 fn relayer_status_sorted_key(&self, relayer_id: &str, status: &TransactionStatus) -> String {
90 format!(
91 "{}:{}:{}:{}:{}",
92 self.key_prefix, RELAYER_PREFIX, relayer_id, STATUS_SORTED_PREFIX, status
93 )
94 }
95
96 fn relayer_nonce_key(&self, relayer_id: &str, nonce: u64) -> String {
98 format!(
99 "{}:{}:{}:{}:{}",
100 self.key_prefix, RELAYER_PREFIX, relayer_id, NONCE_PREFIX, nonce
101 )
102 }
103
104 fn relayer_list_key(&self) -> String {
106 format!("{}:{}", self.key_prefix, RELAYER_LIST_KEY)
107 }
108
109 fn relayer_tx_by_created_at_key(&self, relayer_id: &str) -> String {
111 format!(
112 "{}:{}:{}:{}",
113 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_BY_CREATED_AT_PREFIX
114 )
115 }
116
117 fn tx_key_parts(&self, tx_id: &str) -> (String, String, String) {
122 let lookup_key = self.tx_to_relayer_key(tx_id);
123 let key_prefix = format!("{}:{}:", self.key_prefix, RELAYER_PREFIX);
124 let key_suffix = format!(":{TX_PREFIX}:{tx_id}");
125 (lookup_key, key_prefix, key_suffix)
126 }
127
128 async fn run_atomic_script(
135 &self,
136 lua: &str,
137 tx_id: &str,
138 extra_args: &[&str],
139 op_name: &str,
140 ) -> Result<TransactionRepoModel, RepositoryError> {
141 const MAX_RETRIES: u32 = 3;
142 const BASE_BACKOFF_MS: u64 = 100;
143
144 let (lookup_key, key_prefix, key_suffix) = self.tx_key_parts(tx_id);
145 let script = Script::new(lua);
146 let mut last_error = None;
147
148 for attempt in 0..MAX_RETRIES {
149 let backoff = BASE_BACKOFF_MS * 2u64.pow(attempt);
150
151 let mut conn = match self
152 .get_connection(self.connections.primary(), op_name)
153 .await
154 {
155 Ok(conn) => conn,
156 Err(e) => {
157 last_error = Some(e);
158 if attempt < MAX_RETRIES - 1 {
159 warn!(tx_id = %tx_id, attempt, op = %op_name, "connection failed, retrying");
160 tokio::time::sleep(tokio::time::Duration::from_millis(backoff)).await;
161 continue;
162 }
163 return Err(last_error.unwrap());
164 }
165 };
166
167 let mut invocation = script.prepare_invoke();
168 invocation
169 .key(&lookup_key)
170 .arg(&key_prefix)
171 .arg(&key_suffix);
172 for arg in extra_args {
173 invocation.arg(*arg);
174 }
175
176 match invocation.invoke_async::<Option<String>>(&mut conn).await {
177 Ok(result) => {
178 let json = result.ok_or_else(|| {
179 RepositoryError::NotFound(format!("Transaction with ID {tx_id} not found"))
180 })?;
181 return self.deserialize_entity::<TransactionRepoModel>(
182 &json,
183 tx_id,
184 "transaction",
185 );
186 }
187 Err(e) => {
188 last_error = Some(self.map_redis_error(e, op_name));
189 if attempt < MAX_RETRIES - 1 {
190 warn!(
191 tx_id = %tx_id, attempt, op = %op_name,
192 "atomic script failed, retrying"
193 );
194 tokio::time::sleep(tokio::time::Duration::from_millis(backoff)).await;
195 continue;
196 }
197 return Err(last_error.unwrap());
198 }
199 }
200 }
201 Err(last_error.unwrap_or_else(|| {
202 RepositoryError::UnexpectedError(format!("retry loop exhausted for {op_name}"))
203 }))
204 }
205
206 async fn run_script_with_retry_vec(
210 &self,
211 script: &Script,
212 lookup_key: &str,
213 key_prefix: &str,
214 key_suffix: &str,
215 extra_args: &[&str],
216 op_name: &str,
217 ) -> Result<Option<Vec<String>>, RepositoryError> {
218 const MAX_RETRIES: u32 = 3;
219 const BASE_BACKOFF_MS: u64 = 100;
220
221 let mut last_error = None;
222
223 for attempt in 0..MAX_RETRIES {
224 let backoff = BASE_BACKOFF_MS * 2u64.pow(attempt);
225
226 let mut conn = match self
227 .get_connection(self.connections.primary(), op_name)
228 .await
229 {
230 Ok(conn) => conn,
231 Err(e) => {
232 last_error = Some(e);
233 if attempt < MAX_RETRIES - 1 {
234 warn!(op = %op_name, attempt, "connection failed, retrying");
235 tokio::time::sleep(tokio::time::Duration::from_millis(backoff)).await;
236 continue;
237 }
238 return Err(last_error.unwrap());
239 }
240 };
241
242 let mut invocation = script.prepare_invoke();
243 invocation.key(lookup_key).arg(key_prefix).arg(key_suffix);
244 for arg in extra_args {
245 invocation.arg(*arg);
246 }
247
248 match invocation
251 .invoke_async::<Option<Vec<String>>>(&mut conn)
252 .await
253 {
254 Ok(result) => return Ok(result),
255 Err(e) => {
256 last_error = Some(self.map_redis_error(e, op_name));
257 if attempt < MAX_RETRIES - 1 {
258 warn!(op = %op_name, attempt, "script failed, retrying");
259 tokio::time::sleep(tokio::time::Duration::from_millis(backoff)).await;
260 continue;
261 }
262 return Err(last_error.unwrap());
263 }
264 }
265 }
266 Err(last_error.unwrap_or_else(|| {
267 RepositoryError::UnexpectedError(format!("retry loop exhausted for {op_name}"))
268 }))
269 }
270
271 fn timestamp_to_score(&self, timestamp: &str) -> f64 {
273 chrono::DateTime::parse_from_rfc3339(timestamp)
274 .map(|dt| dt.timestamp_millis() as f64)
275 .unwrap_or_else(|_| {
276 warn!(timestamp = %timestamp, "failed to parse timestamp, using 0");
277 0.0
278 })
279 }
280
281 fn status_sorted_score(&self, tx: &TransactionRepoModel) -> f64 {
285 if tx.status == TransactionStatus::Confirmed {
286 if let Some(ref confirmed_at) = tx.confirmed_at {
288 return self.timestamp_to_score(confirmed_at);
289 }
290 warn!(tx_id = %tx.id, "Confirmed transaction missing confirmed_at, using created_at");
292 }
293 self.timestamp_to_score(&tx.created_at)
294 }
295
296 async fn get_transactions_by_ids(
298 &self,
299 ids: &[String],
300 ) -> Result<BatchRetrievalResult<TransactionRepoModel>, RepositoryError> {
301 if ids.is_empty() {
302 debug!("no transaction IDs provided for batch fetch");
303 return Ok(BatchRetrievalResult {
304 results: vec![],
305 failed_ids: vec![],
306 });
307 }
308
309 let mut conn = self
310 .get_connection(self.connections.reader(), "batch_fetch_transactions")
311 .await?;
312
313 let reverse_keys: Vec<String> = ids.iter().map(|id| self.tx_to_relayer_key(id)).collect();
314
315 debug!(count = %ids.len(), "fetching relayer IDs for transactions");
316
317 let relayer_ids: Vec<Option<String>> = conn
318 .mget(&reverse_keys)
319 .await
320 .map_err(|e| self.map_redis_error(e, "batch_fetch_relayer_ids"))?;
321
322 let mut tx_keys = Vec::new();
323 let mut valid_ids = Vec::new();
324 let mut failed_ids = Vec::new();
325 for (i, relayer_id) in relayer_ids.into_iter().enumerate() {
326 match relayer_id {
327 Some(relayer_id) => {
328 tx_keys.push(self.tx_key(&relayer_id, &ids[i]));
329 valid_ids.push(ids[i].clone());
330 }
331 None => {
332 warn!(tx_id = %ids[i], "no relayer found for transaction");
333 failed_ids.push(ids[i].clone());
334 }
335 }
336 }
337
338 if tx_keys.is_empty() {
339 debug!("no valid transactions found for batch fetch");
340 return Ok(BatchRetrievalResult {
341 results: vec![],
342 failed_ids,
343 });
344 }
345
346 debug!(count = %tx_keys.len(), "batch fetching transaction data");
347
348 let values: Vec<Option<String>> = conn
349 .mget(&tx_keys)
350 .await
351 .map_err(|e| self.map_redis_error(e, "batch_fetch_transactions"))?;
352
353 let mut transactions = Vec::new();
354 let mut failed_count = 0;
355 for (i, value) in values.into_iter().enumerate() {
356 match value {
357 Some(json) => {
358 match self.deserialize_entity::<TransactionRepoModel>(
359 &json,
360 &valid_ids[i],
361 "transaction",
362 ) {
363 Ok(tx) => transactions.push(tx),
364 Err(e) => {
365 failed_count += 1;
366 error!(tx_id = %valid_ids[i], error = %e, "failed to deserialize transaction");
367 }
369 }
370 }
371 None => {
372 warn!(tx_id = %valid_ids[i], "transaction not found in batch fetch");
373 failed_ids.push(valid_ids[i].clone());
374 }
375 }
376 }
377
378 if failed_count > 0 {
379 warn!(failed_count = %failed_count, total_count = %valid_ids.len(), "failed to deserialize transactions in batch");
380 }
381
382 debug!(count = %transactions.len(), "successfully fetched transactions");
383 Ok(BatchRetrievalResult {
384 results: transactions,
385 failed_ids,
386 })
387 }
388
389 fn extract_nonce(&self, network_data: &NetworkTransactionData) -> Option<u64> {
391 match network_data.get_evm_transaction_data() {
392 Ok(tx_data) => tx_data.nonce,
393 Err(_) => {
394 debug!("no EVM transaction data available for nonce extraction");
395 None
396 }
397 }
398 }
399
400 async fn ensure_status_sorted_set(
417 &self,
418 relayer_id: &str,
419 status: &TransactionStatus,
420 ) -> Result<u64, RepositoryError> {
421 let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
422 let legacy_key = self.relayer_status_key(relayer_id, status);
423
424 let legacy_ids = {
426 let mut conn = self
427 .get_connection(self.connections.primary(), "ensure_status_sorted_set_check")
428 .await?;
429
430 let legacy_count: u64 = conn
432 .scard(&legacy_key)
433 .await
434 .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_scard"))?;
435
436 if legacy_count == 0 {
437 let sorted_count: u64 = conn
439 .zcard(&sorted_key)
440 .await
441 .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_zcard"))?;
442 return Ok(sorted_count);
443 }
444
445 debug!(
447 relayer_id = %relayer_id,
448 status = %status,
449 legacy_count = %legacy_count,
450 "migrating status set to sorted set"
451 );
452
453 let ids: Vec<String> = conn
454 .smembers(&legacy_key)
455 .await
456 .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_smembers"))?;
457
458 ids
459 };
461
462 if legacy_ids.is_empty() {
463 return Ok(0);
464 }
465
466 let transactions = self.get_transactions_by_ids(&legacy_ids).await?;
468
469 let mut conn = self
471 .get_connection(
472 self.connections.primary(),
473 "ensure_status_sorted_set_migrate",
474 )
475 .await?;
476
477 if transactions.results.is_empty() {
478 let _: () = conn
480 .del(&legacy_key)
481 .await
482 .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_del_stale"))?;
483 return Ok(0);
484 }
485
486 let mut pipe = redis::pipe();
489 pipe.atomic();
490
491 for tx in &transactions.results {
492 let score = self.status_sorted_score(tx);
493 pipe.zadd(&sorted_key, &tx.id, score);
494 }
495
496 pipe.del(&legacy_key);
498
499 pipe.query_async::<()>(&mut conn)
500 .await
501 .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_migrate"))?;
502
503 let migrated_count = transactions.results.len() as u64;
504 debug!(
505 relayer_id = %relayer_id,
506 status = %status,
507 migrated_count = %migrated_count,
508 "completed migration of status set to sorted set"
509 );
510
511 Ok(migrated_count)
512 }
513
514 async fn update_indexes(
516 &self,
517 tx: &TransactionRepoModel,
518 old_tx: Option<&TransactionRepoModel>,
519 ) -> Result<(), RepositoryError> {
520 let mut conn = self
521 .get_connection(self.connections.primary(), "update_indexes")
522 .await?;
523 let mut pipe = redis::pipe();
524 pipe.atomic();
525
526 debug!(tx_id = %tx.id, "updating indexes for transaction");
527
528 let relayer_list_key = self.relayer_list_key();
530 pipe.sadd(&relayer_list_key, &tx.relayer_id);
531
532 let status_score = self.status_sorted_score(tx);
535 let created_at_score = self.timestamp_to_score(&tx.created_at);
537
538 let new_status_sorted_key = self.relayer_status_sorted_key(&tx.relayer_id, &tx.status);
540 pipe.zadd(&new_status_sorted_key, &tx.id, status_score);
541 debug!(tx_id = %tx.id, status = %tx.status, score = %status_score, "added transaction to status sorted set");
542
543 if let Some(nonce) = self.extract_nonce(&tx.network_data) {
544 let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
545 pipe.set(&nonce_key, &tx.id);
546 debug!(tx_id = %tx.id, nonce = %nonce, "added nonce index for transaction");
547 }
548
549 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&tx.relayer_id);
551 pipe.zadd(&relayer_sorted_key, &tx.id, created_at_score);
552 debug!(tx_id = %tx.id, score = %created_at_score, "added transaction to sorted set by created_at");
553
554 if let Some(old) = old_tx {
556 if old.status != tx.status {
557 let old_status_sorted_key =
559 self.relayer_status_sorted_key(&old.relayer_id, &old.status);
560 pipe.zrem(&old_status_sorted_key, &tx.id);
561
562 let old_status_legacy_key = self.relayer_status_key(&old.relayer_id, &old.status);
564 pipe.srem(&old_status_legacy_key, &tx.id);
565
566 debug!(tx_id = %tx.id, old_status = %old.status, new_status = %tx.status, "removing old status indexes for transaction");
567 }
568
569 if let Some(old_nonce) = self.extract_nonce(&old.network_data) {
571 let new_nonce = self.extract_nonce(&tx.network_data);
572 if Some(old_nonce) != new_nonce {
573 let old_nonce_key = self.relayer_nonce_key(&old.relayer_id, old_nonce);
574 pipe.del(&old_nonce_key);
575 debug!(tx_id = %tx.id, old_nonce = %old_nonce, new_nonce = ?new_nonce, "removing old nonce index for transaction");
576 }
577 }
578 }
579
580 pipe.exec_async(&mut conn).await.map_err(|e| {
582 error!(tx_id = %tx.id, error = %e, "index update pipeline failed for transaction");
583 self.map_redis_error(e, &format!("update_indexes_for_tx_{}", tx.id))
584 })?;
585
586 debug!(tx_id = %tx.id, "successfully updated indexes for transaction");
587 Ok(())
588 }
589
590 async fn remove_all_indexes(&self, tx: &TransactionRepoModel) -> Result<(), RepositoryError> {
592 let mut conn = self
593 .get_connection(self.connections.primary(), "remove_all_indexes")
594 .await?;
595 let mut pipe = redis::pipe();
596 pipe.atomic();
597
598 debug!(tx_id = %tx.id, "removing all indexes for transaction");
599
600 for status in &[
604 TransactionStatus::Canceled,
605 TransactionStatus::Pending,
606 TransactionStatus::Sent,
607 TransactionStatus::Submitted,
608 TransactionStatus::Mined,
609 TransactionStatus::Confirmed,
610 TransactionStatus::Failed,
611 TransactionStatus::Expired,
612 ] {
613 let status_sorted_key = self.relayer_status_sorted_key(&tx.relayer_id, status);
615 pipe.zrem(&status_sorted_key, &tx.id);
616
617 let status_legacy_key = self.relayer_status_key(&tx.relayer_id, status);
619 pipe.srem(&status_legacy_key, &tx.id);
620 }
621
622 if let Some(nonce) = self.extract_nonce(&tx.network_data) {
624 let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
625 pipe.del(&nonce_key);
626 debug!(tx_id = %tx.id, nonce = %nonce, "removing nonce index for transaction");
627 }
628
629 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&tx.relayer_id);
631 pipe.zrem(&relayer_sorted_key, &tx.id);
632 debug!(tx_id = %tx.id, "removing transaction from sorted set by created_at");
633
634 let reverse_key = self.tx_to_relayer_key(&tx.id);
636 pipe.del(&reverse_key);
637
638 pipe.exec_async(&mut conn).await.map_err(|e| {
639 error!(tx_id = %tx.id, error = %e, "index removal failed for transaction");
640 self.map_redis_error(e, &format!("remove_indexes_for_tx_{}", tx.id))
641 })?;
642
643 debug!(tx_id = %tx.id, "successfully removed all indexes for transaction");
644 Ok(())
645 }
646
647 fn track_status_change_metrics(
649 &self,
650 _original_tx: &TransactionRepoModel,
651 updated_tx: &TransactionRepoModel,
652 old_status: &TransactionStatus,
653 new_status: &TransactionStatus,
654 ) {
655 let network_type = format!("{:?}", updated_tx.network_type).to_lowercase();
656 let relayer_id = updated_tx.relayer_id.as_str();
657
658 if *old_status != TransactionStatus::Submitted
660 && *new_status == TransactionStatus::Submitted
661 {
662 TRANSACTIONS_SUBMITTED
663 .with_label_values(&[relayer_id, &network_type])
664 .inc();
665
666 if let Ok(created_time) = chrono::DateTime::parse_from_rfc3339(&updated_tx.created_at) {
667 let processing_seconds =
668 (Utc::now() - created_time.with_timezone(&Utc)).num_seconds() as f64;
669 TRANSACTION_PROCESSING_TIME
670 .with_label_values(&[relayer_id, &network_type, "creation_to_submission"])
671 .observe(processing_seconds);
672 }
673 }
674
675 if old_status != new_status {
677 let old_status_str = format!("{old_status:?}").to_lowercase();
678 let old_status_gauge = TRANSACTIONS_BY_STATUS.with_label_values(&[
679 relayer_id,
680 &network_type,
681 &old_status_str,
682 ]);
683 let clamped_value = (old_status_gauge.get() - 1.0).max(0.0);
684 old_status_gauge.set(clamped_value);
685
686 let new_status_str = format!("{new_status:?}").to_lowercase();
687 TRANSACTIONS_BY_STATUS
688 .with_label_values(&[relayer_id, &network_type, &new_status_str])
689 .inc();
690 }
691
692 let was_final = is_final_state(old_status);
694 let is_final = is_final_state(new_status);
695
696 if !was_final && is_final {
697 let previous_status = format!("{old_status:?}").to_lowercase();
698 let meta = updated_tx.metadata.as_ref();
699 let had_insufficient_fee = meta.is_some_and(|m| m.insufficient_fee_retries > 0);
700 let had_try_again_later = meta.is_some_and(|m| m.try_again_later_retries > 0);
701
702 match new_status {
703 TransactionStatus::Confirmed => {
704 TRANSACTIONS_SUCCESS
705 .with_label_values(&[relayer_id, &network_type])
706 .inc();
707 if had_insufficient_fee {
708 TRANSACTIONS_INSUFFICIENT_FEE_SUCCESS
709 .with_label_values(&[relayer_id, &network_type])
710 .inc();
711 }
712 if had_try_again_later {
713 TRANSACTIONS_TRY_AGAIN_LATER_SUCCESS
714 .with_label_values(&[relayer_id, &network_type])
715 .inc();
716 }
717
718 if let (Some(sent_at_str), Some(confirmed_at_str)) =
719 (&updated_tx.sent_at, &updated_tx.confirmed_at)
720 {
721 if let (Ok(sent_time), Ok(confirmed_time)) = (
722 chrono::DateTime::parse_from_rfc3339(sent_at_str),
723 chrono::DateTime::parse_from_rfc3339(confirmed_at_str),
724 ) {
725 let processing_seconds = (confirmed_time.with_timezone(&Utc)
726 - sent_time.with_timezone(&Utc))
727 .num_seconds()
728 as f64;
729 TRANSACTION_PROCESSING_TIME
730 .with_label_values(&[
731 relayer_id,
732 &network_type,
733 "submission_to_confirmation",
734 ])
735 .observe(processing_seconds);
736 }
737 }
738
739 if let Ok(created_time) =
740 chrono::DateTime::parse_from_rfc3339(&updated_tx.created_at)
741 {
742 if let Some(confirmed_at_str) = &updated_tx.confirmed_at {
743 if let Ok(confirmed_time) =
744 chrono::DateTime::parse_from_rfc3339(confirmed_at_str)
745 {
746 let processing_seconds = (confirmed_time.with_timezone(&Utc)
747 - created_time.with_timezone(&Utc))
748 .num_seconds()
749 as f64;
750 TRANSACTION_PROCESSING_TIME
751 .with_label_values(&[
752 relayer_id,
753 &network_type,
754 "creation_to_confirmation",
755 ])
756 .observe(processing_seconds);
757 }
758 }
759 }
760 }
761 TransactionStatus::Failed => {
762 let failure_reason = updated_tx
763 .status_reason
764 .as_deref()
765 .map(|reason| {
766 if reason.starts_with("Submission failed:") {
767 "submission_failed"
768 } else if reason.starts_with("Preparation failed:") {
769 "preparation_failed"
770 } else {
771 "failed"
772 }
773 })
774 .unwrap_or("failed");
775 TRANSACTIONS_FAILED
776 .with_label_values(&[
777 relayer_id,
778 &network_type,
779 failure_reason,
780 &previous_status,
781 ])
782 .inc();
783 }
784 TransactionStatus::Expired => {
785 TRANSACTIONS_FAILED
786 .with_label_values(&[
787 relayer_id,
788 &network_type,
789 "expired",
790 &previous_status,
791 ])
792 .inc();
793 }
794 TransactionStatus::Canceled => {
795 TRANSACTIONS_FAILED
796 .with_label_values(&[
797 relayer_id,
798 &network_type,
799 "canceled",
800 &previous_status,
801 ])
802 .inc();
803 }
804 _ => {}
805 }
806
807 if *new_status != TransactionStatus::Confirmed {
809 if had_insufficient_fee {
810 TRANSACTIONS_INSUFFICIENT_FEE_FAILED
811 .with_label_values(&[relayer_id, &network_type])
812 .inc();
813 }
814 if had_try_again_later {
815 TRANSACTIONS_TRY_AGAIN_LATER_FAILED
816 .with_label_values(&[relayer_id, &network_type])
817 .inc();
818 }
819 }
820 }
821 }
822}
823
824impl fmt::Debug for RedisTransactionRepository {
825 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
826 f.debug_struct("RedisTransactionRepository")
827 .field("connections", &"<RedisConnections>")
828 .field("key_prefix", &self.key_prefix)
829 .finish()
830 }
831}
832
833#[async_trait]
834impl Repository<TransactionRepoModel, String> for RedisTransactionRepository {
835 async fn create(
836 &self,
837 entity: TransactionRepoModel,
838 ) -> Result<TransactionRepoModel, RepositoryError> {
839 if entity.id.is_empty() {
840 return Err(RepositoryError::InvalidData(
841 "Transaction ID cannot be empty".to_string(),
842 ));
843 }
844
845 let key = self.tx_key(&entity.relayer_id, &entity.id);
846 let reverse_key = self.tx_to_relayer_key(&entity.id);
847 let mut conn = self
848 .get_connection(self.connections.primary(), "create")
849 .await?;
850
851 debug!(tx_id = %entity.id, "creating transaction");
852
853 let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
854
855 let existing: Option<String> = conn
857 .get(&reverse_key)
858 .await
859 .map_err(|e| self.map_redis_error(e, "create_transaction_check"))?;
860
861 if existing.is_some() {
862 return Err(RepositoryError::ConstraintViolation(format!(
863 "Transaction with ID {} already exists",
864 entity.id
865 )));
866 }
867
868 let mut pipe = redis::pipe();
870 pipe.atomic();
871 pipe.set(&key, &value);
872 pipe.set(&reverse_key, &entity.relayer_id);
873
874 pipe.exec_async(&mut conn)
875 .await
876 .map_err(|e| self.map_redis_error(e, "create_transaction"))?;
877
878 if let Err(e) = self.update_indexes(&entity, None).await {
880 error!(tx_id = %entity.id, error = %e, "failed to update indexes for new transaction");
881 return Err(e);
882 }
883
884 let network_type = format!("{:?}", entity.network_type).to_lowercase();
886 let relayer_id = entity.relayer_id.as_str();
887 TRANSACTIONS_CREATED
888 .with_label_values(&[relayer_id, &network_type])
889 .inc();
890
891 let status = &entity.status;
893 let status_str = format!("{status:?}").to_lowercase();
894 TRANSACTIONS_BY_STATUS
895 .with_label_values(&[relayer_id, &network_type, &status_str])
896 .inc();
897
898 debug!(tx_id = %entity.id, "successfully created transaction");
899 Ok(entity)
900 }
901
902 async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError> {
903 if id.is_empty() {
904 return Err(RepositoryError::InvalidData(
905 "Transaction ID cannot be empty".to_string(),
906 ));
907 }
908
909 let mut conn = self
910 .get_connection(self.connections.reader(), "get_by_id")
911 .await?;
912
913 debug!(tx_id = %id, "fetching transaction");
914
915 let reverse_key = self.tx_to_relayer_key(&id);
916 let relayer_id: Option<String> = conn
917 .get(&reverse_key)
918 .await
919 .map_err(|e| self.map_redis_error(e, "get_transaction_reverse_lookup"))?;
920
921 let relayer_id = match relayer_id {
922 Some(relayer_id) => relayer_id,
923 None => {
924 debug!(tx_id = %id, "transaction not found (no reverse lookup)");
925 return Err(RepositoryError::NotFound(format!(
926 "Transaction with ID {id} not found"
927 )));
928 }
929 };
930
931 let key = self.tx_key(&relayer_id, &id);
932 let value: Option<String> = conn
933 .get(&key)
934 .await
935 .map_err(|e| self.map_redis_error(e, "get_transaction_by_id"))?;
936
937 match value {
938 Some(json) => {
939 let tx =
940 self.deserialize_entity::<TransactionRepoModel>(&json, &id, "transaction")?;
941 debug!(tx_id = %id, "successfully fetched transaction");
942 Ok(tx)
943 }
944 None => {
945 debug!(tx_id = %id, "transaction not found");
946 Err(RepositoryError::NotFound(format!(
947 "Transaction with ID {id} not found"
948 )))
949 }
950 }
951 }
952
953 async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
955 let mut conn = self
956 .get_connection(self.connections.reader(), "list_all")
957 .await?;
958
959 debug!("fetching all transactions sorted by created_at (newest first)");
960
961 let relayer_list_key = self.relayer_list_key();
963 let relayer_ids: Vec<String> = conn
964 .smembers(&relayer_list_key)
965 .await
966 .map_err(|e| self.map_redis_error(e, "list_all_relayer_ids"))?;
967
968 debug!(count = %relayer_ids.len(), "found relayers");
969
970 let mut all_tx_ids = Vec::new();
972 for relayer_id in relayer_ids {
973 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
974 let tx_ids: Vec<String> = redis::cmd("ZRANGE")
975 .arg(&relayer_sorted_key)
976 .arg(0)
977 .arg(-1)
978 .arg("REV")
979 .query_async(&mut conn)
980 .await
981 .map_err(|e| self.map_redis_error(e, "list_all_relayer_sorted"))?;
982
983 all_tx_ids.extend(tx_ids);
984 }
985
986 drop(conn);
988
989 let batch_result = self.get_transactions_by_ids(&all_tx_ids).await?;
991 let mut all_transactions = batch_result.results;
992
993 all_transactions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
995
996 debug!(count = %all_transactions.len(), "found transactions");
997 Ok(all_transactions)
998 }
999
1000 async fn list_paginated(
1002 &self,
1003 query: PaginationQuery,
1004 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
1005 if query.per_page == 0 {
1006 return Err(RepositoryError::InvalidData(
1007 "per_page must be greater than 0".to_string(),
1008 ));
1009 }
1010
1011 let mut conn = self
1012 .get_connection(self.connections.reader(), "list_paginated")
1013 .await?;
1014
1015 debug!(page = %query.page, per_page = %query.per_page, "fetching paginated transactions sorted by created_at (newest first)");
1016
1017 let relayer_list_key = self.relayer_list_key();
1019 let relayer_ids: Vec<String> = conn
1020 .smembers(&relayer_list_key)
1021 .await
1022 .map_err(|e| self.map_redis_error(e, "list_paginated_relayer_ids"))?;
1023
1024 let mut all_tx_ids = Vec::new();
1026 for relayer_id in relayer_ids {
1027 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
1028 let tx_ids: Vec<String> = redis::cmd("ZRANGE")
1029 .arg(&relayer_sorted_key)
1030 .arg(0)
1031 .arg(-1)
1032 .arg("REV")
1033 .query_async(&mut conn)
1034 .await
1035 .map_err(|e| self.map_redis_error(e, "list_paginated_relayer_sorted"))?;
1036
1037 all_tx_ids.extend(tx_ids);
1038 }
1039
1040 drop(conn);
1042
1043 let batch_result = self.get_transactions_by_ids(&all_tx_ids).await?;
1045 let mut all_transactions = batch_result.results;
1046
1047 all_transactions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
1049
1050 let total = all_transactions.len() as u64;
1051 let start = ((query.page - 1) * query.per_page) as usize;
1052 let end = (start + query.per_page as usize).min(all_transactions.len());
1053
1054 if start >= all_transactions.len() {
1055 debug!(page = %query.page, total = %total, "page is beyond available data");
1056 return Ok(PaginatedResult {
1057 items: vec![],
1058 total,
1059 page: query.page,
1060 per_page: query.per_page,
1061 });
1062 }
1063
1064 let items = all_transactions[start..end].to_vec();
1065
1066 debug!(count = %items.len(), page = %query.page, "successfully fetched transactions for page");
1067
1068 Ok(PaginatedResult {
1069 items,
1070 total,
1071 page: query.page,
1072 per_page: query.per_page,
1073 })
1074 }
1075
1076 async fn update(
1077 &self,
1078 id: String,
1079 entity: TransactionRepoModel,
1080 ) -> Result<TransactionRepoModel, RepositoryError> {
1081 if id.is_empty() {
1082 return Err(RepositoryError::InvalidData(
1083 "Transaction ID cannot be empty".to_string(),
1084 ));
1085 }
1086
1087 debug!(tx_id = %id, "updating transaction");
1088
1089 let old_tx = self.get_by_id(id.clone()).await?;
1091
1092 let key = self.tx_key(&entity.relayer_id, &id);
1093 let mut conn = self
1094 .get_connection(self.connections.primary(), "update")
1095 .await?;
1096
1097 let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
1098
1099 let _: () = conn
1101 .set(&key, value)
1102 .await
1103 .map_err(|e| self.map_redis_error(e, "update_transaction"))?;
1104
1105 self.update_indexes(&entity, Some(&old_tx)).await?;
1107
1108 debug!(tx_id = %id, "successfully updated transaction");
1109 Ok(entity)
1110 }
1111
1112 async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
1113 if id.is_empty() {
1114 return Err(RepositoryError::InvalidData(
1115 "Transaction ID cannot be empty".to_string(),
1116 ));
1117 }
1118
1119 debug!(tx_id = %id, "deleting transaction");
1120
1121 let tx = self.get_by_id(id.clone()).await?;
1123
1124 let key = self.tx_key(&tx.relayer_id, &id);
1125 let reverse_key = self.tx_to_relayer_key(&id);
1126 let mut conn = self
1127 .get_connection(self.connections.primary(), "delete_by_id")
1128 .await?;
1129
1130 let mut pipe = redis::pipe();
1131 pipe.atomic();
1132 pipe.del(&key);
1133 pipe.del(&reverse_key);
1134
1135 pipe.exec_async(&mut conn)
1136 .await
1137 .map_err(|e| self.map_redis_error(e, "delete_transaction"))?;
1138
1139 if let Err(e) = self.remove_all_indexes(&tx).await {
1141 error!(tx_id = %id, error = %e, "failed to remove indexes for deleted transaction");
1142 }
1143
1144 debug!(tx_id = %id, "successfully deleted transaction");
1145 Ok(())
1146 }
1147
1148 async fn count(&self) -> Result<usize, RepositoryError> {
1150 let mut conn = self
1151 .get_connection(self.connections.reader(), "count")
1152 .await?;
1153
1154 debug!("counting transactions");
1155
1156 let relayer_list_key = self.relayer_list_key();
1158 let relayer_ids: Vec<String> = conn
1159 .smembers(&relayer_list_key)
1160 .await
1161 .map_err(|e| self.map_redis_error(e, "count_relayer_ids"))?;
1162
1163 let mut total_count = 0usize;
1164 for relayer_id in relayer_ids {
1165 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
1166 let count: usize = conn
1167 .zcard(&relayer_sorted_key)
1168 .await
1169 .map_err(|e| self.map_redis_error(e, "count_relayer_transactions"))?;
1170 total_count += count;
1171 }
1172
1173 debug!(count = %total_count, "transaction count");
1174 Ok(total_count)
1175 }
1176
1177 async fn has_entries(&self) -> Result<bool, RepositoryError> {
1178 let mut conn = self
1179 .get_connection(self.connections.reader(), "has_entries")
1180 .await?;
1181 let relayer_list_key = self.relayer_list_key();
1182
1183 debug!("checking if transaction entries exist");
1184
1185 let exists: bool = conn
1186 .exists(&relayer_list_key)
1187 .await
1188 .map_err(|e| self.map_redis_error(e, "has_entries_check"))?;
1189
1190 debug!(exists = %exists, "transaction entries exist");
1191 Ok(exists)
1192 }
1193
1194 async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
1195 let mut conn = self
1196 .get_connection(self.connections.primary(), "drop_all_entries")
1197 .await?;
1198 let relayer_list_key = self.relayer_list_key();
1199
1200 debug!("dropping all transaction entries");
1201
1202 let relayer_ids: Vec<String> = conn
1204 .smembers(&relayer_list_key)
1205 .await
1206 .map_err(|e| self.map_redis_error(e, "drop_all_entries_get_relayer_ids"))?;
1207
1208 if relayer_ids.is_empty() {
1209 debug!("no transaction entries to drop");
1210 return Ok(());
1211 }
1212
1213 let mut pipe = redis::pipe();
1215 pipe.atomic();
1216
1217 for relayer_id in &relayer_ids {
1219 let pattern = format!(
1221 "{}:{}:{}:{}:*",
1222 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
1223 );
1224 let mut cursor = 0;
1225 let mut tx_ids = Vec::new();
1226
1227 loop {
1228 let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
1229 .cursor_arg(cursor)
1230 .arg("MATCH")
1231 .arg(&pattern)
1232 .query_async(&mut conn)
1233 .await
1234 .map_err(|e| self.map_redis_error(e, "drop_all_entries_scan"))?;
1235
1236 for key in keys {
1238 pipe.del(&key);
1239 if let Some(tx_id) = key.split(':').next_back() {
1240 tx_ids.push(tx_id.to_string());
1241 }
1242 }
1243
1244 cursor = next_cursor;
1245 if cursor == 0 {
1246 break;
1247 }
1248 }
1249
1250 for tx_id in tx_ids {
1252 let reverse_key = self.tx_to_relayer_key(&tx_id);
1253 pipe.del(&reverse_key);
1254
1255 for status in &[
1258 TransactionStatus::Canceled,
1259 TransactionStatus::Pending,
1260 TransactionStatus::Sent,
1261 TransactionStatus::Submitted,
1262 TransactionStatus::Mined,
1263 TransactionStatus::Confirmed,
1264 TransactionStatus::Failed,
1265 TransactionStatus::Expired,
1266 ] {
1267 let status_sorted_key = self.relayer_status_sorted_key(relayer_id, status);
1269 pipe.zrem(&status_sorted_key, &tx_id);
1270
1271 let status_key = self.relayer_status_key(relayer_id, status);
1273 pipe.srem(&status_key, &tx_id);
1274 }
1275 }
1276
1277 let relayer_sorted_key = self.relayer_tx_by_created_at_key(relayer_id);
1279 pipe.del(&relayer_sorted_key);
1280 }
1281
1282 pipe.del(&relayer_list_key);
1284
1285 pipe.exec_async(&mut conn)
1286 .await
1287 .map_err(|e| self.map_redis_error(e, "drop_all_entries_pipeline"))?;
1288
1289 debug!(count = %relayer_ids.len(), "dropped all transaction entries for relayers");
1290 Ok(())
1291 }
1292}
1293
1294#[async_trait]
1295impl TransactionRepository for RedisTransactionRepository {
1296 async fn find_by_relayer_id(
1297 &self,
1298 relayer_id: &str,
1299 query: PaginationQuery,
1300 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
1301 let mut conn = self
1302 .get_connection(self.connections.reader(), "find_by_relayer_id")
1303 .await?;
1304
1305 debug!(relayer_id = %relayer_id, page = %query.page, per_page = %query.per_page, "fetching transactions for relayer sorted by created_at (newest first)");
1306
1307 let relayer_sorted_key = self.relayer_tx_by_created_at_key(relayer_id);
1308
1309 let sorted_set_count: u64 = conn
1311 .zcard(&relayer_sorted_key)
1312 .await
1313 .map_err(|e| self.map_redis_error(e, "find_by_relayer_id_count"))?;
1314
1315 if sorted_set_count == 0 {
1318 debug!(relayer_id = %relayer_id, "no transactions found for relayer (sorted set is empty)");
1319 return Ok(PaginatedResult {
1320 items: vec![],
1321 total: 0,
1322 page: query.page,
1323 per_page: query.per_page,
1324 });
1325 }
1326
1327 let total = sorted_set_count;
1328
1329 let start = ((query.page - 1) * query.per_page) as isize;
1331 let end = start + query.per_page as isize - 1;
1332
1333 if start as u64 >= total {
1334 debug!(relayer_id = %relayer_id, page = %query.page, total = %total, "page is beyond available data");
1335 return Ok(PaginatedResult {
1336 items: vec![],
1337 total,
1338 page: query.page,
1339 per_page: query.per_page,
1340 });
1341 }
1342
1343 let page_ids: Vec<String> = redis::cmd("ZRANGE")
1345 .arg(&relayer_sorted_key)
1346 .arg(start)
1347 .arg(end)
1348 .arg("REV")
1349 .query_async(&mut conn)
1350 .await
1351 .map_err(|e| self.map_redis_error(e, "find_by_relayer_id_sorted"))?;
1352
1353 drop(conn);
1355
1356 let items = self.get_transactions_by_ids(&page_ids).await?;
1357
1358 debug!(relayer_id = %relayer_id, count = %items.results.len(), page = %query.page, "successfully fetched transactions for relayer");
1359
1360 Ok(PaginatedResult {
1361 items: items.results,
1362 total,
1363 page: query.page,
1364 per_page: query.per_page,
1365 })
1366 }
1367
1368 async fn find_by_status(
1370 &self,
1371 relayer_id: &str,
1372 statuses: &[TransactionStatus],
1373 ) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
1374 for status in statuses {
1376 self.ensure_status_sorted_set(relayer_id, status).await?;
1377 }
1378
1379 let mut conn = self
1381 .get_connection(self.connections.reader(), "find_by_status")
1382 .await?;
1383
1384 let mut all_ids: Vec<String> = Vec::new();
1385 for status in statuses {
1386 let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
1388 let ids: Vec<String> = redis::cmd("ZRANGE")
1389 .arg(&sorted_key)
1390 .arg(0)
1391 .arg(-1)
1392 .arg("REV") .query_async(&mut conn)
1394 .await
1395 .map_err(|e| self.map_redis_error(e, "find_by_status"))?;
1396
1397 all_ids.extend(ids);
1398 }
1399
1400 drop(conn);
1402
1403 if all_ids.is_empty() {
1404 return Ok(vec![]);
1405 }
1406
1407 all_ids.sort();
1409 all_ids.dedup();
1410
1411 let mut transactions = self.get_transactions_by_ids(&all_ids).await?;
1413
1414 transactions
1416 .results
1417 .sort_by(|a, b| b.created_at.cmp(&a.created_at));
1418
1419 Ok(transactions.results)
1420 }
1421
1422 async fn find_by_status_paginated(
1423 &self,
1424 relayer_id: &str,
1425 statuses: &[TransactionStatus],
1426 query: PaginationQuery,
1427 oldest_first: bool,
1428 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
1429 for status in statuses {
1431 self.ensure_status_sorted_set(relayer_id, status).await?;
1432 }
1433
1434 let mut conn = self
1435 .get_connection(self.connections.reader(), "find_by_status_paginated")
1436 .await?;
1437
1438 if statuses.len() == 1 {
1440 let sorted_key = self.relayer_status_sorted_key(relayer_id, &statuses[0]);
1441
1442 let total: u64 = conn
1444 .zcard(&sorted_key)
1445 .await
1446 .map_err(|e| self.map_redis_error(e, "find_by_status_paginated_count"))?;
1447
1448 if total == 0 {
1449 return Ok(PaginatedResult {
1450 items: vec![],
1451 total: 0,
1452 page: query.page,
1453 per_page: query.per_page,
1454 });
1455 }
1456
1457 let start = ((query.page.saturating_sub(1)) * query.per_page) as isize;
1459 let end = start + query.per_page as isize - 1;
1460
1461 let mut cmd = redis::cmd("ZRANGE");
1464 cmd.arg(&sorted_key).arg(start).arg(end);
1465 if !oldest_first {
1466 cmd.arg("REV");
1467 }
1468 let page_ids: Vec<String> = cmd
1469 .query_async(&mut conn)
1470 .await
1471 .map_err(|e| self.map_redis_error(e, "find_by_status_paginated"))?;
1472
1473 drop(conn);
1475
1476 let transactions = self.get_transactions_by_ids(&page_ids).await?;
1477
1478 debug!(
1479 relayer_id = %relayer_id,
1480 status = %statuses[0],
1481 total = %total,
1482 page = %query.page,
1483 page_size = %transactions.results.len(),
1484 "fetched paginated transactions by single status"
1485 );
1486
1487 return Ok(PaginatedResult {
1488 items: transactions.results,
1489 total,
1490 page: query.page,
1491 per_page: query.per_page,
1492 });
1493 }
1494
1495 let mut all_ids: Vec<(String, f64)> = Vec::new();
1497 for status in statuses {
1498 let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
1499
1500 let ids_with_scores: Vec<(String, f64)> = redis::cmd("ZRANGE")
1502 .arg(&sorted_key)
1503 .arg(0)
1504 .arg(-1)
1505 .arg("WITHSCORES")
1506 .query_async(&mut conn)
1507 .await
1508 .map_err(|e| self.map_redis_error(e, "find_by_status_paginated_multi"))?;
1509
1510 all_ids.extend(ids_with_scores);
1511 }
1512
1513 drop(conn);
1515
1516 let mut id_map: std::collections::HashMap<String, f64> = std::collections::HashMap::new();
1518 for (id, score) in all_ids {
1519 id_map
1520 .entry(id)
1521 .and_modify(|s| {
1522 if oldest_first {
1524 if score < *s {
1525 *s = score
1526 }
1527 } else if score > *s {
1528 *s = score
1529 }
1530 })
1531 .or_insert(score);
1532 }
1533
1534 let mut sorted_ids: Vec<(String, f64)> = id_map.into_iter().collect();
1536 if oldest_first {
1537 sorted_ids.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
1538 } else {
1539 sorted_ids.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1540 }
1541
1542 let total = sorted_ids.len() as u64;
1543
1544 if total == 0 {
1545 return Ok(PaginatedResult {
1546 items: vec![],
1547 total: 0,
1548 page: query.page,
1549 per_page: query.per_page,
1550 });
1551 }
1552
1553 let start = ((query.page.saturating_sub(1)) * query.per_page) as usize;
1555 let page_ids: Vec<String> = sorted_ids
1556 .into_iter()
1557 .skip(start)
1558 .take(query.per_page as usize)
1559 .map(|(id, _)| id)
1560 .collect();
1561
1562 let transactions = self.get_transactions_by_ids(&page_ids).await?;
1564
1565 debug!(
1566 relayer_id = %relayer_id,
1567 total = %total,
1568 page = %query.page,
1569 page_size = %transactions.results.len(),
1570 "fetched paginated transactions by status"
1571 );
1572
1573 Ok(PaginatedResult {
1574 items: transactions.results,
1575 total,
1576 page: query.page,
1577 per_page: query.per_page,
1578 })
1579 }
1580
1581 async fn find_by_nonce(
1582 &self,
1583 relayer_id: &str,
1584 nonce: u64,
1585 ) -> Result<Option<TransactionRepoModel>, RepositoryError> {
1586 let mut conn = self
1587 .get_connection(self.connections.reader(), "find_by_nonce")
1588 .await?;
1589 let nonce_key = self.relayer_nonce_key(relayer_id, nonce);
1590
1591 let tx_id: Option<String> = conn
1593 .get(nonce_key)
1594 .await
1595 .map_err(|e| self.map_redis_error(e, "find_by_nonce"))?;
1596
1597 match tx_id {
1598 Some(tx_id) => {
1599 match self.get_by_id(tx_id.clone()).await {
1600 Ok(tx) => Ok(Some(tx)),
1601 Err(RepositoryError::NotFound(_)) => {
1602 warn!(relayer_id = %relayer_id, nonce = %nonce, "stale nonce index found for relayer");
1604 Ok(None)
1605 }
1606 Err(e) => Err(e),
1607 }
1608 }
1609 None => Ok(None),
1610 }
1611 }
1612
1613 async fn update_status(
1614 &self,
1615 tx_id: String,
1616 status: TransactionStatus,
1617 ) -> Result<TransactionRepoModel, RepositoryError> {
1618 let update = TransactionUpdateRequest {
1619 status: Some(status),
1620 ..Default::default()
1621 };
1622 self.partial_update(tx_id, update).await
1623 }
1624
1625 async fn partial_update(
1626 &self,
1627 tx_id: String,
1628 update: TransactionUpdateRequest,
1629 ) -> Result<TransactionRepoModel, RepositoryError> {
1630 let patch_json = serde_json::to_string(&update).map_err(|e| {
1632 RepositoryError::InvalidData(format!("Failed to serialize update patch: {e}"))
1633 })?;
1634
1635 let delete_at_value = if let Some(ref status) = update.status {
1638 if FINAL_TRANSACTION_STATUSES.contains(status) {
1639 let expiration_hours = ServerConfig::get_transaction_expiration_hours();
1640 let seconds = (expiration_hours * 3600.0) as i64;
1641 let delete_time = Utc::now() + chrono::Duration::seconds(seconds);
1642 Some(delete_time.to_rfc3339())
1643 } else {
1644 None
1645 }
1646 } else {
1647 None
1648 };
1649 let delete_at_arg = delete_at_value.as_deref().unwrap_or("");
1650
1651 let (lookup_key, key_prefix, key_suffix) = self.tx_key_parts(&tx_id);
1652
1653 let patch_script = Script::new(
1659 r#"
1660 local relayer_id = redis.call('GET', KEYS[1])
1661 if not relayer_id then return false end
1662
1663 local tx_key = ARGV[1] .. relayer_id .. ARGV[2]
1664 local current = redis.call('GET', tx_key)
1665 if not current then return false end
1666
1667 local tx = cjson.decode(current)
1668 local patch = cjson.decode(ARGV[3])
1669
1670 -- Guard: reject status changes on finalized transactions.
1671 -- A stale worker must not resurrect a tx that another worker
1672 -- already moved to a terminal state.
1673 local final_states = {confirmed=true, failed=true, expired=true, canceled=true}
1674 if final_states[tx["status"]] and patch["status"] then
1675 return {current, current}
1676 end
1677
1678 local old_snapshot = current
1679
1680 -- lua-cjson cannot distinguish empty Lua tables from empty
1681 -- arrays, so a decode/encode round-trip turns [] into {}.
1682 -- Record which keys held [] in the stored doc and the patch
1683 -- so we can restore them after cjson.encode.
1684 -- NOTE: this relies on each array-typed field having a unique key
1685 -- name across the entire JSON document (including nested objects).
1686 -- If the model ever introduces duplicate key names at different
1687 -- nesting levels (e.g. metadata.hashes), the gsub below could
1688 -- restore the wrong occurrence.
1689 local empty_arrs = {}
1690 for k in string.gmatch(current, '"([^"]+)"%s*:%s*%[%s*%]') do
1691 empty_arrs[k] = true
1692 end
1693 for k in string.gmatch(ARGV[3], '"([^"]+)"%s*:%s*%[%s*%]') do
1694 empty_arrs[k] = true
1695 end
1696
1697 for k, v in pairs(patch) do
1698 tx[k] = v
1699 end
1700
1701 -- Apply delete_at if transitioning to a final state and not already set
1702 if ARGV[4] ~= '' and (not tx["delete_at"] or tx["delete_at"] == cjson.null) then
1703 tx["delete_at"] = ARGV[4]
1704 end
1705
1706 local updated = cjson.encode(tx)
1707
1708 -- Restore empty arrays that cjson.encode converted to {}
1709 for k, _ in pairs(empty_arrs) do
1710 updated = string.gsub(
1711 updated, '"'..k..'"%s*:%s*{}', '"'..k..'":[]', 1
1712 )
1713 end
1714
1715 redis.call('SET', tx_key, updated)
1716 return {old_snapshot, updated}
1717 "#,
1718 );
1719
1720 let result: Option<Vec<String>> = self
1721 .run_script_with_retry_vec(
1722 &patch_script,
1723 &lookup_key,
1724 &key_prefix,
1725 &key_suffix,
1726 &[&patch_json, delete_at_arg],
1727 "partial_update",
1728 )
1729 .await?;
1730
1731 let parts = result.ok_or_else(|| {
1732 RepositoryError::NotFound(format!("Transaction with ID {tx_id} not found"))
1733 })?;
1734
1735 if parts.len() != 2 {
1736 return Err(RepositoryError::UnexpectedError(format!(
1737 "partial_update script returned {} elements, expected 2",
1738 parts.len()
1739 )));
1740 }
1741
1742 let old_json = &parts[0];
1743 let new_json = &parts[1];
1744
1745 let original_tx =
1746 self.deserialize_entity::<TransactionRepoModel>(old_json, &tx_id, "transaction")?;
1747 let updated_tx =
1748 self.deserialize_entity::<TransactionRepoModel>(new_json, &tx_id, "transaction")?;
1749
1750 self.update_indexes(&updated_tx, Some(&original_tx)).await?;
1752
1753 debug!(tx_id = %tx_id, "successfully updated transaction via patch");
1754
1755 if original_tx.status != updated_tx.status {
1759 self.track_status_change_metrics(
1760 &original_tx,
1761 &updated_tx,
1762 &original_tx.status,
1763 &updated_tx.status,
1764 );
1765 }
1766
1767 Ok(updated_tx)
1768 }
1769
1770 async fn update_network_data(
1771 &self,
1772 tx_id: String,
1773 network_data: NetworkTransactionData,
1774 ) -> Result<TransactionRepoModel, RepositoryError> {
1775 let update = TransactionUpdateRequest {
1776 network_data: Some(network_data),
1777 ..Default::default()
1778 };
1779 self.partial_update(tx_id, update).await
1780 }
1781
1782 async fn set_sent_at(
1783 &self,
1784 tx_id: String,
1785 sent_at: String,
1786 ) -> Result<TransactionRepoModel, RepositoryError> {
1787 let update = TransactionUpdateRequest {
1788 sent_at: Some(sent_at),
1789 ..Default::default()
1790 };
1791 self.partial_update(tx_id, update).await
1792 }
1793
1794 async fn increment_status_check_failures(
1795 &self,
1796 tx_id: String,
1797 ) -> Result<TransactionRepoModel, RepositoryError> {
1798 self.run_atomic_script(
1799 r#"
1800 local function set_obj(json, key, tbl)
1801 local enc = cjson.encode(tbl)
1802 local r, n = string.gsub(json, '"'..key..'"%s*:%s*%b{}', '"'..key..'":'..enc, 1)
1803 if n > 0 then return r end
1804 r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
1805 if n > 0 then return r end
1806 return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
1807 end
1808
1809 local relayer_id = redis.call('GET', KEYS[1])
1810 if not relayer_id then return false end
1811
1812 local tx_key = ARGV[1] .. relayer_id .. ARGV[2]
1813 local current = redis.call('GET', tx_key)
1814 if not current then return false end
1815
1816 local tx = cjson.decode(current)
1817 local final_states = {confirmed=true, failed=true, expired=true, canceled=true}
1818 if final_states[tx["status"]] then return current end
1819
1820 local metadata = tx["metadata"]
1821 if type(metadata) ~= 'table' then metadata = {} end
1822 metadata["consecutive_failures"] = (metadata["consecutive_failures"] or 0) + 1
1823 metadata["total_failures"] = (metadata["total_failures"] or 0) + 1
1824
1825 local updated = set_obj(current, "metadata", metadata)
1826 redis.call('SET', tx_key, updated)
1827 return updated
1828 "#,
1829 &tx_id,
1830 &[],
1831 "increment_status_check_failures",
1832 )
1833 .await
1834 }
1835
1836 async fn reset_status_check_consecutive_failures(
1837 &self,
1838 tx_id: String,
1839 ) -> Result<TransactionRepoModel, RepositoryError> {
1840 self.run_atomic_script(
1841 r#"
1842 local function set_obj(json, key, tbl)
1843 local enc = cjson.encode(tbl)
1844 local r, n = string.gsub(json, '"'..key..'"%s*:%s*%b{}', '"'..key..'":'..enc, 1)
1845 if n > 0 then return r end
1846 r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
1847 if n > 0 then return r end
1848 return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
1849 end
1850
1851 local relayer_id = redis.call('GET', KEYS[1])
1852 if not relayer_id then return false end
1853
1854 local tx_key = ARGV[1] .. relayer_id .. ARGV[2]
1855 local current = redis.call('GET', tx_key)
1856 if not current then return false end
1857
1858 local tx = cjson.decode(current)
1859 local final_states = {confirmed=true, failed=true, expired=true, canceled=true}
1860 if final_states[tx["status"]] then return current end
1861
1862 local metadata = tx["metadata"]
1863 if type(metadata) ~= 'table' then metadata = {} end
1864 metadata["consecutive_failures"] = 0
1865
1866 local updated = set_obj(current, "metadata", metadata)
1867 redis.call('SET', tx_key, updated)
1868 return updated
1869 "#,
1870 &tx_id,
1871 &[],
1872 "reset_status_check_consecutive_failures",
1873 )
1874 .await
1875 }
1876
1877 async fn record_stellar_insufficient_fee_retry(
1878 &self,
1879 tx_id: String,
1880 sent_at: String,
1881 ) -> Result<TransactionRepoModel, RepositoryError> {
1882 self.run_atomic_script(
1883 r#"
1884 local function set_str(json, key, val)
1885 local enc = cjson.encode(val)
1886 local r, n = string.gsub(json, '"'..key..'"%s*:%s*"[^"]*"', '"'..key..'":'..enc, 1)
1887 if n > 0 then return r end
1888 r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
1889 if n > 0 then return r end
1890 return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
1891 end
1892 local function set_obj(json, key, tbl)
1893 local enc = cjson.encode(tbl)
1894 local r, n = string.gsub(json, '"'..key..'"%s*:%s*%b{}', '"'..key..'":'..enc, 1)
1895 if n > 0 then return r end
1896 r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
1897 if n > 0 then return r end
1898 return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
1899 end
1900
1901 local relayer_id = redis.call('GET', KEYS[1])
1902 if not relayer_id then return false end
1903
1904 local tx_key = ARGV[1] .. relayer_id .. ARGV[2]
1905 local current = redis.call('GET', tx_key)
1906 if not current then return false end
1907
1908 local tx = cjson.decode(current)
1909 local final_states = {confirmed=true, failed=true, expired=true, canceled=true}
1910 if final_states[tx["status"]] then return current end
1911
1912 local metadata = tx["metadata"]
1913 if type(metadata) ~= 'table' then metadata = {} end
1914 metadata["insufficient_fee_retries"] = (metadata["insufficient_fee_retries"] or 0) + 1
1915
1916 local updated = set_str(current, "sent_at", ARGV[3])
1917 updated = set_obj(updated, "metadata", metadata)
1918 redis.call('SET', tx_key, updated)
1919 return updated
1920 "#,
1921 &tx_id,
1922 &[&sent_at],
1923 "record_stellar_insufficient_fee_retry",
1924 )
1925 .await
1926 }
1927
1928 async fn record_stellar_try_again_later_retry(
1929 &self,
1930 tx_id: String,
1931 sent_at: String,
1932 ) -> Result<TransactionRepoModel, RepositoryError> {
1933 self.run_atomic_script(
1934 r#"
1935 local function set_str(json, key, val)
1936 local enc = cjson.encode(val)
1937 local r, n = string.gsub(json, '"'..key..'"%s*:%s*"[^"]*"', '"'..key..'":'..enc, 1)
1938 if n > 0 then return r end
1939 r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
1940 if n > 0 then return r end
1941 return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
1942 end
1943 local function set_obj(json, key, tbl)
1944 local enc = cjson.encode(tbl)
1945 local r, n = string.gsub(json, '"'..key..'"%s*:%s*%b{}', '"'..key..'":'..enc, 1)
1946 if n > 0 then return r end
1947 r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
1948 if n > 0 then return r end
1949 return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
1950 end
1951
1952 local relayer_id = redis.call('GET', KEYS[1])
1953 if not relayer_id then return false end
1954
1955 local tx_key = ARGV[1] .. relayer_id .. ARGV[2]
1956 local current = redis.call('GET', tx_key)
1957 if not current then return false end
1958
1959 local tx = cjson.decode(current)
1960 local final_states = {confirmed=true, failed=true, expired=true, canceled=true}
1961 if final_states[tx["status"]] then return current end
1962
1963 local metadata = tx["metadata"]
1964 if type(metadata) ~= 'table' then metadata = {} end
1965 metadata["try_again_later_retries"] = (metadata["try_again_later_retries"] or 0) + 1
1966
1967 local updated = set_str(current, "sent_at", ARGV[3])
1968 updated = set_obj(updated, "metadata", metadata)
1969 redis.call('SET', tx_key, updated)
1970 return updated
1971 "#,
1972 &tx_id,
1973 &[&sent_at],
1974 "record_stellar_try_again_later_retry",
1975 )
1976 .await
1977 }
1978
1979 async fn set_confirmed_at(
1980 &self,
1981 tx_id: String,
1982 confirmed_at: String,
1983 ) -> Result<TransactionRepoModel, RepositoryError> {
1984 let update = TransactionUpdateRequest {
1985 confirmed_at: Some(confirmed_at),
1986 ..Default::default()
1987 };
1988 self.partial_update(tx_id, update).await
1989 }
1990
1991 async fn count_by_status(
1995 &self,
1996 relayer_id: &str,
1997 statuses: &[TransactionStatus],
1998 ) -> Result<u64, RepositoryError> {
1999 let mut conn = self
2000 .get_connection(self.connections.reader(), "count_by_status")
2001 .await?;
2002 let mut total_count: u64 = 0;
2003
2004 for status in statuses {
2005 self.ensure_status_sorted_set(relayer_id, status).await?;
2007
2008 let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
2009 let count: u64 = conn
2010 .zcard(&sorted_key)
2011 .await
2012 .map_err(|e| self.map_redis_error(e, "count_by_status"))?;
2013 total_count += count;
2014 }
2015
2016 debug!(relayer_id = %relayer_id, count = %total_count, "counted transactions by status");
2017 Ok(total_count)
2018 }
2019
2020 async fn delete_by_ids(&self, ids: Vec<String>) -> Result<BatchDeleteResult, RepositoryError> {
2021 if ids.is_empty() {
2022 debug!("no transaction IDs provided for batch delete");
2023 return Ok(BatchDeleteResult::default());
2024 }
2025
2026 debug!(count = %ids.len(), "batch deleting transactions by IDs (with fetch)");
2027
2028 let batch_result = self.get_transactions_by_ids(&ids).await?;
2030
2031 let requests: Vec<TransactionDeleteRequest> = batch_result
2033 .results
2034 .iter()
2035 .map(|tx| TransactionDeleteRequest {
2036 id: tx.id.clone(),
2037 relayer_id: tx.relayer_id.clone(),
2038 nonce: self.extract_nonce(&tx.network_data),
2039 })
2040 .collect();
2041
2042 let mut result = self.delete_by_requests(requests).await?;
2044
2045 for id in batch_result.failed_ids {
2047 result
2048 .failed
2049 .push((id.clone(), format!("Transaction with ID {id} not found")));
2050 }
2051
2052 Ok(result)
2053 }
2054
2055 async fn delete_by_requests(
2056 &self,
2057 requests: Vec<TransactionDeleteRequest>,
2058 ) -> Result<BatchDeleteResult, RepositoryError> {
2059 if requests.is_empty() {
2060 debug!("no delete requests provided for batch delete");
2061 return Ok(BatchDeleteResult::default());
2062 }
2063
2064 debug!(count = %requests.len(), "batch deleting transactions by requests (no fetch)");
2065 let mut conn = self
2066 .get_connection(self.connections.primary(), "batch_delete_no_fetch")
2067 .await?;
2068 let mut pipe = redis::pipe();
2069 pipe.atomic();
2070
2071 let all_statuses = [
2073 TransactionStatus::Canceled,
2074 TransactionStatus::Pending,
2075 TransactionStatus::Sent,
2076 TransactionStatus::Submitted,
2077 TransactionStatus::Mined,
2078 TransactionStatus::Confirmed,
2079 TransactionStatus::Failed,
2080 TransactionStatus::Expired,
2081 ];
2082
2083 for req in &requests {
2085 let tx_key = self.tx_key(&req.relayer_id, &req.id);
2087 pipe.del(&tx_key);
2088
2089 let reverse_key = self.tx_to_relayer_key(&req.id);
2091 pipe.del(&reverse_key);
2092
2093 for status in &all_statuses {
2095 let status_sorted_key = self.relayer_status_sorted_key(&req.relayer_id, status);
2096 pipe.zrem(&status_sorted_key, &req.id);
2097
2098 let status_legacy_key = self.relayer_status_key(&req.relayer_id, status);
2099 pipe.srem(&status_legacy_key, &req.id);
2100 }
2101
2102 if let Some(nonce) = req.nonce {
2104 let nonce_key = self.relayer_nonce_key(&req.relayer_id, nonce);
2105 pipe.del(&nonce_key);
2106 }
2107
2108 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&req.relayer_id);
2110 pipe.zrem(&relayer_sorted_key, &req.id);
2111 }
2112
2113 match pipe.exec_async(&mut conn).await {
2115 Ok(_) => {
2116 let deleted_count = requests.len();
2117 debug!(
2118 deleted_count = %deleted_count,
2119 "batch delete completed"
2120 );
2121 Ok(BatchDeleteResult {
2122 deleted_count,
2123 failed: vec![],
2124 })
2125 }
2126 Err(e) => {
2127 error!(error = %e, "batch delete pipeline failed");
2128 let failed: Vec<(String, String)> = requests
2130 .iter()
2131 .map(|req| (req.id.clone(), format!("Redis pipeline error: {e}")))
2132 .collect();
2133 Ok(BatchDeleteResult {
2134 deleted_count: 0,
2135 failed,
2136 })
2137 }
2138 }
2139 }
2140}
2141
2142#[cfg(test)]
2143mod tests {
2144 use super::*;
2145 use crate::models::{evm::Speed, EvmTransactionData, NetworkType};
2146 use alloy::primitives::U256;
2147 use deadpool_redis::{Config, Runtime};
2148 use lazy_static::lazy_static;
2149 use std::str::FromStr;
2150 use tokio;
2151 use uuid::Uuid;
2152
2153 use tokio::sync::Mutex;
2154
2155 lazy_static! {
2157 static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
2158 }
2159
2160 fn create_test_transaction(id: &str) -> TransactionRepoModel {
2162 TransactionRepoModel {
2163 id: id.to_string(),
2164 relayer_id: "relayer-1".to_string(),
2165 status: TransactionStatus::Pending,
2166 status_reason: None,
2167 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
2168 sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
2169 confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
2170 valid_until: None,
2171 delete_at: None,
2172 network_type: NetworkType::Evm,
2173 priced_at: None,
2174 hashes: vec![],
2175 network_data: NetworkTransactionData::Evm(EvmTransactionData {
2176 gas_price: Some(1000000000),
2177 gas_limit: Some(21000),
2178 nonce: Some(1),
2179 value: U256::from_str("1000000000000000000").unwrap(),
2180 data: Some("0x".to_string()),
2181 from: "0xSender".to_string(),
2182 to: Some("0xRecipient".to_string()),
2183 chain_id: 1,
2184 signature: None,
2185 hash: Some(format!("0x{id}")),
2186 speed: Some(Speed::Fast),
2187 max_fee_per_gas: None,
2188 max_priority_fee_per_gas: None,
2189 raw: None,
2190 }),
2191 noop_count: None,
2192 is_canceled: Some(false),
2193 metadata: None,
2194 }
2195 }
2196
2197 fn create_test_transaction_with_relayer(id: &str, relayer_id: &str) -> TransactionRepoModel {
2198 let mut tx = create_test_transaction(id);
2199 tx.relayer_id = relayer_id.to_string();
2200 tx
2201 }
2202
2203 fn create_test_transaction_with_status(
2204 id: &str,
2205 relayer_id: &str,
2206 status: TransactionStatus,
2207 ) -> TransactionRepoModel {
2208 let mut tx = create_test_transaction_with_relayer(id, relayer_id);
2209 tx.status = status;
2210 tx
2211 }
2212
2213 fn create_test_transaction_with_nonce(
2214 id: &str,
2215 nonce: u64,
2216 relayer_id: &str,
2217 ) -> TransactionRepoModel {
2218 let mut tx = create_test_transaction_with_relayer(id, relayer_id);
2219 if let NetworkTransactionData::Evm(ref mut evm_data) = tx.network_data {
2220 evm_data.nonce = Some(nonce);
2221 }
2222 tx
2223 }
2224
2225 async fn setup_test_repo() -> RedisTransactionRepository {
2226 let redis_url = std::env::var("REDIS_TEST_URL")
2228 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
2229
2230 let cfg = Config::from_url(&redis_url);
2231 let pool = Arc::new(
2232 cfg.builder()
2233 .expect("Failed to create pool builder")
2234 .max_size(16)
2235 .runtime(Runtime::Tokio1)
2236 .build()
2237 .expect("Failed to build Redis pool"),
2238 );
2239
2240 let connections = Arc::new(RedisConnections::new_single_pool(pool));
2242
2243 let random_id = Uuid::new_v4().to_string();
2244 let key_prefix = format!("test_prefix:{random_id}");
2245
2246 RedisTransactionRepository::new(connections, key_prefix)
2247 .expect("Failed to create RedisTransactionRepository")
2248 }
2249
2250 #[tokio::test]
2251 #[ignore = "Requires active Redis instance"]
2252 async fn test_new_repository_creation() {
2253 let repo = setup_test_repo().await;
2254 assert!(repo.key_prefix.contains("test_prefix"));
2255 }
2256
2257 #[tokio::test]
2258 #[ignore = "Requires active Redis instance"]
2259 async fn test_new_repository_empty_prefix_fails() {
2260 let redis_url = std::env::var("REDIS_TEST_URL")
2261 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
2262 let cfg = Config::from_url(&redis_url);
2263 let pool = Arc::new(
2264 cfg.builder()
2265 .expect("Failed to create pool builder")
2266 .max_size(16)
2267 .runtime(Runtime::Tokio1)
2268 .build()
2269 .expect("Failed to build Redis pool"),
2270 );
2271 let connections = Arc::new(RedisConnections::new_single_pool(pool));
2272
2273 let result = RedisTransactionRepository::new(connections, "".to_string());
2274 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
2275 }
2276
2277 #[tokio::test]
2278 #[ignore = "Requires active Redis instance"]
2279 async fn test_key_generation() {
2280 let repo = setup_test_repo().await;
2281
2282 assert!(repo
2283 .tx_key("relayer-1", "test-id")
2284 .contains(":relayer:relayer-1:tx:test-id"));
2285 assert!(repo
2286 .tx_to_relayer_key("test-id")
2287 .contains(":relayer:tx_to_relayer:test-id"));
2288 assert!(repo.relayer_list_key().contains(":relayer_list"));
2289 assert!(repo
2290 .relayer_status_key("relayer-1", &TransactionStatus::Pending)
2291 .contains(":relayer:relayer-1:status:Pending"));
2292 assert!(repo
2293 .relayer_nonce_key("relayer-1", 42)
2294 .contains(":relayer:relayer-1:nonce:42"));
2295 }
2296
2297 #[tokio::test]
2298 #[ignore = "Requires active Redis instance"]
2299 async fn test_serialize_deserialize_transaction() {
2300 let repo = setup_test_repo().await;
2301 let tx = create_test_transaction("test-1");
2302
2303 let serialized = repo
2304 .serialize_entity(&tx, |t| &t.id, "transaction")
2305 .expect("Serialization should succeed");
2306 let deserialized: TransactionRepoModel = repo
2307 .deserialize_entity(&serialized, "test-1", "transaction")
2308 .expect("Deserialization should succeed");
2309
2310 assert_eq!(tx.id, deserialized.id);
2311 assert_eq!(tx.relayer_id, deserialized.relayer_id);
2312 assert_eq!(tx.status, deserialized.status);
2313 }
2314
2315 #[tokio::test]
2316 #[ignore = "Requires active Redis instance"]
2317 async fn test_extract_nonce() {
2318 let repo = setup_test_repo().await;
2319 let random_id = Uuid::new_v4().to_string();
2320 let relayer_id = Uuid::new_v4().to_string();
2321 let tx_with_nonce = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
2322
2323 let nonce = repo.extract_nonce(&tx_with_nonce.network_data);
2324 assert_eq!(nonce, Some(42));
2325 }
2326
2327 #[tokio::test]
2328 #[ignore = "Requires active Redis instance"]
2329 async fn test_create_transaction() {
2330 let repo = setup_test_repo().await;
2331 let random_id = Uuid::new_v4().to_string();
2332 let tx = create_test_transaction(&random_id);
2333
2334 let result = repo.create(tx.clone()).await.unwrap();
2335 assert_eq!(result.id, tx.id);
2336 }
2337
2338 #[tokio::test]
2339 #[ignore = "Requires active Redis instance"]
2340 async fn test_get_transaction() {
2341 let repo = setup_test_repo().await;
2342 let random_id = Uuid::new_v4().to_string();
2343 let tx = create_test_transaction(&random_id);
2344
2345 repo.create(tx.clone()).await.unwrap();
2346 let stored = repo.get_by_id(random_id.to_string()).await.unwrap();
2347 assert_eq!(stored.id, tx.id);
2348 assert_eq!(stored.relayer_id, tx.relayer_id);
2349 }
2350
2351 #[tokio::test]
2352 #[ignore = "Requires active Redis instance"]
2353 async fn test_update_transaction() {
2354 let repo = setup_test_repo().await;
2355 let random_id = Uuid::new_v4().to_string();
2356 let mut tx = create_test_transaction(&random_id);
2357
2358 repo.create(tx.clone()).await.unwrap();
2359 tx.status = TransactionStatus::Confirmed;
2360
2361 let updated = repo.update(random_id.to_string(), tx).await.unwrap();
2362 assert!(matches!(updated.status, TransactionStatus::Confirmed));
2363 }
2364
2365 #[tokio::test]
2366 #[ignore = "Requires active Redis instance"]
2367 async fn test_delete_transaction() {
2368 let repo = setup_test_repo().await;
2369 let random_id = Uuid::new_v4().to_string();
2370 let tx = create_test_transaction(&random_id);
2371
2372 repo.create(tx).await.unwrap();
2373 repo.delete_by_id(random_id.to_string()).await.unwrap();
2374
2375 let result = repo.get_by_id(random_id.to_string()).await;
2376 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
2377 }
2378
2379 #[tokio::test]
2380 #[ignore = "Requires active Redis instance"]
2381 async fn test_list_all_transactions() {
2382 let repo = setup_test_repo().await;
2383 let random_id = Uuid::new_v4().to_string();
2384 let random_id2 = Uuid::new_v4().to_string();
2385
2386 let tx1 = create_test_transaction(&random_id);
2387 let tx2 = create_test_transaction(&random_id2);
2388
2389 repo.create(tx1).await.unwrap();
2390 repo.create(tx2).await.unwrap();
2391
2392 let transactions = repo.list_all().await.unwrap();
2393 assert!(transactions.len() >= 2);
2394 }
2395
2396 #[tokio::test]
2397 #[ignore = "Requires active Redis instance"]
2398 async fn test_count_transactions() {
2399 let repo = setup_test_repo().await;
2400 let random_id = Uuid::new_v4().to_string();
2401 let tx = create_test_transaction(&random_id);
2402
2403 let count = repo.count().await.unwrap();
2404 repo.create(tx).await.unwrap();
2405 assert!(repo.count().await.unwrap() > count);
2406 }
2407
2408 #[tokio::test]
2409 #[ignore = "Requires active Redis instance"]
2410 async fn test_get_nonexistent_transaction() {
2411 let repo = setup_test_repo().await;
2412 let result = repo.get_by_id("nonexistent".to_string()).await;
2413 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
2414 }
2415
2416 #[tokio::test]
2417 #[ignore = "Requires active Redis instance"]
2418 async fn test_duplicate_transaction_creation() {
2419 let repo = setup_test_repo().await;
2420 let random_id = Uuid::new_v4().to_string();
2421
2422 let tx = create_test_transaction(&random_id);
2423
2424 repo.create(tx.clone()).await.unwrap();
2425 let result = repo.create(tx).await;
2426
2427 assert!(matches!(
2428 result,
2429 Err(RepositoryError::ConstraintViolation(_))
2430 ));
2431 }
2432
2433 #[tokio::test]
2434 #[ignore = "Requires active Redis instance"]
2435 async fn test_update_nonexistent_transaction() {
2436 let repo = setup_test_repo().await;
2437 let tx = create_test_transaction("test-1");
2438
2439 let result = repo.update("nonexistent".to_string(), tx).await;
2440 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
2441 }
2442
2443 #[tokio::test]
2444 #[ignore = "Requires active Redis instance"]
2445 async fn test_list_paginated() {
2446 let repo = setup_test_repo().await;
2447
2448 for _ in 1..=10 {
2450 let random_id = Uuid::new_v4().to_string();
2451 let tx = create_test_transaction(&random_id);
2452 repo.create(tx).await.unwrap();
2453 }
2454
2455 let query = PaginationQuery {
2457 page: 1,
2458 per_page: 3,
2459 };
2460 let result = repo.list_paginated(query).await.unwrap();
2461 assert_eq!(result.items.len(), 3);
2462 assert!(result.total >= 10);
2463 assert_eq!(result.page, 1);
2464 assert_eq!(result.per_page, 3);
2465
2466 let query = PaginationQuery {
2468 page: 1000,
2469 per_page: 3,
2470 };
2471 let result = repo.list_paginated(query).await.unwrap();
2472 assert_eq!(result.items.len(), 0);
2473 }
2474
2475 #[tokio::test]
2476 #[ignore = "Requires active Redis instance"]
2477 async fn test_find_by_relayer_id() {
2478 let repo = setup_test_repo().await;
2479 let random_id = Uuid::new_v4().to_string();
2480 let random_id2 = Uuid::new_v4().to_string();
2481 let random_id3 = Uuid::new_v4().to_string();
2482
2483 let tx1 = create_test_transaction_with_relayer(&random_id, "relayer-1");
2484 let tx2 = create_test_transaction_with_relayer(&random_id2, "relayer-1");
2485 let tx3 = create_test_transaction_with_relayer(&random_id3, "relayer-2");
2486
2487 repo.create(tx1).await.unwrap();
2488 repo.create(tx2).await.unwrap();
2489 repo.create(tx3).await.unwrap();
2490
2491 let query = PaginationQuery {
2493 page: 1,
2494 per_page: 10,
2495 };
2496 let result = repo
2497 .find_by_relayer_id("relayer-1", query.clone())
2498 .await
2499 .unwrap();
2500 assert!(result.total >= 2);
2501 assert!(result.items.len() >= 2);
2502 assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-1"));
2503
2504 let result = repo
2506 .find_by_relayer_id("relayer-2", query.clone())
2507 .await
2508 .unwrap();
2509 assert!(result.total >= 1);
2510 assert!(!result.items.is_empty());
2511 assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-2"));
2512
2513 let result = repo
2515 .find_by_relayer_id("non-existent", query.clone())
2516 .await
2517 .unwrap();
2518 assert_eq!(result.total, 0);
2519 assert_eq!(result.items.len(), 0);
2520 }
2521
2522 #[tokio::test]
2523 #[ignore = "Requires active Redis instance"]
2524 async fn test_find_by_relayer_id_sorted_by_created_at_newest_first() {
2525 let repo = setup_test_repo().await;
2526 let relayer_id = Uuid::new_v4().to_string();
2527
2528 let mut tx1 = create_test_transaction_with_relayer("test-1", &relayer_id);
2530 tx1.created_at = "2025-01-27T10:00:00.000000+00:00".to_string(); let mut tx2 = create_test_transaction_with_relayer("test-2", &relayer_id);
2533 tx2.created_at = "2025-01-27T12:00:00.000000+00:00".to_string(); let mut tx3 = create_test_transaction_with_relayer("test-3", &relayer_id);
2536 tx3.created_at = "2025-01-27T14:00:00.000000+00:00".to_string(); repo.create(tx2.clone()).await.unwrap(); repo.create(tx1.clone()).await.unwrap(); repo.create(tx3.clone()).await.unwrap(); let query = PaginationQuery {
2544 page: 1,
2545 per_page: 10,
2546 };
2547 let result = repo.find_by_relayer_id(&relayer_id, query).await.unwrap();
2548
2549 assert_eq!(result.total, 3);
2550 assert_eq!(result.items.len(), 3);
2551
2552 assert_eq!(
2554 result.items[0].id, "test-3",
2555 "First item should be newest (test-3)"
2556 );
2557 assert_eq!(
2558 result.items[0].created_at,
2559 "2025-01-27T14:00:00.000000+00:00"
2560 );
2561
2562 assert_eq!(
2563 result.items[1].id, "test-2",
2564 "Second item should be middle (test-2)"
2565 );
2566 assert_eq!(
2567 result.items[1].created_at,
2568 "2025-01-27T12:00:00.000000+00:00"
2569 );
2570
2571 assert_eq!(
2572 result.items[2].id, "test-1",
2573 "Third item should be oldest (test-1)"
2574 );
2575 assert_eq!(
2576 result.items[2].created_at,
2577 "2025-01-27T10:00:00.000000+00:00"
2578 );
2579 }
2580
2581 #[tokio::test]
2582 #[ignore = "Requires active Redis instance"]
2583 async fn test_find_by_status() {
2584 let repo = setup_test_repo().await;
2585 let random_id = Uuid::new_v4().to_string();
2586 let random_id2 = Uuid::new_v4().to_string();
2587 let random_id3 = Uuid::new_v4().to_string();
2588 let relayer_id = Uuid::new_v4().to_string();
2589 let tx1 = create_test_transaction_with_status(
2590 &random_id,
2591 &relayer_id,
2592 TransactionStatus::Pending,
2593 );
2594 let tx2 =
2595 create_test_transaction_with_status(&random_id2, &relayer_id, TransactionStatus::Sent);
2596 let tx3 = create_test_transaction_with_status(
2597 &random_id3,
2598 &relayer_id,
2599 TransactionStatus::Confirmed,
2600 );
2601
2602 repo.create(tx1).await.unwrap();
2603 repo.create(tx2).await.unwrap();
2604 repo.create(tx3).await.unwrap();
2605
2606 let result = repo
2608 .find_by_status(&relayer_id, &[TransactionStatus::Pending])
2609 .await
2610 .unwrap();
2611 assert_eq!(result.len(), 1);
2612 assert_eq!(result[0].status, TransactionStatus::Pending);
2613
2614 let result = repo
2616 .find_by_status(
2617 &relayer_id,
2618 &[TransactionStatus::Pending, TransactionStatus::Sent],
2619 )
2620 .await
2621 .unwrap();
2622 assert_eq!(result.len(), 2);
2623
2624 let result = repo
2626 .find_by_status(&relayer_id, &[TransactionStatus::Failed])
2627 .await
2628 .unwrap();
2629 assert_eq!(result.len(), 0);
2630 }
2631
2632 #[tokio::test]
2633 #[ignore = "Requires active Redis instance"]
2634 async fn test_find_by_status_paginated() {
2635 let repo = setup_test_repo().await;
2636 let relayer_id = Uuid::new_v4().to_string();
2637
2638 for i in 1..=5 {
2640 let tx_id = Uuid::new_v4().to_string();
2641 let mut tx = create_test_transaction_with_status(
2642 &tx_id,
2643 &relayer_id,
2644 TransactionStatus::Pending,
2645 );
2646 tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
2647 repo.create(tx).await.unwrap();
2648 }
2649
2650 for i in 6..=7 {
2652 let tx_id = Uuid::new_v4().to_string();
2653 let mut tx = create_test_transaction_with_status(
2654 &tx_id,
2655 &relayer_id,
2656 TransactionStatus::Confirmed,
2657 );
2658 tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
2659 repo.create(tx).await.unwrap();
2660 }
2661
2662 let query = PaginationQuery {
2664 page: 1,
2665 per_page: 2,
2666 };
2667 let result = repo
2668 .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2669 .await
2670 .unwrap();
2671
2672 assert_eq!(result.total, 5);
2673 assert_eq!(result.items.len(), 2);
2674 assert_eq!(result.page, 1);
2675 assert_eq!(result.per_page, 2);
2676
2677 let query = PaginationQuery {
2679 page: 2,
2680 per_page: 2,
2681 };
2682 let result = repo
2683 .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2684 .await
2685 .unwrap();
2686
2687 assert_eq!(result.total, 5);
2688 assert_eq!(result.items.len(), 2);
2689 assert_eq!(result.page, 2);
2690
2691 let query = PaginationQuery {
2693 page: 3,
2694 per_page: 2,
2695 };
2696 let result = repo
2697 .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2698 .await
2699 .unwrap();
2700
2701 assert_eq!(result.total, 5);
2702 assert_eq!(result.items.len(), 1);
2703
2704 let query = PaginationQuery {
2706 page: 1,
2707 per_page: 10,
2708 };
2709 let result = repo
2710 .find_by_status_paginated(
2711 &relayer_id,
2712 &[TransactionStatus::Pending, TransactionStatus::Confirmed],
2713 query,
2714 false,
2715 )
2716 .await
2717 .unwrap();
2718
2719 assert_eq!(result.total, 7);
2720 assert_eq!(result.items.len(), 7);
2721
2722 let query = PaginationQuery {
2724 page: 1,
2725 per_page: 10,
2726 };
2727 let result = repo
2728 .find_by_status_paginated(&relayer_id, &[TransactionStatus::Failed], query, false)
2729 .await
2730 .unwrap();
2731
2732 assert_eq!(result.total, 0);
2733 assert_eq!(result.items.len(), 0);
2734 }
2735
2736 #[tokio::test]
2737 #[ignore = "Requires active Redis instance"]
2738 async fn test_find_by_status_paginated_oldest_first() {
2739 let repo = setup_test_repo().await;
2740 let relayer_id = Uuid::new_v4().to_string();
2741
2742 for i in 1..=5 {
2744 let tx_id = format!("tx{}-{}", i, Uuid::new_v4());
2745 let mut tx = create_test_transaction(&tx_id);
2746 tx.relayer_id = relayer_id.clone();
2747 tx.status = TransactionStatus::Pending;
2748 tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
2749 repo.create(tx).await.unwrap();
2750 }
2751
2752 let query = PaginationQuery {
2754 page: 1,
2755 per_page: 3,
2756 };
2757 let result = repo
2758 .find_by_status_paginated(
2759 &relayer_id,
2760 &[TransactionStatus::Pending],
2761 query.clone(),
2762 true,
2763 )
2764 .await
2765 .unwrap();
2766
2767 assert_eq!(result.total, 5);
2768 assert_eq!(result.items.len(), 3);
2769 assert!(
2771 result.items[0].created_at < result.items[1].created_at,
2772 "First item should be older than second"
2773 );
2774 assert!(
2775 result.items[1].created_at < result.items[2].created_at,
2776 "Second item should be older than third"
2777 );
2778
2779 let result_newest = repo
2781 .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2782 .await
2783 .unwrap();
2784
2785 assert_eq!(result_newest.items.len(), 3);
2786 assert!(
2788 result_newest.items[0].created_at > result_newest.items[1].created_at,
2789 "First item should be newer than second"
2790 );
2791 assert!(
2792 result_newest.items[1].created_at > result_newest.items[2].created_at,
2793 "Second item should be newer than third"
2794 );
2795 }
2796
2797 #[tokio::test]
2798 #[ignore = "Requires active Redis instance"]
2799 async fn test_find_by_status_paginated_oldest_first_single_item() {
2800 let repo = setup_test_repo().await;
2801 let relayer_id = Uuid::new_v4().to_string();
2802
2803 let timestamps = [
2805 "2025-01-27T08:00:00.000000+00:00", "2025-01-27T10:00:00.000000+00:00", "2025-01-27T12:00:00.000000+00:00", ];
2809
2810 let mut oldest_id = String::new();
2811 let mut newest_id = String::new();
2812
2813 for (i, timestamp) in timestamps.iter().enumerate() {
2814 let tx_id = format!("tx-{}-{}", i, Uuid::new_v4());
2815 if i == 0 {
2816 oldest_id = tx_id.clone();
2817 }
2818 if i == 2 {
2819 newest_id = tx_id.clone();
2820 }
2821 let mut tx = create_test_transaction(&tx_id);
2822 tx.relayer_id = relayer_id.clone();
2823 tx.status = TransactionStatus::Pending;
2824 tx.created_at = timestamp.to_string();
2825 repo.create(tx).await.unwrap();
2826 }
2827
2828 let query = PaginationQuery {
2830 page: 1,
2831 per_page: 1,
2832 };
2833 let result = repo
2834 .find_by_status_paginated(
2835 &relayer_id,
2836 &[TransactionStatus::Pending],
2837 query.clone(),
2838 true,
2839 )
2840 .await
2841 .unwrap();
2842
2843 assert_eq!(result.total, 3);
2844 assert_eq!(result.items.len(), 1);
2845 assert_eq!(
2846 result.items[0].id, oldest_id,
2847 "With oldest_first=true and per_page=1, should return the oldest transaction"
2848 );
2849
2850 let result = repo
2852 .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2853 .await
2854 .unwrap();
2855
2856 assert_eq!(result.items.len(), 1);
2857 assert_eq!(
2858 result.items[0].id, newest_id,
2859 "With oldest_first=false and per_page=1, should return the newest transaction"
2860 );
2861 }
2862
2863 #[tokio::test]
2864 #[ignore = "Requires active Redis instance"]
2865 async fn test_find_by_nonce() {
2866 let repo = setup_test_repo().await;
2867 let random_id = Uuid::new_v4().to_string();
2868 let random_id2 = Uuid::new_v4().to_string();
2869 let relayer_id = Uuid::new_v4().to_string();
2870
2871 let tx1 = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
2872 let tx2 = create_test_transaction_with_nonce(&random_id2, 43, &relayer_id);
2873
2874 repo.create(tx1.clone()).await.unwrap();
2875 repo.create(tx2).await.unwrap();
2876
2877 let result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
2879 assert!(result.is_some());
2880 assert_eq!(result.unwrap().id, random_id);
2881
2882 let result = repo.find_by_nonce(&relayer_id, 99).await.unwrap();
2884 assert!(result.is_none());
2885
2886 let result = repo.find_by_nonce("non-existent", 42).await.unwrap();
2888 assert!(result.is_none());
2889 }
2890
2891 #[tokio::test]
2892 #[ignore = "Requires active Redis instance"]
2893 async fn test_update_status() {
2894 let repo = setup_test_repo().await;
2895 let random_id = Uuid::new_v4().to_string();
2896 let tx = create_test_transaction(&random_id);
2897
2898 repo.create(tx).await.unwrap();
2899 let updated = repo
2900 .update_status(random_id.to_string(), TransactionStatus::Confirmed)
2901 .await
2902 .unwrap();
2903 assert_eq!(updated.status, TransactionStatus::Confirmed);
2904 }
2905
2906 #[tokio::test]
2907 #[ignore = "Requires active Redis instance"]
2908 async fn test_partial_update() {
2909 let repo = setup_test_repo().await;
2910 let random_id = Uuid::new_v4().to_string();
2911 let tx = create_test_transaction(&random_id);
2912
2913 repo.create(tx).await.unwrap();
2914
2915 let update = TransactionUpdateRequest {
2916 status: Some(TransactionStatus::Sent),
2917 status_reason: Some("Transaction sent".to_string()),
2918 sent_at: Some("2025-01-27T16:00:00.000000+00:00".to_string()),
2919 confirmed_at: None,
2920 network_data: None,
2921 hashes: None,
2922 is_canceled: None,
2923 priced_at: None,
2924 noop_count: None,
2925 delete_at: None,
2926 metadata: None,
2927 };
2928
2929 let updated = repo
2930 .partial_update(random_id.to_string(), update)
2931 .await
2932 .unwrap();
2933 assert_eq!(updated.status, TransactionStatus::Sent);
2934 assert_eq!(updated.status_reason, Some("Transaction sent".to_string()));
2935 assert_eq!(
2936 updated.sent_at,
2937 Some("2025-01-27T16:00:00.000000+00:00".to_string())
2938 );
2939 }
2940
2941 #[tokio::test]
2942 #[ignore = "Requires active Redis instance"]
2943 async fn test_set_sent_at() {
2944 let repo = setup_test_repo().await;
2945 let random_id = Uuid::new_v4().to_string();
2946 let tx = create_test_transaction(&random_id);
2947
2948 repo.create(tx).await.unwrap();
2949 let updated = repo
2950 .set_sent_at(
2951 random_id.to_string(),
2952 "2025-01-27T16:00:00.000000+00:00".to_string(),
2953 )
2954 .await
2955 .unwrap();
2956 assert_eq!(
2957 updated.sent_at,
2958 Some("2025-01-27T16:00:00.000000+00:00".to_string())
2959 );
2960 }
2961
2962 #[tokio::test]
2963 #[ignore = "Requires active Redis instance"]
2964 async fn test_set_confirmed_at() {
2965 let repo = setup_test_repo().await;
2966 let random_id = Uuid::new_v4().to_string();
2967 let tx = create_test_transaction(&random_id);
2968
2969 repo.create(tx).await.unwrap();
2970 let updated = repo
2971 .set_confirmed_at(
2972 random_id.to_string(),
2973 "2025-01-27T16:00:00.000000+00:00".to_string(),
2974 )
2975 .await
2976 .unwrap();
2977 assert_eq!(
2978 updated.confirmed_at,
2979 Some("2025-01-27T16:00:00.000000+00:00".to_string())
2980 );
2981 }
2982
2983 #[tokio::test]
2984 #[ignore = "Requires active Redis instance"]
2985 async fn test_update_network_data() {
2986 let repo = setup_test_repo().await;
2987 let random_id = Uuid::new_v4().to_string();
2988 let tx = create_test_transaction(&random_id);
2989
2990 repo.create(tx).await.unwrap();
2991
2992 let new_network_data = NetworkTransactionData::Evm(EvmTransactionData {
2993 gas_price: Some(2000000000),
2994 gas_limit: Some(42000),
2995 nonce: Some(2),
2996 value: U256::from_str("2000000000000000000").unwrap(),
2997 data: Some("0x1234".to_string()),
2998 from: "0xNewSender".to_string(),
2999 to: Some("0xNewRecipient".to_string()),
3000 chain_id: 1,
3001 signature: None,
3002 hash: Some("0xnewhash".to_string()),
3003 speed: Some(Speed::SafeLow),
3004 max_fee_per_gas: None,
3005 max_priority_fee_per_gas: None,
3006 raw: None,
3007 });
3008
3009 let updated = repo
3010 .update_network_data(random_id.to_string(), new_network_data.clone())
3011 .await
3012 .unwrap();
3013 assert_eq!(
3014 updated
3015 .network_data
3016 .get_evm_transaction_data()
3017 .unwrap()
3018 .hash,
3019 new_network_data.get_evm_transaction_data().unwrap().hash
3020 );
3021 }
3022
3023 #[tokio::test]
3024 #[ignore = "Requires active Redis instance"]
3025 async fn test_debug_implementation() {
3026 let repo = setup_test_repo().await;
3027 let debug_str = format!("{repo:?}");
3028 assert!(debug_str.contains("RedisTransactionRepository"));
3029 assert!(debug_str.contains("test_prefix"));
3030 }
3031
3032 #[tokio::test]
3033 #[ignore = "Requires active Redis instance"]
3034 async fn test_error_handling_empty_id() {
3035 let repo = setup_test_repo().await;
3036
3037 let result = repo.get_by_id("".to_string()).await;
3038 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
3039
3040 let result = repo
3041 .update("".to_string(), create_test_transaction("test"))
3042 .await;
3043 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
3044
3045 let result = repo.delete_by_id("".to_string()).await;
3046 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
3047 }
3048
3049 #[tokio::test]
3050 #[ignore = "Requires active Redis instance"]
3051 async fn test_pagination_validation() {
3052 let repo = setup_test_repo().await;
3053
3054 let query = PaginationQuery {
3055 page: 1,
3056 per_page: 0,
3057 };
3058 let result = repo.list_paginated(query).await;
3059 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
3060 }
3061
3062 #[tokio::test]
3063 #[ignore = "Requires active Redis instance"]
3064 async fn test_index_consistency() {
3065 let repo = setup_test_repo().await;
3066 let random_id = Uuid::new_v4().to_string();
3067 let relayer_id = Uuid::new_v4().to_string();
3068 let tx = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
3069
3070 repo.create(tx.clone()).await.unwrap();
3072
3073 let found = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
3075 assert!(found.is_some());
3076
3077 let mut updated_tx = tx.clone();
3079 if let NetworkTransactionData::Evm(ref mut evm_data) = updated_tx.network_data {
3080 evm_data.nonce = Some(43);
3081 }
3082
3083 repo.update(random_id.to_string(), updated_tx)
3084 .await
3085 .unwrap();
3086
3087 let old_nonce_result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
3089 assert!(old_nonce_result.is_none());
3090
3091 let new_nonce_result = repo.find_by_nonce(&relayer_id, 43).await.unwrap();
3093 assert!(new_nonce_result.is_some());
3094 }
3095
3096 #[tokio::test]
3097 #[ignore = "Requires active Redis instance"]
3098 async fn test_has_entries() {
3099 let repo = setup_test_repo().await;
3100 assert!(!repo.has_entries().await.unwrap());
3101
3102 let tx_id = uuid::Uuid::new_v4().to_string();
3103 let tx = create_test_transaction(&tx_id);
3104 repo.create(tx.clone()).await.unwrap();
3105
3106 assert!(repo.has_entries().await.unwrap());
3107 }
3108
3109 #[tokio::test]
3110 #[ignore = "Requires active Redis instance"]
3111 async fn test_drop_all_entries() {
3112 let repo = setup_test_repo().await;
3113 let tx_id = uuid::Uuid::new_v4().to_string();
3114 let tx = create_test_transaction(&tx_id);
3115 repo.create(tx.clone()).await.unwrap();
3116 assert!(repo.has_entries().await.unwrap());
3117
3118 repo.drop_all_entries().await.unwrap();
3119 assert!(!repo.has_entries().await.unwrap());
3120 }
3121
3122 #[tokio::test]
3124 #[ignore = "Requires active Redis instance"]
3125 async fn test_update_status_sets_delete_at_for_final_statuses() {
3126 let _lock = ENV_MUTEX.lock().await;
3127
3128 use chrono::{DateTime, Duration, Utc};
3129 use std::env;
3130
3131 env::set_var("TRANSACTION_EXPIRATION_HOURS", "6");
3133
3134 let repo = setup_test_repo().await;
3135
3136 let final_statuses = [
3137 TransactionStatus::Canceled,
3138 TransactionStatus::Confirmed,
3139 TransactionStatus::Failed,
3140 TransactionStatus::Expired,
3141 ];
3142
3143 for (i, status) in final_statuses.iter().enumerate() {
3144 let tx_id = format!("test-final-{}-{}", i, Uuid::new_v4());
3145 let mut tx = create_test_transaction(&tx_id);
3146
3147 tx.delete_at = None;
3149 tx.status = TransactionStatus::Pending;
3150
3151 repo.create(tx).await.unwrap();
3152
3153 let before_update = Utc::now();
3154
3155 let updated = repo
3157 .update_status(tx_id.clone(), status.clone())
3158 .await
3159 .unwrap();
3160
3161 assert!(
3163 updated.delete_at.is_some(),
3164 "delete_at should be set for status: {status:?}"
3165 );
3166
3167 let delete_at_str = updated.delete_at.unwrap();
3169 let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
3170 .expect("delete_at should be valid RFC3339")
3171 .with_timezone(&Utc);
3172
3173 let duration_from_before = delete_at.signed_duration_since(before_update);
3174 let expected_duration = Duration::hours(6);
3175 let tolerance = Duration::minutes(5);
3176
3177 assert!(
3178 duration_from_before >= expected_duration - tolerance
3179 && duration_from_before <= expected_duration + tolerance,
3180 "delete_at should be approximately 6 hours from now for status: {status:?}. Duration: {duration_from_before:?}"
3181 );
3182 }
3183
3184 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3186 }
3187
3188 #[tokio::test]
3189 #[ignore = "Requires active Redis instance"]
3190 async fn test_update_status_does_not_set_delete_at_for_non_final_statuses() {
3191 let _lock = ENV_MUTEX.lock().await;
3192
3193 use std::env;
3194
3195 env::set_var("TRANSACTION_EXPIRATION_HOURS", "4");
3196
3197 let repo = setup_test_repo().await;
3198
3199 let non_final_statuses = [
3200 TransactionStatus::Pending,
3201 TransactionStatus::Sent,
3202 TransactionStatus::Submitted,
3203 TransactionStatus::Mined,
3204 ];
3205
3206 for (i, status) in non_final_statuses.iter().enumerate() {
3207 let tx_id = format!("test-non-final-{}-{}", i, Uuid::new_v4());
3208 let mut tx = create_test_transaction(&tx_id);
3209 tx.delete_at = None;
3210 tx.status = TransactionStatus::Pending;
3211
3212 repo.create(tx).await.unwrap();
3213
3214 let updated = repo
3216 .update_status(tx_id.clone(), status.clone())
3217 .await
3218 .unwrap();
3219
3220 assert!(
3222 updated.delete_at.is_none(),
3223 "delete_at should NOT be set for status: {status:?}"
3224 );
3225 }
3226
3227 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3229 }
3230
3231 #[tokio::test]
3232 #[ignore = "Requires active Redis instance"]
3233 async fn test_partial_update_sets_delete_at_for_final_statuses() {
3234 let _lock = ENV_MUTEX.lock().await;
3235
3236 use chrono::{DateTime, Duration, Utc};
3237 use std::env;
3238
3239 env::set_var("TRANSACTION_EXPIRATION_HOURS", "8");
3240
3241 let repo = setup_test_repo().await;
3242 let tx_id = format!("test-partial-final-{}", Uuid::new_v4());
3243 let mut tx = create_test_transaction(&tx_id);
3244 tx.delete_at = None;
3245 tx.status = TransactionStatus::Pending;
3246
3247 repo.create(tx).await.unwrap();
3248
3249 let before_update = Utc::now();
3250
3251 let update = TransactionUpdateRequest {
3253 status: Some(TransactionStatus::Confirmed),
3254 status_reason: Some("Transaction completed".to_string()),
3255 confirmed_at: Some("2023-01-01T12:05:00Z".to_string()),
3256 ..Default::default()
3257 };
3258
3259 let updated = repo.partial_update(tx_id.clone(), update).await.unwrap();
3260
3261 assert!(
3263 updated.delete_at.is_some(),
3264 "delete_at should be set when updating to Confirmed status"
3265 );
3266
3267 let delete_at_str = updated.delete_at.unwrap();
3269 let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
3270 .expect("delete_at should be valid RFC3339")
3271 .with_timezone(&Utc);
3272
3273 let duration_from_before = delete_at.signed_duration_since(before_update);
3274 let expected_duration = Duration::hours(8);
3275 let tolerance = Duration::minutes(5);
3276
3277 assert!(
3278 duration_from_before >= expected_duration - tolerance
3279 && duration_from_before <= expected_duration + tolerance,
3280 "delete_at should be approximately 8 hours from now. Duration: {duration_from_before:?}"
3281 );
3282
3283 assert_eq!(updated.status, TransactionStatus::Confirmed);
3285 assert_eq!(
3286 updated.status_reason,
3287 Some("Transaction completed".to_string())
3288 );
3289 assert_eq!(
3290 updated.confirmed_at,
3291 Some("2023-01-01T12:05:00Z".to_string())
3292 );
3293
3294 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3296 }
3297
3298 #[tokio::test]
3299 #[ignore = "Requires active Redis instance"]
3300 async fn test_update_status_preserves_existing_delete_at() {
3301 let _lock = ENV_MUTEX.lock().await;
3302
3303 use std::env;
3304
3305 env::set_var("TRANSACTION_EXPIRATION_HOURS", "2");
3306
3307 let repo = setup_test_repo().await;
3308 let tx_id = format!("test-preserve-delete-at-{}", Uuid::new_v4());
3309 let mut tx = create_test_transaction(&tx_id);
3310
3311 let existing_delete_at = "2025-01-01T12:00:00Z".to_string();
3313 tx.delete_at = Some(existing_delete_at.clone());
3314 tx.status = TransactionStatus::Pending;
3315
3316 repo.create(tx).await.unwrap();
3317
3318 let updated = repo
3320 .update_status(tx_id.clone(), TransactionStatus::Confirmed)
3321 .await
3322 .unwrap();
3323
3324 assert_eq!(
3326 updated.delete_at,
3327 Some(existing_delete_at),
3328 "Existing delete_at should be preserved when updating to final status"
3329 );
3330
3331 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3333 }
3334 #[tokio::test]
3335 #[ignore = "Requires active Redis instance"]
3336 async fn test_partial_update_without_status_change_preserves_delete_at() {
3337 let _lock = ENV_MUTEX.lock().await;
3338
3339 use std::env;
3340
3341 env::set_var("TRANSACTION_EXPIRATION_HOURS", "3");
3342
3343 let repo = setup_test_repo().await;
3344 let tx_id = format!("test-preserve-no-status-{}", Uuid::new_v4());
3345 let mut tx = create_test_transaction(&tx_id);
3346 tx.delete_at = None;
3347 tx.status = TransactionStatus::Pending;
3348
3349 repo.create(tx).await.unwrap();
3350
3351 let updated1 = repo
3353 .update_status(tx_id.clone(), TransactionStatus::Confirmed)
3354 .await
3355 .unwrap();
3356
3357 assert!(updated1.delete_at.is_some());
3358 let original_delete_at = updated1.delete_at.clone();
3359
3360 let update = TransactionUpdateRequest {
3362 status: None, status_reason: Some("Updated reason".to_string()),
3364 confirmed_at: Some("2023-01-01T12:10:00Z".to_string()),
3365 ..Default::default()
3366 };
3367
3368 let updated2 = repo.partial_update(tx_id.clone(), update).await.unwrap();
3369
3370 assert_eq!(
3372 updated2.delete_at, original_delete_at,
3373 "delete_at should be preserved when status is not updated"
3374 );
3375
3376 assert_eq!(updated2.status, TransactionStatus::Confirmed); assert_eq!(updated2.status_reason, Some("Updated reason".to_string()));
3379 assert_eq!(
3380 updated2.confirmed_at,
3381 Some("2023-01-01T12:10:00Z".to_string())
3382 );
3383
3384 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3386 }
3387
3388 #[tokio::test]
3391 #[ignore = "Requires active Redis instance"]
3392 async fn test_delete_by_ids_empty_list() {
3393 let repo = setup_test_repo().await;
3394 let tx_id = format!("test-empty-{}", Uuid::new_v4());
3395
3396 let tx = create_test_transaction(&tx_id);
3398 repo.create(tx).await.unwrap();
3399
3400 let result = repo.delete_by_ids(vec![]).await.unwrap();
3402
3403 assert_eq!(result.deleted_count, 0);
3404 assert!(result.failed.is_empty());
3405
3406 assert!(repo.get_by_id(tx_id).await.is_ok());
3408 }
3409
3410 #[tokio::test]
3411 #[ignore = "Requires active Redis instance"]
3412 async fn test_delete_by_ids_single_transaction() {
3413 let repo = setup_test_repo().await;
3414 let tx_id = format!("test-single-{}", Uuid::new_v4());
3415
3416 let tx = create_test_transaction(&tx_id);
3417 repo.create(tx).await.unwrap();
3418
3419 let result = repo.delete_by_ids(vec![tx_id.clone()]).await.unwrap();
3420
3421 assert_eq!(result.deleted_count, 1);
3422 assert!(result.failed.is_empty());
3423
3424 assert!(repo.get_by_id(tx_id).await.is_err());
3426 }
3427
3428 #[tokio::test]
3429 #[ignore = "Requires active Redis instance"]
3430 async fn test_delete_by_ids_multiple_transactions() {
3431 let repo = setup_test_repo().await;
3432 let base_id = Uuid::new_v4();
3433
3434 let mut created_ids = Vec::new();
3436 for i in 1..=5 {
3437 let tx_id = format!("test-multi-{base_id}-{i}");
3438 let tx = create_test_transaction(&tx_id);
3439 repo.create(tx).await.unwrap();
3440 created_ids.push(tx_id);
3441 }
3442
3443 let ids_to_delete = vec![
3445 created_ids[0].clone(),
3446 created_ids[2].clone(),
3447 created_ids[4].clone(),
3448 ];
3449 let result = repo.delete_by_ids(ids_to_delete).await.unwrap();
3450
3451 assert_eq!(result.deleted_count, 3);
3452 assert!(result.failed.is_empty());
3453
3454 assert!(repo.get_by_id(created_ids[0].clone()).await.is_err());
3456 assert!(repo.get_by_id(created_ids[1].clone()).await.is_ok()); assert!(repo.get_by_id(created_ids[2].clone()).await.is_err());
3458 assert!(repo.get_by_id(created_ids[3].clone()).await.is_ok()); assert!(repo.get_by_id(created_ids[4].clone()).await.is_err());
3460 }
3461
3462 #[tokio::test]
3463 #[ignore = "Requires active Redis instance"]
3464 async fn test_delete_by_ids_nonexistent_transactions() {
3465 let repo = setup_test_repo().await;
3466 let base_id = Uuid::new_v4();
3467
3468 let ids_to_delete = vec![
3470 format!("nonexistent-{}-1", base_id),
3471 format!("nonexistent-{}-2", base_id),
3472 ];
3473 let result = repo.delete_by_ids(ids_to_delete.clone()).await.unwrap();
3474
3475 assert_eq!(result.deleted_count, 0);
3476 assert_eq!(result.failed.len(), 2);
3477
3478 let failed_ids: Vec<&String> = result.failed.iter().map(|(id, _)| id).collect();
3480 assert!(failed_ids.contains(&&ids_to_delete[0]));
3481 assert!(failed_ids.contains(&&ids_to_delete[1]));
3482 }
3483
3484 #[tokio::test]
3485 #[ignore = "Requires active Redis instance"]
3486 async fn test_delete_by_ids_mixed_existing_and_nonexistent() {
3487 let repo = setup_test_repo().await;
3488 let base_id = Uuid::new_v4();
3489
3490 let existing_ids: Vec<String> = (1..=3)
3492 .map(|i| format!("test-mixed-existing-{base_id}-{i}"))
3493 .collect();
3494
3495 for id in &existing_ids {
3496 let tx = create_test_transaction(id);
3497 repo.create(tx).await.unwrap();
3498 }
3499
3500 let nonexistent_ids: Vec<String> = (1..=2)
3501 .map(|i| format!("test-mixed-nonexistent-{base_id}-{i}"))
3502 .collect();
3503
3504 let ids_to_delete = vec![
3506 existing_ids[0].clone(),
3507 nonexistent_ids[0].clone(),
3508 existing_ids[1].clone(),
3509 nonexistent_ids[1].clone(),
3510 ];
3511 let result = repo.delete_by_ids(ids_to_delete).await.unwrap();
3512
3513 assert_eq!(result.deleted_count, 2);
3514 assert_eq!(result.failed.len(), 2);
3515
3516 assert!(repo.get_by_id(existing_ids[0].clone()).await.is_err());
3518 assert!(repo.get_by_id(existing_ids[1].clone()).await.is_err());
3519
3520 assert!(repo.get_by_id(existing_ids[2].clone()).await.is_ok());
3522 }
3523
3524 #[tokio::test]
3525 #[ignore = "Requires active Redis instance"]
3526 async fn test_delete_by_ids_removes_all_indexes() {
3527 let repo = setup_test_repo().await;
3528 let relayer_id = format!("relayer-{}", Uuid::new_v4());
3529 let tx_id = format!("test-indexes-{}", Uuid::new_v4());
3530
3531 let mut tx = create_test_transaction(&tx_id);
3533 tx.relayer_id = relayer_id.clone();
3534 tx.status = TransactionStatus::Confirmed;
3535 repo.create(tx).await.unwrap();
3536
3537 let found = repo
3539 .find_by_status(&relayer_id, &[TransactionStatus::Confirmed])
3540 .await
3541 .unwrap();
3542 assert!(found.iter().any(|t| t.id == tx_id));
3543
3544 let result = repo.delete_by_ids(vec![tx_id.clone()]).await.unwrap();
3546 assert_eq!(result.deleted_count, 1);
3547
3548 let found_after = repo
3550 .find_by_status(&relayer_id, &[TransactionStatus::Confirmed])
3551 .await
3552 .unwrap();
3553 assert!(!found_after.iter().any(|t| t.id == tx_id));
3554
3555 assert!(repo.get_by_id(tx_id).await.is_err());
3557 }
3558
3559 #[tokio::test]
3560 #[ignore = "Requires active Redis instance"]
3561 async fn test_delete_by_ids_removes_nonce_index() {
3562 let repo = setup_test_repo().await;
3563 let relayer_id = format!("relayer-{}", Uuid::new_v4());
3564 let tx_id = format!("test-nonce-{}", Uuid::new_v4());
3565 let nonce = 12345u64;
3566
3567 let tx = create_test_transaction_with_nonce(&tx_id, nonce, &relayer_id);
3569 repo.create(tx).await.unwrap();
3570
3571 let found = repo.find_by_nonce(&relayer_id, nonce).await.unwrap();
3573 assert!(found.is_some());
3574 assert_eq!(found.unwrap().id, tx_id);
3575
3576 let result = repo.delete_by_ids(vec![tx_id.clone()]).await.unwrap();
3578 assert_eq!(result.deleted_count, 1);
3579
3580 let found_after = repo.find_by_nonce(&relayer_id, nonce).await.unwrap();
3582 assert!(found_after.is_none());
3583 }
3584
3585 #[tokio::test]
3586 #[ignore = "Requires active Redis instance"]
3587 async fn test_delete_by_ids_large_batch() {
3588 let repo = setup_test_repo().await;
3589 let base_id = Uuid::new_v4();
3590
3591 let count = 50;
3593 let mut created_ids = Vec::new();
3594
3595 for i in 0..count {
3596 let tx_id = format!("test-large-{base_id}-{i}");
3597 let tx = create_test_transaction(&tx_id);
3598 repo.create(tx).await.unwrap();
3599 created_ids.push(tx_id);
3600 }
3601
3602 let result = repo.delete_by_ids(created_ids.clone()).await.unwrap();
3604
3605 assert_eq!(result.deleted_count, count);
3606 assert!(result.failed.is_empty());
3607
3608 for id in created_ids {
3610 assert!(repo.get_by_id(id).await.is_err());
3611 }
3612 }
3613
3614 #[tokio::test]
3615 #[ignore = "Requires active Redis instance"]
3616 async fn test_delete_by_ids_preserves_other_relayer_transactions() {
3617 let repo = setup_test_repo().await;
3618 let relayer_1 = format!("relayer-1-{}", Uuid::new_v4());
3619 let relayer_2 = format!("relayer-2-{}", Uuid::new_v4());
3620 let tx_id_1 = format!("tx-relayer-1-{}", Uuid::new_v4());
3621 let tx_id_2 = format!("tx-relayer-2-{}", Uuid::new_v4());
3622
3623 let tx1 = create_test_transaction_with_relayer(&tx_id_1, &relayer_1);
3625 let tx2 = create_test_transaction_with_relayer(&tx_id_2, &relayer_2);
3626
3627 repo.create(tx1).await.unwrap();
3628 repo.create(tx2).await.unwrap();
3629
3630 let result = repo.delete_by_ids(vec![tx_id_1.clone()]).await.unwrap();
3632
3633 assert_eq!(result.deleted_count, 1);
3634
3635 assert!(repo.get_by_id(tx_id_1).await.is_err());
3637
3638 let remaining = repo.get_by_id(tx_id_2).await.unwrap();
3640 assert_eq!(remaining.relayer_id, relayer_2);
3641 }
3642
3643 #[tokio::test]
3646 #[ignore = "Requires active Redis instance"]
3647 async fn test_increment_status_check_failures_no_prior_metadata() {
3648 let _lock = ENV_MUTEX.lock().await;
3649 let repo = setup_test_repo().await;
3650 let relayer_id = Uuid::new_v4().to_string();
3651 let tx_id = Uuid::new_v4().to_string();
3652 let mut tx =
3653 create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3654 tx.metadata = None;
3655 repo.create(tx).await.unwrap();
3656
3657 let updated = repo.increment_status_check_failures(tx_id).await.unwrap();
3658
3659 let meta = updated.metadata.expect("metadata should be set");
3660 assert_eq!(meta.consecutive_failures, 1);
3661 assert_eq!(meta.total_failures, 1);
3662 assert_eq!(meta.insufficient_fee_retries, 0);
3663 }
3664
3665 #[tokio::test]
3666 #[ignore = "Requires active Redis instance"]
3667 async fn test_increment_status_check_failures_accumulates() {
3668 let _lock = ENV_MUTEX.lock().await;
3669 let repo = setup_test_repo().await;
3670 let relayer_id = Uuid::new_v4().to_string();
3671 let tx_id = Uuid::new_v4().to_string();
3672 let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3673 repo.create(tx).await.unwrap();
3674
3675 repo.increment_status_check_failures(tx_id.clone())
3676 .await
3677 .unwrap();
3678 repo.increment_status_check_failures(tx_id.clone())
3679 .await
3680 .unwrap();
3681 let updated = repo.increment_status_check_failures(tx_id).await.unwrap();
3682
3683 let meta = updated.metadata.unwrap();
3684 assert_eq!(meta.consecutive_failures, 3);
3685 assert_eq!(meta.total_failures, 3);
3686 }
3687
3688 #[tokio::test]
3689 #[ignore = "Requires active Redis instance"]
3690 async fn test_increment_status_check_failures_noop_on_final_state() {
3691 let _lock = ENV_MUTEX.lock().await;
3692 let repo = setup_test_repo().await;
3693 let relayer_id = Uuid::new_v4().to_string();
3694 let tx_id = Uuid::new_v4().to_string();
3695 let tx =
3696 create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Confirmed);
3697 repo.create(tx).await.unwrap();
3698
3699 let result = repo.increment_status_check_failures(tx_id).await.unwrap();
3700
3701 assert!(result.metadata.is_none());
3703 assert_eq!(result.status, TransactionStatus::Confirmed);
3704 }
3705
3706 #[tokio::test]
3707 #[ignore = "Requires active Redis instance"]
3708 async fn test_increment_status_check_failures_not_found() {
3709 let _lock = ENV_MUTEX.lock().await;
3710 let repo = setup_test_repo().await;
3711
3712 let result = repo
3713 .increment_status_check_failures("nonexistent".to_string())
3714 .await;
3715
3716 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
3717 }
3718
3719 #[tokio::test]
3722 #[ignore = "Requires active Redis instance"]
3723 async fn test_reset_consecutive_failures() {
3724 let _lock = ENV_MUTEX.lock().await;
3725 let repo = setup_test_repo().await;
3726 let relayer_id = Uuid::new_v4().to_string();
3727 let tx_id = Uuid::new_v4().to_string();
3728 let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3729 repo.create(tx).await.unwrap();
3730
3731 repo.increment_status_check_failures(tx_id.clone())
3733 .await
3734 .unwrap();
3735 repo.increment_status_check_failures(tx_id.clone())
3736 .await
3737 .unwrap();
3738
3739 let updated = repo
3740 .reset_status_check_consecutive_failures(tx_id)
3741 .await
3742 .unwrap();
3743
3744 let meta = updated.metadata.unwrap();
3745 assert_eq!(meta.consecutive_failures, 0);
3746 assert_eq!(meta.total_failures, 2);
3748 }
3749
3750 #[tokio::test]
3751 #[ignore = "Requires active Redis instance"]
3752 async fn test_reset_consecutive_failures_noop_on_final_state() {
3753 let _lock = ENV_MUTEX.lock().await;
3754 let repo = setup_test_repo().await;
3755 let relayer_id = Uuid::new_v4().to_string();
3756 let tx_id = Uuid::new_v4().to_string();
3757 let mut tx =
3758 create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Failed);
3759 tx.metadata = Some(crate::models::TransactionMetadata {
3760 consecutive_failures: 5,
3761 total_failures: 10,
3762 insufficient_fee_retries: 0,
3763 try_again_later_retries: 0,
3764 });
3765 repo.create(tx).await.unwrap();
3766
3767 let result = repo
3768 .reset_status_check_consecutive_failures(tx_id)
3769 .await
3770 .unwrap();
3771
3772 let meta = result.metadata.unwrap();
3774 assert_eq!(meta.consecutive_failures, 5);
3775 }
3776
3777 #[tokio::test]
3778 #[ignore = "Requires active Redis instance"]
3779 async fn test_reset_consecutive_failures_not_found() {
3780 let _lock = ENV_MUTEX.lock().await;
3781 let repo = setup_test_repo().await;
3782
3783 let result = repo
3784 .reset_status_check_consecutive_failures("nonexistent".to_string())
3785 .await;
3786
3787 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
3788 }
3789
3790 #[tokio::test]
3793 #[ignore = "Requires active Redis instance"]
3794 async fn test_record_insufficient_fee_retry() {
3795 let _lock = ENV_MUTEX.lock().await;
3796 let repo = setup_test_repo().await;
3797 let relayer_id = Uuid::new_v4().to_string();
3798 let tx_id = Uuid::new_v4().to_string();
3799 let mut tx =
3800 create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3801 tx.sent_at = None;
3802 repo.create(tx).await.unwrap();
3803
3804 let updated = repo
3805 .record_stellar_insufficient_fee_retry(tx_id, "2025-03-18T10:00:00Z".to_string())
3806 .await
3807 .unwrap();
3808
3809 assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:00:00Z"));
3810 let meta = updated.metadata.unwrap();
3811 assert_eq!(meta.insufficient_fee_retries, 1);
3812 assert_eq!(meta.consecutive_failures, 0);
3813 assert_eq!(meta.total_failures, 0);
3814 }
3815
3816 #[tokio::test]
3817 #[ignore = "Requires active Redis instance"]
3818 async fn test_record_insufficient_fee_retry_accumulates() {
3819 let _lock = ENV_MUTEX.lock().await;
3820 let repo = setup_test_repo().await;
3821 let relayer_id = Uuid::new_v4().to_string();
3822 let tx_id = Uuid::new_v4().to_string();
3823 let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3824 repo.create(tx).await.unwrap();
3825
3826 repo.record_stellar_insufficient_fee_retry(
3827 tx_id.clone(),
3828 "2025-03-18T10:00:00Z".to_string(),
3829 )
3830 .await
3831 .unwrap();
3832
3833 let updated = repo
3834 .record_stellar_insufficient_fee_retry(tx_id, "2025-03-18T10:01:00Z".to_string())
3835 .await
3836 .unwrap();
3837
3838 assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:01:00Z"));
3839 let meta = updated.metadata.unwrap();
3840 assert_eq!(meta.insufficient_fee_retries, 2);
3841 }
3842
3843 #[tokio::test]
3844 #[ignore = "Requires active Redis instance"]
3845 async fn test_record_insufficient_fee_retry_noop_on_final_state() {
3846 let _lock = ENV_MUTEX.lock().await;
3847 let repo = setup_test_repo().await;
3848 let relayer_id = Uuid::new_v4().to_string();
3849 let tx_id = Uuid::new_v4().to_string();
3850 let mut tx =
3851 create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Confirmed);
3852 tx.sent_at = Some("old-time".to_string());
3853 repo.create(tx).await.unwrap();
3854
3855 let result = repo
3856 .record_stellar_insufficient_fee_retry(tx_id, "new-time".to_string())
3857 .await
3858 .unwrap();
3859
3860 assert_eq!(result.sent_at.as_deref(), Some("old-time"));
3862 assert!(result.metadata.is_none());
3863 }
3864
3865 #[tokio::test]
3866 #[ignore = "Requires active Redis instance"]
3867 async fn test_record_insufficient_fee_retry_not_found() {
3868 let _lock = ENV_MUTEX.lock().await;
3869 let repo = setup_test_repo().await;
3870
3871 let result = repo
3872 .record_stellar_insufficient_fee_retry(
3873 "nonexistent".to_string(),
3874 "2025-03-18T10:00:00Z".to_string(),
3875 )
3876 .await;
3877
3878 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
3879 }
3880
3881 #[tokio::test]
3884 #[ignore = "Requires active Redis instance"]
3885 async fn test_record_try_again_later_retry() {
3886 let _lock = ENV_MUTEX.lock().await;
3887 let repo = setup_test_repo().await;
3888 let relayer_id = Uuid::new_v4().to_string();
3889 let tx_id = Uuid::new_v4().to_string();
3890 let mut tx =
3891 create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3892 tx.sent_at = None;
3893 repo.create(tx).await.unwrap();
3894
3895 let updated = repo
3896 .record_stellar_try_again_later_retry(tx_id, "2025-03-18T10:00:00Z".to_string())
3897 .await
3898 .unwrap();
3899
3900 assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:00:00Z"));
3901 let meta = updated.metadata.unwrap();
3902 assert_eq!(meta.try_again_later_retries, 1);
3903 assert_eq!(meta.consecutive_failures, 0);
3904 assert_eq!(meta.total_failures, 0);
3905 }
3906
3907 #[tokio::test]
3908 #[ignore = "Requires active Redis instance"]
3909 async fn test_record_try_again_later_retry_accumulates() {
3910 let _lock = ENV_MUTEX.lock().await;
3911 let repo = setup_test_repo().await;
3912 let relayer_id = Uuid::new_v4().to_string();
3913 let tx_id = Uuid::new_v4().to_string();
3914 let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3915 repo.create(tx).await.unwrap();
3916
3917 repo.record_stellar_try_again_later_retry(
3918 tx_id.clone(),
3919 "2025-03-18T10:00:00Z".to_string(),
3920 )
3921 .await
3922 .unwrap();
3923
3924 let updated = repo
3925 .record_stellar_try_again_later_retry(tx_id, "2025-03-18T10:01:00Z".to_string())
3926 .await
3927 .unwrap();
3928
3929 assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:01:00Z"));
3930 let meta = updated.metadata.unwrap();
3931 assert_eq!(meta.try_again_later_retries, 2);
3932 }
3933
3934 #[tokio::test]
3935 #[ignore = "Requires active Redis instance"]
3936 async fn test_record_try_again_later_retry_noop_on_final_state() {
3937 let _lock = ENV_MUTEX.lock().await;
3938 let repo = setup_test_repo().await;
3939 let relayer_id = Uuid::new_v4().to_string();
3940 let tx_id = Uuid::new_v4().to_string();
3941 let mut tx =
3942 create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Confirmed);
3943 tx.sent_at = Some("old-time".to_string());
3944 repo.create(tx).await.unwrap();
3945
3946 let result = repo
3947 .record_stellar_try_again_later_retry(tx_id, "new-time".to_string())
3948 .await
3949 .unwrap();
3950
3951 assert_eq!(result.sent_at.as_deref(), Some("old-time"));
3953 assert!(result.metadata.is_none());
3954 }
3955
3956 #[tokio::test]
3957 #[ignore = "Requires active Redis instance"]
3958 async fn test_record_try_again_later_retry_not_found() {
3959 let _lock = ENV_MUTEX.lock().await;
3960 let repo = setup_test_repo().await;
3961
3962 let result = repo
3963 .record_stellar_try_again_later_retry(
3964 "nonexistent".to_string(),
3965 "2025-03-18T10:00:00Z".to_string(),
3966 )
3967 .await;
3968
3969 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
3970 }
3971
3972 #[tokio::test]
3975 #[ignore = "Requires active Redis instance"]
3976 async fn test_increment_failures_preserves_try_again_later_retries() {
3977 let _lock = ENV_MUTEX.lock().await;
3978 let repo = setup_test_repo().await;
3979 let relayer_id = Uuid::new_v4().to_string();
3980 let tx_id = Uuid::new_v4().to_string();
3981 let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3982 repo.create(tx).await.unwrap();
3983
3984 repo.record_stellar_try_again_later_retry(
3986 tx_id.clone(),
3987 "2025-03-18T10:00:00Z".to_string(),
3988 )
3989 .await
3990 .unwrap();
3991
3992 let updated = repo.increment_status_check_failures(tx_id).await.unwrap();
3994
3995 let meta = updated.metadata.unwrap();
3996 assert_eq!(
3997 meta.try_again_later_retries, 1,
3998 "try_again_later_retries must survive increment_status_check_failures"
3999 );
4000 assert_eq!(meta.consecutive_failures, 1);
4001 assert_eq!(meta.total_failures, 1);
4002 }
4003
4004 #[tokio::test]
4005 #[ignore = "Requires active Redis instance"]
4006 async fn test_increment_failures_preserves_insufficient_fee_retries() {
4007 let _lock = ENV_MUTEX.lock().await;
4008 let repo = setup_test_repo().await;
4009 let relayer_id = Uuid::new_v4().to_string();
4010 let tx_id = Uuid::new_v4().to_string();
4011 let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
4012 repo.create(tx).await.unwrap();
4013
4014 repo.record_stellar_insufficient_fee_retry(
4016 tx_id.clone(),
4017 "2025-03-18T10:00:00Z".to_string(),
4018 )
4019 .await
4020 .unwrap();
4021
4022 let updated = repo.increment_status_check_failures(tx_id).await.unwrap();
4024
4025 let meta = updated.metadata.unwrap();
4026 assert_eq!(
4027 meta.insufficient_fee_retries, 1,
4028 "insufficient_fee_retries must survive increment_status_check_failures"
4029 );
4030 assert_eq!(meta.consecutive_failures, 1);
4031 }
4032
4033 #[tokio::test]
4034 #[ignore = "Requires active Redis instance"]
4035 async fn test_reset_failures_preserves_retry_counters() {
4036 let _lock = ENV_MUTEX.lock().await;
4037 let repo = setup_test_repo().await;
4038 let relayer_id = Uuid::new_v4().to_string();
4039 let tx_id = Uuid::new_v4().to_string();
4040 let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
4041 repo.create(tx).await.unwrap();
4042
4043 repo.record_stellar_try_again_later_retry(
4045 tx_id.clone(),
4046 "2025-03-18T10:00:00Z".to_string(),
4047 )
4048 .await
4049 .unwrap();
4050 repo.record_stellar_insufficient_fee_retry(
4051 tx_id.clone(),
4052 "2025-03-18T10:01:00Z".to_string(),
4053 )
4054 .await
4055 .unwrap();
4056
4057 repo.increment_status_check_failures(tx_id.clone())
4059 .await
4060 .unwrap();
4061 let updated = repo
4062 .reset_status_check_consecutive_failures(tx_id)
4063 .await
4064 .unwrap();
4065
4066 let meta = updated.metadata.unwrap();
4067 assert_eq!(meta.consecutive_failures, 0);
4068 assert_eq!(meta.total_failures, 1);
4069 assert_eq!(
4070 meta.try_again_later_retries, 1,
4071 "try_again_later_retries must survive reset"
4072 );
4073 assert_eq!(
4074 meta.insufficient_fee_retries, 1,
4075 "insufficient_fee_retries must survive reset"
4076 );
4077 }
4078
4079 #[tokio::test]
4080 #[ignore = "Requires active Redis instance"]
4081 async fn test_fee_and_try_again_later_retries_independent() {
4082 let _lock = ENV_MUTEX.lock().await;
4083 let repo = setup_test_repo().await;
4084 let relayer_id = Uuid::new_v4().to_string();
4085 let tx_id = Uuid::new_v4().to_string();
4086 let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
4087 repo.create(tx).await.unwrap();
4088
4089 repo.record_stellar_try_again_later_retry(
4091 tx_id.clone(),
4092 "2025-03-18T10:00:00Z".to_string(),
4093 )
4094 .await
4095 .unwrap();
4096 repo.record_stellar_try_again_later_retry(
4097 tx_id.clone(),
4098 "2025-03-18T10:01:00Z".to_string(),
4099 )
4100 .await
4101 .unwrap();
4102
4103 let updated = repo
4105 .record_stellar_insufficient_fee_retry(tx_id, "2025-03-18T10:02:00Z".to_string())
4106 .await
4107 .unwrap();
4108
4109 let meta = updated.metadata.unwrap();
4110 assert_eq!(
4111 meta.try_again_later_retries, 2,
4112 "try_again_later_retries must survive insufficient_fee_retry"
4113 );
4114 assert_eq!(meta.insufficient_fee_retries, 1);
4115 }
4116}