1use actix_web::web::ThinData;
8use eyre::Result;
9use tracing::{debug, info, instrument, warn};
10
11use crate::{
12 constants::{get_max_consecutive_status_failures, get_max_total_status_failures},
13 domain::{get_relayer_transaction, get_transaction_by_id, is_final_state, Transaction},
14 jobs::{Job, StatusCheckContext, TransactionStatusCheck},
15 models::{
16 ApiError, DefaultAppState, TransactionError, TransactionMetadata, TransactionRepoModel,
17 },
18 observability::request_id::set_request_id,
19 queues::{HandlerError, WorkerContext},
20 repositories::TransactionRepository,
21};
22
23#[instrument(
24 level = "debug",
25 skip(job, state, ctx),
26 fields(
27 request_id = ?job.request_id,
28 job_id = %job.message_id,
29 job_type = %job.job_type.to_string(),
30 attempt = %ctx.attempt,
31 tx_id = %job.data.transaction_id,
32 relayer_id = %job.data.relayer_id,
33 task_id = %ctx.task_id,
34 )
35)]
36pub async fn transaction_status_handler(
37 job: Job<TransactionStatusCheck>,
38 state: ThinData<DefaultAppState>,
39 ctx: WorkerContext,
40) -> Result<(), HandlerError> {
41 if let Some(request_id) = job.request_id.clone() {
42 set_request_id(request_id);
43 }
44
45 let tx_repo = state.transaction_repository();
46
47 let req_result = handle_request(&job.data, &state, ctx.attempt, &ctx.task_id).await;
49
50 let tx_id = &job.data.transaction_id;
51
52 handle_result(
54 req_result.result,
55 &*tx_repo,
56 tx_id,
57 req_result.metadata,
58 req_result.should_retry_on_error,
59 )
60 .await
61}
62
63async fn handle_result<TR>(
74 result: Result<TransactionRepoModel>,
75 tx_repo: &TR,
76 tx_id: &str,
77 metadata: Option<TransactionMetadata>,
78 should_retry_on_error: bool,
79) -> Result<(), HandlerError>
80where
81 TR: TransactionRepository + Send + Sync,
82{
83 match result {
84 Ok(tx) if is_final_state(&tx.status) => {
85 debug!(
88 tx_id = %tx.id,
89 relayer_id = %tx.relayer_id,
90 status = ?tx.status,
91 consecutive_failures = ?metadata.as_ref().map(|m| m.consecutive_failures),
92 total_failures = ?metadata.as_ref().map(|m| m.total_failures),
93 "transaction in final state, status check complete"
94 );
95
96 Ok(())
97 }
98 Ok(tx) => {
99 debug!(
101 tx_id = %tx.id,
102 relayer_id = %tx.relayer_id,
103 status = ?tx.status,
104 "transaction not in final state"
105 );
106
107 let fresh_meta = tx.metadata.clone().or(metadata);
110 if let Some(meta) = fresh_meta {
111 if meta.consecutive_failures > 0 {
112 if let Err(e) = tx_repo
113 .reset_status_check_consecutive_failures(tx_id.to_string())
114 .await
115 {
116 warn!(error = %e, tx_id = %tx_id, relayer_id = %tx.relayer_id, "failed to reset consecutive counter");
117 }
118 }
119 }
120
121 Err(HandlerError::Retry(format!(
123 "transaction status: {:?} - not in final state, retrying",
124 tx.status
125 )))
126 }
127 Err(e) => {
128 if e.downcast_ref::<TransactionError>()
129 .is_some_and(TransactionError::is_concurrent_update_conflict)
130 {
131 info!(
132 error = %e,
133 tx_id = %tx_id,
134 "status check lost a concurrent update race, completing job without counter changes"
135 );
136 return Ok(());
137 }
138
139 if !should_retry_on_error {
141 info!(
142 error = %e,
143 tx_id = %tx_id,
144 "status check failed with permanent error, completing job without retry"
145 );
146 return Ok(());
147 }
148
149 if let Some(meta) = metadata {
151 warn!(
152 error = %e,
153 tx_id = %tx_id,
154 consecutive_failures = meta.consecutive_failures.saturating_add(1),
155 total_failures = meta.total_failures.saturating_add(1),
156 "status check failed, incrementing failure counters"
157 );
158
159 if let Err(update_err) = tx_repo
161 .increment_status_check_failures(tx_id.to_string())
162 .await
163 {
164 warn!(error = %update_err, tx_id = %tx_id, "failed to update counters");
165 }
166 } else {
167 warn!(
169 error = %e,
170 tx_id = %tx_id,
171 "status check failed early, counters not available"
172 );
173 }
174
175 Err(HandlerError::Retry(format!("{e}")))
177 }
178 }
179}
180
181struct HandleRequestResult {
183 result: Result<TransactionRepoModel>,
184 metadata: Option<TransactionMetadata>,
187 should_retry_on_error: bool,
189}
190
191async fn handle_request(
195 status_request: &TransactionStatusCheck,
196 state: &ThinData<DefaultAppState>,
197 attempt: usize,
198 task_id: &str,
199) -> HandleRequestResult {
200 let tx_id = &status_request.transaction_id;
201 debug!(
202 tx_id = %tx_id,
203 relayer_id = %status_request.relayer_id,
204 "handling transaction status check"
205 );
206
207 let transaction = match get_transaction_by_id(tx_id.clone(), state).await {
209 Ok(tx) => tx,
210 Err(ApiError::NotFound(msg)) => {
211 warn!(tx_id = %tx_id, "transaction not found, completing job without retry: {}", msg);
213 return HandleRequestResult {
214 result: Err(eyre::eyre!("Transaction not found: {}", msg)),
215 metadata: None,
216 should_retry_on_error: false,
217 };
218 }
219 Err(e) => {
220 return HandleRequestResult {
222 result: Err(e.into()),
223 metadata: None,
224 should_retry_on_error: true,
225 };
226 }
227 };
228
229 let meta = transaction.metadata.clone().unwrap_or_default();
231
232 let network_type = transaction.network_type;
234 let max_consecutive = get_max_consecutive_status_failures(network_type);
235 let max_total = get_max_total_status_failures(network_type);
236
237 debug!(
238 tx_id = %tx_id,
239 consecutive_failures = meta.consecutive_failures,
240 total_failures = meta.total_failures,
241 max_consecutive,
242 max_total,
243 attempt,
244 task_id = %task_id,
245 "handling transaction status check"
246 );
247
248 let context = StatusCheckContext::new(
250 meta.consecutive_failures,
251 meta.total_failures,
252 attempt as u32,
253 max_consecutive,
254 max_total,
255 network_type,
256 );
257
258 let relayer_transaction =
260 match get_relayer_transaction(status_request.relayer_id.clone(), state).await {
261 Ok(rt) => rt,
262 Err(ApiError::NotFound(msg)) => {
263 warn!(
265 tx_id = %tx_id,
266 relayer_id = %status_request.relayer_id,
267 "relayer or signer not found, completing job without retry: {}", msg
268 );
269 return HandleRequestResult {
270 result: Err(eyre::eyre!("Relayer or signer not found: {}", msg)),
271 metadata: Some(meta),
272 should_retry_on_error: false,
273 };
274 }
275 Err(e) => {
276 return HandleRequestResult {
278 result: Err(e.into()),
279 metadata: Some(meta),
280 should_retry_on_error: true,
281 };
282 }
283 };
284
285 let result = relayer_transaction
287 .handle_transaction_status(transaction, Some(context))
288 .await
289 .map_err(|e| e.into());
290
291 if let Ok(tx) = result.as_ref() {
292 debug!(
293 tx_id = %tx.id,
294 status = ?tx.status,
295 "status check handled successfully"
296 );
297 }
298
299 HandleRequestResult {
300 result,
301 metadata: Some(meta),
302 should_retry_on_error: true,
303 }
304}
305
306#[cfg(test)]
307mod tests {
308 use super::*;
309 use crate::{
310 models::{NetworkType, TransactionStatus},
311 repositories::MockTransactionRepository,
312 };
313 use std::collections::HashMap;
314
315 #[tokio::test]
316 async fn test_status_check_job_validation() {
317 let check_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm);
318 let job = Job::new(crate::jobs::JobType::TransactionStatusCheck, check_job);
319
320 assert_eq!(job.data.transaction_id, "tx123");
321 assert_eq!(job.data.relayer_id, "relayer-1");
322 assert!(job.data.metadata.is_none());
323 }
324
325 #[tokio::test]
326 async fn test_status_check_with_metadata() {
327 let mut metadata = HashMap::new();
328 metadata.insert("retry_count".to_string(), "2".to_string());
329 metadata.insert("last_status".to_string(), "pending".to_string());
330
331 let check_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm)
332 .with_metadata(metadata.clone());
333
334 assert!(check_job.metadata.is_some());
335 let job_metadata = check_job.metadata.unwrap();
336 assert_eq!(job_metadata.get("retry_count").unwrap(), "2");
337 assert_eq!(job_metadata.get("last_status").unwrap(), "pending");
338 }
339
340 #[test]
341 fn test_status_check_network_type_required() {
342 let check_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm);
344 assert!(check_job.network_type.is_some());
345
346 let solana_job = TransactionStatusCheck::new("tx456", "relayer-2", NetworkType::Solana);
348 assert_eq!(solana_job.network_type, Some(NetworkType::Solana));
349
350 let stellar_job = TransactionStatusCheck::new("tx789", "relayer-3", NetworkType::Stellar);
351 assert_eq!(stellar_job.network_type, Some(NetworkType::Stellar));
352 }
353
354 mod context_tests {
355 use super::*;
356
357 #[test]
358 fn test_context_should_force_finalize_below_threshold() {
359 let ctx = StatusCheckContext::new(5, 10, 15, 25, 75, NetworkType::Evm);
360 assert!(!ctx.should_force_finalize());
361 }
362
363 #[test]
364 fn test_context_should_force_finalize_consecutive_at_threshold() {
365 let ctx = StatusCheckContext::new(25, 30, 35, 25, 75, NetworkType::Evm);
366 assert!(ctx.should_force_finalize());
367 }
368
369 #[test]
370 fn test_context_should_force_finalize_total_at_threshold() {
371 let ctx = StatusCheckContext::new(10, 75, 80, 25, 75, NetworkType::Evm);
372 assert!(ctx.should_force_finalize());
373 }
374 }
375
376 mod final_state_tests {
377 use super::*;
378
379 fn verify_final_state(status: TransactionStatus) {
380 assert!(is_final_state(&status));
381 }
382
383 fn verify_not_final_state(status: TransactionStatus) {
384 assert!(!is_final_state(&status));
385 }
386
387 #[test]
388 fn test_confirmed_is_final() {
389 verify_final_state(TransactionStatus::Confirmed);
390 }
391
392 #[test]
393 fn test_failed_is_final() {
394 verify_final_state(TransactionStatus::Failed);
395 }
396
397 #[test]
398 fn test_canceled_is_final() {
399 verify_final_state(TransactionStatus::Canceled);
400 }
401
402 #[test]
403 fn test_expired_is_final() {
404 verify_final_state(TransactionStatus::Expired);
405 }
406
407 #[test]
408 fn test_pending_is_not_final() {
409 verify_not_final_state(TransactionStatus::Pending);
410 }
411
412 #[test]
413 fn test_sent_is_not_final() {
414 verify_not_final_state(TransactionStatus::Sent);
415 }
416
417 #[test]
418 fn test_submitted_is_not_final() {
419 verify_not_final_state(TransactionStatus::Submitted);
420 }
421
422 #[test]
423 fn test_mined_is_not_final() {
424 verify_not_final_state(TransactionStatus::Mined);
425 }
426 }
427
428 mod handle_result_tests {
429 use super::*;
430
431 #[test]
433 fn test_counter_increment_saturating() {
434 let consecutive: u32 = u32::MAX;
435 let total: u32 = u32::MAX;
436
437 let new_consecutive = consecutive.saturating_add(1);
438 let new_total = total.saturating_add(1);
439
440 assert_eq!(new_consecutive, u32::MAX);
442 assert_eq!(new_total, u32::MAX);
443 }
444
445 #[test]
447 fn test_counter_increment_normal() {
448 let consecutive: u32 = 5;
449 let total: u32 = 10;
450
451 let new_consecutive = consecutive.saturating_add(1);
452 let new_total = total.saturating_add(1);
453
454 assert_eq!(new_consecutive, 6);
455 assert_eq!(new_total, 11);
456 }
457
458 #[test]
460 fn test_consecutive_reset_on_success() {
461 let total: u32 = 20;
464
465 let new_consecutive = 0;
467 let new_total = total; assert_eq!(new_consecutive, 0);
470 assert_eq!(new_total, 20);
471 }
472
473 #[test]
475 fn test_final_state_triggers_cleanup() {
476 let final_states = vec![
477 TransactionStatus::Confirmed,
478 TransactionStatus::Failed,
479 TransactionStatus::Canceled,
480 TransactionStatus::Expired,
481 ];
482
483 for status in final_states {
484 assert!(
485 is_final_state(&status),
486 "Expected {status:?} to be a final state"
487 );
488 }
489 }
490
491 #[test]
493 fn test_non_final_state_triggers_retry() {
494 let non_final_states = vec![
495 TransactionStatus::Pending,
496 TransactionStatus::Sent,
497 TransactionStatus::Submitted,
498 TransactionStatus::Mined,
499 ];
500
501 for status in non_final_states {
502 assert!(
503 !is_final_state(&status),
504 "Expected {status:?} to NOT be a final state"
505 );
506 }
507 }
508 }
509
510 mod handle_request_result_tests {
511 use super::*;
512
513 #[tokio::test]
514 async fn test_handle_result_ignores_concurrent_update_conflict() {
515 let tx_repo = MockTransactionRepository::new();
516
517 let result = handle_result(
518 Err(TransactionError::ConcurrentUpdateConflict("tx race".to_string()).into()),
519 &tx_repo,
520 "tx-1",
521 Some(TransactionMetadata {
522 consecutive_failures: 2,
523 total_failures: 5,
524 ..Default::default()
525 }),
526 true,
527 )
528 .await;
529
530 assert!(result.is_ok());
531 }
532
533 #[test]
534 fn test_handle_request_result_with_counters() {
535 let result = HandleRequestResult {
536 result: Ok(TransactionRepoModel::default()),
537 metadata: Some(TransactionMetadata {
538 consecutive_failures: 5,
539 total_failures: 10,
540 insufficient_fee_retries: 2,
541 try_again_later_retries: 1,
542 }),
543 should_retry_on_error: true,
544 };
545
546 assert!(result.result.is_ok());
547 let meta = result.metadata.unwrap();
548 assert_eq!(meta.consecutive_failures, 5);
549 assert_eq!(meta.total_failures, 10);
550 assert!(result.should_retry_on_error);
551 }
552
553 #[test]
554 fn test_handle_request_result_without_counters() {
555 let result = HandleRequestResult {
557 result: Err(eyre::eyre!("Transaction not found")),
558 metadata: None,
559 should_retry_on_error: false,
560 };
561
562 assert!(result.result.is_err());
563 assert!(result.metadata.is_none());
564 assert!(!result.should_retry_on_error);
565 }
566
567 #[test]
568 fn test_permanent_error_should_not_retry() {
569 let result = HandleRequestResult {
571 result: Err(eyre::eyre!("Transaction not found")),
572 metadata: None,
573 should_retry_on_error: false,
574 };
575
576 assert!(!result.should_retry_on_error);
578 }
579
580 #[test]
581 fn test_transient_error_should_retry() {
582 let result = HandleRequestResult {
584 result: Err(eyre::eyre!("Connection timeout")),
585 metadata: Some(TransactionMetadata {
586 consecutive_failures: 3,
587 total_failures: 7,
588 insufficient_fee_retries: 1,
589 try_again_later_retries: 0,
590 }),
591 should_retry_on_error: true,
592 };
593
594 assert!(result.should_retry_on_error);
596 }
597 }
598}