openzeppelin_relayer/repositories/transaction/
transaction_redis.rs

1//! Redis-backed implementation of the TransactionRepository.
2
3use crate::config::ServerConfig;
4use crate::constants::FINAL_TRANSACTION_STATUSES;
5use crate::domain::transaction::common::is_final_state;
6use crate::metrics::{
7    TRANSACTIONS_BY_STATUS, TRANSACTIONS_CREATED, TRANSACTIONS_FAILED,
8    TRANSACTIONS_INSUFFICIENT_FEE_FAILED, TRANSACTIONS_INSUFFICIENT_FEE_SUCCESS,
9    TRANSACTIONS_SUBMITTED, TRANSACTIONS_SUCCESS, TRANSACTIONS_TRY_AGAIN_LATER_FAILED,
10    TRANSACTIONS_TRY_AGAIN_LATER_SUCCESS, TRANSACTION_PROCESSING_TIME,
11};
12use crate::models::{
13    NetworkTransactionData, PaginationQuery, RepositoryError, TransactionRepoModel,
14    TransactionStatus, TransactionUpdateRequest,
15};
16use crate::repositories::redis_base::RedisRepository;
17use crate::repositories::{
18    BatchDeleteResult, BatchRetrievalResult, PaginatedResult, Repository, TransactionDeleteRequest,
19    TransactionRepository,
20};
21use crate::utils::RedisConnections;
22use async_trait::async_trait;
23use chrono::Utc;
24use redis::{AsyncCommands, Script};
25use std::fmt;
26use std::sync::Arc;
27use tracing::{debug, error, warn};
28
29const RELAYER_PREFIX: &str = "relayer";
30const TX_PREFIX: &str = "tx";
31const STATUS_PREFIX: &str = "status";
32const STATUS_SORTED_PREFIX: &str = "status_sorted";
33const NONCE_PREFIX: &str = "nonce";
34const TX_TO_RELAYER_PREFIX: &str = "tx_to_relayer";
35const RELAYER_LIST_KEY: &str = "relayer_list";
36const TX_BY_CREATED_AT_PREFIX: &str = "tx_by_created_at";
37
38#[derive(Clone)]
39pub struct RedisTransactionRepository {
40    pub connections: Arc<RedisConnections>,
41    pub key_prefix: String,
42}
43
44impl RedisRepository for RedisTransactionRepository {}
45
46impl RedisTransactionRepository {
47    pub fn new(
48        connections: Arc<RedisConnections>,
49        key_prefix: String,
50    ) -> Result<Self, RepositoryError> {
51        if key_prefix.is_empty() {
52            return Err(RepositoryError::InvalidData(
53                "Redis key prefix cannot be empty".to_string(),
54            ));
55        }
56
57        Ok(Self {
58            connections,
59            key_prefix,
60        })
61    }
62
63    /// Generate key for transaction data: relayer:{relayer_id}:tx:{tx_id}
64    fn tx_key(&self, relayer_id: &str, tx_id: &str) -> String {
65        format!(
66            "{}:{}:{}:{}:{}",
67            self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX, tx_id
68        )
69    }
70
71    /// Generate key for reverse lookup: tx_to_relayer:{tx_id}
72    fn tx_to_relayer_key(&self, tx_id: &str) -> String {
73        format!(
74            "{}:{}:{}:{}",
75            self.key_prefix, RELAYER_PREFIX, TX_TO_RELAYER_PREFIX, tx_id
76        )
77    }
78
79    /// Generate key for relayer status index (legacy SET): relayer:{relayer_id}:status:{status}
80    fn relayer_status_key(&self, relayer_id: &str, status: &TransactionStatus) -> String {
81        format!(
82            "{}:{}:{}:{}:{}",
83            self.key_prefix, RELAYER_PREFIX, relayer_id, STATUS_PREFIX, status
84        )
85    }
86
87    /// Generate key for relayer status sorted index (SORTED SET): relayer:{relayer_id}:status_sorted:{status}
88    /// Score is created_at timestamp in milliseconds for efficient ordering.
89    fn relayer_status_sorted_key(&self, relayer_id: &str, status: &TransactionStatus) -> String {
90        format!(
91            "{}:{}:{}:{}:{}",
92            self.key_prefix, RELAYER_PREFIX, relayer_id, STATUS_SORTED_PREFIX, status
93        )
94    }
95
96    /// Generate key for relayer nonce index: relayer:{relayer_id}:nonce:{nonce}
97    fn relayer_nonce_key(&self, relayer_id: &str, nonce: u64) -> String {
98        format!(
99            "{}:{}:{}:{}:{}",
100            self.key_prefix, RELAYER_PREFIX, relayer_id, NONCE_PREFIX, nonce
101        )
102    }
103
104    /// Generate key for relayer list: relayer_list (set of all relayer IDs)
105    fn relayer_list_key(&self) -> String {
106        format!("{}:{}", self.key_prefix, RELAYER_LIST_KEY)
107    }
108
109    /// Generate key for relayer's sorted set by created_at: relayer:{relayer_id}:tx_by_created_at
110    fn relayer_tx_by_created_at_key(&self, relayer_id: &str) -> String {
111        format!(
112            "{}:{}:{}:{}",
113            self.key_prefix, RELAYER_PREFIX, relayer_id, TX_BY_CREATED_AT_PREFIX
114        )
115    }
116
117    /// Returns the components needed for Lua scripts to resolve a tx key from
118    /// only the tx_id: (tx_to_relayer lookup key, key prefix, key suffix).
119    /// The Lua script does: `GET KEYS[1]` to get the relayer_id, then
120    /// constructs the tx key as `ARGV[1] .. relayer_id .. ARGV[2]`.
121    fn tx_key_parts(&self, tx_id: &str) -> (String, String, String) {
122        let lookup_key = self.tx_to_relayer_key(tx_id);
123        let key_prefix = format!("{}:{}:", self.key_prefix, RELAYER_PREFIX);
124        let key_suffix = format!(":{TX_PREFIX}:{tx_id}");
125        (lookup_key, key_prefix, key_suffix)
126    }
127
128    /// Executes an atomic Lua script with retry/backoff for transient Redis failures.
129    ///
130    /// Every script receives `KEYS[1]` = tx_to_relayer lookup key and
131    /// `ARGV[1..2]` = key prefix/suffix. `extra_args` are appended as `ARGV[3..]`.
132    /// The script must return the (possibly updated) JSON string or `false` for
133    /// not-found.
134    async fn run_atomic_script(
135        &self,
136        lua: &str,
137        tx_id: &str,
138        extra_args: &[&str],
139        op_name: &str,
140    ) -> Result<TransactionRepoModel, RepositoryError> {
141        const MAX_RETRIES: u32 = 3;
142        const BASE_BACKOFF_MS: u64 = 100;
143
144        let (lookup_key, key_prefix, key_suffix) = self.tx_key_parts(tx_id);
145        let script = Script::new(lua);
146        let mut last_error = None;
147
148        for attempt in 0..MAX_RETRIES {
149            let backoff = BASE_BACKOFF_MS * 2u64.pow(attempt);
150
151            let mut conn = match self
152                .get_connection(self.connections.primary(), op_name)
153                .await
154            {
155                Ok(conn) => conn,
156                Err(e) => {
157                    last_error = Some(e);
158                    if attempt < MAX_RETRIES - 1 {
159                        warn!(tx_id = %tx_id, attempt, op = %op_name, "connection failed, retrying");
160                        tokio::time::sleep(tokio::time::Duration::from_millis(backoff)).await;
161                        continue;
162                    }
163                    return Err(last_error.unwrap());
164                }
165            };
166
167            let mut invocation = script.prepare_invoke();
168            invocation
169                .key(&lookup_key)
170                .arg(&key_prefix)
171                .arg(&key_suffix);
172            for arg in extra_args {
173                invocation.arg(*arg);
174            }
175
176            match invocation.invoke_async::<Option<String>>(&mut conn).await {
177                Ok(result) => {
178                    let json = result.ok_or_else(|| {
179                        RepositoryError::NotFound(format!("Transaction with ID {tx_id} not found"))
180                    })?;
181                    return self.deserialize_entity::<TransactionRepoModel>(
182                        &json,
183                        tx_id,
184                        "transaction",
185                    );
186                }
187                Err(e) => {
188                    last_error = Some(self.map_redis_error(e, op_name));
189                    if attempt < MAX_RETRIES - 1 {
190                        warn!(
191                            tx_id = %tx_id, attempt, op = %op_name,
192                            "atomic script failed, retrying"
193                        );
194                        tokio::time::sleep(tokio::time::Duration::from_millis(backoff)).await;
195                        continue;
196                    }
197                    return Err(last_error.unwrap());
198                }
199            }
200        }
201        Err(last_error.unwrap_or_else(|| {
202            RepositoryError::UnexpectedError(format!("retry loop exhausted for {op_name}"))
203        }))
204    }
205
206    /// Executes a Lua script with retry/backoff, returning a Vec<String> result
207    /// (for scripts that return Lua tables / multi-bulk replies).
208    /// Returns `Ok(None)` when the script returns `false`.
209    async fn run_script_with_retry_vec(
210        &self,
211        script: &Script,
212        lookup_key: &str,
213        key_prefix: &str,
214        key_suffix: &str,
215        extra_args: &[&str],
216        op_name: &str,
217    ) -> Result<Option<Vec<String>>, RepositoryError> {
218        const MAX_RETRIES: u32 = 3;
219        const BASE_BACKOFF_MS: u64 = 100;
220
221        let mut last_error = None;
222
223        for attempt in 0..MAX_RETRIES {
224            let backoff = BASE_BACKOFF_MS * 2u64.pow(attempt);
225
226            let mut conn = match self
227                .get_connection(self.connections.primary(), op_name)
228                .await
229            {
230                Ok(conn) => conn,
231                Err(e) => {
232                    last_error = Some(e);
233                    if attempt < MAX_RETRIES - 1 {
234                        warn!(op = %op_name, attempt, "connection failed, retrying");
235                        tokio::time::sleep(tokio::time::Duration::from_millis(backoff)).await;
236                        continue;
237                    }
238                    return Err(last_error.unwrap());
239                }
240            };
241
242            let mut invocation = script.prepare_invoke();
243            invocation.key(lookup_key).arg(key_prefix).arg(key_suffix);
244            for arg in extra_args {
245                invocation.arg(*arg);
246            }
247
248            // Redis returns `false` from Lua as a Nil bulk reply, which
249            // redis-rs maps to `None` for `Option<Vec<String>>`.
250            match invocation
251                .invoke_async::<Option<Vec<String>>>(&mut conn)
252                .await
253            {
254                Ok(result) => return Ok(result),
255                Err(e) => {
256                    last_error = Some(self.map_redis_error(e, op_name));
257                    if attempt < MAX_RETRIES - 1 {
258                        warn!(op = %op_name, attempt, "script failed, retrying");
259                        tokio::time::sleep(tokio::time::Duration::from_millis(backoff)).await;
260                        continue;
261                    }
262                    return Err(last_error.unwrap());
263                }
264            }
265        }
266        Err(last_error.unwrap_or_else(|| {
267            RepositoryError::UnexpectedError(format!("retry loop exhausted for {op_name}"))
268        }))
269    }
270
271    /// Parse timestamp string to score for sorted set (milliseconds since epoch)
272    fn timestamp_to_score(&self, timestamp: &str) -> f64 {
273        chrono::DateTime::parse_from_rfc3339(timestamp)
274            .map(|dt| dt.timestamp_millis() as f64)
275            .unwrap_or_else(|_| {
276                warn!(timestamp = %timestamp, "failed to parse timestamp, using 0");
277                0.0
278            })
279    }
280
281    /// Compute the appropriate score for a transaction's status sorted set.
282    /// - For Confirmed status: use confirmed_at (on-chain confirmation order)
283    /// - For all other statuses: use created_at (queue/processing order)
284    fn status_sorted_score(&self, tx: &TransactionRepoModel) -> f64 {
285        if tx.status == TransactionStatus::Confirmed {
286            // For Confirmed, prefer confirmed_at for accurate on-chain ordering
287            if let Some(ref confirmed_at) = tx.confirmed_at {
288                return self.timestamp_to_score(confirmed_at);
289            }
290            // Fallback to created_at if confirmed_at not set (shouldn't happen)
291            warn!(tx_id = %tx.id, "Confirmed transaction missing confirmed_at, using created_at");
292        }
293        self.timestamp_to_score(&tx.created_at)
294    }
295
296    /// Batch fetch transactions by IDs using reverse lookup
297    async fn get_transactions_by_ids(
298        &self,
299        ids: &[String],
300    ) -> Result<BatchRetrievalResult<TransactionRepoModel>, RepositoryError> {
301        if ids.is_empty() {
302            debug!("no transaction IDs provided for batch fetch");
303            return Ok(BatchRetrievalResult {
304                results: vec![],
305                failed_ids: vec![],
306            });
307        }
308
309        let mut conn = self
310            .get_connection(self.connections.reader(), "batch_fetch_transactions")
311            .await?;
312
313        let reverse_keys: Vec<String> = ids.iter().map(|id| self.tx_to_relayer_key(id)).collect();
314
315        debug!(count = %ids.len(), "fetching relayer IDs for transactions");
316
317        let relayer_ids: Vec<Option<String>> = conn
318            .mget(&reverse_keys)
319            .await
320            .map_err(|e| self.map_redis_error(e, "batch_fetch_relayer_ids"))?;
321
322        let mut tx_keys = Vec::new();
323        let mut valid_ids = Vec::new();
324        let mut failed_ids = Vec::new();
325        for (i, relayer_id) in relayer_ids.into_iter().enumerate() {
326            match relayer_id {
327                Some(relayer_id) => {
328                    tx_keys.push(self.tx_key(&relayer_id, &ids[i]));
329                    valid_ids.push(ids[i].clone());
330                }
331                None => {
332                    warn!(tx_id = %ids[i], "no relayer found for transaction");
333                    failed_ids.push(ids[i].clone());
334                }
335            }
336        }
337
338        if tx_keys.is_empty() {
339            debug!("no valid transactions found for batch fetch");
340            return Ok(BatchRetrievalResult {
341                results: vec![],
342                failed_ids,
343            });
344        }
345
346        debug!(count = %tx_keys.len(), "batch fetching transaction data");
347
348        let values: Vec<Option<String>> = conn
349            .mget(&tx_keys)
350            .await
351            .map_err(|e| self.map_redis_error(e, "batch_fetch_transactions"))?;
352
353        let mut transactions = Vec::new();
354        let mut failed_count = 0;
355        for (i, value) in values.into_iter().enumerate() {
356            match value {
357                Some(json) => {
358                    match self.deserialize_entity::<TransactionRepoModel>(
359                        &json,
360                        &valid_ids[i],
361                        "transaction",
362                    ) {
363                        Ok(tx) => transactions.push(tx),
364                        Err(e) => {
365                            failed_count += 1;
366                            error!(tx_id = %valid_ids[i], error = %e, "failed to deserialize transaction");
367                            // Continue processing other transactions
368                        }
369                    }
370                }
371                None => {
372                    warn!(tx_id = %valid_ids[i], "transaction not found in batch fetch");
373                    failed_ids.push(valid_ids[i].clone());
374                }
375            }
376        }
377
378        if failed_count > 0 {
379            warn!(failed_count = %failed_count, total_count = %valid_ids.len(), "failed to deserialize transactions in batch");
380        }
381
382        debug!(count = %transactions.len(), "successfully fetched transactions");
383        Ok(BatchRetrievalResult {
384            results: transactions,
385            failed_ids,
386        })
387    }
388
389    /// Extract nonce from EVM transaction data
390    fn extract_nonce(&self, network_data: &NetworkTransactionData) -> Option<u64> {
391        match network_data.get_evm_transaction_data() {
392            Ok(tx_data) => tx_data.nonce,
393            Err(_) => {
394                debug!("no EVM transaction data available for nonce extraction");
395                None
396            }
397        }
398    }
399
400    /// Ensures the status sorted set exists, migrating from legacy SET if needed.
401    ///
402    /// This handles the transition from unordered SETs to sorted SETs for status indexing.
403    /// If the sorted set is empty but the legacy set has data, it migrates the data
404    /// by looking up each transaction's created_at timestamp to compute the score.
405    ///
406    /// # Concurrency
407    /// This function is safe for concurrent calls. If multiple calls race to migrate
408    /// the same status set:
409    /// - ZADD is idempotent (same member + score = no-op)
410    /// - DEL on non-existent key is safe (returns 0)
411    /// - After first successful migration, subsequent calls hit the fast path (ZCARD > 0)
412    ///
413    /// The only downside of concurrent migrations is wasted work, not data corruption.
414    ///
415    /// Returns the count of items in the sorted set after migration.
416    async fn ensure_status_sorted_set(
417        &self,
418        relayer_id: &str,
419        status: &TransactionStatus,
420    ) -> Result<u64, RepositoryError> {
421        let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
422        let legacy_key = self.relayer_status_key(relayer_id, status);
423
424        // Phase 1: Check if migration is needed
425        let legacy_ids = {
426            let mut conn = self
427                .get_connection(self.connections.primary(), "ensure_status_sorted_set_check")
428                .await?;
429
430            // Always check if legacy set has data that needs migration
431            let legacy_count: u64 = conn
432                .scard(&legacy_key)
433                .await
434                .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_scard"))?;
435
436            if legacy_count == 0 {
437                // No legacy data to migrate, return current ZSET count
438                let sorted_count: u64 = conn
439                    .zcard(&sorted_key)
440                    .await
441                    .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_zcard"))?;
442                return Ok(sorted_count);
443            }
444
445            // Migration needed: get all IDs from legacy set
446            debug!(
447                relayer_id = %relayer_id,
448                status = %status,
449                legacy_count = %legacy_count,
450                "migrating status set to sorted set"
451            );
452
453            let ids: Vec<String> = conn
454                .smembers(&legacy_key)
455                .await
456                .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_smembers"))?;
457
458            ids
459            // Connection dropped here before nested call to avoid connection doubling
460        };
461
462        if legacy_ids.is_empty() {
463            return Ok(0);
464        }
465
466        // Phase 2: Fetch transactions (uses its own connection internally)
467        let transactions = self.get_transactions_by_ids(&legacy_ids).await?;
468
469        // Phase 3: Perform migration with a new connection
470        let mut conn = self
471            .get_connection(
472                self.connections.primary(),
473                "ensure_status_sorted_set_migrate",
474            )
475            .await?;
476
477        if transactions.results.is_empty() {
478            // All transactions were stale/deleted, clean up legacy set
479            let _: () = conn
480                .del(&legacy_key)
481                .await
482                .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_del_stale"))?;
483            return Ok(0);
484        }
485
486        // Build sorted set entries and migrate atomically
487        // Use status-aware scoring: confirmed_at for Confirmed, created_at for others
488        let mut pipe = redis::pipe();
489        pipe.atomic();
490
491        for tx in &transactions.results {
492            let score = self.status_sorted_score(tx);
493            pipe.zadd(&sorted_key, &tx.id, score);
494        }
495
496        // Delete legacy set after migration
497        pipe.del(&legacy_key);
498
499        pipe.query_async::<()>(&mut conn)
500            .await
501            .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_migrate"))?;
502
503        let migrated_count = transactions.results.len() as u64;
504        debug!(
505            relayer_id = %relayer_id,
506            status = %status,
507            migrated_count = %migrated_count,
508            "completed migration of status set to sorted set"
509        );
510
511        Ok(migrated_count)
512    }
513
514    /// Update indexes atomically with comprehensive error handling
515    async fn update_indexes(
516        &self,
517        tx: &TransactionRepoModel,
518        old_tx: Option<&TransactionRepoModel>,
519    ) -> Result<(), RepositoryError> {
520        let mut conn = self
521            .get_connection(self.connections.primary(), "update_indexes")
522            .await?;
523        let mut pipe = redis::pipe();
524        pipe.atomic();
525
526        debug!(tx_id = %tx.id, "updating indexes for transaction");
527
528        // Add relayer to the global relayer list
529        let relayer_list_key = self.relayer_list_key();
530        pipe.sadd(&relayer_list_key, &tx.relayer_id);
531
532        // Compute scores for sorted sets
533        // Status sorted set: uses confirmed_at for Confirmed status, created_at for others
534        let status_score = self.status_sorted_score(tx);
535        // Global tx_by_created_at: always uses created_at for consistent ordering
536        let created_at_score = self.timestamp_to_score(&tx.created_at);
537
538        // Handle status index updates - write to SORTED SET (new format)
539        let new_status_sorted_key = self.relayer_status_sorted_key(&tx.relayer_id, &tx.status);
540        pipe.zadd(&new_status_sorted_key, &tx.id, status_score);
541        debug!(tx_id = %tx.id, status = %tx.status, score = %status_score, "added transaction to status sorted set");
542
543        if let Some(nonce) = self.extract_nonce(&tx.network_data) {
544            let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
545            pipe.set(&nonce_key, &tx.id);
546            debug!(tx_id = %tx.id, nonce = %nonce, "added nonce index for transaction");
547        }
548
549        // Add to per-relayer sorted set by created_at (for efficient sorted pagination)
550        let relayer_sorted_key = self.relayer_tx_by_created_at_key(&tx.relayer_id);
551        pipe.zadd(&relayer_sorted_key, &tx.id, created_at_score);
552        debug!(tx_id = %tx.id, score = %created_at_score, "added transaction to sorted set by created_at");
553
554        // Remove old indexes if updating
555        if let Some(old) = old_tx {
556            if old.status != tx.status {
557                // Remove from old status sorted set (new format)
558                let old_status_sorted_key =
559                    self.relayer_status_sorted_key(&old.relayer_id, &old.status);
560                pipe.zrem(&old_status_sorted_key, &tx.id);
561
562                // Also clean up legacy SET if it exists (for migration cleanup)
563                let old_status_legacy_key = self.relayer_status_key(&old.relayer_id, &old.status);
564                pipe.srem(&old_status_legacy_key, &tx.id);
565
566                debug!(tx_id = %tx.id, old_status = %old.status, new_status = %tx.status, "removing old status indexes for transaction");
567            }
568
569            // Handle nonce index cleanup
570            if let Some(old_nonce) = self.extract_nonce(&old.network_data) {
571                let new_nonce = self.extract_nonce(&tx.network_data);
572                if Some(old_nonce) != new_nonce {
573                    let old_nonce_key = self.relayer_nonce_key(&old.relayer_id, old_nonce);
574                    pipe.del(&old_nonce_key);
575                    debug!(tx_id = %tx.id, old_nonce = %old_nonce, new_nonce = ?new_nonce, "removing old nonce index for transaction");
576                }
577            }
578        }
579
580        // Execute all operations in a single pipeline
581        pipe.exec_async(&mut conn).await.map_err(|e| {
582            error!(tx_id = %tx.id, error = %e, "index update pipeline failed for transaction");
583            self.map_redis_error(e, &format!("update_indexes_for_tx_{}", tx.id))
584        })?;
585
586        debug!(tx_id = %tx.id, "successfully updated indexes for transaction");
587        Ok(())
588    }
589
590    /// Remove all indexes with error recovery
591    async fn remove_all_indexes(&self, tx: &TransactionRepoModel) -> Result<(), RepositoryError> {
592        let mut conn = self
593            .get_connection(self.connections.primary(), "remove_all_indexes")
594            .await?;
595        let mut pipe = redis::pipe();
596        pipe.atomic();
597
598        debug!(tx_id = %tx.id, "removing all indexes for transaction");
599
600        // Remove from ALL possible status indexes to ensure complete cleanup
601        // This handles cases where a transaction might be in multiple status sets
602        // due to race conditions, partial failures, or bugs
603        for status in &[
604            TransactionStatus::Canceled,
605            TransactionStatus::Pending,
606            TransactionStatus::Sent,
607            TransactionStatus::Submitted,
608            TransactionStatus::Mined,
609            TransactionStatus::Confirmed,
610            TransactionStatus::Failed,
611            TransactionStatus::Expired,
612        ] {
613            // Remove from sorted status set (new format)
614            let status_sorted_key = self.relayer_status_sorted_key(&tx.relayer_id, status);
615            pipe.zrem(&status_sorted_key, &tx.id);
616
617            // Remove from legacy status set (for migration cleanup)
618            let status_legacy_key = self.relayer_status_key(&tx.relayer_id, status);
619            pipe.srem(&status_legacy_key, &tx.id);
620        }
621
622        // Remove nonce index if exists
623        if let Some(nonce) = self.extract_nonce(&tx.network_data) {
624            let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
625            pipe.del(&nonce_key);
626            debug!(tx_id = %tx.id, nonce = %nonce, "removing nonce index for transaction");
627        }
628
629        // Remove from per-relayer sorted set by created_at
630        let relayer_sorted_key = self.relayer_tx_by_created_at_key(&tx.relayer_id);
631        pipe.zrem(&relayer_sorted_key, &tx.id);
632        debug!(tx_id = %tx.id, "removing transaction from sorted set by created_at");
633
634        // Remove reverse lookup
635        let reverse_key = self.tx_to_relayer_key(&tx.id);
636        pipe.del(&reverse_key);
637
638        pipe.exec_async(&mut conn).await.map_err(|e| {
639            error!(tx_id = %tx.id, error = %e, "index removal failed for transaction");
640            self.map_redis_error(e, &format!("remove_indexes_for_tx_{}", tx.id))
641        })?;
642
643        debug!(tx_id = %tx.id, "successfully removed all indexes for transaction");
644        Ok(())
645    }
646
647    /// Track Prometheus metrics when a transaction status changes.
648    fn track_status_change_metrics(
649        &self,
650        _original_tx: &TransactionRepoModel,
651        updated_tx: &TransactionRepoModel,
652        old_status: &TransactionStatus,
653        new_status: &TransactionStatus,
654    ) {
655        let network_type = format!("{:?}", updated_tx.network_type).to_lowercase();
656        let relayer_id = updated_tx.relayer_id.as_str();
657
658        // Track submission (when status changes to Submitted)
659        if *old_status != TransactionStatus::Submitted
660            && *new_status == TransactionStatus::Submitted
661        {
662            TRANSACTIONS_SUBMITTED
663                .with_label_values(&[relayer_id, &network_type])
664                .inc();
665
666            if let Ok(created_time) = chrono::DateTime::parse_from_rfc3339(&updated_tx.created_at) {
667                let processing_seconds =
668                    (Utc::now() - created_time.with_timezone(&Utc)).num_seconds() as f64;
669                TRANSACTION_PROCESSING_TIME
670                    .with_label_values(&[relayer_id, &network_type, "creation_to_submission"])
671                    .observe(processing_seconds);
672            }
673        }
674
675        // Track status distribution (update gauge when status changes)
676        if old_status != new_status {
677            let old_status_str = format!("{old_status:?}").to_lowercase();
678            let old_status_gauge = TRANSACTIONS_BY_STATUS.with_label_values(&[
679                relayer_id,
680                &network_type,
681                &old_status_str,
682            ]);
683            let clamped_value = (old_status_gauge.get() - 1.0).max(0.0);
684            old_status_gauge.set(clamped_value);
685
686            let new_status_str = format!("{new_status:?}").to_lowercase();
687            TRANSACTIONS_BY_STATUS
688                .with_label_values(&[relayer_id, &network_type, &new_status_str])
689                .inc();
690        }
691
692        // Track metrics for final transaction states
693        let was_final = is_final_state(old_status);
694        let is_final = is_final_state(new_status);
695
696        if !was_final && is_final {
697            let previous_status = format!("{old_status:?}").to_lowercase();
698            let meta = updated_tx.metadata.as_ref();
699            let had_insufficient_fee = meta.is_some_and(|m| m.insufficient_fee_retries > 0);
700            let had_try_again_later = meta.is_some_and(|m| m.try_again_later_retries > 0);
701
702            match new_status {
703                TransactionStatus::Confirmed => {
704                    TRANSACTIONS_SUCCESS
705                        .with_label_values(&[relayer_id, &network_type])
706                        .inc();
707                    if had_insufficient_fee {
708                        TRANSACTIONS_INSUFFICIENT_FEE_SUCCESS
709                            .with_label_values(&[relayer_id, &network_type])
710                            .inc();
711                    }
712                    if had_try_again_later {
713                        TRANSACTIONS_TRY_AGAIN_LATER_SUCCESS
714                            .with_label_values(&[relayer_id, &network_type])
715                            .inc();
716                    }
717
718                    if let (Some(sent_at_str), Some(confirmed_at_str)) =
719                        (&updated_tx.sent_at, &updated_tx.confirmed_at)
720                    {
721                        if let (Ok(sent_time), Ok(confirmed_time)) = (
722                            chrono::DateTime::parse_from_rfc3339(sent_at_str),
723                            chrono::DateTime::parse_from_rfc3339(confirmed_at_str),
724                        ) {
725                            let processing_seconds = (confirmed_time.with_timezone(&Utc)
726                                - sent_time.with_timezone(&Utc))
727                            .num_seconds()
728                                as f64;
729                            TRANSACTION_PROCESSING_TIME
730                                .with_label_values(&[
731                                    relayer_id,
732                                    &network_type,
733                                    "submission_to_confirmation",
734                                ])
735                                .observe(processing_seconds);
736                        }
737                    }
738
739                    if let Ok(created_time) =
740                        chrono::DateTime::parse_from_rfc3339(&updated_tx.created_at)
741                    {
742                        if let Some(confirmed_at_str) = &updated_tx.confirmed_at {
743                            if let Ok(confirmed_time) =
744                                chrono::DateTime::parse_from_rfc3339(confirmed_at_str)
745                            {
746                                let processing_seconds = (confirmed_time.with_timezone(&Utc)
747                                    - created_time.with_timezone(&Utc))
748                                .num_seconds()
749                                    as f64;
750                                TRANSACTION_PROCESSING_TIME
751                                    .with_label_values(&[
752                                        relayer_id,
753                                        &network_type,
754                                        "creation_to_confirmation",
755                                    ])
756                                    .observe(processing_seconds);
757                            }
758                        }
759                    }
760                }
761                TransactionStatus::Failed => {
762                    let failure_reason = updated_tx
763                        .status_reason
764                        .as_deref()
765                        .map(|reason| {
766                            if reason.starts_with("Submission failed:") {
767                                "submission_failed"
768                            } else if reason.starts_with("Preparation failed:") {
769                                "preparation_failed"
770                            } else {
771                                "failed"
772                            }
773                        })
774                        .unwrap_or("failed");
775                    TRANSACTIONS_FAILED
776                        .with_label_values(&[
777                            relayer_id,
778                            &network_type,
779                            failure_reason,
780                            &previous_status,
781                        ])
782                        .inc();
783                }
784                TransactionStatus::Expired => {
785                    TRANSACTIONS_FAILED
786                        .with_label_values(&[
787                            relayer_id,
788                            &network_type,
789                            "expired",
790                            &previous_status,
791                        ])
792                        .inc();
793                }
794                TransactionStatus::Canceled => {
795                    TRANSACTIONS_FAILED
796                        .with_label_values(&[
797                            relayer_id,
798                            &network_type,
799                            "canceled",
800                            &previous_status,
801                        ])
802                        .inc();
803                }
804                _ => {}
805            }
806
807            // Track retry-related failure metrics for all non-success final states
808            if *new_status != TransactionStatus::Confirmed {
809                if had_insufficient_fee {
810                    TRANSACTIONS_INSUFFICIENT_FEE_FAILED
811                        .with_label_values(&[relayer_id, &network_type])
812                        .inc();
813                }
814                if had_try_again_later {
815                    TRANSACTIONS_TRY_AGAIN_LATER_FAILED
816                        .with_label_values(&[relayer_id, &network_type])
817                        .inc();
818                }
819            }
820        }
821    }
822}
823
824impl fmt::Debug for RedisTransactionRepository {
825    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
826        f.debug_struct("RedisTransactionRepository")
827            .field("connections", &"<RedisConnections>")
828            .field("key_prefix", &self.key_prefix)
829            .finish()
830    }
831}
832
833#[async_trait]
834impl Repository<TransactionRepoModel, String> for RedisTransactionRepository {
835    async fn create(
836        &self,
837        entity: TransactionRepoModel,
838    ) -> Result<TransactionRepoModel, RepositoryError> {
839        if entity.id.is_empty() {
840            return Err(RepositoryError::InvalidData(
841                "Transaction ID cannot be empty".to_string(),
842            ));
843        }
844
845        let key = self.tx_key(&entity.relayer_id, &entity.id);
846        let reverse_key = self.tx_to_relayer_key(&entity.id);
847        let mut conn = self
848            .get_connection(self.connections.primary(), "create")
849            .await?;
850
851        debug!(tx_id = %entity.id, "creating transaction");
852
853        let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
854
855        // Check if transaction already exists by checking reverse lookup
856        let existing: Option<String> = conn
857            .get(&reverse_key)
858            .await
859            .map_err(|e| self.map_redis_error(e, "create_transaction_check"))?;
860
861        if existing.is_some() {
862            return Err(RepositoryError::ConstraintViolation(format!(
863                "Transaction with ID {} already exists",
864                entity.id
865            )));
866        }
867
868        // Use atomic pipeline for consistency
869        let mut pipe = redis::pipe();
870        pipe.atomic();
871        pipe.set(&key, &value);
872        pipe.set(&reverse_key, &entity.relayer_id);
873
874        pipe.exec_async(&mut conn)
875            .await
876            .map_err(|e| self.map_redis_error(e, "create_transaction"))?;
877
878        // Update indexes separately to handle partial failures gracefully
879        if let Err(e) = self.update_indexes(&entity, None).await {
880            error!(tx_id = %entity.id, error = %e, "failed to update indexes for new transaction");
881            return Err(e);
882        }
883
884        // Track transaction creation metric
885        let network_type = format!("{:?}", entity.network_type).to_lowercase();
886        let relayer_id = entity.relayer_id.as_str();
887        TRANSACTIONS_CREATED
888            .with_label_values(&[relayer_id, &network_type])
889            .inc();
890
891        // Track initial status distribution (Pending)
892        let status = &entity.status;
893        let status_str = format!("{status:?}").to_lowercase();
894        TRANSACTIONS_BY_STATUS
895            .with_label_values(&[relayer_id, &network_type, &status_str])
896            .inc();
897
898        debug!(tx_id = %entity.id, "successfully created transaction");
899        Ok(entity)
900    }
901
902    async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError> {
903        if id.is_empty() {
904            return Err(RepositoryError::InvalidData(
905                "Transaction ID cannot be empty".to_string(),
906            ));
907        }
908
909        let mut conn = self
910            .get_connection(self.connections.reader(), "get_by_id")
911            .await?;
912
913        debug!(tx_id = %id, "fetching transaction");
914
915        let reverse_key = self.tx_to_relayer_key(&id);
916        let relayer_id: Option<String> = conn
917            .get(&reverse_key)
918            .await
919            .map_err(|e| self.map_redis_error(e, "get_transaction_reverse_lookup"))?;
920
921        let relayer_id = match relayer_id {
922            Some(relayer_id) => relayer_id,
923            None => {
924                debug!(tx_id = %id, "transaction not found (no reverse lookup)");
925                return Err(RepositoryError::NotFound(format!(
926                    "Transaction with ID {id} not found"
927                )));
928            }
929        };
930
931        let key = self.tx_key(&relayer_id, &id);
932        let value: Option<String> = conn
933            .get(&key)
934            .await
935            .map_err(|e| self.map_redis_error(e, "get_transaction_by_id"))?;
936
937        match value {
938            Some(json) => {
939                let tx =
940                    self.deserialize_entity::<TransactionRepoModel>(&json, &id, "transaction")?;
941                debug!(tx_id = %id, "successfully fetched transaction");
942                Ok(tx)
943            }
944            None => {
945                debug!(tx_id = %id, "transaction not found");
946                Err(RepositoryError::NotFound(format!(
947                    "Transaction with ID {id} not found"
948                )))
949            }
950        }
951    }
952
953    // Unoptimized implementation of list_paginated. Rarely used. find_by_relayer_id is preferred.
954    async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
955        let mut conn = self
956            .get_connection(self.connections.reader(), "list_all")
957            .await?;
958
959        debug!("fetching all transactions sorted by created_at (newest first)");
960
961        // Get all relayer IDs
962        let relayer_list_key = self.relayer_list_key();
963        let relayer_ids: Vec<String> = conn
964            .smembers(&relayer_list_key)
965            .await
966            .map_err(|e| self.map_redis_error(e, "list_all_relayer_ids"))?;
967
968        debug!(count = %relayer_ids.len(), "found relayers");
969
970        // Collect all transaction IDs from all relayers using their sorted sets
971        let mut all_tx_ids = Vec::new();
972        for relayer_id in relayer_ids {
973            let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
974            let tx_ids: Vec<String> = redis::cmd("ZRANGE")
975                .arg(&relayer_sorted_key)
976                .arg(0)
977                .arg(-1)
978                .arg("REV")
979                .query_async(&mut conn)
980                .await
981                .map_err(|e| self.map_redis_error(e, "list_all_relayer_sorted"))?;
982
983            all_tx_ids.extend(tx_ids);
984        }
985
986        // Release connection before nested call to avoid connection doubling
987        drop(conn);
988
989        // Batch fetch all transactions at once
990        let batch_result = self.get_transactions_by_ids(&all_tx_ids).await?;
991        let mut all_transactions = batch_result.results;
992
993        // Sort all transactions by created_at (newest first)
994        all_transactions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
995
996        debug!(count = %all_transactions.len(), "found transactions");
997        Ok(all_transactions)
998    }
999
1000    // Unoptimized implementation of list_paginated. Rarely used. find_by_relayer_id is preferred.
1001    async fn list_paginated(
1002        &self,
1003        query: PaginationQuery,
1004    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
1005        if query.per_page == 0 {
1006            return Err(RepositoryError::InvalidData(
1007                "per_page must be greater than 0".to_string(),
1008            ));
1009        }
1010
1011        let mut conn = self
1012            .get_connection(self.connections.reader(), "list_paginated")
1013            .await?;
1014
1015        debug!(page = %query.page, per_page = %query.per_page, "fetching paginated transactions sorted by created_at (newest first)");
1016
1017        // Get all relayer IDs
1018        let relayer_list_key = self.relayer_list_key();
1019        let relayer_ids: Vec<String> = conn
1020            .smembers(&relayer_list_key)
1021            .await
1022            .map_err(|e| self.map_redis_error(e, "list_paginated_relayer_ids"))?;
1023
1024        // Collect all transaction IDs from all relayers using their sorted sets
1025        let mut all_tx_ids = Vec::new();
1026        for relayer_id in relayer_ids {
1027            let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
1028            let tx_ids: Vec<String> = redis::cmd("ZRANGE")
1029                .arg(&relayer_sorted_key)
1030                .arg(0)
1031                .arg(-1)
1032                .arg("REV")
1033                .query_async(&mut conn)
1034                .await
1035                .map_err(|e| self.map_redis_error(e, "list_paginated_relayer_sorted"))?;
1036
1037            all_tx_ids.extend(tx_ids);
1038        }
1039
1040        // Release connection before nested call to avoid connection doubling
1041        drop(conn);
1042
1043        // Batch fetch all transactions at once
1044        let batch_result = self.get_transactions_by_ids(&all_tx_ids).await?;
1045        let mut all_transactions = batch_result.results;
1046
1047        // Sort all transactions by created_at (newest first)
1048        all_transactions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
1049
1050        let total = all_transactions.len() as u64;
1051        let start = ((query.page - 1) * query.per_page) as usize;
1052        let end = (start + query.per_page as usize).min(all_transactions.len());
1053
1054        if start >= all_transactions.len() {
1055            debug!(page = %query.page, total = %total, "page is beyond available data");
1056            return Ok(PaginatedResult {
1057                items: vec![],
1058                total,
1059                page: query.page,
1060                per_page: query.per_page,
1061            });
1062        }
1063
1064        let items = all_transactions[start..end].to_vec();
1065
1066        debug!(count = %items.len(), page = %query.page, "successfully fetched transactions for page");
1067
1068        Ok(PaginatedResult {
1069            items,
1070            total,
1071            page: query.page,
1072            per_page: query.per_page,
1073        })
1074    }
1075
1076    async fn update(
1077        &self,
1078        id: String,
1079        entity: TransactionRepoModel,
1080    ) -> Result<TransactionRepoModel, RepositoryError> {
1081        if id.is_empty() {
1082            return Err(RepositoryError::InvalidData(
1083                "Transaction ID cannot be empty".to_string(),
1084            ));
1085        }
1086
1087        debug!(tx_id = %id, "updating transaction");
1088
1089        // Get the old transaction for index cleanup
1090        let old_tx = self.get_by_id(id.clone()).await?;
1091
1092        let key = self.tx_key(&entity.relayer_id, &id);
1093        let mut conn = self
1094            .get_connection(self.connections.primary(), "update")
1095            .await?;
1096
1097        let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
1098
1099        // Update transaction
1100        let _: () = conn
1101            .set(&key, value)
1102            .await
1103            .map_err(|e| self.map_redis_error(e, "update_transaction"))?;
1104
1105        // Update indexes
1106        self.update_indexes(&entity, Some(&old_tx)).await?;
1107
1108        debug!(tx_id = %id, "successfully updated transaction");
1109        Ok(entity)
1110    }
1111
1112    async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
1113        if id.is_empty() {
1114            return Err(RepositoryError::InvalidData(
1115                "Transaction ID cannot be empty".to_string(),
1116            ));
1117        }
1118
1119        debug!(tx_id = %id, "deleting transaction");
1120
1121        // Get transaction first for index cleanup
1122        let tx = self.get_by_id(id.clone()).await?;
1123
1124        let key = self.tx_key(&tx.relayer_id, &id);
1125        let reverse_key = self.tx_to_relayer_key(&id);
1126        let mut conn = self
1127            .get_connection(self.connections.primary(), "delete_by_id")
1128            .await?;
1129
1130        let mut pipe = redis::pipe();
1131        pipe.atomic();
1132        pipe.del(&key);
1133        pipe.del(&reverse_key);
1134
1135        pipe.exec_async(&mut conn)
1136            .await
1137            .map_err(|e| self.map_redis_error(e, "delete_transaction"))?;
1138
1139        // Remove indexes (log errors but don't fail the delete)
1140        if let Err(e) = self.remove_all_indexes(&tx).await {
1141            error!(tx_id = %id, error = %e, "failed to remove indexes for deleted transaction");
1142        }
1143
1144        debug!(tx_id = %id, "successfully deleted transaction");
1145        Ok(())
1146    }
1147
1148    // Unoptimized implementation of count. Rarely used. find_by_relayer_id is preferred.
1149    async fn count(&self) -> Result<usize, RepositoryError> {
1150        let mut conn = self
1151            .get_connection(self.connections.reader(), "count")
1152            .await?;
1153
1154        debug!("counting transactions");
1155
1156        // Get all relayer IDs and sum their sorted set counts
1157        let relayer_list_key = self.relayer_list_key();
1158        let relayer_ids: Vec<String> = conn
1159            .smembers(&relayer_list_key)
1160            .await
1161            .map_err(|e| self.map_redis_error(e, "count_relayer_ids"))?;
1162
1163        let mut total_count = 0usize;
1164        for relayer_id in relayer_ids {
1165            let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
1166            let count: usize = conn
1167                .zcard(&relayer_sorted_key)
1168                .await
1169                .map_err(|e| self.map_redis_error(e, "count_relayer_transactions"))?;
1170            total_count += count;
1171        }
1172
1173        debug!(count = %total_count, "transaction count");
1174        Ok(total_count)
1175    }
1176
1177    async fn has_entries(&self) -> Result<bool, RepositoryError> {
1178        let mut conn = self
1179            .get_connection(self.connections.reader(), "has_entries")
1180            .await?;
1181        let relayer_list_key = self.relayer_list_key();
1182
1183        debug!("checking if transaction entries exist");
1184
1185        let exists: bool = conn
1186            .exists(&relayer_list_key)
1187            .await
1188            .map_err(|e| self.map_redis_error(e, "has_entries_check"))?;
1189
1190        debug!(exists = %exists, "transaction entries exist");
1191        Ok(exists)
1192    }
1193
1194    async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
1195        let mut conn = self
1196            .get_connection(self.connections.primary(), "drop_all_entries")
1197            .await?;
1198        let relayer_list_key = self.relayer_list_key();
1199
1200        debug!("dropping all transaction entries");
1201
1202        // Get all relayer IDs first
1203        let relayer_ids: Vec<String> = conn
1204            .smembers(&relayer_list_key)
1205            .await
1206            .map_err(|e| self.map_redis_error(e, "drop_all_entries_get_relayer_ids"))?;
1207
1208        if relayer_ids.is_empty() {
1209            debug!("no transaction entries to drop");
1210            return Ok(());
1211        }
1212
1213        // Use pipeline for atomic operations
1214        let mut pipe = redis::pipe();
1215        pipe.atomic();
1216
1217        // Delete all transactions and their indexes for each relayer
1218        for relayer_id in &relayer_ids {
1219            // Get all transaction IDs for this relayer
1220            let pattern = format!(
1221                "{}:{}:{}:{}:*",
1222                self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
1223            );
1224            let mut cursor = 0;
1225            let mut tx_ids = Vec::new();
1226
1227            loop {
1228                let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
1229                    .cursor_arg(cursor)
1230                    .arg("MATCH")
1231                    .arg(&pattern)
1232                    .query_async(&mut conn)
1233                    .await
1234                    .map_err(|e| self.map_redis_error(e, "drop_all_entries_scan"))?;
1235
1236                // Extract transaction IDs from keys and delete keys
1237                for key in keys {
1238                    pipe.del(&key);
1239                    if let Some(tx_id) = key.split(':').next_back() {
1240                        tx_ids.push(tx_id.to_string());
1241                    }
1242                }
1243
1244                cursor = next_cursor;
1245                if cursor == 0 {
1246                    break;
1247                }
1248            }
1249
1250            // Delete reverse lookup keys and indexes
1251            for tx_id in tx_ids {
1252                let reverse_key = self.tx_to_relayer_key(&tx_id);
1253                pipe.del(&reverse_key);
1254
1255                // Delete status indexes (we can't know the specific status, so we'll clean up all possible ones)
1256                // This ensures complete cleanup even if there are orphaned entries
1257                for status in &[
1258                    TransactionStatus::Canceled,
1259                    TransactionStatus::Pending,
1260                    TransactionStatus::Sent,
1261                    TransactionStatus::Submitted,
1262                    TransactionStatus::Mined,
1263                    TransactionStatus::Confirmed,
1264                    TransactionStatus::Failed,
1265                    TransactionStatus::Expired,
1266                ] {
1267                    // Remove from sorted status set (new format)
1268                    let status_sorted_key = self.relayer_status_sorted_key(relayer_id, status);
1269                    pipe.zrem(&status_sorted_key, &tx_id);
1270
1271                    // Remove from legacy status set (for migration cleanup)
1272                    let status_key = self.relayer_status_key(relayer_id, status);
1273                    pipe.srem(&status_key, &tx_id);
1274                }
1275            }
1276
1277            // Delete the relayer's sorted set by created_at
1278            let relayer_sorted_key = self.relayer_tx_by_created_at_key(relayer_id);
1279            pipe.del(&relayer_sorted_key);
1280        }
1281
1282        // Delete the relayer list key
1283        pipe.del(&relayer_list_key);
1284
1285        pipe.exec_async(&mut conn)
1286            .await
1287            .map_err(|e| self.map_redis_error(e, "drop_all_entries_pipeline"))?;
1288
1289        debug!(count = %relayer_ids.len(), "dropped all transaction entries for relayers");
1290        Ok(())
1291    }
1292}
1293
1294#[async_trait]
1295impl TransactionRepository for RedisTransactionRepository {
1296    async fn find_by_relayer_id(
1297        &self,
1298        relayer_id: &str,
1299        query: PaginationQuery,
1300    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
1301        let mut conn = self
1302            .get_connection(self.connections.reader(), "find_by_relayer_id")
1303            .await?;
1304
1305        debug!(relayer_id = %relayer_id, page = %query.page, per_page = %query.per_page, "fetching transactions for relayer sorted by created_at (newest first)");
1306
1307        let relayer_sorted_key = self.relayer_tx_by_created_at_key(relayer_id);
1308
1309        // Get total count from relayer's sorted set
1310        let sorted_set_count: u64 = conn
1311            .zcard(&relayer_sorted_key)
1312            .await
1313            .map_err(|e| self.map_redis_error(e, "find_by_relayer_id_count"))?;
1314
1315        // If sorted set is empty, return empty result immediately
1316        // All new transactions are automatically added to the sorted set
1317        if sorted_set_count == 0 {
1318            debug!(relayer_id = %relayer_id, "no transactions found for relayer (sorted set is empty)");
1319            return Ok(PaginatedResult {
1320                items: vec![],
1321                total: 0,
1322                page: query.page,
1323                per_page: query.per_page,
1324            });
1325        }
1326
1327        let total = sorted_set_count;
1328
1329        // Calculate pagination range (0-indexed for Redis ZRANGE with REV)
1330        let start = ((query.page - 1) * query.per_page) as isize;
1331        let end = start + query.per_page as isize - 1;
1332
1333        if start as u64 >= total {
1334            debug!(relayer_id = %relayer_id, page = %query.page, total = %total, "page is beyond available data");
1335            return Ok(PaginatedResult {
1336                items: vec![],
1337                total,
1338                page: query.page,
1339                per_page: query.per_page,
1340            });
1341        }
1342
1343        // Get page of transaction IDs from sorted set (newest first using ZRANGE with REV)
1344        let page_ids: Vec<String> = redis::cmd("ZRANGE")
1345            .arg(&relayer_sorted_key)
1346            .arg(start)
1347            .arg(end)
1348            .arg("REV")
1349            .query_async(&mut conn)
1350            .await
1351            .map_err(|e| self.map_redis_error(e, "find_by_relayer_id_sorted"))?;
1352
1353        // Release connection before nested call to avoid connection doubling
1354        drop(conn);
1355
1356        let items = self.get_transactions_by_ids(&page_ids).await?;
1357
1358        debug!(relayer_id = %relayer_id, count = %items.results.len(), page = %query.page, "successfully fetched transactions for relayer");
1359
1360        Ok(PaginatedResult {
1361            items: items.results,
1362            total,
1363            page: query.page,
1364            per_page: query.per_page,
1365        })
1366    }
1367
1368    // Unoptimized implementation of find_by_status. Rarely used. find_by_status_paginated is preferred.
1369    async fn find_by_status(
1370        &self,
1371        relayer_id: &str,
1372        statuses: &[TransactionStatus],
1373    ) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
1374        // Ensure all status sorted sets are migrated first (releases connection after each)
1375        for status in statuses {
1376            self.ensure_status_sorted_set(relayer_id, status).await?;
1377        }
1378
1379        // Now get a connection and collect all IDs
1380        let mut conn = self
1381            .get_connection(self.connections.reader(), "find_by_status")
1382            .await?;
1383
1384        let mut all_ids: Vec<String> = Vec::new();
1385        for status in statuses {
1386            // Get IDs from sorted set (already ordered by created_at)
1387            let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
1388            let ids: Vec<String> = redis::cmd("ZRANGE")
1389                .arg(&sorted_key)
1390                .arg(0)
1391                .arg(-1)
1392                .arg("REV") // Newest first
1393                .query_async(&mut conn)
1394                .await
1395                .map_err(|e| self.map_redis_error(e, "find_by_status"))?;
1396
1397            all_ids.extend(ids);
1398        }
1399
1400        // Release connection before nested call to avoid connection doubling
1401        drop(conn);
1402
1403        if all_ids.is_empty() {
1404            return Ok(vec![]);
1405        }
1406
1407        // Remove duplicates (can happen if a transaction is in multiple status sets due to partial failures)
1408        all_ids.sort();
1409        all_ids.dedup();
1410
1411        // Fetch all transactions and sort by created_at (newest first)
1412        let mut transactions = self.get_transactions_by_ids(&all_ids).await?;
1413
1414        // Sort by created_at descending (newest first)
1415        transactions
1416            .results
1417            .sort_by(|a, b| b.created_at.cmp(&a.created_at));
1418
1419        Ok(transactions.results)
1420    }
1421
1422    async fn find_by_status_paginated(
1423        &self,
1424        relayer_id: &str,
1425        statuses: &[TransactionStatus],
1426        query: PaginationQuery,
1427        oldest_first: bool,
1428    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
1429        // Ensure all status sorted sets are migrated first (releases connection after each)
1430        for status in statuses {
1431            self.ensure_status_sorted_set(relayer_id, status).await?;
1432        }
1433
1434        let mut conn = self
1435            .get_connection(self.connections.reader(), "find_by_status_paginated")
1436            .await?;
1437
1438        // For single status, we can paginate directly from the sorted set
1439        if statuses.len() == 1 {
1440            let sorted_key = self.relayer_status_sorted_key(relayer_id, &statuses[0]);
1441
1442            // Get total count
1443            let total: u64 = conn
1444                .zcard(&sorted_key)
1445                .await
1446                .map_err(|e| self.map_redis_error(e, "find_by_status_paginated_count"))?;
1447
1448            if total == 0 {
1449                return Ok(PaginatedResult {
1450                    items: vec![],
1451                    total: 0,
1452                    page: query.page,
1453                    per_page: query.per_page,
1454                });
1455            }
1456
1457            // Calculate pagination bounds
1458            let start = ((query.page.saturating_sub(1)) * query.per_page) as isize;
1459            let end = start + query.per_page as isize - 1;
1460
1461            // Get page of IDs directly from sorted set
1462            // REV = newest first (descending), no REV = oldest first (ascending)
1463            let mut cmd = redis::cmd("ZRANGE");
1464            cmd.arg(&sorted_key).arg(start).arg(end);
1465            if !oldest_first {
1466                cmd.arg("REV");
1467            }
1468            let page_ids: Vec<String> = cmd
1469                .query_async(&mut conn)
1470                .await
1471                .map_err(|e| self.map_redis_error(e, "find_by_status_paginated"))?;
1472
1473            // Release connection before nested call to avoid connection doubling
1474            drop(conn);
1475
1476            let transactions = self.get_transactions_by_ids(&page_ids).await?;
1477
1478            debug!(
1479                relayer_id = %relayer_id,
1480                status = %statuses[0],
1481                total = %total,
1482                page = %query.page,
1483                page_size = %transactions.results.len(),
1484                "fetched paginated transactions by single status"
1485            );
1486
1487            return Ok(PaginatedResult {
1488                items: transactions.results,
1489                total,
1490                page: query.page,
1491                per_page: query.per_page,
1492            });
1493        }
1494
1495        // For multiple statuses, collect all IDs and merge
1496        let mut all_ids: Vec<(String, f64)> = Vec::new();
1497        for status in statuses {
1498            let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
1499
1500            // Get IDs with scores for proper sorting
1501            let ids_with_scores: Vec<(String, f64)> = redis::cmd("ZRANGE")
1502                .arg(&sorted_key)
1503                .arg(0)
1504                .arg(-1)
1505                .arg("WITHSCORES")
1506                .query_async(&mut conn)
1507                .await
1508                .map_err(|e| self.map_redis_error(e, "find_by_status_paginated_multi"))?;
1509
1510            all_ids.extend(ids_with_scores);
1511        }
1512
1513        // Release connection before nested call to avoid connection doubling
1514        drop(conn);
1515
1516        // Remove duplicates (keep highest/lowest score based on sort order)
1517        let mut id_map: std::collections::HashMap<String, f64> = std::collections::HashMap::new();
1518        for (id, score) in all_ids {
1519            id_map
1520                .entry(id)
1521                .and_modify(|s| {
1522                    // For oldest_first, keep the lowest score; otherwise keep highest
1523                    if oldest_first {
1524                        if score < *s {
1525                            *s = score
1526                        }
1527                    } else if score > *s {
1528                        *s = score
1529                    }
1530                })
1531                .or_insert(score);
1532        }
1533
1534        // Sort by score: descending for newest first, ascending for oldest first
1535        let mut sorted_ids: Vec<(String, f64)> = id_map.into_iter().collect();
1536        if oldest_first {
1537            sorted_ids.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
1538        } else {
1539            sorted_ids.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1540        }
1541
1542        let total = sorted_ids.len() as u64;
1543
1544        if total == 0 {
1545            return Ok(PaginatedResult {
1546                items: vec![],
1547                total: 0,
1548                page: query.page,
1549                per_page: query.per_page,
1550            });
1551        }
1552
1553        // Apply pagination
1554        let start = ((query.page.saturating_sub(1)) * query.per_page) as usize;
1555        let page_ids: Vec<String> = sorted_ids
1556            .into_iter()
1557            .skip(start)
1558            .take(query.per_page as usize)
1559            .map(|(id, _)| id)
1560            .collect();
1561
1562        // Fetch only the transactions for this page
1563        let transactions = self.get_transactions_by_ids(&page_ids).await?;
1564
1565        debug!(
1566            relayer_id = %relayer_id,
1567            total = %total,
1568            page = %query.page,
1569            page_size = %transactions.results.len(),
1570            "fetched paginated transactions by status"
1571        );
1572
1573        Ok(PaginatedResult {
1574            items: transactions.results,
1575            total,
1576            page: query.page,
1577            per_page: query.per_page,
1578        })
1579    }
1580
1581    async fn find_by_nonce(
1582        &self,
1583        relayer_id: &str,
1584        nonce: u64,
1585    ) -> Result<Option<TransactionRepoModel>, RepositoryError> {
1586        let mut conn = self
1587            .get_connection(self.connections.reader(), "find_by_nonce")
1588            .await?;
1589        let nonce_key = self.relayer_nonce_key(relayer_id, nonce);
1590
1591        // Get transaction ID with this nonce for this relayer (should be single value)
1592        let tx_id: Option<String> = conn
1593            .get(nonce_key)
1594            .await
1595            .map_err(|e| self.map_redis_error(e, "find_by_nonce"))?;
1596
1597        match tx_id {
1598            Some(tx_id) => {
1599                match self.get_by_id(tx_id.clone()).await {
1600                    Ok(tx) => Ok(Some(tx)),
1601                    Err(RepositoryError::NotFound(_)) => {
1602                        // Transaction was deleted but index wasn't cleaned up
1603                        warn!(relayer_id = %relayer_id, nonce = %nonce, "stale nonce index found for relayer");
1604                        Ok(None)
1605                    }
1606                    Err(e) => Err(e),
1607                }
1608            }
1609            None => Ok(None),
1610        }
1611    }
1612
1613    async fn update_status(
1614        &self,
1615        tx_id: String,
1616        status: TransactionStatus,
1617    ) -> Result<TransactionRepoModel, RepositoryError> {
1618        let update = TransactionUpdateRequest {
1619            status: Some(status),
1620            ..Default::default()
1621        };
1622        self.partial_update(tx_id, update).await
1623    }
1624
1625    async fn partial_update(
1626        &self,
1627        tx_id: String,
1628        update: TransactionUpdateRequest,
1629    ) -> Result<TransactionRepoModel, RepositoryError> {
1630        // Serialize only the non-None fields as a JSON patch.
1631        let patch_json = serde_json::to_string(&update).map_err(|e| {
1632            RepositoryError::InvalidData(format!("Failed to serialize update patch: {e}"))
1633        })?;
1634
1635        // If the update sets a final status, compute delete_at in Rust (depends on server config)
1636        // and include it in the patch so the Lua script applies it atomically.
1637        let delete_at_value = if let Some(ref status) = update.status {
1638            if FINAL_TRANSACTION_STATUSES.contains(status) {
1639                let expiration_hours = ServerConfig::get_transaction_expiration_hours();
1640                let seconds = (expiration_hours * 3600.0) as i64;
1641                let delete_time = Utc::now() + chrono::Duration::seconds(seconds);
1642                Some(delete_time.to_rfc3339())
1643            } else {
1644                None
1645            }
1646        } else {
1647            None
1648        };
1649        let delete_at_arg = delete_at_value.as_deref().unwrap_or("");
1650
1651        let (lookup_key, key_prefix, key_suffix) = self.tx_key_parts(&tx_id);
1652
1653        // Lua script: atomically applies a JSON patch to the stored transaction.
1654        // Guards: rejects status changes on already-finalized transactions.
1655        // Returns a two-element array {old_json, new_json} so Rust has the full
1656        // pre-update state for index cleanup and metrics.
1657        // Returns false if tx not found.
1658        let patch_script = Script::new(
1659            r#"
1660            local relayer_id = redis.call('GET', KEYS[1])
1661            if not relayer_id then return false end
1662
1663            local tx_key = ARGV[1] .. relayer_id .. ARGV[2]
1664            local current = redis.call('GET', tx_key)
1665            if not current then return false end
1666
1667            local tx = cjson.decode(current)
1668            local patch = cjson.decode(ARGV[3])
1669
1670            -- Guard: reject status changes on finalized transactions.
1671            -- A stale worker must not resurrect a tx that another worker
1672            -- already moved to a terminal state.
1673            local final_states = {confirmed=true, failed=true, expired=true, canceled=true}
1674            if final_states[tx["status"]] and patch["status"] then
1675                return {current, current}
1676            end
1677
1678            local old_snapshot = current
1679
1680            -- lua-cjson cannot distinguish empty Lua tables from empty
1681            -- arrays, so a decode/encode round-trip turns [] into {}.
1682            -- Record which keys held [] in the stored doc and the patch
1683            -- so we can restore them after cjson.encode.
1684            -- NOTE: this relies on each array-typed field having a unique key
1685            -- name across the entire JSON document (including nested objects).
1686            -- If the model ever introduces duplicate key names at different
1687            -- nesting levels (e.g. metadata.hashes), the gsub below could
1688            -- restore the wrong occurrence.
1689            local empty_arrs = {}
1690            for k in string.gmatch(current, '"([^"]+)"%s*:%s*%[%s*%]') do
1691                empty_arrs[k] = true
1692            end
1693            for k in string.gmatch(ARGV[3], '"([^"]+)"%s*:%s*%[%s*%]') do
1694                empty_arrs[k] = true
1695            end
1696
1697            for k, v in pairs(patch) do
1698                tx[k] = v
1699            end
1700
1701            -- Apply delete_at if transitioning to a final state and not already set
1702            if ARGV[4] ~= '' and (not tx["delete_at"] or tx["delete_at"] == cjson.null) then
1703                tx["delete_at"] = ARGV[4]
1704            end
1705
1706            local updated = cjson.encode(tx)
1707
1708            -- Restore empty arrays that cjson.encode converted to {}
1709            for k, _ in pairs(empty_arrs) do
1710                updated = string.gsub(
1711                    updated, '"'..k..'"%s*:%s*{}', '"'..k..'":[]', 1
1712                )
1713            end
1714
1715            redis.call('SET', tx_key, updated)
1716            return {old_snapshot, updated}
1717            "#,
1718        );
1719
1720        let result: Option<Vec<String>> = self
1721            .run_script_with_retry_vec(
1722                &patch_script,
1723                &lookup_key,
1724                &key_prefix,
1725                &key_suffix,
1726                &[&patch_json, delete_at_arg],
1727                "partial_update",
1728            )
1729            .await?;
1730
1731        let parts = result.ok_or_else(|| {
1732            RepositoryError::NotFound(format!("Transaction with ID {tx_id} not found"))
1733        })?;
1734
1735        if parts.len() != 2 {
1736            return Err(RepositoryError::UnexpectedError(format!(
1737                "partial_update script returned {} elements, expected 2",
1738                parts.len()
1739            )));
1740        }
1741
1742        let old_json = &parts[0];
1743        let new_json = &parts[1];
1744
1745        let original_tx =
1746            self.deserialize_entity::<TransactionRepoModel>(old_json, &tx_id, "transaction")?;
1747        let updated_tx =
1748            self.deserialize_entity::<TransactionRepoModel>(new_json, &tx_id, "transaction")?;
1749
1750        // Update indexes using the full pre-update state (status, network_data, nonce, etc.)
1751        self.update_indexes(&updated_tx, Some(&original_tx)).await?;
1752
1753        debug!(tx_id = %tx_id, "successfully updated transaction via patch");
1754
1755        // Track metrics only when the persisted status actually changed.
1756        // The Lua script may silently reject a status patch on already-final
1757        // transactions, so we compare the deserialized before/after states.
1758        if original_tx.status != updated_tx.status {
1759            self.track_status_change_metrics(
1760                &original_tx,
1761                &updated_tx,
1762                &original_tx.status,
1763                &updated_tx.status,
1764            );
1765        }
1766
1767        Ok(updated_tx)
1768    }
1769
1770    async fn update_network_data(
1771        &self,
1772        tx_id: String,
1773        network_data: NetworkTransactionData,
1774    ) -> Result<TransactionRepoModel, RepositoryError> {
1775        let update = TransactionUpdateRequest {
1776            network_data: Some(network_data),
1777            ..Default::default()
1778        };
1779        self.partial_update(tx_id, update).await
1780    }
1781
1782    async fn set_sent_at(
1783        &self,
1784        tx_id: String,
1785        sent_at: String,
1786    ) -> Result<TransactionRepoModel, RepositoryError> {
1787        let update = TransactionUpdateRequest {
1788            sent_at: Some(sent_at),
1789            ..Default::default()
1790        };
1791        self.partial_update(tx_id, update).await
1792    }
1793
1794    async fn increment_status_check_failures(
1795        &self,
1796        tx_id: String,
1797    ) -> Result<TransactionRepoModel, RepositoryError> {
1798        self.run_atomic_script(
1799            r#"
1800            local function set_obj(json, key, tbl)
1801                local enc = cjson.encode(tbl)
1802                local r, n = string.gsub(json, '"'..key..'"%s*:%s*%b{}', '"'..key..'":'..enc, 1)
1803                if n > 0 then return r end
1804                r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
1805                if n > 0 then return r end
1806                return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
1807            end
1808
1809            local relayer_id = redis.call('GET', KEYS[1])
1810            if not relayer_id then return false end
1811
1812            local tx_key = ARGV[1] .. relayer_id .. ARGV[2]
1813            local current = redis.call('GET', tx_key)
1814            if not current then return false end
1815
1816            local tx = cjson.decode(current)
1817            local final_states = {confirmed=true, failed=true, expired=true, canceled=true}
1818            if final_states[tx["status"]] then return current end
1819
1820            local metadata = tx["metadata"]
1821            if type(metadata) ~= 'table' then metadata = {} end
1822            metadata["consecutive_failures"] = (metadata["consecutive_failures"] or 0) + 1
1823            metadata["total_failures"] = (metadata["total_failures"] or 0) + 1
1824
1825            local updated = set_obj(current, "metadata", metadata)
1826            redis.call('SET', tx_key, updated)
1827            return updated
1828            "#,
1829            &tx_id,
1830            &[],
1831            "increment_status_check_failures",
1832        )
1833        .await
1834    }
1835
1836    async fn reset_status_check_consecutive_failures(
1837        &self,
1838        tx_id: String,
1839    ) -> Result<TransactionRepoModel, RepositoryError> {
1840        self.run_atomic_script(
1841            r#"
1842            local function set_obj(json, key, tbl)
1843                local enc = cjson.encode(tbl)
1844                local r, n = string.gsub(json, '"'..key..'"%s*:%s*%b{}', '"'..key..'":'..enc, 1)
1845                if n > 0 then return r end
1846                r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
1847                if n > 0 then return r end
1848                return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
1849            end
1850
1851            local relayer_id = redis.call('GET', KEYS[1])
1852            if not relayer_id then return false end
1853
1854            local tx_key = ARGV[1] .. relayer_id .. ARGV[2]
1855            local current = redis.call('GET', tx_key)
1856            if not current then return false end
1857
1858            local tx = cjson.decode(current)
1859            local final_states = {confirmed=true, failed=true, expired=true, canceled=true}
1860            if final_states[tx["status"]] then return current end
1861
1862            local metadata = tx["metadata"]
1863            if type(metadata) ~= 'table' then metadata = {} end
1864            metadata["consecutive_failures"] = 0
1865
1866            local updated = set_obj(current, "metadata", metadata)
1867            redis.call('SET', tx_key, updated)
1868            return updated
1869            "#,
1870            &tx_id,
1871            &[],
1872            "reset_status_check_consecutive_failures",
1873        )
1874        .await
1875    }
1876
1877    async fn record_stellar_insufficient_fee_retry(
1878        &self,
1879        tx_id: String,
1880        sent_at: String,
1881    ) -> Result<TransactionRepoModel, RepositoryError> {
1882        self.run_atomic_script(
1883            r#"
1884            local function set_str(json, key, val)
1885                local enc = cjson.encode(val)
1886                local r, n = string.gsub(json, '"'..key..'"%s*:%s*"[^"]*"', '"'..key..'":'..enc, 1)
1887                if n > 0 then return r end
1888                r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
1889                if n > 0 then return r end
1890                return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
1891            end
1892            local function set_obj(json, key, tbl)
1893                local enc = cjson.encode(tbl)
1894                local r, n = string.gsub(json, '"'..key..'"%s*:%s*%b{}', '"'..key..'":'..enc, 1)
1895                if n > 0 then return r end
1896                r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
1897                if n > 0 then return r end
1898                return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
1899            end
1900
1901            local relayer_id = redis.call('GET', KEYS[1])
1902            if not relayer_id then return false end
1903
1904            local tx_key = ARGV[1] .. relayer_id .. ARGV[2]
1905            local current = redis.call('GET', tx_key)
1906            if not current then return false end
1907
1908            local tx = cjson.decode(current)
1909            local final_states = {confirmed=true, failed=true, expired=true, canceled=true}
1910            if final_states[tx["status"]] then return current end
1911
1912            local metadata = tx["metadata"]
1913            if type(metadata) ~= 'table' then metadata = {} end
1914            metadata["insufficient_fee_retries"] = (metadata["insufficient_fee_retries"] or 0) + 1
1915
1916            local updated = set_str(current, "sent_at", ARGV[3])
1917            updated = set_obj(updated, "metadata", metadata)
1918            redis.call('SET', tx_key, updated)
1919            return updated
1920            "#,
1921            &tx_id,
1922            &[&sent_at],
1923            "record_stellar_insufficient_fee_retry",
1924        )
1925        .await
1926    }
1927
1928    async fn record_stellar_try_again_later_retry(
1929        &self,
1930        tx_id: String,
1931        sent_at: String,
1932    ) -> Result<TransactionRepoModel, RepositoryError> {
1933        self.run_atomic_script(
1934            r#"
1935            local function set_str(json, key, val)
1936                local enc = cjson.encode(val)
1937                local r, n = string.gsub(json, '"'..key..'"%s*:%s*"[^"]*"', '"'..key..'":'..enc, 1)
1938                if n > 0 then return r end
1939                r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
1940                if n > 0 then return r end
1941                return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
1942            end
1943            local function set_obj(json, key, tbl)
1944                local enc = cjson.encode(tbl)
1945                local r, n = string.gsub(json, '"'..key..'"%s*:%s*%b{}', '"'..key..'":'..enc, 1)
1946                if n > 0 then return r end
1947                r, n = string.gsub(json, '"'..key..'"%s*:%s*null', '"'..key..'":'..enc, 1)
1948                if n > 0 then return r end
1949                return string.gsub(json, '}%s*$', ',"'..key..'":'..enc..'}', 1)
1950            end
1951
1952            local relayer_id = redis.call('GET', KEYS[1])
1953            if not relayer_id then return false end
1954
1955            local tx_key = ARGV[1] .. relayer_id .. ARGV[2]
1956            local current = redis.call('GET', tx_key)
1957            if not current then return false end
1958
1959            local tx = cjson.decode(current)
1960            local final_states = {confirmed=true, failed=true, expired=true, canceled=true}
1961            if final_states[tx["status"]] then return current end
1962
1963            local metadata = tx["metadata"]
1964            if type(metadata) ~= 'table' then metadata = {} end
1965            metadata["try_again_later_retries"] = (metadata["try_again_later_retries"] or 0) + 1
1966
1967            local updated = set_str(current, "sent_at", ARGV[3])
1968            updated = set_obj(updated, "metadata", metadata)
1969            redis.call('SET', tx_key, updated)
1970            return updated
1971            "#,
1972            &tx_id,
1973            &[&sent_at],
1974            "record_stellar_try_again_later_retry",
1975        )
1976        .await
1977    }
1978
1979    async fn set_confirmed_at(
1980        &self,
1981        tx_id: String,
1982        confirmed_at: String,
1983    ) -> Result<TransactionRepoModel, RepositoryError> {
1984        let update = TransactionUpdateRequest {
1985            confirmed_at: Some(confirmed_at),
1986            ..Default::default()
1987        };
1988        self.partial_update(tx_id, update).await
1989    }
1990
1991    /// Count transactions by status using Redis ZCARD (O(1) per sorted set).
1992    /// Much more efficient than find_by_status when you only need the count.
1993    /// Triggers migration from legacy SETs if needed.
1994    async fn count_by_status(
1995        &self,
1996        relayer_id: &str,
1997        statuses: &[TransactionStatus],
1998    ) -> Result<u64, RepositoryError> {
1999        let mut conn = self
2000            .get_connection(self.connections.reader(), "count_by_status")
2001            .await?;
2002        let mut total_count: u64 = 0;
2003
2004        for status in statuses {
2005            // Ensure sorted set is migrated
2006            self.ensure_status_sorted_set(relayer_id, status).await?;
2007
2008            let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
2009            let count: u64 = conn
2010                .zcard(&sorted_key)
2011                .await
2012                .map_err(|e| self.map_redis_error(e, "count_by_status"))?;
2013            total_count += count;
2014        }
2015
2016        debug!(relayer_id = %relayer_id, count = %total_count, "counted transactions by status");
2017        Ok(total_count)
2018    }
2019
2020    async fn delete_by_ids(&self, ids: Vec<String>) -> Result<BatchDeleteResult, RepositoryError> {
2021        if ids.is_empty() {
2022            debug!("no transaction IDs provided for batch delete");
2023            return Ok(BatchDeleteResult::default());
2024        }
2025
2026        debug!(count = %ids.len(), "batch deleting transactions by IDs (with fetch)");
2027
2028        // Fetch transactions to get their data for index cleanup
2029        let batch_result = self.get_transactions_by_ids(&ids).await?;
2030
2031        // Convert to delete requests
2032        let requests: Vec<TransactionDeleteRequest> = batch_result
2033            .results
2034            .iter()
2035            .map(|tx| TransactionDeleteRequest {
2036                id: tx.id.clone(),
2037                relayer_id: tx.relayer_id.clone(),
2038                nonce: self.extract_nonce(&tx.network_data),
2039            })
2040            .collect();
2041
2042        // Track IDs that weren't found
2043        let mut result = self.delete_by_requests(requests).await?;
2044
2045        // Add the IDs that weren't found during fetch
2046        for id in batch_result.failed_ids {
2047            result
2048                .failed
2049                .push((id.clone(), format!("Transaction with ID {id} not found")));
2050        }
2051
2052        Ok(result)
2053    }
2054
2055    async fn delete_by_requests(
2056        &self,
2057        requests: Vec<TransactionDeleteRequest>,
2058    ) -> Result<BatchDeleteResult, RepositoryError> {
2059        if requests.is_empty() {
2060            debug!("no delete requests provided for batch delete");
2061            return Ok(BatchDeleteResult::default());
2062        }
2063
2064        debug!(count = %requests.len(), "batch deleting transactions by requests (no fetch)");
2065        let mut conn = self
2066            .get_connection(self.connections.primary(), "batch_delete_no_fetch")
2067            .await?;
2068        let mut pipe = redis::pipe();
2069        pipe.atomic();
2070
2071        // All possible statuses for index cleanup
2072        let all_statuses = [
2073            TransactionStatus::Canceled,
2074            TransactionStatus::Pending,
2075            TransactionStatus::Sent,
2076            TransactionStatus::Submitted,
2077            TransactionStatus::Mined,
2078            TransactionStatus::Confirmed,
2079            TransactionStatus::Failed,
2080            TransactionStatus::Expired,
2081        ];
2082
2083        // Build pipeline for all deletions and index removals
2084        for req in &requests {
2085            // Delete transaction data
2086            let tx_key = self.tx_key(&req.relayer_id, &req.id);
2087            pipe.del(&tx_key);
2088
2089            // Delete reverse lookup
2090            let reverse_key = self.tx_to_relayer_key(&req.id);
2091            pipe.del(&reverse_key);
2092
2093            // Remove from all possible status indexes
2094            for status in &all_statuses {
2095                let status_sorted_key = self.relayer_status_sorted_key(&req.relayer_id, status);
2096                pipe.zrem(&status_sorted_key, &req.id);
2097
2098                let status_legacy_key = self.relayer_status_key(&req.relayer_id, status);
2099                pipe.srem(&status_legacy_key, &req.id);
2100            }
2101
2102            // Remove nonce index if exists
2103            if let Some(nonce) = req.nonce {
2104                let nonce_key = self.relayer_nonce_key(&req.relayer_id, nonce);
2105                pipe.del(&nonce_key);
2106            }
2107
2108            // Remove from per-relayer sorted set by created_at
2109            let relayer_sorted_key = self.relayer_tx_by_created_at_key(&req.relayer_id);
2110            pipe.zrem(&relayer_sorted_key, &req.id);
2111        }
2112
2113        // Execute the entire pipeline in one round-trip
2114        match pipe.exec_async(&mut conn).await {
2115            Ok(_) => {
2116                let deleted_count = requests.len();
2117                debug!(
2118                    deleted_count = %deleted_count,
2119                    "batch delete completed"
2120                );
2121                Ok(BatchDeleteResult {
2122                    deleted_count,
2123                    failed: vec![],
2124                })
2125            }
2126            Err(e) => {
2127                error!(error = %e, "batch delete pipeline failed");
2128                // Mark all requests as failed
2129                let failed: Vec<(String, String)> = requests
2130                    .iter()
2131                    .map(|req| (req.id.clone(), format!("Redis pipeline error: {e}")))
2132                    .collect();
2133                Ok(BatchDeleteResult {
2134                    deleted_count: 0,
2135                    failed,
2136                })
2137            }
2138        }
2139    }
2140}
2141
2142#[cfg(test)]
2143mod tests {
2144    use super::*;
2145    use crate::models::{evm::Speed, EvmTransactionData, NetworkType};
2146    use alloy::primitives::U256;
2147    use deadpool_redis::{Config, Runtime};
2148    use lazy_static::lazy_static;
2149    use std::str::FromStr;
2150    use tokio;
2151    use uuid::Uuid;
2152
2153    use tokio::sync::Mutex;
2154
2155    // Use a mutex to ensure tests don't run in parallel when modifying env vars
2156    lazy_static! {
2157        static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
2158    }
2159
2160    // Helper function to create test transactions
2161    fn create_test_transaction(id: &str) -> TransactionRepoModel {
2162        TransactionRepoModel {
2163            id: id.to_string(),
2164            relayer_id: "relayer-1".to_string(),
2165            status: TransactionStatus::Pending,
2166            status_reason: None,
2167            created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
2168            sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
2169            confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
2170            valid_until: None,
2171            delete_at: None,
2172            network_type: NetworkType::Evm,
2173            priced_at: None,
2174            hashes: vec![],
2175            network_data: NetworkTransactionData::Evm(EvmTransactionData {
2176                gas_price: Some(1000000000),
2177                gas_limit: Some(21000),
2178                nonce: Some(1),
2179                value: U256::from_str("1000000000000000000").unwrap(),
2180                data: Some("0x".to_string()),
2181                from: "0xSender".to_string(),
2182                to: Some("0xRecipient".to_string()),
2183                chain_id: 1,
2184                signature: None,
2185                hash: Some(format!("0x{id}")),
2186                speed: Some(Speed::Fast),
2187                max_fee_per_gas: None,
2188                max_priority_fee_per_gas: None,
2189                raw: None,
2190            }),
2191            noop_count: None,
2192            is_canceled: Some(false),
2193            metadata: None,
2194        }
2195    }
2196
2197    fn create_test_transaction_with_relayer(id: &str, relayer_id: &str) -> TransactionRepoModel {
2198        let mut tx = create_test_transaction(id);
2199        tx.relayer_id = relayer_id.to_string();
2200        tx
2201    }
2202
2203    fn create_test_transaction_with_status(
2204        id: &str,
2205        relayer_id: &str,
2206        status: TransactionStatus,
2207    ) -> TransactionRepoModel {
2208        let mut tx = create_test_transaction_with_relayer(id, relayer_id);
2209        tx.status = status;
2210        tx
2211    }
2212
2213    fn create_test_transaction_with_nonce(
2214        id: &str,
2215        nonce: u64,
2216        relayer_id: &str,
2217    ) -> TransactionRepoModel {
2218        let mut tx = create_test_transaction_with_relayer(id, relayer_id);
2219        if let NetworkTransactionData::Evm(ref mut evm_data) = tx.network_data {
2220            evm_data.nonce = Some(nonce);
2221        }
2222        tx
2223    }
2224
2225    async fn setup_test_repo() -> RedisTransactionRepository {
2226        // Use a mock Redis URL - in real integration tests, this would connect to a test Redis instance
2227        let redis_url = std::env::var("REDIS_TEST_URL")
2228            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
2229
2230        let cfg = Config::from_url(&redis_url);
2231        let pool = Arc::new(
2232            cfg.builder()
2233                .expect("Failed to create pool builder")
2234                .max_size(16)
2235                .runtime(Runtime::Tokio1)
2236                .build()
2237                .expect("Failed to build Redis pool"),
2238        );
2239
2240        // Create RedisConnections with same pool for both primary and reader (for testing)
2241        let connections = Arc::new(RedisConnections::new_single_pool(pool));
2242
2243        let random_id = Uuid::new_v4().to_string();
2244        let key_prefix = format!("test_prefix:{random_id}");
2245
2246        RedisTransactionRepository::new(connections, key_prefix)
2247            .expect("Failed to create RedisTransactionRepository")
2248    }
2249
2250    #[tokio::test]
2251    #[ignore = "Requires active Redis instance"]
2252    async fn test_new_repository_creation() {
2253        let repo = setup_test_repo().await;
2254        assert!(repo.key_prefix.contains("test_prefix"));
2255    }
2256
2257    #[tokio::test]
2258    #[ignore = "Requires active Redis instance"]
2259    async fn test_new_repository_empty_prefix_fails() {
2260        let redis_url = std::env::var("REDIS_TEST_URL")
2261            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
2262        let cfg = Config::from_url(&redis_url);
2263        let pool = Arc::new(
2264            cfg.builder()
2265                .expect("Failed to create pool builder")
2266                .max_size(16)
2267                .runtime(Runtime::Tokio1)
2268                .build()
2269                .expect("Failed to build Redis pool"),
2270        );
2271        let connections = Arc::new(RedisConnections::new_single_pool(pool));
2272
2273        let result = RedisTransactionRepository::new(connections, "".to_string());
2274        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
2275    }
2276
2277    #[tokio::test]
2278    #[ignore = "Requires active Redis instance"]
2279    async fn test_key_generation() {
2280        let repo = setup_test_repo().await;
2281
2282        assert!(repo
2283            .tx_key("relayer-1", "test-id")
2284            .contains(":relayer:relayer-1:tx:test-id"));
2285        assert!(repo
2286            .tx_to_relayer_key("test-id")
2287            .contains(":relayer:tx_to_relayer:test-id"));
2288        assert!(repo.relayer_list_key().contains(":relayer_list"));
2289        assert!(repo
2290            .relayer_status_key("relayer-1", &TransactionStatus::Pending)
2291            .contains(":relayer:relayer-1:status:Pending"));
2292        assert!(repo
2293            .relayer_nonce_key("relayer-1", 42)
2294            .contains(":relayer:relayer-1:nonce:42"));
2295    }
2296
2297    #[tokio::test]
2298    #[ignore = "Requires active Redis instance"]
2299    async fn test_serialize_deserialize_transaction() {
2300        let repo = setup_test_repo().await;
2301        let tx = create_test_transaction("test-1");
2302
2303        let serialized = repo
2304            .serialize_entity(&tx, |t| &t.id, "transaction")
2305            .expect("Serialization should succeed");
2306        let deserialized: TransactionRepoModel = repo
2307            .deserialize_entity(&serialized, "test-1", "transaction")
2308            .expect("Deserialization should succeed");
2309
2310        assert_eq!(tx.id, deserialized.id);
2311        assert_eq!(tx.relayer_id, deserialized.relayer_id);
2312        assert_eq!(tx.status, deserialized.status);
2313    }
2314
2315    #[tokio::test]
2316    #[ignore = "Requires active Redis instance"]
2317    async fn test_extract_nonce() {
2318        let repo = setup_test_repo().await;
2319        let random_id = Uuid::new_v4().to_string();
2320        let relayer_id = Uuid::new_v4().to_string();
2321        let tx_with_nonce = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
2322
2323        let nonce = repo.extract_nonce(&tx_with_nonce.network_data);
2324        assert_eq!(nonce, Some(42));
2325    }
2326
2327    #[tokio::test]
2328    #[ignore = "Requires active Redis instance"]
2329    async fn test_create_transaction() {
2330        let repo = setup_test_repo().await;
2331        let random_id = Uuid::new_v4().to_string();
2332        let tx = create_test_transaction(&random_id);
2333
2334        let result = repo.create(tx.clone()).await.unwrap();
2335        assert_eq!(result.id, tx.id);
2336    }
2337
2338    #[tokio::test]
2339    #[ignore = "Requires active Redis instance"]
2340    async fn test_get_transaction() {
2341        let repo = setup_test_repo().await;
2342        let random_id = Uuid::new_v4().to_string();
2343        let tx = create_test_transaction(&random_id);
2344
2345        repo.create(tx.clone()).await.unwrap();
2346        let stored = repo.get_by_id(random_id.to_string()).await.unwrap();
2347        assert_eq!(stored.id, tx.id);
2348        assert_eq!(stored.relayer_id, tx.relayer_id);
2349    }
2350
2351    #[tokio::test]
2352    #[ignore = "Requires active Redis instance"]
2353    async fn test_update_transaction() {
2354        let repo = setup_test_repo().await;
2355        let random_id = Uuid::new_v4().to_string();
2356        let mut tx = create_test_transaction(&random_id);
2357
2358        repo.create(tx.clone()).await.unwrap();
2359        tx.status = TransactionStatus::Confirmed;
2360
2361        let updated = repo.update(random_id.to_string(), tx).await.unwrap();
2362        assert!(matches!(updated.status, TransactionStatus::Confirmed));
2363    }
2364
2365    #[tokio::test]
2366    #[ignore = "Requires active Redis instance"]
2367    async fn test_delete_transaction() {
2368        let repo = setup_test_repo().await;
2369        let random_id = Uuid::new_v4().to_string();
2370        let tx = create_test_transaction(&random_id);
2371
2372        repo.create(tx).await.unwrap();
2373        repo.delete_by_id(random_id.to_string()).await.unwrap();
2374
2375        let result = repo.get_by_id(random_id.to_string()).await;
2376        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
2377    }
2378
2379    #[tokio::test]
2380    #[ignore = "Requires active Redis instance"]
2381    async fn test_list_all_transactions() {
2382        let repo = setup_test_repo().await;
2383        let random_id = Uuid::new_v4().to_string();
2384        let random_id2 = Uuid::new_v4().to_string();
2385
2386        let tx1 = create_test_transaction(&random_id);
2387        let tx2 = create_test_transaction(&random_id2);
2388
2389        repo.create(tx1).await.unwrap();
2390        repo.create(tx2).await.unwrap();
2391
2392        let transactions = repo.list_all().await.unwrap();
2393        assert!(transactions.len() >= 2);
2394    }
2395
2396    #[tokio::test]
2397    #[ignore = "Requires active Redis instance"]
2398    async fn test_count_transactions() {
2399        let repo = setup_test_repo().await;
2400        let random_id = Uuid::new_v4().to_string();
2401        let tx = create_test_transaction(&random_id);
2402
2403        let count = repo.count().await.unwrap();
2404        repo.create(tx).await.unwrap();
2405        assert!(repo.count().await.unwrap() > count);
2406    }
2407
2408    #[tokio::test]
2409    #[ignore = "Requires active Redis instance"]
2410    async fn test_get_nonexistent_transaction() {
2411        let repo = setup_test_repo().await;
2412        let result = repo.get_by_id("nonexistent".to_string()).await;
2413        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
2414    }
2415
2416    #[tokio::test]
2417    #[ignore = "Requires active Redis instance"]
2418    async fn test_duplicate_transaction_creation() {
2419        let repo = setup_test_repo().await;
2420        let random_id = Uuid::new_v4().to_string();
2421
2422        let tx = create_test_transaction(&random_id);
2423
2424        repo.create(tx.clone()).await.unwrap();
2425        let result = repo.create(tx).await;
2426
2427        assert!(matches!(
2428            result,
2429            Err(RepositoryError::ConstraintViolation(_))
2430        ));
2431    }
2432
2433    #[tokio::test]
2434    #[ignore = "Requires active Redis instance"]
2435    async fn test_update_nonexistent_transaction() {
2436        let repo = setup_test_repo().await;
2437        let tx = create_test_transaction("test-1");
2438
2439        let result = repo.update("nonexistent".to_string(), tx).await;
2440        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
2441    }
2442
2443    #[tokio::test]
2444    #[ignore = "Requires active Redis instance"]
2445    async fn test_list_paginated() {
2446        let repo = setup_test_repo().await;
2447
2448        // Create multiple transactions
2449        for _ in 1..=10 {
2450            let random_id = Uuid::new_v4().to_string();
2451            let tx = create_test_transaction(&random_id);
2452            repo.create(tx).await.unwrap();
2453        }
2454
2455        // Test first page with 3 items per page
2456        let query = PaginationQuery {
2457            page: 1,
2458            per_page: 3,
2459        };
2460        let result = repo.list_paginated(query).await.unwrap();
2461        assert_eq!(result.items.len(), 3);
2462        assert!(result.total >= 10);
2463        assert_eq!(result.page, 1);
2464        assert_eq!(result.per_page, 3);
2465
2466        // Test empty page (beyond total items)
2467        let query = PaginationQuery {
2468            page: 1000,
2469            per_page: 3,
2470        };
2471        let result = repo.list_paginated(query).await.unwrap();
2472        assert_eq!(result.items.len(), 0);
2473    }
2474
2475    #[tokio::test]
2476    #[ignore = "Requires active Redis instance"]
2477    async fn test_find_by_relayer_id() {
2478        let repo = setup_test_repo().await;
2479        let random_id = Uuid::new_v4().to_string();
2480        let random_id2 = Uuid::new_v4().to_string();
2481        let random_id3 = Uuid::new_v4().to_string();
2482
2483        let tx1 = create_test_transaction_with_relayer(&random_id, "relayer-1");
2484        let tx2 = create_test_transaction_with_relayer(&random_id2, "relayer-1");
2485        let tx3 = create_test_transaction_with_relayer(&random_id3, "relayer-2");
2486
2487        repo.create(tx1).await.unwrap();
2488        repo.create(tx2).await.unwrap();
2489        repo.create(tx3).await.unwrap();
2490
2491        // Test finding transactions for relayer-1
2492        let query = PaginationQuery {
2493            page: 1,
2494            per_page: 10,
2495        };
2496        let result = repo
2497            .find_by_relayer_id("relayer-1", query.clone())
2498            .await
2499            .unwrap();
2500        assert!(result.total >= 2);
2501        assert!(result.items.len() >= 2);
2502        assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-1"));
2503
2504        // Test finding transactions for relayer-2
2505        let result = repo
2506            .find_by_relayer_id("relayer-2", query.clone())
2507            .await
2508            .unwrap();
2509        assert!(result.total >= 1);
2510        assert!(!result.items.is_empty());
2511        assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-2"));
2512
2513        // Test finding transactions for non-existent relayer
2514        let result = repo
2515            .find_by_relayer_id("non-existent", query.clone())
2516            .await
2517            .unwrap();
2518        assert_eq!(result.total, 0);
2519        assert_eq!(result.items.len(), 0);
2520    }
2521
2522    #[tokio::test]
2523    #[ignore = "Requires active Redis instance"]
2524    async fn test_find_by_relayer_id_sorted_by_created_at_newest_first() {
2525        let repo = setup_test_repo().await;
2526        let relayer_id = Uuid::new_v4().to_string();
2527
2528        // Create transactions with different created_at timestamps
2529        let mut tx1 = create_test_transaction_with_relayer("test-1", &relayer_id);
2530        tx1.created_at = "2025-01-27T10:00:00.000000+00:00".to_string(); // Oldest
2531
2532        let mut tx2 = create_test_transaction_with_relayer("test-2", &relayer_id);
2533        tx2.created_at = "2025-01-27T12:00:00.000000+00:00".to_string(); // Middle
2534
2535        let mut tx3 = create_test_transaction_with_relayer("test-3", &relayer_id);
2536        tx3.created_at = "2025-01-27T14:00:00.000000+00:00".to_string(); // Newest
2537
2538        // Create transactions in non-chronological order to ensure sorting works
2539        repo.create(tx2.clone()).await.unwrap(); // Middle first
2540        repo.create(tx1.clone()).await.unwrap(); // Oldest second
2541        repo.create(tx3.clone()).await.unwrap(); // Newest last
2542
2543        let query = PaginationQuery {
2544            page: 1,
2545            per_page: 10,
2546        };
2547        let result = repo.find_by_relayer_id(&relayer_id, query).await.unwrap();
2548
2549        assert_eq!(result.total, 3);
2550        assert_eq!(result.items.len(), 3);
2551
2552        // Verify transactions are sorted by created_at descending (newest first)
2553        assert_eq!(
2554            result.items[0].id, "test-3",
2555            "First item should be newest (test-3)"
2556        );
2557        assert_eq!(
2558            result.items[0].created_at,
2559            "2025-01-27T14:00:00.000000+00:00"
2560        );
2561
2562        assert_eq!(
2563            result.items[1].id, "test-2",
2564            "Second item should be middle (test-2)"
2565        );
2566        assert_eq!(
2567            result.items[1].created_at,
2568            "2025-01-27T12:00:00.000000+00:00"
2569        );
2570
2571        assert_eq!(
2572            result.items[2].id, "test-1",
2573            "Third item should be oldest (test-1)"
2574        );
2575        assert_eq!(
2576            result.items[2].created_at,
2577            "2025-01-27T10:00:00.000000+00:00"
2578        );
2579    }
2580
2581    #[tokio::test]
2582    #[ignore = "Requires active Redis instance"]
2583    async fn test_find_by_status() {
2584        let repo = setup_test_repo().await;
2585        let random_id = Uuid::new_v4().to_string();
2586        let random_id2 = Uuid::new_v4().to_string();
2587        let random_id3 = Uuid::new_v4().to_string();
2588        let relayer_id = Uuid::new_v4().to_string();
2589        let tx1 = create_test_transaction_with_status(
2590            &random_id,
2591            &relayer_id,
2592            TransactionStatus::Pending,
2593        );
2594        let tx2 =
2595            create_test_transaction_with_status(&random_id2, &relayer_id, TransactionStatus::Sent);
2596        let tx3 = create_test_transaction_with_status(
2597            &random_id3,
2598            &relayer_id,
2599            TransactionStatus::Confirmed,
2600        );
2601
2602        repo.create(tx1).await.unwrap();
2603        repo.create(tx2).await.unwrap();
2604        repo.create(tx3).await.unwrap();
2605
2606        // Test finding pending transactions
2607        let result = repo
2608            .find_by_status(&relayer_id, &[TransactionStatus::Pending])
2609            .await
2610            .unwrap();
2611        assert_eq!(result.len(), 1);
2612        assert_eq!(result[0].status, TransactionStatus::Pending);
2613
2614        // Test finding multiple statuses
2615        let result = repo
2616            .find_by_status(
2617                &relayer_id,
2618                &[TransactionStatus::Pending, TransactionStatus::Sent],
2619            )
2620            .await
2621            .unwrap();
2622        assert_eq!(result.len(), 2);
2623
2624        // Test finding non-existent status
2625        let result = repo
2626            .find_by_status(&relayer_id, &[TransactionStatus::Failed])
2627            .await
2628            .unwrap();
2629        assert_eq!(result.len(), 0);
2630    }
2631
2632    #[tokio::test]
2633    #[ignore = "Requires active Redis instance"]
2634    async fn test_find_by_status_paginated() {
2635        let repo = setup_test_repo().await;
2636        let relayer_id = Uuid::new_v4().to_string();
2637
2638        // Create 5 pending transactions with different timestamps
2639        for i in 1..=5 {
2640            let tx_id = Uuid::new_v4().to_string();
2641            let mut tx = create_test_transaction_with_status(
2642                &tx_id,
2643                &relayer_id,
2644                TransactionStatus::Pending,
2645            );
2646            tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
2647            repo.create(tx).await.unwrap();
2648        }
2649
2650        // Create 2 confirmed transactions
2651        for i in 6..=7 {
2652            let tx_id = Uuid::new_v4().to_string();
2653            let mut tx = create_test_transaction_with_status(
2654                &tx_id,
2655                &relayer_id,
2656                TransactionStatus::Confirmed,
2657            );
2658            tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
2659            repo.create(tx).await.unwrap();
2660        }
2661
2662        // Test first page (2 items per page)
2663        let query = PaginationQuery {
2664            page: 1,
2665            per_page: 2,
2666        };
2667        let result = repo
2668            .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2669            .await
2670            .unwrap();
2671
2672        assert_eq!(result.total, 5);
2673        assert_eq!(result.items.len(), 2);
2674        assert_eq!(result.page, 1);
2675        assert_eq!(result.per_page, 2);
2676
2677        // Test second page
2678        let query = PaginationQuery {
2679            page: 2,
2680            per_page: 2,
2681        };
2682        let result = repo
2683            .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2684            .await
2685            .unwrap();
2686
2687        assert_eq!(result.total, 5);
2688        assert_eq!(result.items.len(), 2);
2689        assert_eq!(result.page, 2);
2690
2691        // Test last page (partial)
2692        let query = PaginationQuery {
2693            page: 3,
2694            per_page: 2,
2695        };
2696        let result = repo
2697            .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2698            .await
2699            .unwrap();
2700
2701        assert_eq!(result.total, 5);
2702        assert_eq!(result.items.len(), 1);
2703
2704        // Test multiple statuses
2705        let query = PaginationQuery {
2706            page: 1,
2707            per_page: 10,
2708        };
2709        let result = repo
2710            .find_by_status_paginated(
2711                &relayer_id,
2712                &[TransactionStatus::Pending, TransactionStatus::Confirmed],
2713                query,
2714                false,
2715            )
2716            .await
2717            .unwrap();
2718
2719        assert_eq!(result.total, 7);
2720        assert_eq!(result.items.len(), 7);
2721
2722        // Test empty result
2723        let query = PaginationQuery {
2724            page: 1,
2725            per_page: 10,
2726        };
2727        let result = repo
2728            .find_by_status_paginated(&relayer_id, &[TransactionStatus::Failed], query, false)
2729            .await
2730            .unwrap();
2731
2732        assert_eq!(result.total, 0);
2733        assert_eq!(result.items.len(), 0);
2734    }
2735
2736    #[tokio::test]
2737    #[ignore = "Requires active Redis instance"]
2738    async fn test_find_by_status_paginated_oldest_first() {
2739        let repo = setup_test_repo().await;
2740        let relayer_id = Uuid::new_v4().to_string();
2741
2742        // Create 5 pending transactions with ascending timestamps
2743        for i in 1..=5 {
2744            let tx_id = format!("tx{}-{}", i, Uuid::new_v4());
2745            let mut tx = create_test_transaction(&tx_id);
2746            tx.relayer_id = relayer_id.clone();
2747            tx.status = TransactionStatus::Pending;
2748            tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
2749            repo.create(tx).await.unwrap();
2750        }
2751
2752        // Test oldest_first: true - should return oldest transactions first
2753        let query = PaginationQuery {
2754            page: 1,
2755            per_page: 3,
2756        };
2757        let result = repo
2758            .find_by_status_paginated(
2759                &relayer_id,
2760                &[TransactionStatus::Pending],
2761                query.clone(),
2762                true,
2763            )
2764            .await
2765            .unwrap();
2766
2767        assert_eq!(result.total, 5);
2768        assert_eq!(result.items.len(), 3);
2769        // Verify ordering: oldest first (11:00, 12:00, 13:00)
2770        assert!(
2771            result.items[0].created_at < result.items[1].created_at,
2772            "First item should be older than second"
2773        );
2774        assert!(
2775            result.items[1].created_at < result.items[2].created_at,
2776            "Second item should be older than third"
2777        );
2778
2779        // Contrast with oldest_first: false - should return newest first
2780        let result_newest = repo
2781            .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2782            .await
2783            .unwrap();
2784
2785        assert_eq!(result_newest.items.len(), 3);
2786        // Verify ordering: newest first (15:00, 14:00, 13:00)
2787        assert!(
2788            result_newest.items[0].created_at > result_newest.items[1].created_at,
2789            "First item should be newer than second"
2790        );
2791        assert!(
2792            result_newest.items[1].created_at > result_newest.items[2].created_at,
2793            "Second item should be newer than third"
2794        );
2795    }
2796
2797    #[tokio::test]
2798    #[ignore = "Requires active Redis instance"]
2799    async fn test_find_by_status_paginated_oldest_first_single_item() {
2800        let repo = setup_test_repo().await;
2801        let relayer_id = Uuid::new_v4().to_string();
2802
2803        // Create transactions with specific timestamps
2804        let timestamps = [
2805            "2025-01-27T08:00:00.000000+00:00", // oldest
2806            "2025-01-27T10:00:00.000000+00:00", // middle
2807            "2025-01-27T12:00:00.000000+00:00", // newest
2808        ];
2809
2810        let mut oldest_id = String::new();
2811        let mut newest_id = String::new();
2812
2813        for (i, timestamp) in timestamps.iter().enumerate() {
2814            let tx_id = format!("tx-{}-{}", i, Uuid::new_v4());
2815            if i == 0 {
2816                oldest_id = tx_id.clone();
2817            }
2818            if i == 2 {
2819                newest_id = tx_id.clone();
2820            }
2821            let mut tx = create_test_transaction(&tx_id);
2822            tx.relayer_id = relayer_id.clone();
2823            tx.status = TransactionStatus::Pending;
2824            tx.created_at = timestamp.to_string();
2825            repo.create(tx).await.unwrap();
2826        }
2827
2828        // Request just 1 item with oldest_first: true
2829        let query = PaginationQuery {
2830            page: 1,
2831            per_page: 1,
2832        };
2833        let result = repo
2834            .find_by_status_paginated(
2835                &relayer_id,
2836                &[TransactionStatus::Pending],
2837                query.clone(),
2838                true,
2839            )
2840            .await
2841            .unwrap();
2842
2843        assert_eq!(result.total, 3);
2844        assert_eq!(result.items.len(), 1);
2845        assert_eq!(
2846            result.items[0].id, oldest_id,
2847            "With oldest_first=true and per_page=1, should return the oldest transaction"
2848        );
2849
2850        // Contrast with oldest_first: false
2851        let result = repo
2852            .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2853            .await
2854            .unwrap();
2855
2856        assert_eq!(result.items.len(), 1);
2857        assert_eq!(
2858            result.items[0].id, newest_id,
2859            "With oldest_first=false and per_page=1, should return the newest transaction"
2860        );
2861    }
2862
2863    #[tokio::test]
2864    #[ignore = "Requires active Redis instance"]
2865    async fn test_find_by_nonce() {
2866        let repo = setup_test_repo().await;
2867        let random_id = Uuid::new_v4().to_string();
2868        let random_id2 = Uuid::new_v4().to_string();
2869        let relayer_id = Uuid::new_v4().to_string();
2870
2871        let tx1 = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
2872        let tx2 = create_test_transaction_with_nonce(&random_id2, 43, &relayer_id);
2873
2874        repo.create(tx1.clone()).await.unwrap();
2875        repo.create(tx2).await.unwrap();
2876
2877        // Test finding existing nonce
2878        let result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
2879        assert!(result.is_some());
2880        assert_eq!(result.unwrap().id, random_id);
2881
2882        // Test finding non-existent nonce
2883        let result = repo.find_by_nonce(&relayer_id, 99).await.unwrap();
2884        assert!(result.is_none());
2885
2886        // Test finding nonce for non-existent relayer
2887        let result = repo.find_by_nonce("non-existent", 42).await.unwrap();
2888        assert!(result.is_none());
2889    }
2890
2891    #[tokio::test]
2892    #[ignore = "Requires active Redis instance"]
2893    async fn test_update_status() {
2894        let repo = setup_test_repo().await;
2895        let random_id = Uuid::new_v4().to_string();
2896        let tx = create_test_transaction(&random_id);
2897
2898        repo.create(tx).await.unwrap();
2899        let updated = repo
2900            .update_status(random_id.to_string(), TransactionStatus::Confirmed)
2901            .await
2902            .unwrap();
2903        assert_eq!(updated.status, TransactionStatus::Confirmed);
2904    }
2905
2906    #[tokio::test]
2907    #[ignore = "Requires active Redis instance"]
2908    async fn test_partial_update() {
2909        let repo = setup_test_repo().await;
2910        let random_id = Uuid::new_v4().to_string();
2911        let tx = create_test_transaction(&random_id);
2912
2913        repo.create(tx).await.unwrap();
2914
2915        let update = TransactionUpdateRequest {
2916            status: Some(TransactionStatus::Sent),
2917            status_reason: Some("Transaction sent".to_string()),
2918            sent_at: Some("2025-01-27T16:00:00.000000+00:00".to_string()),
2919            confirmed_at: None,
2920            network_data: None,
2921            hashes: None,
2922            is_canceled: None,
2923            priced_at: None,
2924            noop_count: None,
2925            delete_at: None,
2926            metadata: None,
2927        };
2928
2929        let updated = repo
2930            .partial_update(random_id.to_string(), update)
2931            .await
2932            .unwrap();
2933        assert_eq!(updated.status, TransactionStatus::Sent);
2934        assert_eq!(updated.status_reason, Some("Transaction sent".to_string()));
2935        assert_eq!(
2936            updated.sent_at,
2937            Some("2025-01-27T16:00:00.000000+00:00".to_string())
2938        );
2939    }
2940
2941    #[tokio::test]
2942    #[ignore = "Requires active Redis instance"]
2943    async fn test_set_sent_at() {
2944        let repo = setup_test_repo().await;
2945        let random_id = Uuid::new_v4().to_string();
2946        let tx = create_test_transaction(&random_id);
2947
2948        repo.create(tx).await.unwrap();
2949        let updated = repo
2950            .set_sent_at(
2951                random_id.to_string(),
2952                "2025-01-27T16:00:00.000000+00:00".to_string(),
2953            )
2954            .await
2955            .unwrap();
2956        assert_eq!(
2957            updated.sent_at,
2958            Some("2025-01-27T16:00:00.000000+00:00".to_string())
2959        );
2960    }
2961
2962    #[tokio::test]
2963    #[ignore = "Requires active Redis instance"]
2964    async fn test_set_confirmed_at() {
2965        let repo = setup_test_repo().await;
2966        let random_id = Uuid::new_v4().to_string();
2967        let tx = create_test_transaction(&random_id);
2968
2969        repo.create(tx).await.unwrap();
2970        let updated = repo
2971            .set_confirmed_at(
2972                random_id.to_string(),
2973                "2025-01-27T16:00:00.000000+00:00".to_string(),
2974            )
2975            .await
2976            .unwrap();
2977        assert_eq!(
2978            updated.confirmed_at,
2979            Some("2025-01-27T16:00:00.000000+00:00".to_string())
2980        );
2981    }
2982
2983    #[tokio::test]
2984    #[ignore = "Requires active Redis instance"]
2985    async fn test_update_network_data() {
2986        let repo = setup_test_repo().await;
2987        let random_id = Uuid::new_v4().to_string();
2988        let tx = create_test_transaction(&random_id);
2989
2990        repo.create(tx).await.unwrap();
2991
2992        let new_network_data = NetworkTransactionData::Evm(EvmTransactionData {
2993            gas_price: Some(2000000000),
2994            gas_limit: Some(42000),
2995            nonce: Some(2),
2996            value: U256::from_str("2000000000000000000").unwrap(),
2997            data: Some("0x1234".to_string()),
2998            from: "0xNewSender".to_string(),
2999            to: Some("0xNewRecipient".to_string()),
3000            chain_id: 1,
3001            signature: None,
3002            hash: Some("0xnewhash".to_string()),
3003            speed: Some(Speed::SafeLow),
3004            max_fee_per_gas: None,
3005            max_priority_fee_per_gas: None,
3006            raw: None,
3007        });
3008
3009        let updated = repo
3010            .update_network_data(random_id.to_string(), new_network_data.clone())
3011            .await
3012            .unwrap();
3013        assert_eq!(
3014            updated
3015                .network_data
3016                .get_evm_transaction_data()
3017                .unwrap()
3018                .hash,
3019            new_network_data.get_evm_transaction_data().unwrap().hash
3020        );
3021    }
3022
3023    #[tokio::test]
3024    #[ignore = "Requires active Redis instance"]
3025    async fn test_debug_implementation() {
3026        let repo = setup_test_repo().await;
3027        let debug_str = format!("{repo:?}");
3028        assert!(debug_str.contains("RedisTransactionRepository"));
3029        assert!(debug_str.contains("test_prefix"));
3030    }
3031
3032    #[tokio::test]
3033    #[ignore = "Requires active Redis instance"]
3034    async fn test_error_handling_empty_id() {
3035        let repo = setup_test_repo().await;
3036
3037        let result = repo.get_by_id("".to_string()).await;
3038        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
3039
3040        let result = repo
3041            .update("".to_string(), create_test_transaction("test"))
3042            .await;
3043        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
3044
3045        let result = repo.delete_by_id("".to_string()).await;
3046        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
3047    }
3048
3049    #[tokio::test]
3050    #[ignore = "Requires active Redis instance"]
3051    async fn test_pagination_validation() {
3052        let repo = setup_test_repo().await;
3053
3054        let query = PaginationQuery {
3055            page: 1,
3056            per_page: 0,
3057        };
3058        let result = repo.list_paginated(query).await;
3059        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
3060    }
3061
3062    #[tokio::test]
3063    #[ignore = "Requires active Redis instance"]
3064    async fn test_index_consistency() {
3065        let repo = setup_test_repo().await;
3066        let random_id = Uuid::new_v4().to_string();
3067        let relayer_id = Uuid::new_v4().to_string();
3068        let tx = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
3069
3070        // Create transaction
3071        repo.create(tx.clone()).await.unwrap();
3072
3073        // Verify it can be found by nonce
3074        let found = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
3075        assert!(found.is_some());
3076
3077        // Update the transaction with a new nonce
3078        let mut updated_tx = tx.clone();
3079        if let NetworkTransactionData::Evm(ref mut evm_data) = updated_tx.network_data {
3080            evm_data.nonce = Some(43);
3081        }
3082
3083        repo.update(random_id.to_string(), updated_tx)
3084            .await
3085            .unwrap();
3086
3087        // Verify old nonce index is cleaned up
3088        let old_nonce_result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
3089        assert!(old_nonce_result.is_none());
3090
3091        // Verify new nonce index works
3092        let new_nonce_result = repo.find_by_nonce(&relayer_id, 43).await.unwrap();
3093        assert!(new_nonce_result.is_some());
3094    }
3095
3096    #[tokio::test]
3097    #[ignore = "Requires active Redis instance"]
3098    async fn test_has_entries() {
3099        let repo = setup_test_repo().await;
3100        assert!(!repo.has_entries().await.unwrap());
3101
3102        let tx_id = uuid::Uuid::new_v4().to_string();
3103        let tx = create_test_transaction(&tx_id);
3104        repo.create(tx.clone()).await.unwrap();
3105
3106        assert!(repo.has_entries().await.unwrap());
3107    }
3108
3109    #[tokio::test]
3110    #[ignore = "Requires active Redis instance"]
3111    async fn test_drop_all_entries() {
3112        let repo = setup_test_repo().await;
3113        let tx_id = uuid::Uuid::new_v4().to_string();
3114        let tx = create_test_transaction(&tx_id);
3115        repo.create(tx.clone()).await.unwrap();
3116        assert!(repo.has_entries().await.unwrap());
3117
3118        repo.drop_all_entries().await.unwrap();
3119        assert!(!repo.has_entries().await.unwrap());
3120    }
3121
3122    // Tests for delete_at field setting on final status updates
3123    #[tokio::test]
3124    #[ignore = "Requires active Redis instance"]
3125    async fn test_update_status_sets_delete_at_for_final_statuses() {
3126        let _lock = ENV_MUTEX.lock().await;
3127
3128        use chrono::{DateTime, Duration, Utc};
3129        use std::env;
3130
3131        // Use a unique test environment variable to avoid conflicts
3132        env::set_var("TRANSACTION_EXPIRATION_HOURS", "6");
3133
3134        let repo = setup_test_repo().await;
3135
3136        let final_statuses = [
3137            TransactionStatus::Canceled,
3138            TransactionStatus::Confirmed,
3139            TransactionStatus::Failed,
3140            TransactionStatus::Expired,
3141        ];
3142
3143        for (i, status) in final_statuses.iter().enumerate() {
3144            let tx_id = format!("test-final-{}-{}", i, Uuid::new_v4());
3145            let mut tx = create_test_transaction(&tx_id);
3146
3147            // Ensure transaction has no delete_at initially and is in pending state
3148            tx.delete_at = None;
3149            tx.status = TransactionStatus::Pending;
3150
3151            repo.create(tx).await.unwrap();
3152
3153            let before_update = Utc::now();
3154
3155            // Update to final status
3156            let updated = repo
3157                .update_status(tx_id.clone(), status.clone())
3158                .await
3159                .unwrap();
3160
3161            // Should have delete_at set
3162            assert!(
3163                updated.delete_at.is_some(),
3164                "delete_at should be set for status: {status:?}"
3165            );
3166
3167            // Verify the timestamp is reasonable (approximately 6 hours from now)
3168            let delete_at_str = updated.delete_at.unwrap();
3169            let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
3170                .expect("delete_at should be valid RFC3339")
3171                .with_timezone(&Utc);
3172
3173            let duration_from_before = delete_at.signed_duration_since(before_update);
3174            let expected_duration = Duration::hours(6);
3175            let tolerance = Duration::minutes(5);
3176
3177            assert!(
3178                duration_from_before >= expected_duration - tolerance
3179                    && duration_from_before <= expected_duration + tolerance,
3180                "delete_at should be approximately 6 hours from now for status: {status:?}. Duration: {duration_from_before:?}"
3181            );
3182        }
3183
3184        // Cleanup
3185        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3186    }
3187
3188    #[tokio::test]
3189    #[ignore = "Requires active Redis instance"]
3190    async fn test_update_status_does_not_set_delete_at_for_non_final_statuses() {
3191        let _lock = ENV_MUTEX.lock().await;
3192
3193        use std::env;
3194
3195        env::set_var("TRANSACTION_EXPIRATION_HOURS", "4");
3196
3197        let repo = setup_test_repo().await;
3198
3199        let non_final_statuses = [
3200            TransactionStatus::Pending,
3201            TransactionStatus::Sent,
3202            TransactionStatus::Submitted,
3203            TransactionStatus::Mined,
3204        ];
3205
3206        for (i, status) in non_final_statuses.iter().enumerate() {
3207            let tx_id = format!("test-non-final-{}-{}", i, Uuid::new_v4());
3208            let mut tx = create_test_transaction(&tx_id);
3209            tx.delete_at = None;
3210            tx.status = TransactionStatus::Pending;
3211
3212            repo.create(tx).await.unwrap();
3213
3214            // Update to non-final status
3215            let updated = repo
3216                .update_status(tx_id.clone(), status.clone())
3217                .await
3218                .unwrap();
3219
3220            // Should NOT have delete_at set
3221            assert!(
3222                updated.delete_at.is_none(),
3223                "delete_at should NOT be set for status: {status:?}"
3224            );
3225        }
3226
3227        // Cleanup
3228        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3229    }
3230
3231    #[tokio::test]
3232    #[ignore = "Requires active Redis instance"]
3233    async fn test_partial_update_sets_delete_at_for_final_statuses() {
3234        let _lock = ENV_MUTEX.lock().await;
3235
3236        use chrono::{DateTime, Duration, Utc};
3237        use std::env;
3238
3239        env::set_var("TRANSACTION_EXPIRATION_HOURS", "8");
3240
3241        let repo = setup_test_repo().await;
3242        let tx_id = format!("test-partial-final-{}", Uuid::new_v4());
3243        let mut tx = create_test_transaction(&tx_id);
3244        tx.delete_at = None;
3245        tx.status = TransactionStatus::Pending;
3246
3247        repo.create(tx).await.unwrap();
3248
3249        let before_update = Utc::now();
3250
3251        // Use partial_update to set status to Confirmed (final status)
3252        let update = TransactionUpdateRequest {
3253            status: Some(TransactionStatus::Confirmed),
3254            status_reason: Some("Transaction completed".to_string()),
3255            confirmed_at: Some("2023-01-01T12:05:00Z".to_string()),
3256            ..Default::default()
3257        };
3258
3259        let updated = repo.partial_update(tx_id.clone(), update).await.unwrap();
3260
3261        // Should have delete_at set
3262        assert!(
3263            updated.delete_at.is_some(),
3264            "delete_at should be set when updating to Confirmed status"
3265        );
3266
3267        // Verify the timestamp is reasonable (approximately 8 hours from now)
3268        let delete_at_str = updated.delete_at.unwrap();
3269        let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
3270            .expect("delete_at should be valid RFC3339")
3271            .with_timezone(&Utc);
3272
3273        let duration_from_before = delete_at.signed_duration_since(before_update);
3274        let expected_duration = Duration::hours(8);
3275        let tolerance = Duration::minutes(5);
3276
3277        assert!(
3278            duration_from_before >= expected_duration - tolerance
3279                && duration_from_before <= expected_duration + tolerance,
3280            "delete_at should be approximately 8 hours from now. Duration: {duration_from_before:?}"
3281        );
3282
3283        // Also verify other fields were updated
3284        assert_eq!(updated.status, TransactionStatus::Confirmed);
3285        assert_eq!(
3286            updated.status_reason,
3287            Some("Transaction completed".to_string())
3288        );
3289        assert_eq!(
3290            updated.confirmed_at,
3291            Some("2023-01-01T12:05:00Z".to_string())
3292        );
3293
3294        // Cleanup
3295        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3296    }
3297
3298    #[tokio::test]
3299    #[ignore = "Requires active Redis instance"]
3300    async fn test_update_status_preserves_existing_delete_at() {
3301        let _lock = ENV_MUTEX.lock().await;
3302
3303        use std::env;
3304
3305        env::set_var("TRANSACTION_EXPIRATION_HOURS", "2");
3306
3307        let repo = setup_test_repo().await;
3308        let tx_id = format!("test-preserve-delete-at-{}", Uuid::new_v4());
3309        let mut tx = create_test_transaction(&tx_id);
3310
3311        // Set an existing delete_at value
3312        let existing_delete_at = "2025-01-01T12:00:00Z".to_string();
3313        tx.delete_at = Some(existing_delete_at.clone());
3314        tx.status = TransactionStatus::Pending;
3315
3316        repo.create(tx).await.unwrap();
3317
3318        // Update to final status
3319        let updated = repo
3320            .update_status(tx_id.clone(), TransactionStatus::Confirmed)
3321            .await
3322            .unwrap();
3323
3324        // Should preserve the existing delete_at value
3325        assert_eq!(
3326            updated.delete_at,
3327            Some(existing_delete_at),
3328            "Existing delete_at should be preserved when updating to final status"
3329        );
3330
3331        // Cleanup
3332        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3333    }
3334    #[tokio::test]
3335    #[ignore = "Requires active Redis instance"]
3336    async fn test_partial_update_without_status_change_preserves_delete_at() {
3337        let _lock = ENV_MUTEX.lock().await;
3338
3339        use std::env;
3340
3341        env::set_var("TRANSACTION_EXPIRATION_HOURS", "3");
3342
3343        let repo = setup_test_repo().await;
3344        let tx_id = format!("test-preserve-no-status-{}", Uuid::new_v4());
3345        let mut tx = create_test_transaction(&tx_id);
3346        tx.delete_at = None;
3347        tx.status = TransactionStatus::Pending;
3348
3349        repo.create(tx).await.unwrap();
3350
3351        // First, update to final status to set delete_at
3352        let updated1 = repo
3353            .update_status(tx_id.clone(), TransactionStatus::Confirmed)
3354            .await
3355            .unwrap();
3356
3357        assert!(updated1.delete_at.is_some());
3358        let original_delete_at = updated1.delete_at.clone();
3359
3360        // Now update other fields without changing status
3361        let update = TransactionUpdateRequest {
3362            status: None, // No status change
3363            status_reason: Some("Updated reason".to_string()),
3364            confirmed_at: Some("2023-01-01T12:10:00Z".to_string()),
3365            ..Default::default()
3366        };
3367
3368        let updated2 = repo.partial_update(tx_id.clone(), update).await.unwrap();
3369
3370        // delete_at should be preserved
3371        assert_eq!(
3372            updated2.delete_at, original_delete_at,
3373            "delete_at should be preserved when status is not updated"
3374        );
3375
3376        // Other fields should be updated
3377        assert_eq!(updated2.status, TransactionStatus::Confirmed); // Unchanged
3378        assert_eq!(updated2.status_reason, Some("Updated reason".to_string()));
3379        assert_eq!(
3380            updated2.confirmed_at,
3381            Some("2023-01-01T12:10:00Z".to_string())
3382        );
3383
3384        // Cleanup
3385        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3386    }
3387
3388    // Tests for delete_by_ids batch delete functionality
3389
3390    #[tokio::test]
3391    #[ignore = "Requires active Redis instance"]
3392    async fn test_delete_by_ids_empty_list() {
3393        let repo = setup_test_repo().await;
3394        let tx_id = format!("test-empty-{}", Uuid::new_v4());
3395
3396        // Create a transaction to ensure repo is not empty
3397        let tx = create_test_transaction(&tx_id);
3398        repo.create(tx).await.unwrap();
3399
3400        // Delete with empty list should succeed and not affect existing data
3401        let result = repo.delete_by_ids(vec![]).await.unwrap();
3402
3403        assert_eq!(result.deleted_count, 0);
3404        assert!(result.failed.is_empty());
3405
3406        // Original transaction should still exist
3407        assert!(repo.get_by_id(tx_id).await.is_ok());
3408    }
3409
3410    #[tokio::test]
3411    #[ignore = "Requires active Redis instance"]
3412    async fn test_delete_by_ids_single_transaction() {
3413        let repo = setup_test_repo().await;
3414        let tx_id = format!("test-single-{}", Uuid::new_v4());
3415
3416        let tx = create_test_transaction(&tx_id);
3417        repo.create(tx).await.unwrap();
3418
3419        let result = repo.delete_by_ids(vec![tx_id.clone()]).await.unwrap();
3420
3421        assert_eq!(result.deleted_count, 1);
3422        assert!(result.failed.is_empty());
3423
3424        // Verify transaction was deleted
3425        assert!(repo.get_by_id(tx_id).await.is_err());
3426    }
3427
3428    #[tokio::test]
3429    #[ignore = "Requires active Redis instance"]
3430    async fn test_delete_by_ids_multiple_transactions() {
3431        let repo = setup_test_repo().await;
3432        let base_id = Uuid::new_v4();
3433
3434        // Create multiple transactions
3435        let mut created_ids = Vec::new();
3436        for i in 1..=5 {
3437            let tx_id = format!("test-multi-{base_id}-{i}");
3438            let tx = create_test_transaction(&tx_id);
3439            repo.create(tx).await.unwrap();
3440            created_ids.push(tx_id);
3441        }
3442
3443        // Delete 3 of them
3444        let ids_to_delete = vec![
3445            created_ids[0].clone(),
3446            created_ids[2].clone(),
3447            created_ids[4].clone(),
3448        ];
3449        let result = repo.delete_by_ids(ids_to_delete).await.unwrap();
3450
3451        assert_eq!(result.deleted_count, 3);
3452        assert!(result.failed.is_empty());
3453
3454        // Verify correct transactions were deleted
3455        assert!(repo.get_by_id(created_ids[0].clone()).await.is_err());
3456        assert!(repo.get_by_id(created_ids[1].clone()).await.is_ok()); // Not deleted
3457        assert!(repo.get_by_id(created_ids[2].clone()).await.is_err());
3458        assert!(repo.get_by_id(created_ids[3].clone()).await.is_ok()); // Not deleted
3459        assert!(repo.get_by_id(created_ids[4].clone()).await.is_err());
3460    }
3461
3462    #[tokio::test]
3463    #[ignore = "Requires active Redis instance"]
3464    async fn test_delete_by_ids_nonexistent_transactions() {
3465        let repo = setup_test_repo().await;
3466        let base_id = Uuid::new_v4();
3467
3468        // Try to delete transactions that don't exist
3469        let ids_to_delete = vec![
3470            format!("nonexistent-{}-1", base_id),
3471            format!("nonexistent-{}-2", base_id),
3472        ];
3473        let result = repo.delete_by_ids(ids_to_delete.clone()).await.unwrap();
3474
3475        assert_eq!(result.deleted_count, 0);
3476        assert_eq!(result.failed.len(), 2);
3477
3478        // Verify error messages contain the IDs
3479        let failed_ids: Vec<&String> = result.failed.iter().map(|(id, _)| id).collect();
3480        assert!(failed_ids.contains(&&ids_to_delete[0]));
3481        assert!(failed_ids.contains(&&ids_to_delete[1]));
3482    }
3483
3484    #[tokio::test]
3485    #[ignore = "Requires active Redis instance"]
3486    async fn test_delete_by_ids_mixed_existing_and_nonexistent() {
3487        let repo = setup_test_repo().await;
3488        let base_id = Uuid::new_v4();
3489
3490        // Create some transactions
3491        let existing_ids: Vec<String> = (1..=3)
3492            .map(|i| format!("test-mixed-existing-{base_id}-{i}"))
3493            .collect();
3494
3495        for id in &existing_ids {
3496            let tx = create_test_transaction(id);
3497            repo.create(tx).await.unwrap();
3498        }
3499
3500        let nonexistent_ids: Vec<String> = (1..=2)
3501            .map(|i| format!("test-mixed-nonexistent-{base_id}-{i}"))
3502            .collect();
3503
3504        // Try to delete mix of existing and non-existing
3505        let ids_to_delete = vec![
3506            existing_ids[0].clone(),
3507            nonexistent_ids[0].clone(),
3508            existing_ids[1].clone(),
3509            nonexistent_ids[1].clone(),
3510        ];
3511        let result = repo.delete_by_ids(ids_to_delete).await.unwrap();
3512
3513        assert_eq!(result.deleted_count, 2);
3514        assert_eq!(result.failed.len(), 2);
3515
3516        // Verify existing transactions were deleted
3517        assert!(repo.get_by_id(existing_ids[0].clone()).await.is_err());
3518        assert!(repo.get_by_id(existing_ids[1].clone()).await.is_err());
3519
3520        // Verify remaining transaction still exists
3521        assert!(repo.get_by_id(existing_ids[2].clone()).await.is_ok());
3522    }
3523
3524    #[tokio::test]
3525    #[ignore = "Requires active Redis instance"]
3526    async fn test_delete_by_ids_removes_all_indexes() {
3527        let repo = setup_test_repo().await;
3528        let relayer_id = format!("relayer-{}", Uuid::new_v4());
3529        let tx_id = format!("test-indexes-{}", Uuid::new_v4());
3530
3531        // Create a transaction with specific status
3532        let mut tx = create_test_transaction(&tx_id);
3533        tx.relayer_id = relayer_id.clone();
3534        tx.status = TransactionStatus::Confirmed;
3535        repo.create(tx).await.unwrap();
3536
3537        // Verify transaction exists and is indexed
3538        let found = repo
3539            .find_by_status(&relayer_id, &[TransactionStatus::Confirmed])
3540            .await
3541            .unwrap();
3542        assert!(found.iter().any(|t| t.id == tx_id));
3543
3544        // Delete the transaction
3545        let result = repo.delete_by_ids(vec![tx_id.clone()]).await.unwrap();
3546        assert_eq!(result.deleted_count, 1);
3547
3548        // Verify transaction is no longer in status index
3549        let found_after = repo
3550            .find_by_status(&relayer_id, &[TransactionStatus::Confirmed])
3551            .await
3552            .unwrap();
3553        assert!(!found_after.iter().any(|t| t.id == tx_id));
3554
3555        // Verify transaction cannot be found
3556        assert!(repo.get_by_id(tx_id).await.is_err());
3557    }
3558
3559    #[tokio::test]
3560    #[ignore = "Requires active Redis instance"]
3561    async fn test_delete_by_ids_removes_nonce_index() {
3562        let repo = setup_test_repo().await;
3563        let relayer_id = format!("relayer-{}", Uuid::new_v4());
3564        let tx_id = format!("test-nonce-{}", Uuid::new_v4());
3565        let nonce = 12345u64;
3566
3567        // Create a transaction with a specific nonce
3568        let tx = create_test_transaction_with_nonce(&tx_id, nonce, &relayer_id);
3569        repo.create(tx).await.unwrap();
3570
3571        // Verify nonce index works
3572        let found = repo.find_by_nonce(&relayer_id, nonce).await.unwrap();
3573        assert!(found.is_some());
3574        assert_eq!(found.unwrap().id, tx_id);
3575
3576        // Delete the transaction
3577        let result = repo.delete_by_ids(vec![tx_id.clone()]).await.unwrap();
3578        assert_eq!(result.deleted_count, 1);
3579
3580        // Verify nonce index was cleaned up
3581        let found_after = repo.find_by_nonce(&relayer_id, nonce).await.unwrap();
3582        assert!(found_after.is_none());
3583    }
3584
3585    #[tokio::test]
3586    #[ignore = "Requires active Redis instance"]
3587    async fn test_delete_by_ids_large_batch() {
3588        let repo = setup_test_repo().await;
3589        let base_id = Uuid::new_v4();
3590
3591        // Create many transactions to test batch performance
3592        let count = 50;
3593        let mut created_ids = Vec::new();
3594
3595        for i in 0..count {
3596            let tx_id = format!("test-large-{base_id}-{i}");
3597            let tx = create_test_transaction(&tx_id);
3598            repo.create(tx).await.unwrap();
3599            created_ids.push(tx_id);
3600        }
3601
3602        // Delete all of them in one batch
3603        let result = repo.delete_by_ids(created_ids.clone()).await.unwrap();
3604
3605        assert_eq!(result.deleted_count, count);
3606        assert!(result.failed.is_empty());
3607
3608        // Verify all were deleted
3609        for id in created_ids {
3610            assert!(repo.get_by_id(id).await.is_err());
3611        }
3612    }
3613
3614    #[tokio::test]
3615    #[ignore = "Requires active Redis instance"]
3616    async fn test_delete_by_ids_preserves_other_relayer_transactions() {
3617        let repo = setup_test_repo().await;
3618        let relayer_1 = format!("relayer-1-{}", Uuid::new_v4());
3619        let relayer_2 = format!("relayer-2-{}", Uuid::new_v4());
3620        let tx_id_1 = format!("tx-relayer-1-{}", Uuid::new_v4());
3621        let tx_id_2 = format!("tx-relayer-2-{}", Uuid::new_v4());
3622
3623        // Create transactions for different relayers
3624        let tx1 = create_test_transaction_with_relayer(&tx_id_1, &relayer_1);
3625        let tx2 = create_test_transaction_with_relayer(&tx_id_2, &relayer_2);
3626
3627        repo.create(tx1).await.unwrap();
3628        repo.create(tx2).await.unwrap();
3629
3630        // Delete only relayer-1's transaction
3631        let result = repo.delete_by_ids(vec![tx_id_1.clone()]).await.unwrap();
3632
3633        assert_eq!(result.deleted_count, 1);
3634
3635        // relayer-1's transaction should be deleted
3636        assert!(repo.get_by_id(tx_id_1).await.is_err());
3637
3638        // relayer-2's transaction should still exist
3639        let remaining = repo.get_by_id(tx_id_2).await.unwrap();
3640        assert_eq!(remaining.relayer_id, relayer_2);
3641    }
3642
3643    // ── increment_status_check_failures ─────────────────────────────
3644
3645    #[tokio::test]
3646    #[ignore = "Requires active Redis instance"]
3647    async fn test_increment_status_check_failures_no_prior_metadata() {
3648        let _lock = ENV_MUTEX.lock().await;
3649        let repo = setup_test_repo().await;
3650        let relayer_id = Uuid::new_v4().to_string();
3651        let tx_id = Uuid::new_v4().to_string();
3652        let mut tx =
3653            create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3654        tx.metadata = None;
3655        repo.create(tx).await.unwrap();
3656
3657        let updated = repo.increment_status_check_failures(tx_id).await.unwrap();
3658
3659        let meta = updated.metadata.expect("metadata should be set");
3660        assert_eq!(meta.consecutive_failures, 1);
3661        assert_eq!(meta.total_failures, 1);
3662        assert_eq!(meta.insufficient_fee_retries, 0);
3663    }
3664
3665    #[tokio::test]
3666    #[ignore = "Requires active Redis instance"]
3667    async fn test_increment_status_check_failures_accumulates() {
3668        let _lock = ENV_MUTEX.lock().await;
3669        let repo = setup_test_repo().await;
3670        let relayer_id = Uuid::new_v4().to_string();
3671        let tx_id = Uuid::new_v4().to_string();
3672        let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3673        repo.create(tx).await.unwrap();
3674
3675        repo.increment_status_check_failures(tx_id.clone())
3676            .await
3677            .unwrap();
3678        repo.increment_status_check_failures(tx_id.clone())
3679            .await
3680            .unwrap();
3681        let updated = repo.increment_status_check_failures(tx_id).await.unwrap();
3682
3683        let meta = updated.metadata.unwrap();
3684        assert_eq!(meta.consecutive_failures, 3);
3685        assert_eq!(meta.total_failures, 3);
3686    }
3687
3688    #[tokio::test]
3689    #[ignore = "Requires active Redis instance"]
3690    async fn test_increment_status_check_failures_noop_on_final_state() {
3691        let _lock = ENV_MUTEX.lock().await;
3692        let repo = setup_test_repo().await;
3693        let relayer_id = Uuid::new_v4().to_string();
3694        let tx_id = Uuid::new_v4().to_string();
3695        let tx =
3696            create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Confirmed);
3697        repo.create(tx).await.unwrap();
3698
3699        let result = repo.increment_status_check_failures(tx_id).await.unwrap();
3700
3701        // Should return unchanged — no metadata mutation on final state
3702        assert!(result.metadata.is_none());
3703        assert_eq!(result.status, TransactionStatus::Confirmed);
3704    }
3705
3706    #[tokio::test]
3707    #[ignore = "Requires active Redis instance"]
3708    async fn test_increment_status_check_failures_not_found() {
3709        let _lock = ENV_MUTEX.lock().await;
3710        let repo = setup_test_repo().await;
3711
3712        let result = repo
3713            .increment_status_check_failures("nonexistent".to_string())
3714            .await;
3715
3716        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
3717    }
3718
3719    // ── reset_status_check_consecutive_failures ─────────────────────
3720
3721    #[tokio::test]
3722    #[ignore = "Requires active Redis instance"]
3723    async fn test_reset_consecutive_failures() {
3724        let _lock = ENV_MUTEX.lock().await;
3725        let repo = setup_test_repo().await;
3726        let relayer_id = Uuid::new_v4().to_string();
3727        let tx_id = Uuid::new_v4().to_string();
3728        let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3729        repo.create(tx).await.unwrap();
3730
3731        // Increment a few times first
3732        repo.increment_status_check_failures(tx_id.clone())
3733            .await
3734            .unwrap();
3735        repo.increment_status_check_failures(tx_id.clone())
3736            .await
3737            .unwrap();
3738
3739        let updated = repo
3740            .reset_status_check_consecutive_failures(tx_id)
3741            .await
3742            .unwrap();
3743
3744        let meta = updated.metadata.unwrap();
3745        assert_eq!(meta.consecutive_failures, 0);
3746        // total_failures should be preserved
3747        assert_eq!(meta.total_failures, 2);
3748    }
3749
3750    #[tokio::test]
3751    #[ignore = "Requires active Redis instance"]
3752    async fn test_reset_consecutive_failures_noop_on_final_state() {
3753        let _lock = ENV_MUTEX.lock().await;
3754        let repo = setup_test_repo().await;
3755        let relayer_id = Uuid::new_v4().to_string();
3756        let tx_id = Uuid::new_v4().to_string();
3757        let mut tx =
3758            create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Failed);
3759        tx.metadata = Some(crate::models::TransactionMetadata {
3760            consecutive_failures: 5,
3761            total_failures: 10,
3762            insufficient_fee_retries: 0,
3763            try_again_later_retries: 0,
3764        });
3765        repo.create(tx).await.unwrap();
3766
3767        let result = repo
3768            .reset_status_check_consecutive_failures(tx_id)
3769            .await
3770            .unwrap();
3771
3772        // Should return unchanged on final state
3773        let meta = result.metadata.unwrap();
3774        assert_eq!(meta.consecutive_failures, 5);
3775    }
3776
3777    #[tokio::test]
3778    #[ignore = "Requires active Redis instance"]
3779    async fn test_reset_consecutive_failures_not_found() {
3780        let _lock = ENV_MUTEX.lock().await;
3781        let repo = setup_test_repo().await;
3782
3783        let result = repo
3784            .reset_status_check_consecutive_failures("nonexistent".to_string())
3785            .await;
3786
3787        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
3788    }
3789
3790    // ── record_stellar_insufficient_fee_retry ───────────────────────
3791
3792    #[tokio::test]
3793    #[ignore = "Requires active Redis instance"]
3794    async fn test_record_insufficient_fee_retry() {
3795        let _lock = ENV_MUTEX.lock().await;
3796        let repo = setup_test_repo().await;
3797        let relayer_id = Uuid::new_v4().to_string();
3798        let tx_id = Uuid::new_v4().to_string();
3799        let mut tx =
3800            create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3801        tx.sent_at = None;
3802        repo.create(tx).await.unwrap();
3803
3804        let updated = repo
3805            .record_stellar_insufficient_fee_retry(tx_id, "2025-03-18T10:00:00Z".to_string())
3806            .await
3807            .unwrap();
3808
3809        assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:00:00Z"));
3810        let meta = updated.metadata.unwrap();
3811        assert_eq!(meta.insufficient_fee_retries, 1);
3812        assert_eq!(meta.consecutive_failures, 0);
3813        assert_eq!(meta.total_failures, 0);
3814    }
3815
3816    #[tokio::test]
3817    #[ignore = "Requires active Redis instance"]
3818    async fn test_record_insufficient_fee_retry_accumulates() {
3819        let _lock = ENV_MUTEX.lock().await;
3820        let repo = setup_test_repo().await;
3821        let relayer_id = Uuid::new_v4().to_string();
3822        let tx_id = Uuid::new_v4().to_string();
3823        let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3824        repo.create(tx).await.unwrap();
3825
3826        repo.record_stellar_insufficient_fee_retry(
3827            tx_id.clone(),
3828            "2025-03-18T10:00:00Z".to_string(),
3829        )
3830        .await
3831        .unwrap();
3832
3833        let updated = repo
3834            .record_stellar_insufficient_fee_retry(tx_id, "2025-03-18T10:01:00Z".to_string())
3835            .await
3836            .unwrap();
3837
3838        assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:01:00Z"));
3839        let meta = updated.metadata.unwrap();
3840        assert_eq!(meta.insufficient_fee_retries, 2);
3841    }
3842
3843    #[tokio::test]
3844    #[ignore = "Requires active Redis instance"]
3845    async fn test_record_insufficient_fee_retry_noop_on_final_state() {
3846        let _lock = ENV_MUTEX.lock().await;
3847        let repo = setup_test_repo().await;
3848        let relayer_id = Uuid::new_v4().to_string();
3849        let tx_id = Uuid::new_v4().to_string();
3850        let mut tx =
3851            create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Confirmed);
3852        tx.sent_at = Some("old-time".to_string());
3853        repo.create(tx).await.unwrap();
3854
3855        let result = repo
3856            .record_stellar_insufficient_fee_retry(tx_id, "new-time".to_string())
3857            .await
3858            .unwrap();
3859
3860        // Should return unchanged on final state
3861        assert_eq!(result.sent_at.as_deref(), Some("old-time"));
3862        assert!(result.metadata.is_none());
3863    }
3864
3865    #[tokio::test]
3866    #[ignore = "Requires active Redis instance"]
3867    async fn test_record_insufficient_fee_retry_not_found() {
3868        let _lock = ENV_MUTEX.lock().await;
3869        let repo = setup_test_repo().await;
3870
3871        let result = repo
3872            .record_stellar_insufficient_fee_retry(
3873                "nonexistent".to_string(),
3874                "2025-03-18T10:00:00Z".to_string(),
3875            )
3876            .await;
3877
3878        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
3879    }
3880
3881    // ── record_stellar_try_again_later_retry ────────────────────────
3882
3883    #[tokio::test]
3884    #[ignore = "Requires active Redis instance"]
3885    async fn test_record_try_again_later_retry() {
3886        let _lock = ENV_MUTEX.lock().await;
3887        let repo = setup_test_repo().await;
3888        let relayer_id = Uuid::new_v4().to_string();
3889        let tx_id = Uuid::new_v4().to_string();
3890        let mut tx =
3891            create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3892        tx.sent_at = None;
3893        repo.create(tx).await.unwrap();
3894
3895        let updated = repo
3896            .record_stellar_try_again_later_retry(tx_id, "2025-03-18T10:00:00Z".to_string())
3897            .await
3898            .unwrap();
3899
3900        assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:00:00Z"));
3901        let meta = updated.metadata.unwrap();
3902        assert_eq!(meta.try_again_later_retries, 1);
3903        assert_eq!(meta.consecutive_failures, 0);
3904        assert_eq!(meta.total_failures, 0);
3905    }
3906
3907    #[tokio::test]
3908    #[ignore = "Requires active Redis instance"]
3909    async fn test_record_try_again_later_retry_accumulates() {
3910        let _lock = ENV_MUTEX.lock().await;
3911        let repo = setup_test_repo().await;
3912        let relayer_id = Uuid::new_v4().to_string();
3913        let tx_id = Uuid::new_v4().to_string();
3914        let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3915        repo.create(tx).await.unwrap();
3916
3917        repo.record_stellar_try_again_later_retry(
3918            tx_id.clone(),
3919            "2025-03-18T10:00:00Z".to_string(),
3920        )
3921        .await
3922        .unwrap();
3923
3924        let updated = repo
3925            .record_stellar_try_again_later_retry(tx_id, "2025-03-18T10:01:00Z".to_string())
3926            .await
3927            .unwrap();
3928
3929        assert_eq!(updated.sent_at.as_deref(), Some("2025-03-18T10:01:00Z"));
3930        let meta = updated.metadata.unwrap();
3931        assert_eq!(meta.try_again_later_retries, 2);
3932    }
3933
3934    #[tokio::test]
3935    #[ignore = "Requires active Redis instance"]
3936    async fn test_record_try_again_later_retry_noop_on_final_state() {
3937        let _lock = ENV_MUTEX.lock().await;
3938        let repo = setup_test_repo().await;
3939        let relayer_id = Uuid::new_v4().to_string();
3940        let tx_id = Uuid::new_v4().to_string();
3941        let mut tx =
3942            create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Confirmed);
3943        tx.sent_at = Some("old-time".to_string());
3944        repo.create(tx).await.unwrap();
3945
3946        let result = repo
3947            .record_stellar_try_again_later_retry(tx_id, "new-time".to_string())
3948            .await
3949            .unwrap();
3950
3951        // Should return unchanged on final state
3952        assert_eq!(result.sent_at.as_deref(), Some("old-time"));
3953        assert!(result.metadata.is_none());
3954    }
3955
3956    #[tokio::test]
3957    #[ignore = "Requires active Redis instance"]
3958    async fn test_record_try_again_later_retry_not_found() {
3959        let _lock = ENV_MUTEX.lock().await;
3960        let repo = setup_test_repo().await;
3961
3962        let result = repo
3963            .record_stellar_try_again_later_retry(
3964                "nonexistent".to_string(),
3965                "2025-03-18T10:00:00Z".to_string(),
3966            )
3967            .await;
3968
3969        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
3970    }
3971
3972    // ── metadata preservation across operations ─────────────────────
3973
3974    #[tokio::test]
3975    #[ignore = "Requires active Redis instance"]
3976    async fn test_increment_failures_preserves_try_again_later_retries() {
3977        let _lock = ENV_MUTEX.lock().await;
3978        let repo = setup_test_repo().await;
3979        let relayer_id = Uuid::new_v4().to_string();
3980        let tx_id = Uuid::new_v4().to_string();
3981        let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
3982        repo.create(tx).await.unwrap();
3983
3984        // Set try_again_later_retries = 1
3985        repo.record_stellar_try_again_later_retry(
3986            tx_id.clone(),
3987            "2025-03-18T10:00:00Z".to_string(),
3988        )
3989        .await
3990        .unwrap();
3991
3992        // Now increment failures — should NOT clobber try_again_later_retries
3993        let updated = repo.increment_status_check_failures(tx_id).await.unwrap();
3994
3995        let meta = updated.metadata.unwrap();
3996        assert_eq!(
3997            meta.try_again_later_retries, 1,
3998            "try_again_later_retries must survive increment_status_check_failures"
3999        );
4000        assert_eq!(meta.consecutive_failures, 1);
4001        assert_eq!(meta.total_failures, 1);
4002    }
4003
4004    #[tokio::test]
4005    #[ignore = "Requires active Redis instance"]
4006    async fn test_increment_failures_preserves_insufficient_fee_retries() {
4007        let _lock = ENV_MUTEX.lock().await;
4008        let repo = setup_test_repo().await;
4009        let relayer_id = Uuid::new_v4().to_string();
4010        let tx_id = Uuid::new_v4().to_string();
4011        let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
4012        repo.create(tx).await.unwrap();
4013
4014        // Set insufficient_fee_retries = 1
4015        repo.record_stellar_insufficient_fee_retry(
4016            tx_id.clone(),
4017            "2025-03-18T10:00:00Z".to_string(),
4018        )
4019        .await
4020        .unwrap();
4021
4022        // Now increment failures — should NOT clobber insufficient_fee_retries
4023        let updated = repo.increment_status_check_failures(tx_id).await.unwrap();
4024
4025        let meta = updated.metadata.unwrap();
4026        assert_eq!(
4027            meta.insufficient_fee_retries, 1,
4028            "insufficient_fee_retries must survive increment_status_check_failures"
4029        );
4030        assert_eq!(meta.consecutive_failures, 1);
4031    }
4032
4033    #[tokio::test]
4034    #[ignore = "Requires active Redis instance"]
4035    async fn test_reset_failures_preserves_retry_counters() {
4036        let _lock = ENV_MUTEX.lock().await;
4037        let repo = setup_test_repo().await;
4038        let relayer_id = Uuid::new_v4().to_string();
4039        let tx_id = Uuid::new_v4().to_string();
4040        let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
4041        repo.create(tx).await.unwrap();
4042
4043        // Set both retry counters
4044        repo.record_stellar_try_again_later_retry(
4045            tx_id.clone(),
4046            "2025-03-18T10:00:00Z".to_string(),
4047        )
4048        .await
4049        .unwrap();
4050        repo.record_stellar_insufficient_fee_retry(
4051            tx_id.clone(),
4052            "2025-03-18T10:01:00Z".to_string(),
4053        )
4054        .await
4055        .unwrap();
4056
4057        // Increment then reset consecutive failures
4058        repo.increment_status_check_failures(tx_id.clone())
4059            .await
4060            .unwrap();
4061        let updated = repo
4062            .reset_status_check_consecutive_failures(tx_id)
4063            .await
4064            .unwrap();
4065
4066        let meta = updated.metadata.unwrap();
4067        assert_eq!(meta.consecutive_failures, 0);
4068        assert_eq!(meta.total_failures, 1);
4069        assert_eq!(
4070            meta.try_again_later_retries, 1,
4071            "try_again_later_retries must survive reset"
4072        );
4073        assert_eq!(
4074            meta.insufficient_fee_retries, 1,
4075            "insufficient_fee_retries must survive reset"
4076        );
4077    }
4078
4079    #[tokio::test]
4080    #[ignore = "Requires active Redis instance"]
4081    async fn test_fee_and_try_again_later_retries_independent() {
4082        let _lock = ENV_MUTEX.lock().await;
4083        let repo = setup_test_repo().await;
4084        let relayer_id = Uuid::new_v4().to_string();
4085        let tx_id = Uuid::new_v4().to_string();
4086        let tx = create_test_transaction_with_status(&tx_id, &relayer_id, TransactionStatus::Sent);
4087        repo.create(tx).await.unwrap();
4088
4089        // Set try_again_later_retries = 2
4090        repo.record_stellar_try_again_later_retry(
4091            tx_id.clone(),
4092            "2025-03-18T10:00:00Z".to_string(),
4093        )
4094        .await
4095        .unwrap();
4096        repo.record_stellar_try_again_later_retry(
4097            tx_id.clone(),
4098            "2025-03-18T10:01:00Z".to_string(),
4099        )
4100        .await
4101        .unwrap();
4102
4103        // Set insufficient_fee_retries = 1 — should NOT clobber try_again_later_retries
4104        let updated = repo
4105            .record_stellar_insufficient_fee_retry(tx_id, "2025-03-18T10:02:00Z".to_string())
4106            .await
4107            .unwrap();
4108
4109        let meta = updated.metadata.unwrap();
4110        assert_eq!(
4111            meta.try_again_later_retries, 2,
4112            "try_again_later_retries must survive insufficient_fee_retry"
4113        );
4114        assert_eq!(meta.insufficient_fee_retries, 1);
4115    }
4116}