openzeppelin_relayer/jobs/handlers/
transaction_status_handler.rs

1//! Transaction status monitoring handler.
2//!
3//! Monitors the status of submitted transactions by:
4//! - Checking transaction status on the network
5//! - Updating transaction status in storage
6//! - Tracking failure counts for circuit breaker decisions (stored in transaction metadata)
7use actix_web::web::ThinData;
8use eyre::Result;
9use tracing::{debug, info, instrument, warn};
10
11use crate::{
12    constants::{get_max_consecutive_status_failures, get_max_total_status_failures},
13    domain::{get_relayer_transaction, get_transaction_by_id, is_final_state, Transaction},
14    jobs::{Job, StatusCheckContext, TransactionStatusCheck},
15    models::{
16        ApiError, DefaultAppState, TransactionError, TransactionMetadata, TransactionRepoModel,
17    },
18    observability::request_id::set_request_id,
19    queues::{HandlerError, WorkerContext},
20    repositories::TransactionRepository,
21};
22
23#[instrument(
24    level = "debug",
25    skip(job, state, ctx),
26    fields(
27        request_id = ?job.request_id,
28        job_id = %job.message_id,
29        job_type = %job.job_type.to_string(),
30        attempt = %ctx.attempt,
31        tx_id = %job.data.transaction_id,
32        relayer_id = %job.data.relayer_id,
33        task_id = %ctx.task_id,
34    )
35)]
36pub async fn transaction_status_handler(
37    job: Job<TransactionStatusCheck>,
38    state: ThinData<DefaultAppState>,
39    ctx: WorkerContext,
40) -> Result<(), HandlerError> {
41    if let Some(request_id) = job.request_id.clone() {
42        set_request_id(request_id);
43    }
44
45    let tx_repo = state.transaction_repository();
46
47    // Execute status check - all logic moved here so errors go through handle_result
48    let req_result = handle_request(&job.data, &state, ctx.attempt, &ctx.task_id).await;
49
50    let tx_id = &job.data.transaction_id;
51
52    // Handle result and update counters via transaction repository
53    handle_result(
54        req_result.result,
55        &*tx_repo,
56        tx_id,
57        req_result.metadata,
58        req_result.should_retry_on_error,
59    )
60    .await
61}
62
63/// Handles status check results with circuit breaker tracking.
64///
65/// # Strategy
66/// - If transaction is in final state → return Ok (job completes, metadata cleaned up via delete_at)
67/// - If success but not final → Reset consecutive to 0, return Err (retries)
68/// - If error with should_retry=true → Increment counters, return Err (retries)
69/// - If error with should_retry=false → Return Ok (job completes, e.g., transaction not found)
70/// - If counters are None (early failure) → Skip counter updates
71///
72/// Counters are stored in transaction metadata, persisted via atomic Lua scripts.
73async fn handle_result<TR>(
74    result: Result<TransactionRepoModel>,
75    tx_repo: &TR,
76    tx_id: &str,
77    metadata: Option<TransactionMetadata>,
78    should_retry_on_error: bool,
79) -> Result<(), HandlerError>
80where
81    TR: TransactionRepository + Send + Sync,
82{
83    match result {
84        Ok(tx) if is_final_state(&tx.status) => {
85            // Transaction reached final state - job complete
86            // No need to clean up counters - tx will be deleted via delete_at
87            debug!(
88                tx_id = %tx.id,
89                relayer_id = %tx.relayer_id,
90                status = ?tx.status,
91                consecutive_failures = ?metadata.as_ref().map(|m| m.consecutive_failures),
92                total_failures = ?metadata.as_ref().map(|m| m.total_failures),
93                "transaction in final state, status check complete"
94            );
95
96            Ok(())
97        }
98        Ok(tx) => {
99            // Success but not final - RESET consecutive counter, keep total unchanged
100            debug!(
101                tx_id = %tx.id,
102                relayer_id = %tx.relayer_id,
103                status = ?tx.status,
104                "transaction not in final state"
105            );
106
107            // Use fresh metadata from the transaction (updated during handle_transaction_status)
108            // to decide whether a reset is needed, falling back to the pre-check snapshot.
109            let fresh_meta = tx.metadata.clone().or(metadata);
110            if let Some(meta) = fresh_meta {
111                if meta.consecutive_failures > 0 {
112                    if let Err(e) = tx_repo
113                        .reset_status_check_consecutive_failures(tx_id.to_string())
114                        .await
115                    {
116                        warn!(error = %e, tx_id = %tx_id, relayer_id = %tx.relayer_id, "failed to reset consecutive counter");
117                    }
118                }
119            }
120
121            // Return error to trigger retry
122            Err(HandlerError::Retry(format!(
123                "transaction status: {:?} - not in final state, retrying",
124                tx.status
125            )))
126        }
127        Err(e) => {
128            if e.downcast_ref::<TransactionError>()
129                .is_some_and(TransactionError::is_concurrent_update_conflict)
130            {
131                info!(
132                    error = %e,
133                    tx_id = %tx_id,
134                    "status check lost a concurrent update race, completing job without counter changes"
135                );
136                return Ok(());
137            }
138
139            // Check if this is a permanent failure that shouldn't retry
140            if !should_retry_on_error {
141                info!(
142                    error = %e,
143                    tx_id = %tx_id,
144                    "status check failed with permanent error, completing job without retry"
145                );
146                return Ok(());
147            }
148
149            // Transient error - INCREMENT both counters (only if we have metadata)
150            if let Some(meta) = metadata {
151                warn!(
152                    error = %e,
153                    tx_id = %tx_id,
154                    consecutive_failures = meta.consecutive_failures.saturating_add(1),
155                    total_failures = meta.total_failures.saturating_add(1),
156                    "status check failed, incrementing failure counters"
157                );
158
159                // Update counters via atomic transaction repository method
160                if let Err(update_err) = tx_repo
161                    .increment_status_check_failures(tx_id.to_string())
162                    .await
163                {
164                    warn!(error = %update_err, tx_id = %tx_id, "failed to update counters");
165                }
166            } else {
167                // Early failure before counters were read - skip counter update
168                warn!(
169                    error = %e,
170                    tx_id = %tx_id,
171                    "status check failed early, counters not available"
172                );
173            }
174
175            // Return error to trigger retry
176            Err(HandlerError::Retry(format!("{e}")))
177        }
178    }
179}
180
181/// Result of handle_request including whether to retry on error.
182struct HandleRequestResult {
183    result: Result<TransactionRepoModel>,
184    /// Transaction metadata with failure counters. None if metadata couldn't be read
185    /// (e.g., transaction fetch failed early).
186    metadata: Option<TransactionMetadata>,
187    /// If false, errors should not trigger retry (e.g., transaction not found)
188    should_retry_on_error: bool,
189}
190
191/// Executes the status check logic and returns the result with counter values.
192/// Returns None for counters if they couldn't be read (e.g., transaction fetch failed early).
193/// Sets should_retry_on_error=false for permanent failures like transaction not found.
194async fn handle_request(
195    status_request: &TransactionStatusCheck,
196    state: &ThinData<DefaultAppState>,
197    attempt: usize,
198    task_id: &str,
199) -> HandleRequestResult {
200    let tx_id = &status_request.transaction_id;
201    debug!(
202        tx_id = %tx_id,
203        relayer_id = %status_request.relayer_id,
204        "handling transaction status check"
205    );
206
207    // Fetch transaction - if this fails, we can't read counters yet
208    let transaction = match get_transaction_by_id(tx_id.clone(), state).await {
209        Ok(tx) => tx,
210        Err(ApiError::NotFound(msg)) => {
211            // Transaction not found - permanent failure, don't retry
212            warn!(tx_id = %tx_id, "transaction not found, completing job without retry: {}", msg);
213            return HandleRequestResult {
214                result: Err(eyre::eyre!("Transaction not found: {}", msg)),
215                metadata: None,
216                should_retry_on_error: false,
217            };
218        }
219        Err(e) => {
220            // Other errors - should retry
221            return HandleRequestResult {
222                result: Err(e.into()),
223                metadata: None,
224                should_retry_on_error: true,
225            };
226        }
227    };
228
229    // Read failure counters from transaction metadata
230    let meta = transaction.metadata.clone().unwrap_or_default();
231
232    // Get network type from transaction (authoritative source)
233    let network_type = transaction.network_type;
234    let max_consecutive = get_max_consecutive_status_failures(network_type);
235    let max_total = get_max_total_status_failures(network_type);
236
237    debug!(
238        tx_id = %tx_id,
239        consecutive_failures = meta.consecutive_failures,
240        total_failures = meta.total_failures,
241        max_consecutive,
242        max_total,
243        attempt,
244        task_id = %task_id,
245        "handling transaction status check"
246    );
247
248    // Build circuit breaker context
249    let context = StatusCheckContext::new(
250        meta.consecutive_failures,
251        meta.total_failures,
252        attempt as u32,
253        max_consecutive,
254        max_total,
255        network_type,
256    );
257
258    // Get relayer transaction handler
259    let relayer_transaction =
260        match get_relayer_transaction(status_request.relayer_id.clone(), state).await {
261            Ok(rt) => rt,
262            Err(ApiError::NotFound(msg)) => {
263                // Relayer or signer not found - permanent failure, don't retry
264                warn!(
265                    tx_id = %tx_id,
266                    relayer_id = %status_request.relayer_id,
267                    "relayer or signer not found, completing job without retry: {}", msg
268                );
269                return HandleRequestResult {
270                    result: Err(eyre::eyre!("Relayer or signer not found: {}", msg)),
271                    metadata: Some(meta),
272                    should_retry_on_error: false,
273                };
274            }
275            Err(e) => {
276                // Other errors - should retry
277                return HandleRequestResult {
278                    result: Err(e.into()),
279                    metadata: Some(meta),
280                    should_retry_on_error: true,
281                };
282            }
283        };
284
285    // Execute status check
286    let result = relayer_transaction
287        .handle_transaction_status(transaction, Some(context))
288        .await
289        .map_err(|e| e.into());
290
291    if let Ok(tx) = result.as_ref() {
292        debug!(
293            tx_id = %tx.id,
294            status = ?tx.status,
295            "status check handled successfully"
296        );
297    }
298
299    HandleRequestResult {
300        result,
301        metadata: Some(meta),
302        should_retry_on_error: true,
303    }
304}
305
306#[cfg(test)]
307mod tests {
308    use super::*;
309    use crate::{
310        models::{NetworkType, TransactionStatus},
311        repositories::MockTransactionRepository,
312    };
313    use std::collections::HashMap;
314
315    #[tokio::test]
316    async fn test_status_check_job_validation() {
317        let check_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm);
318        let job = Job::new(crate::jobs::JobType::TransactionStatusCheck, check_job);
319
320        assert_eq!(job.data.transaction_id, "tx123");
321        assert_eq!(job.data.relayer_id, "relayer-1");
322        assert!(job.data.metadata.is_none());
323    }
324
325    #[tokio::test]
326    async fn test_status_check_with_metadata() {
327        let mut metadata = HashMap::new();
328        metadata.insert("retry_count".to_string(), "2".to_string());
329        metadata.insert("last_status".to_string(), "pending".to_string());
330
331        let check_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm)
332            .with_metadata(metadata.clone());
333
334        assert!(check_job.metadata.is_some());
335        let job_metadata = check_job.metadata.unwrap();
336        assert_eq!(job_metadata.get("retry_count").unwrap(), "2");
337        assert_eq!(job_metadata.get("last_status").unwrap(), "pending");
338    }
339
340    #[test]
341    fn test_status_check_network_type_required() {
342        // Jobs should always have network_type set
343        let check_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm);
344        assert!(check_job.network_type.is_some());
345
346        // Verify different network types are preserved
347        let solana_job = TransactionStatusCheck::new("tx456", "relayer-2", NetworkType::Solana);
348        assert_eq!(solana_job.network_type, Some(NetworkType::Solana));
349
350        let stellar_job = TransactionStatusCheck::new("tx789", "relayer-3", NetworkType::Stellar);
351        assert_eq!(stellar_job.network_type, Some(NetworkType::Stellar));
352    }
353
354    mod context_tests {
355        use super::*;
356
357        #[test]
358        fn test_context_should_force_finalize_below_threshold() {
359            let ctx = StatusCheckContext::new(5, 10, 15, 25, 75, NetworkType::Evm);
360            assert!(!ctx.should_force_finalize());
361        }
362
363        #[test]
364        fn test_context_should_force_finalize_consecutive_at_threshold() {
365            let ctx = StatusCheckContext::new(25, 30, 35, 25, 75, NetworkType::Evm);
366            assert!(ctx.should_force_finalize());
367        }
368
369        #[test]
370        fn test_context_should_force_finalize_total_at_threshold() {
371            let ctx = StatusCheckContext::new(10, 75, 80, 25, 75, NetworkType::Evm);
372            assert!(ctx.should_force_finalize());
373        }
374    }
375
376    mod final_state_tests {
377        use super::*;
378
379        fn verify_final_state(status: TransactionStatus) {
380            assert!(is_final_state(&status));
381        }
382
383        fn verify_not_final_state(status: TransactionStatus) {
384            assert!(!is_final_state(&status));
385        }
386
387        #[test]
388        fn test_confirmed_is_final() {
389            verify_final_state(TransactionStatus::Confirmed);
390        }
391
392        #[test]
393        fn test_failed_is_final() {
394            verify_final_state(TransactionStatus::Failed);
395        }
396
397        #[test]
398        fn test_canceled_is_final() {
399            verify_final_state(TransactionStatus::Canceled);
400        }
401
402        #[test]
403        fn test_expired_is_final() {
404            verify_final_state(TransactionStatus::Expired);
405        }
406
407        #[test]
408        fn test_pending_is_not_final() {
409            verify_not_final_state(TransactionStatus::Pending);
410        }
411
412        #[test]
413        fn test_sent_is_not_final() {
414            verify_not_final_state(TransactionStatus::Sent);
415        }
416
417        #[test]
418        fn test_submitted_is_not_final() {
419            verify_not_final_state(TransactionStatus::Submitted);
420        }
421
422        #[test]
423        fn test_mined_is_not_final() {
424            verify_not_final_state(TransactionStatus::Mined);
425        }
426    }
427
428    mod handle_result_tests {
429        use super::*;
430
431        /// Tests that counter increment uses saturating_add to prevent overflow
432        #[test]
433        fn test_counter_increment_saturating() {
434            let consecutive: u32 = u32::MAX;
435            let total: u32 = u32::MAX;
436
437            let new_consecutive = consecutive.saturating_add(1);
438            let new_total = total.saturating_add(1);
439
440            // Should not overflow, stays at MAX
441            assert_eq!(new_consecutive, u32::MAX);
442            assert_eq!(new_total, u32::MAX);
443        }
444
445        /// Tests normal counter increment
446        #[test]
447        fn test_counter_increment_normal() {
448            let consecutive: u32 = 5;
449            let total: u32 = 10;
450
451            let new_consecutive = consecutive.saturating_add(1);
452            let new_total = total.saturating_add(1);
453
454            assert_eq!(new_consecutive, 6);
455            assert_eq!(new_total, 11);
456        }
457
458        /// Tests that consecutive counter resets to 0 on success (non-final)
459        #[test]
460        fn test_consecutive_reset_on_success() {
461            // When status check succeeds but tx is not final,
462            // consecutive should reset to 0, total stays unchanged
463            let total: u32 = 20;
464
465            // On success, consecutive resets
466            let new_consecutive = 0;
467            let new_total = total; // unchanged
468
469            assert_eq!(new_consecutive, 0);
470            assert_eq!(new_total, 20);
471        }
472
473        /// Tests that final states are correctly identified for cleanup
474        #[test]
475        fn test_final_state_triggers_cleanup() {
476            let final_states = vec![
477                TransactionStatus::Confirmed,
478                TransactionStatus::Failed,
479                TransactionStatus::Canceled,
480                TransactionStatus::Expired,
481            ];
482
483            for status in final_states {
484                assert!(
485                    is_final_state(&status),
486                    "Expected {status:?} to be a final state"
487                );
488            }
489        }
490
491        /// Tests that non-final states trigger retry
492        #[test]
493        fn test_non_final_state_triggers_retry() {
494            let non_final_states = vec![
495                TransactionStatus::Pending,
496                TransactionStatus::Sent,
497                TransactionStatus::Submitted,
498                TransactionStatus::Mined,
499            ];
500
501            for status in non_final_states {
502                assert!(
503                    !is_final_state(&status),
504                    "Expected {status:?} to NOT be a final state"
505                );
506            }
507        }
508    }
509
510    mod handle_request_result_tests {
511        use super::*;
512
513        #[tokio::test]
514        async fn test_handle_result_ignores_concurrent_update_conflict() {
515            let tx_repo = MockTransactionRepository::new();
516
517            let result = handle_result(
518                Err(TransactionError::ConcurrentUpdateConflict("tx race".to_string()).into()),
519                &tx_repo,
520                "tx-1",
521                Some(TransactionMetadata {
522                    consecutive_failures: 2,
523                    total_failures: 5,
524                    ..Default::default()
525                }),
526                true,
527            )
528            .await;
529
530            assert!(result.is_ok());
531        }
532
533        #[test]
534        fn test_handle_request_result_with_counters() {
535            let result = HandleRequestResult {
536                result: Ok(TransactionRepoModel::default()),
537                metadata: Some(TransactionMetadata {
538                    consecutive_failures: 5,
539                    total_failures: 10,
540                    insufficient_fee_retries: 2,
541                    try_again_later_retries: 1,
542                }),
543                should_retry_on_error: true,
544            };
545
546            assert!(result.result.is_ok());
547            let meta = result.metadata.unwrap();
548            assert_eq!(meta.consecutive_failures, 5);
549            assert_eq!(meta.total_failures, 10);
550            assert!(result.should_retry_on_error);
551        }
552
553        #[test]
554        fn test_handle_request_result_without_counters() {
555            // Early failure before counters could be read
556            let result = HandleRequestResult {
557                result: Err(eyre::eyre!("Transaction not found")),
558                metadata: None,
559                should_retry_on_error: false,
560            };
561
562            assert!(result.result.is_err());
563            assert!(result.metadata.is_none());
564            assert!(!result.should_retry_on_error);
565        }
566
567        #[test]
568        fn test_permanent_error_should_not_retry() {
569            // NotFound errors are permanent - should not retry
570            let result = HandleRequestResult {
571                result: Err(eyre::eyre!("Transaction not found")),
572                metadata: None,
573                should_retry_on_error: false,
574            };
575
576            // Permanent errors have should_retry_on_error = false
577            assert!(!result.should_retry_on_error);
578        }
579
580        #[test]
581        fn test_transient_error_should_retry() {
582            // Network/connection errors are transient - should retry
583            let result = HandleRequestResult {
584                result: Err(eyre::eyre!("Connection timeout")),
585                metadata: Some(TransactionMetadata {
586                    consecutive_failures: 3,
587                    total_failures: 7,
588                    insufficient_fee_retries: 1,
589                    try_again_later_retries: 0,
590                }),
591                should_retry_on_error: true,
592            };
593
594            // Transient errors have should_retry_on_error = true
595            assert!(result.should_retry_on_error);
596        }
597    }
598}