openzeppelin_relayer/queues/sqs/
worker.rs

1//! SQS worker implementation for polling and processing messages.
2//!
3//! This module provides worker tasks that poll SQS queues and process jobs
4//! using the existing handler functions.
5
6use std::future::Future;
7use std::panic::AssertUnwindSafe;
8use std::sync::Arc;
9use std::time::Duration;
10
11use actix_web::web::ThinData;
12use aws_sdk_sqs::error::{ProvideErrorMetadata, SdkError};
13use aws_sdk_sqs::types::{
14    DeleteMessageBatchRequestEntry, Message, MessageAttributeValue, MessageSystemAttributeName,
15};
16use futures::FutureExt;
17use serde::de::DeserializeOwned;
18use tokio::sync::watch;
19use tokio::task::{JoinHandle, JoinSet};
20use tracing::{debug, error, info, warn};
21
22use crate::metrics::observe_queue_pickup_latency;
23use crate::queues::{backoff_config_for_queue, retry_delay_secs};
24use crate::{
25    config::ServerConfig,
26    jobs::{
27        notification_handler, relayer_health_check_handler, token_swap_request_handler,
28        transaction_request_handler, transaction_status_handler, transaction_submission_handler,
29        Job, NotificationSend, RelayerHealthCheck, TokenSwapRequest, TransactionRequest,
30        TransactionSend, TransactionStatusCheck,
31    },
32};
33
34use super::{HandlerError, WorkerContext};
35use super::{QueueBackendError, QueueType, WorkerHandle};
36
37#[derive(Debug)]
38enum ProcessingError {
39    Retryable(String),
40    Permanent(String),
41}
42
43/// Outcome of processing a single SQS message, used to decide whether the
44/// message should be batch-deleted or left in the queue.
45#[derive(Debug)]
46enum MessageOutcome {
47    /// Message processed successfully — should be deleted from queue.
48    Delete { receipt_handle: String },
49    /// Message should remain in queue (e.g. status-check retry via visibility
50    /// change, or retryable error awaiting visibility timeout).
51    Retain,
52}
53
54/// Configuration for a single SQS poll loop, bundling parameters that
55/// would otherwise require too many function arguments.
56#[derive(Clone)]
57struct PollLoopConfig {
58    queue_type: QueueType,
59    polling_interval: u64,
60    visibility_timeout: u32,
61    handler_timeout: Duration,
62    max_retries: usize,
63    poller_id: usize,
64    poller_count: usize,
65}
66
67/// Spawns a worker task for a specific SQS queue.
68///
69/// The worker continuously polls the queue, processes messages, and handles
70/// retries via SQS visibility timeout.
71///
72/// # Arguments
73/// * `sqs_client` - AWS SQS client for all operations (poll, send, delete, change visibility)
74/// * `queue_type` - Type of queue (determines handler and concurrency)
75/// * `queue_url` - SQS queue URL
76/// * `app_state` - Application state with repositories and services
77///
78/// # Returns
79/// JoinHandle to the spawned worker task
80pub async fn spawn_worker_for_queue(
81    sqs_client: aws_sdk_sqs::Client,
82    queue_type: QueueType,
83    queue_url: String,
84    app_state: Arc<ThinData<crate::models::DefaultAppState>>,
85    shutdown_rx: watch::Receiver<bool>,
86) -> Result<WorkerHandle, QueueBackendError> {
87    let concurrency = get_concurrency_for_queue(queue_type);
88    let max_retries = queue_type.max_retries();
89    let polling_interval = get_wait_time_for_queue(queue_type);
90    let poller_count = get_poller_count_for_queue(queue_type);
91    let visibility_timeout = queue_type.visibility_timeout_secs();
92    let handler_timeout_secs = handler_timeout_secs(queue_type);
93    let handler_timeout = Duration::from_secs(handler_timeout_secs);
94
95    info!(
96        queue_type = ?queue_type,
97        queue_url = %queue_url,
98        concurrency = concurrency,
99        max_retries = max_retries,
100        polling_interval_secs = polling_interval,
101        poller_count = poller_count,
102        visibility_timeout_secs = visibility_timeout,
103        handler_timeout_secs = handler_timeout_secs,
104        "Spawning SQS worker"
105    );
106
107    // All pollers share the same semaphore so total concurrency is bounded.
108    let semaphore = Arc::new(tokio::sync::Semaphore::new(concurrency));
109
110    let handle: JoinHandle<()> = tokio::spawn(async move {
111        let mut poller_handles: JoinSet<()> = JoinSet::new();
112
113        for poller_id in 0..poller_count {
114            let client = sqs_client.clone();
115            let url = queue_url.clone();
116            let state = app_state.clone();
117            let sem = semaphore.clone();
118            let mut rx = shutdown_rx.clone();
119            let config = PollLoopConfig {
120                queue_type,
121                polling_interval,
122                visibility_timeout,
123                handler_timeout,
124                max_retries,
125                poller_id,
126                poller_count,
127            };
128
129            poller_handles.spawn(async move {
130                run_poll_loop(client, url, state, sem, &mut rx, config).await;
131            });
132        }
133
134        // Wait for all pollers to finish (they exit on shutdown signal)
135        while let Some(join_result) = poller_handles.join_next().await {
136            if let Err(err) = join_result {
137                error!(
138                    queue_type = ?queue_type,
139                    error = %err,
140                    "SQS poller task terminated unexpectedly"
141                );
142            }
143        }
144        info!(queue_type = ?queue_type, "SQS worker stopped");
145    });
146
147    Ok(WorkerHandle::Tokio(handle))
148}
149
150/// Runs a single SQS poll loop. Multiple instances may share the same semaphore
151/// to increase pickup smoothness without exceeding handler concurrency limits.
152async fn run_poll_loop(
153    sqs_client: aws_sdk_sqs::Client,
154    queue_url: String,
155    app_state: Arc<ThinData<crate::models::DefaultAppState>>,
156    semaphore: Arc<tokio::sync::Semaphore>,
157    shutdown_rx: &mut watch::Receiver<bool>,
158    config: PollLoopConfig,
159) {
160    let PollLoopConfig {
161        queue_type,
162        polling_interval,
163        visibility_timeout,
164        handler_timeout,
165        max_retries,
166        poller_id,
167        poller_count,
168    } = config;
169    let mut inflight: JoinSet<Option<String>> = JoinSet::new();
170    let mut consecutive_poll_errors: u32 = 0;
171    let mut pending_deletes: Vec<String> = Vec::new();
172
173    loop {
174        // Reap completed tasks and collect receipt handles for batch delete
175        while let Some(result) = inflight.try_join_next() {
176            match result {
177                Ok(Some(receipt_handle)) => pending_deletes.push(receipt_handle),
178                Ok(None) => {} // Retained message, no delete needed
179                Err(e) => {
180                    warn!(
181                        queue_type = ?queue_type,
182                        poller_id = poller_id,
183                        error = %e,
184                        "In-flight task failed"
185                    );
186                }
187            }
188        }
189
190        // Flush any accumulated deletes as a batch
191        if !pending_deletes.is_empty() {
192            flush_delete_batch(&sqs_client, &queue_url, &pending_deletes, queue_type).await;
193            pending_deletes.clear();
194        }
195
196        // Check shutdown before each iteration
197        if *shutdown_rx.borrow() {
198            info!(queue_type = ?queue_type, poller_id = poller_id, "Shutdown signal received, stopping SQS poller");
199            break;
200        }
201
202        // Distribute available permits fairly across pollers to prevent
203        // collective overfetch. Each poller gets floor(available / N)
204        // messages, and the first (available % N) pollers (by poller_id)
205        // each get one extra from the remainder. This ensures:
206        // - No stall: at least one poller polls when any permits exist
207        // - Bounded overfetch: at most poller_count extra from racing
208        let available_permits = semaphore.available_permits();
209        let base_share = available_permits / poller_count;
210        let remainder = available_permits % poller_count;
211        let my_share = base_share + usize::from(poller_id < remainder);
212        if my_share == 0 {
213            tokio::select! {
214                _ = tokio::time::sleep(Duration::from_millis(50)) => continue,
215                _ = shutdown_rx.changed() => {
216                    info!(queue_type = ?queue_type, poller_id = poller_id, "Shutdown signal received, stopping SQS poller");
217                    break;
218                }
219            }
220        }
221
222        // SQS MaxNumberOfMessages must be 1-10.
223        let batch_size = my_share.min(10) as i32;
224
225        // Poll SQS for messages, racing with shutdown signal
226        let messages_result = tokio::select! {
227            result = sqs_client
228                .receive_message()
229                .queue_url(&queue_url)
230                .max_number_of_messages(batch_size) // SQS max is 10
231                .wait_time_seconds(polling_interval as i32)
232                .visibility_timeout(visibility_timeout as i32)
233                .message_system_attribute_names(MessageSystemAttributeName::ApproximateReceiveCount)
234                .message_system_attribute_names(MessageSystemAttributeName::MessageGroupId)
235                .message_system_attribute_names(MessageSystemAttributeName::SentTimestamp)
236                .message_attribute_names("target_scheduled_on")
237                .message_attribute_names("retry_attempt")
238                .send() => result,
239            _ = shutdown_rx.changed() => {
240                info!(queue_type = ?queue_type, poller_id = poller_id, "Shutdown signal received during SQS poll, stopping poller");
241                break;
242            }
243        };
244
245        match messages_result {
246            Ok(output) => {
247                if consecutive_poll_errors > 0 {
248                    info!(
249                        queue_type = ?queue_type,
250                        poller_id = poller_id,
251                        previous_errors = consecutive_poll_errors,
252                        "SQS polling recovered after consecutive errors"
253                    );
254                }
255                consecutive_poll_errors = 0;
256
257                if let Some(messages) = output.messages {
258                    if !messages.is_empty() {
259                        debug!(
260                            queue_type = ?queue_type,
261                            poller_id = poller_id,
262                            message_count = messages.len(),
263                            "Received messages from SQS"
264                        );
265
266                        // Process messages concurrently (up to semaphore limit)
267                        for message in messages {
268                            let permit = match semaphore.clone().acquire_owned().await {
269                                Ok(permit) => permit,
270                                Err(err) => {
271                                    error!(
272                                        queue_type = ?queue_type,
273                                        poller_id = poller_id,
274                                        error = %err,
275                                        "Semaphore closed, stopping SQS poller loop"
276                                    );
277                                    return;
278                                }
279                            };
280                            let client = sqs_client.clone();
281                            let url = queue_url.clone();
282                            let state = app_state.clone();
283
284                            inflight.spawn(async move {
285                                let _permit = permit; // always dropped, even on panic
286
287                                let result = tokio::time::timeout(
288                                    handler_timeout,
289                                    AssertUnwindSafe(process_message(
290                                        client.clone(),
291                                        message,
292                                        queue_type,
293                                        &url,
294                                        state,
295                                        max_retries,
296                                    ))
297                                    .catch_unwind(),
298                                )
299                                .await;
300
301                                match result {
302                                    Ok(Ok(Ok(MessageOutcome::Delete { receipt_handle }))) => {
303                                        Some(receipt_handle)
304                                    }
305                                    Ok(Ok(Ok(MessageOutcome::Retain))) => None,
306                                    Ok(Ok(Err(e))) => {
307                                        error!(
308                                            queue_type = ?queue_type,
309                                            error = %e,
310                                            "Failed to process message"
311                                        );
312                                        None
313                                    }
314                                    Ok(Err(panic_info)) => {
315                                        let msg = panic_info
316                                            .downcast_ref::<String>()
317                                            .map(|s| s.as_str())
318                                            .or_else(|| {
319                                                panic_info.downcast_ref::<&str>().copied()
320                                            })
321                                            .unwrap_or("unknown panic");
322                                        error!(
323                                            queue_type = ?queue_type,
324                                            panic = %msg,
325                                            "Message handler panicked"
326                                        );
327                                        None
328                                    }
329                                    Err(_) => {
330                                        error!(
331                                            queue_type = ?queue_type,
332                                            timeout_secs = handler_timeout.as_secs(),
333                                            "Message handler timed out; message will be retried after visibility timeout"
334                                        );
335                                        None
336                                    }
337                                }
338                            });
339                        }
340                    }
341                }
342            }
343            Err(e) => {
344                consecutive_poll_errors = consecutive_poll_errors.saturating_add(1);
345                let backoff_secs = poll_error_backoff_secs(consecutive_poll_errors);
346                let (error_kind, error_code, error_message) = match &e {
347                    SdkError::ServiceError(ctx) => {
348                        ("service", ctx.err().code(), ctx.err().message())
349                    }
350                    SdkError::DispatchFailure(_) => ("dispatch", None, None),
351                    SdkError::ResponseError(_) => ("response", None, None),
352                    SdkError::TimeoutError(_) => ("timeout", None, None),
353                    _ => ("other", None, None),
354                };
355                error!(
356                    queue_type = ?queue_type,
357                    poller_id = poller_id,
358                    error_kind = error_kind,
359                    error_code = error_code.unwrap_or("unknown"),
360                    error_message = error_message.unwrap_or("n/a"),
361                    error = %e,
362                    error_debug = ?e,
363                    consecutive_errors = consecutive_poll_errors,
364                    backoff_secs = backoff_secs,
365                    "Failed to receive messages from SQS, backing off"
366                );
367                tokio::select! {
368                    _ = tokio::time::sleep(Duration::from_secs(backoff_secs)) => {}
369                    _ = shutdown_rx.changed() => {
370                        info!(queue_type = ?queue_type, poller_id = poller_id, "Shutdown signal received during backoff, stopping poller");
371                        break;
372                    }
373                }
374            }
375        }
376    }
377
378    // Drain in-flight tasks before shutdown, collecting final deletes
379    if !inflight.is_empty() {
380        info!(
381            queue_type = ?queue_type,
382            poller_id = poller_id,
383            count = inflight.len(),
384            "Draining in-flight tasks before shutdown"
385        );
386        match tokio::time::timeout(Duration::from_secs(30), async {
387            while let Some(result) = inflight.join_next().await {
388                match result {
389                    Ok(Some(receipt_handle)) => pending_deletes.push(receipt_handle),
390                    Ok(None) => {}
391                    Err(e) => {
392                        warn!(
393                            queue_type = ?queue_type,
394                            poller_id = poller_id,
395                            error = %e,
396                            "In-flight task failed during drain"
397                        );
398                    }
399                }
400            }
401        })
402        .await
403        {
404            Ok(()) => {
405                info!(queue_type = ?queue_type, poller_id = poller_id, "All in-flight tasks drained")
406            }
407            Err(_) => {
408                warn!(
409                    queue_type = ?queue_type,
410                    poller_id = poller_id,
411                    remaining = inflight.len(),
412                    "Drain timeout, abandoning remaining tasks"
413                );
414                inflight.abort_all();
415            }
416        }
417    }
418
419    // Flush any remaining deletes accumulated during drain
420    if !pending_deletes.is_empty() {
421        flush_delete_batch(&sqs_client, &queue_url, &pending_deletes, queue_type).await;
422    }
423}
424
425/// Processes a single SQS message.
426///
427/// Routes the message to the appropriate handler based on queue type,
428/// handles success/failure, and manages message deletion/retry.
429async fn process_message(
430    sqs_client: aws_sdk_sqs::Client,
431    message: Message,
432    queue_type: QueueType,
433    queue_url: &str,
434    app_state: Arc<ThinData<crate::models::DefaultAppState>>,
435    max_retries: usize,
436) -> Result<MessageOutcome, QueueBackendError> {
437    let body = message
438        .body()
439        .ok_or_else(|| QueueBackendError::QueueError("Empty message body".to_string()))?;
440
441    let receipt_handle = message
442        .receipt_handle()
443        .ok_or_else(|| QueueBackendError::QueueError("Missing receipt handle".to_string()))?;
444
445    // For jobs with scheduling beyond SQS 15-minute max delay, keep deferring in hops.
446    if let Some(target_scheduled_on) = parse_target_scheduled_on(&message) {
447        let now = std::time::SystemTime::now()
448            .duration_since(std::time::SystemTime::UNIX_EPOCH)
449            .map_err(|e| QueueBackendError::QueueError(format!("System clock error: {e}")))?
450            .as_secs() as i64;
451        let remaining = target_scheduled_on - now;
452        if remaining > 0 {
453            let should_delete_original = defer_message(
454                &sqs_client,
455                queue_url,
456                body.to_string(),
457                &message,
458                target_scheduled_on,
459                remaining.min(900) as i32,
460            )
461            .await?;
462
463            debug!(
464                queue_type = ?queue_type,
465                remaining_seconds = remaining,
466                "Deferred scheduled SQS message for next delay hop"
467            );
468            return if should_delete_original {
469                Ok(MessageOutcome::Delete {
470                    receipt_handle: receipt_handle.to_string(),
471                })
472            } else {
473                Ok(MessageOutcome::Retain)
474            };
475        }
476    }
477
478    // Get retry attempt count from message attributes
479    let receive_count = message
480        .attributes()
481        .and_then(|attrs| attrs.get(&MessageSystemAttributeName::ApproximateReceiveCount))
482        .and_then(|count| count.parse::<usize>().ok())
483        .unwrap_or(1);
484    // SQS receive count starts at 1; Apalis Attempt starts at 0.
485    let attempt_number = receive_count.saturating_sub(1);
486    // Persisted retry attempt for self-reenqueued status checks. Falls back to receive_count-based
487    // attempt when attribute is missing.
488    let logical_retry_attempt = parse_retry_attempt(&message).unwrap_or(attempt_number);
489
490    // Observe queue pickup latency on first delivery only.
491    // For scheduled messages, measure from `target_scheduled_on` (the intended
492    // availability time) to exclude intentional scheduling delay.
493    // For immediate messages, fall back to SQS `SentTimestamp` (millis).
494    if let Some(baseline) = queue_pickup_baseline_ms(&message, receive_count) {
495        let now_ms = chrono::Utc::now().timestamp_millis();
496        let latency_secs = (now_ms - baseline).max(0) as f64 / 1000.0;
497        observe_queue_pickup_latency(queue_type.queue_name(), "sqs", latency_secs);
498    }
499
500    // Use SQS MessageId as the worker task_id for log correlation.
501    let sqs_message_id = message.message_id().unwrap_or("unknown").to_string();
502
503    debug!(
504        queue_type = ?queue_type,
505        message_id = %sqs_message_id,
506        attempt = attempt_number,
507        receive_count = receive_count,
508        max_retries = max_retries,
509        "Processing message"
510    );
511
512    // Route to appropriate handler
513    let result = match queue_type {
514        QueueType::TransactionRequest => {
515            process_job::<TransactionRequest, _, _>(
516                body,
517                app_state,
518                attempt_number,
519                sqs_message_id,
520                "TransactionRequest",
521                transaction_request_handler,
522            )
523            .await
524        }
525        QueueType::TransactionSubmission => {
526            process_job::<TransactionSend, _, _>(
527                body,
528                app_state,
529                attempt_number,
530                sqs_message_id,
531                "TransactionSend",
532                transaction_submission_handler,
533            )
534            .await
535        }
536        QueueType::StatusCheck | QueueType::StatusCheckEvm | QueueType::StatusCheckStellar => {
537            process_job::<TransactionStatusCheck, _, _>(
538                body,
539                app_state,
540                attempt_number,
541                sqs_message_id,
542                "TransactionStatusCheck",
543                transaction_status_handler,
544            )
545            .await
546        }
547        QueueType::Notification => {
548            process_job::<NotificationSend, _, _>(
549                body,
550                app_state,
551                attempt_number,
552                sqs_message_id,
553                "NotificationSend",
554                notification_handler,
555            )
556            .await
557        }
558        QueueType::TokenSwapRequest => {
559            process_job::<TokenSwapRequest, _, _>(
560                body,
561                app_state,
562                attempt_number,
563                sqs_message_id,
564                "TokenSwapRequest",
565                token_swap_request_handler,
566            )
567            .await
568        }
569        QueueType::RelayerHealthCheck => {
570            process_job::<RelayerHealthCheck, _, _>(
571                body,
572                app_state,
573                attempt_number,
574                sqs_message_id,
575                "RelayerHealthCheck",
576                relayer_health_check_handler,
577            )
578            .await
579        }
580    };
581
582    match result {
583        Ok(()) => {
584            debug!(
585                queue_type = ?queue_type,
586                attempt = attempt_number,
587                "Message processed successfully"
588            );
589
590            Ok(MessageOutcome::Delete {
591                receipt_handle: receipt_handle.to_string(),
592            })
593        }
594        Err(ProcessingError::Permanent(e)) => {
595            error!(
596                queue_type = ?queue_type,
597                attempt = attempt_number,
598                error = %e,
599                "Permanent handler failure, message will be deleted"
600            );
601
602            Ok(MessageOutcome::Delete {
603                receipt_handle: receipt_handle.to_string(),
604            })
605        }
606        Err(ProcessingError::Retryable(e)) => {
607            // Check max retries for non-infinite queues (status checks use usize::MAX)
608            if max_retries != usize::MAX && receive_count > max_retries {
609                error!(
610                    queue_type = ?queue_type,
611                    attempt = attempt_number,
612                    receive_count = receive_count,
613                    max_retries = max_retries,
614                    error = %e,
615                    "Max retries exceeded; message will be automatically moved to DLQ by SQS redrive policy"
616                );
617                return Ok(MessageOutcome::Retain);
618            }
619
620            // Compute retry delay based on queue type:
621            // - Status checks use network-type-aware backoff from the message body
622            // - All other queues use their configured backoff profile from retry_config
623            let delay = if queue_type.is_status_check() {
624                compute_status_retry_delay(body, logical_retry_attempt)
625            } else {
626                retry_delay_secs(backoff_config_for_queue(queue_type), logical_retry_attempt)
627            };
628
629            // FIFO queues do not support per-message DelaySeconds. Use visibility
630            // timeout on the in-flight message to schedule the retry.
631            if is_fifo_queue_url(queue_url) {
632                if let Err(err) = sqs_client
633                    .change_message_visibility()
634                    .queue_url(queue_url)
635                    .receipt_handle(receipt_handle)
636                    .visibility_timeout(delay.clamp(1, 900))
637                    .send()
638                    .await
639                {
640                    error!(
641                        queue_type = ?queue_type,
642                        error = %err,
643                        "Failed to set visibility timeout for retry; falling back to existing visibility timeout"
644                    );
645                    return Ok(MessageOutcome::Retain);
646                }
647
648                debug!(
649                    queue_type = ?queue_type,
650                    attempt = logical_retry_attempt,
651                    delay_seconds = delay,
652                    error = %e,
653                    "Retry scheduled via visibility timeout"
654                );
655
656                return Ok(MessageOutcome::Retain);
657            }
658
659            let next_retry_attempt = logical_retry_attempt.saturating_add(1);
660
661            // Standard queues: re-enqueue with native DelaySeconds,
662            // no group_id or dedup_id needed. Duplicate deliveries are
663            // harmless because handlers are idempotent.
664            if let Err(send_err) = sqs_client
665                .send_message()
666                .queue_url(queue_url)
667                .message_body(body.to_string())
668                .delay_seconds(delay)
669                .message_attributes(
670                    "retry_attempt",
671                    MessageAttributeValue::builder()
672                        .data_type("Number")
673                        .string_value(next_retry_attempt.to_string())
674                        .build()
675                        .map_err(|err| {
676                            QueueBackendError::SqsError(format!(
677                                "Failed to build retry_attempt attribute: {err}"
678                            ))
679                        })?,
680                )
681                .send()
682                .await
683            {
684                error!(
685                    queue_type = ?queue_type,
686                    error = %send_err,
687                    "Failed to re-enqueue message; leaving original for visibility timeout retry"
688                );
689                // Fall through — original message will retry after visibility timeout
690                return Ok(MessageOutcome::Retain);
691            }
692
693            debug!(
694                queue_type = ?queue_type,
695                attempt = logical_retry_attempt,
696                delay_seconds = delay,
697                error = %e,
698                "Message re-enqueued with backoff delay"
699            );
700
701            // Delete the original message now that the re-enqueue succeeded
702            Ok(MessageOutcome::Delete {
703                receipt_handle: receipt_handle.to_string(),
704            })
705        }
706    }
707}
708
709/// Generic job processor — deserializes `Job<T>`, creates a `WorkerContext`,
710/// and delegates to the provided handler function.
711async fn process_job<T, F, Fut>(
712    body: &str,
713    app_state: Arc<ThinData<crate::models::DefaultAppState>>,
714    attempt: usize,
715    task_id: String,
716    type_name: &str,
717    handler: F,
718) -> Result<(), ProcessingError>
719where
720    T: DeserializeOwned,
721    F: FnOnce(Job<T>, ThinData<crate::models::DefaultAppState>, WorkerContext) -> Fut,
722    Fut: Future<Output = Result<(), HandlerError>>,
723{
724    let job: Job<T> = serde_json::from_str(body).map_err(|e| {
725        error!(error = %e, "Failed to deserialize {} job", type_name);
726        // Malformed payload is not recoverable by retrying the same message body.
727        ProcessingError::Permanent(format!("Failed to deserialize {type_name} job: {e}"))
728    })?;
729
730    let ctx = WorkerContext::new(attempt, task_id);
731    handler(job, (*app_state).clone(), ctx)
732        .await
733        .map_err(map_handler_error)
734}
735
736fn map_handler_error(error: HandlerError) -> ProcessingError {
737    match error {
738        HandlerError::Abort(msg) => ProcessingError::Permanent(msg),
739        HandlerError::Retry(msg) => ProcessingError::Retryable(msg),
740    }
741}
742
743fn parse_target_scheduled_on(message: &Message) -> Option<i64> {
744    message
745        .message_attributes()
746        .and_then(|attrs| attrs.get("target_scheduled_on"))
747        .and_then(|value| value.string_value())
748        .and_then(|value| value.parse::<i64>().ok())
749}
750
751fn parse_retry_attempt(message: &Message) -> Option<usize> {
752    message
753        .message_attributes()
754        .and_then(|attrs| attrs.get("retry_attempt"))
755        .and_then(|value| value.string_value())
756        .and_then(|value| value.parse::<usize>().ok())
757}
758
759fn queue_pickup_baseline_ms(message: &Message, receive_count: usize) -> Option<i64> {
760    if receive_count != 1 {
761        return None;
762    }
763
764    parse_target_scheduled_on(message)
765        .map(|ts_secs| ts_secs * 1000)
766        .or_else(|| {
767            message
768                .attributes()
769                .and_then(|a| a.get(&MessageSystemAttributeName::SentTimestamp))
770                .and_then(|v| v.parse::<i64>().ok())
771        })
772}
773
774fn is_fifo_queue_url(queue_url: &str) -> bool {
775    queue_url.ends_with(".fifo")
776}
777
778async fn defer_message(
779    sqs_client: &aws_sdk_sqs::Client,
780    queue_url: &str,
781    body: String,
782    message: &Message,
783    target_scheduled_on: i64,
784    delay_seconds: i32,
785) -> Result<bool, QueueBackendError> {
786    if is_fifo_queue_url(queue_url) {
787        let receipt_handle = message.receipt_handle().ok_or_else(|| {
788            QueueBackendError::QueueError(
789                "Cannot defer FIFO message: missing receipt handle".to_string(),
790            )
791        })?;
792
793        sqs_client
794            .change_message_visibility()
795            .queue_url(queue_url)
796            .receipt_handle(receipt_handle)
797            .visibility_timeout(delay_seconds.clamp(1, 900))
798            .send()
799            .await
800            .map_err(|e| {
801                QueueBackendError::SqsError(format!(
802                    "Failed to defer FIFO message via visibility timeout: {e}"
803                ))
804            })?;
805
806        return Ok(false);
807    }
808
809    // Standard queues support native per-message DelaySeconds — no need for
810    // group_id or dedup_id. Just re-send with the delay and scheduling attribute.
811    let request = sqs_client
812        .send_message()
813        .queue_url(queue_url)
814        .message_body(body)
815        .delay_seconds(delay_seconds.clamp(1, 900))
816        .message_attributes(
817            "target_scheduled_on",
818            MessageAttributeValue::builder()
819                .data_type("Number")
820                .string_value(target_scheduled_on.to_string())
821                .build()
822                .map_err(|e| {
823                    QueueBackendError::SqsError(format!(
824                        "Failed to build deferred scheduled attribute: {e}"
825                    ))
826                })?,
827        );
828
829    request.send().await.map_err(|e| {
830        QueueBackendError::SqsError(format!("Failed to defer scheduled message: {e}"))
831    })?;
832
833    Ok(true)
834}
835
836/// Partial struct for deserializing only the `network_type` field from a status check job.
837///
838/// Used to avoid deserializing the entire `Job<TransactionStatusCheck>` when we only
839/// need the network type to determine retry delay.
840#[derive(serde::Deserialize)]
841struct StatusCheckData {
842    network_type: Option<crate::models::NetworkType>,
843}
844
845/// Partial struct matching `Job<TransactionStatusCheck>` structure.
846///
847/// Used for efficient partial deserialization to extract only the `network_type`
848/// field without parsing the entire job payload.
849#[derive(serde::Deserialize)]
850struct PartialStatusCheckJob {
851    data: StatusCheckData,
852}
853
854/// Extracts `network_type` from a status check payload and computes retry delay.
855///
856/// This uses hardcoded network-specific backoff windows aligned with Redis/Apalis:
857/// - EVM: 8s -> 12s cap
858/// - Stellar: 2s -> 3s cap
859/// - Solana/default: 5s -> 8s cap
860fn compute_status_retry_delay(body: &str, attempt: usize) -> i32 {
861    let network_type = serde_json::from_str::<PartialStatusCheckJob>(body)
862        .ok()
863        .and_then(|j| j.data.network_type);
864
865    crate::queues::retry_config::status_check_retry_delay_secs(network_type, attempt)
866}
867
868/// Gets the SQS long-poll wait time for a queue type from environment or default.
869fn get_wait_time_for_queue(queue_type: QueueType) -> u64 {
870    ServerConfig::get_sqs_wait_time(
871        queue_type.sqs_env_key(),
872        queue_type.default_wait_time_secs(),
873    )
874}
875
876/// Gets the number of poll loops to run for a queue type from environment or default.
877fn get_poller_count_for_queue(queue_type: QueueType) -> usize {
878    let configured = ServerConfig::get_sqs_poller_count(
879        queue_type.sqs_env_key(),
880        queue_type.default_poller_count(),
881    );
882    if configured == 0 {
883        warn!(
884            queue_type = ?queue_type,
885            "Configured poller count is 0; clamping to 1"
886        );
887        1
888    } else {
889        configured
890    }
891}
892
893/// Gets the concurrency limit for a queue type from environment.
894fn get_concurrency_for_queue(queue_type: QueueType) -> usize {
895    let configured = ServerConfig::get_worker_concurrency(
896        queue_type.concurrency_env_key(),
897        queue_type.default_concurrency(),
898    );
899    if configured == 0 {
900        warn!(
901            queue_type = ?queue_type,
902            "Configured concurrency is 0; clamping to 1"
903        );
904        1
905    } else {
906        configured
907    }
908}
909
910/// Maximum allowed wall-clock processing time per message before the handler task is canceled.
911///
912/// Keep this bounded so permits cannot be held forever by hung handlers.
913fn handler_timeout_secs(queue_type: QueueType) -> u64 {
914    u64::from(queue_type.visibility_timeout_secs().max(1))
915}
916
917/// Maximum backoff duration for poll errors (1 minute).
918const MAX_POLL_BACKOFF_SECS: u64 = 60;
919
920/// Number of consecutive errors between recovery probes at the backoff ceiling.
921/// Once the backoff reaches `MAX_POLL_BACKOFF_SECS`, every Nth error cycle uses
922/// the base interval (5s) to quickly detect when the SQS endpoint recovers.
923const RECOVERY_PROBE_EVERY: u32 = 4;
924
925/// Computes exponential backoff for consecutive poll errors with recovery probes.
926///
927/// Returns: 5, 10, 20, 40, 60, 60, 60, **5** (probe), 60, 60, 60, **5**, ...
928fn poll_error_backoff_secs(consecutive_errors: u32) -> u64 {
929    let base: u64 = 5;
930
931    // Once well past the ceiling, periodically try the base interval
932    // to quickly detect when the SQS endpoint recovers.
933    if consecutive_errors >= 7 && consecutive_errors % RECOVERY_PROBE_EVERY == 0 {
934        return base;
935    }
936
937    let exponent = consecutive_errors.saturating_sub(1).min(16);
938    base.saturating_mul(2_u64.saturating_pow(exponent))
939        .min(MAX_POLL_BACKOFF_SECS)
940}
941
942/// Deletes messages from SQS in batches of up to 10 (the SQS maximum per call).
943///
944/// Returns the total number of successfully deleted messages. Any per-entry
945/// failures are logged as warnings — SQS will redeliver those messages after
946/// the visibility timeout expires.
947async fn flush_delete_batch(
948    sqs_client: &aws_sdk_sqs::Client,
949    queue_url: &str,
950    batch: &[String],
951    queue_type: QueueType,
952) -> usize {
953    if batch.is_empty() {
954        return 0;
955    }
956
957    let mut deleted = 0;
958
959    for chunk in batch.chunks(10) {
960        let entries: Vec<DeleteMessageBatchRequestEntry> = chunk
961            .iter()
962            .enumerate()
963            .map(|(i, handle)| {
964                DeleteMessageBatchRequestEntry::builder()
965                    .id(i.to_string())
966                    .receipt_handle(handle)
967                    .build()
968                    .expect("id and receipt_handle are always set")
969            })
970            .collect();
971
972        match sqs_client
973            .delete_message_batch()
974            .queue_url(queue_url)
975            .set_entries(Some(entries))
976            .send()
977            .await
978        {
979            Ok(output) => {
980                deleted += output.successful().len();
981
982                for f in output.failed() {
983                    warn!(
984                        queue_type = ?queue_type,
985                        id = %f.id(),
986                        code = %f.code(),
987                        message = f.message().unwrap_or("unknown"),
988                        "Batch delete entry failed (message will be redelivered)"
989                    );
990                }
991            }
992            Err(e) => {
993                error!(
994                    queue_type = ?queue_type,
995                    error = %e,
996                    batch_size = chunk.len(),
997                    "Batch delete API call failed (messages will be redelivered)"
998                );
999            }
1000        }
1001    }
1002
1003    deleted
1004}
1005
1006#[cfg(test)]
1007mod tests {
1008    use super::*;
1009
1010    #[test]
1011    fn test_get_concurrency_for_queue() {
1012        // Test that concurrency is retrieved (exact value depends on env)
1013        let concurrency = get_concurrency_for_queue(QueueType::TransactionRequest);
1014        assert!(concurrency > 0);
1015
1016        let concurrency = get_concurrency_for_queue(QueueType::StatusCheck);
1017        assert!(concurrency > 0);
1018    }
1019
1020    #[test]
1021    fn test_handler_timeout_secs_is_positive() {
1022        let all = [
1023            QueueType::TransactionRequest,
1024            QueueType::TransactionSubmission,
1025            QueueType::StatusCheck,
1026            QueueType::StatusCheckEvm,
1027            QueueType::StatusCheckStellar,
1028            QueueType::Notification,
1029            QueueType::TokenSwapRequest,
1030            QueueType::RelayerHealthCheck,
1031        ];
1032        for queue_type in all {
1033            assert!(handler_timeout_secs(queue_type) > 0);
1034        }
1035    }
1036
1037    #[test]
1038    fn test_handler_timeout_secs_uses_visibility_timeout() {
1039        assert_eq!(
1040            handler_timeout_secs(QueueType::StatusCheckEvm),
1041            QueueType::StatusCheckEvm.visibility_timeout_secs() as u64
1042        );
1043        assert_eq!(
1044            handler_timeout_secs(QueueType::Notification),
1045            QueueType::Notification.visibility_timeout_secs() as u64
1046        );
1047    }
1048
1049    #[test]
1050    fn test_parse_target_scheduled_on() {
1051        // Test parsing target_scheduled_on from message attributes
1052        let message = Message::builder().build();
1053
1054        // Message without attribute should return None
1055        assert_eq!(parse_target_scheduled_on(&message), None);
1056
1057        // Message with valid attribute
1058        let message = Message::builder()
1059            .message_attributes(
1060                "target_scheduled_on",
1061                MessageAttributeValue::builder()
1062                    .data_type("Number")
1063                    .string_value("1234567890")
1064                    .build()
1065                    .unwrap(),
1066            )
1067            .build();
1068
1069        assert_eq!(parse_target_scheduled_on(&message), Some(1234567890));
1070    }
1071
1072    #[test]
1073    fn test_parse_retry_attempt() {
1074        let message = Message::builder().build();
1075        assert_eq!(parse_retry_attempt(&message), None);
1076
1077        let message = Message::builder()
1078            .message_attributes(
1079                "retry_attempt",
1080                MessageAttributeValue::builder()
1081                    .data_type("Number")
1082                    .string_value("7")
1083                    .build()
1084                    .unwrap(),
1085            )
1086            .build();
1087        assert_eq!(parse_retry_attempt(&message), Some(7));
1088    }
1089
1090    #[test]
1091    fn test_map_handler_error() {
1092        // Test Abort maps to Permanent
1093        let error = HandlerError::Abort("Validation failed".to_string());
1094        let result = map_handler_error(error);
1095        assert!(matches!(result, ProcessingError::Permanent(_)));
1096
1097        // Test Retry maps to Retryable
1098        let error = HandlerError::Retry("Network timeout".to_string());
1099        let result = map_handler_error(error);
1100        assert!(matches!(result, ProcessingError::Retryable(_)));
1101    }
1102
1103    #[test]
1104    fn test_is_fifo_queue_url() {
1105        assert!(is_fifo_queue_url(
1106            "https://sqs.us-east-1.amazonaws.com/123/queue.fifo"
1107        ));
1108        assert!(!is_fifo_queue_url(
1109            "https://sqs.us-east-1.amazonaws.com/123/queue"
1110        ));
1111    }
1112
1113    #[test]
1114    fn test_compute_status_retry_delay_evm() {
1115        // NetworkType uses #[serde(rename_all = "lowercase")]
1116        let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1","network_type":"evm"}}"#;
1117        assert_eq!(compute_status_retry_delay(body, 0), 8);
1118        assert_eq!(compute_status_retry_delay(body, 1), 12);
1119        assert_eq!(compute_status_retry_delay(body, 8), 12);
1120    }
1121
1122    #[test]
1123    fn test_compute_status_retry_delay_stellar() {
1124        let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1","network_type":"stellar"}}"#;
1125        assert_eq!(compute_status_retry_delay(body, 0), 2);
1126        assert_eq!(compute_status_retry_delay(body, 1), 3);
1127        assert_eq!(compute_status_retry_delay(body, 8), 3);
1128    }
1129
1130    #[test]
1131    fn test_compute_status_retry_delay_solana() {
1132        let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1","network_type":"solana"}}"#;
1133        assert_eq!(compute_status_retry_delay(body, 0), 5);
1134        assert_eq!(compute_status_retry_delay(body, 1), 8);
1135        assert_eq!(compute_status_retry_delay(body, 8), 8);
1136    }
1137
1138    #[test]
1139    fn test_compute_status_retry_delay_missing_network() {
1140        let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1"}}"#;
1141        assert_eq!(compute_status_retry_delay(body, 0), 5);
1142        assert_eq!(compute_status_retry_delay(body, 1), 8);
1143        assert_eq!(compute_status_retry_delay(body, 8), 8);
1144    }
1145
1146    #[test]
1147    fn test_compute_status_retry_delay_invalid_body() {
1148        assert_eq!(compute_status_retry_delay("not json", 0), 5);
1149        assert_eq!(compute_status_retry_delay("not json", 1), 8);
1150        assert_eq!(compute_status_retry_delay("not json", 8), 8);
1151    }
1152
1153    #[tokio::test]
1154    async fn test_semaphore_released_on_panic() {
1155        let sem = Arc::new(tokio::sync::Semaphore::new(1));
1156        let permit = sem.clone().acquire_owned().await.unwrap();
1157
1158        let handle = tokio::spawn(async move {
1159            let _permit = permit; // dropped on scope exit, even after panic
1160            let _ = AssertUnwindSafe(async { panic!("test panic") })
1161                .catch_unwind()
1162                .await;
1163        });
1164
1165        handle.await.unwrap();
1166        // Would hang forever if permit leaked
1167        let _p = tokio::time::timeout(Duration::from_millis(100), sem.acquire())
1168            .await
1169            .expect("permit should be available after panic");
1170    }
1171
1172    #[test]
1173    fn test_poll_error_backoff_secs() {
1174        // First error: 5s
1175        assert_eq!(poll_error_backoff_secs(1), 5);
1176        // Second: 10s
1177        assert_eq!(poll_error_backoff_secs(2), 10);
1178        // Third: 20s
1179        assert_eq!(poll_error_backoff_secs(3), 20);
1180        // Fourth: 40s
1181        assert_eq!(poll_error_backoff_secs(4), 40);
1182        // Capped at MAX_POLL_BACKOFF_SECS (60)
1183        assert_eq!(poll_error_backoff_secs(5), 60);
1184        assert_eq!(poll_error_backoff_secs(6), 60);
1185        assert_eq!(poll_error_backoff_secs(7), 60);
1186        // Recovery probe: base interval at multiples of RECOVERY_PROBE_EVERY (>= 7)
1187        assert_eq!(poll_error_backoff_secs(8), 5);
1188        assert_eq!(poll_error_backoff_secs(9), 60);
1189        assert_eq!(poll_error_backoff_secs(12), 5); // next probe
1190    }
1191
1192    #[test]
1193    fn test_poll_error_backoff_zero_errors() {
1194        // Zero consecutive errors should still produce a reasonable value
1195        assert_eq!(poll_error_backoff_secs(0), 5);
1196    }
1197
1198    #[test]
1199    fn test_poll_error_backoff_recovery_probes() {
1200        // Verify probes repeat at regular intervals once past threshold
1201        for i in (8..=100).step_by(RECOVERY_PROBE_EVERY as usize) {
1202            assert_eq!(
1203                poll_error_backoff_secs(i as u32),
1204                5,
1205                "Expected recovery probe at error {i}"
1206            );
1207        }
1208    }
1209
1210    #[test]
1211    fn test_message_outcome_delete_carries_receipt_handle() {
1212        let handle = "test-receipt-handle-123".to_string();
1213        let outcome = MessageOutcome::Delete {
1214            receipt_handle: handle.clone(),
1215        };
1216        match outcome {
1217            MessageOutcome::Delete { receipt_handle } => {
1218                assert_eq!(receipt_handle, handle);
1219            }
1220            MessageOutcome::Retain => panic!("Expected Delete variant"),
1221        }
1222    }
1223
1224    #[test]
1225    fn test_message_outcome_retain() {
1226        let outcome = MessageOutcome::Retain;
1227        assert!(matches!(outcome, MessageOutcome::Retain));
1228    }
1229
1230    #[test]
1231    fn test_batch_delete_entry_builder() {
1232        // Verify DeleteMessageBatchRequestEntry builds correctly with sequential IDs,
1233        // matching the pattern used in flush_delete_batch.
1234        let handles = vec![
1235            "receipt-0".to_string(),
1236            "receipt-1".to_string(),
1237            "receipt-2".to_string(),
1238        ];
1239        let entries: Vec<DeleteMessageBatchRequestEntry> = handles
1240            .iter()
1241            .enumerate()
1242            .map(|(i, handle)| {
1243                DeleteMessageBatchRequestEntry::builder()
1244                    .id(i.to_string())
1245                    .receipt_handle(handle)
1246                    .build()
1247                    .expect("id and receipt_handle are set")
1248            })
1249            .collect();
1250
1251        assert_eq!(entries.len(), 3);
1252        assert_eq!(entries[0].id(), "0");
1253        assert_eq!(entries[0].receipt_handle(), "receipt-0");
1254        assert_eq!(entries[2].id(), "2");
1255        assert_eq!(entries[2].receipt_handle(), "receipt-2");
1256    }
1257
1258    #[test]
1259    fn test_batch_chunking_logic() {
1260        // Verify that chunks(10) correctly splits receipt handles,
1261        // matching the pattern used in flush_delete_batch.
1262        let handles: Vec<String> = (0..25).map(|i| format!("receipt-{i}")).collect();
1263        let chunks: Vec<&[String]> = handles.chunks(10).collect();
1264
1265        assert_eq!(chunks.len(), 3);
1266        assert_eq!(chunks[0].len(), 10);
1267        assert_eq!(chunks[1].len(), 10);
1268        assert_eq!(chunks[2].len(), 5);
1269    }
1270
1271    #[test]
1272    fn test_outcome_collection_pattern() {
1273        // Verify the pattern used in the main loop to collect receipt handles
1274        // from a mix of Delete and Retain outcomes.
1275        let outcomes = vec![
1276            Some("receipt-1".to_string()), // Delete
1277            None,                          // Retain
1278            Some("receipt-2".to_string()), // Delete
1279            None,                          // Retain
1280            Some("receipt-3".to_string()), // Delete
1281        ];
1282
1283        let pending_deletes: Vec<String> = outcomes.into_iter().flatten().collect();
1284
1285        assert_eq!(pending_deletes.len(), 3);
1286        assert_eq!(pending_deletes[0], "receipt-1");
1287        assert_eq!(pending_deletes[1], "receipt-2");
1288        assert_eq!(pending_deletes[2], "receipt-3");
1289    }
1290
1291    // ── parse_target_scheduled_on: edge cases ─────────────────────────
1292
1293    #[test]
1294    fn test_parse_target_scheduled_on_non_numeric_string() {
1295        let message = Message::builder()
1296            .message_attributes(
1297                "target_scheduled_on",
1298                MessageAttributeValue::builder()
1299                    .data_type("String")
1300                    .string_value("not-a-number")
1301                    .build()
1302                    .unwrap(),
1303            )
1304            .build();
1305        assert_eq!(parse_target_scheduled_on(&message), None);
1306    }
1307
1308    #[test]
1309    fn test_parse_target_scheduled_on_empty_string() {
1310        let message = Message::builder()
1311            .message_attributes(
1312                "target_scheduled_on",
1313                MessageAttributeValue::builder()
1314                    .data_type("Number")
1315                    .string_value("")
1316                    .build()
1317                    .unwrap(),
1318            )
1319            .build();
1320        assert_eq!(parse_target_scheduled_on(&message), None);
1321    }
1322
1323    #[test]
1324    fn test_parse_target_scheduled_on_negative_value() {
1325        let message = Message::builder()
1326            .message_attributes(
1327                "target_scheduled_on",
1328                MessageAttributeValue::builder()
1329                    .data_type("Number")
1330                    .string_value("-1000")
1331                    .build()
1332                    .unwrap(),
1333            )
1334            .build();
1335        // Negative values parse fine as i64
1336        assert_eq!(parse_target_scheduled_on(&message), Some(-1000));
1337    }
1338
1339    #[test]
1340    fn test_parse_target_scheduled_on_float_string() {
1341        let message = Message::builder()
1342            .message_attributes(
1343                "target_scheduled_on",
1344                MessageAttributeValue::builder()
1345                    .data_type("Number")
1346                    .string_value("1234567890.5")
1347                    .build()
1348                    .unwrap(),
1349            )
1350            .build();
1351        // Floats can't parse as i64
1352        assert_eq!(parse_target_scheduled_on(&message), None);
1353    }
1354
1355    #[test]
1356    fn test_parse_target_scheduled_on_zero() {
1357        let message = Message::builder()
1358            .message_attributes(
1359                "target_scheduled_on",
1360                MessageAttributeValue::builder()
1361                    .data_type("Number")
1362                    .string_value("0")
1363                    .build()
1364                    .unwrap(),
1365            )
1366            .build();
1367        assert_eq!(parse_target_scheduled_on(&message), Some(0));
1368    }
1369
1370    #[test]
1371    fn test_parse_target_scheduled_on_wrong_attribute_name() {
1372        // Attribute exists but under a different key
1373        let message = Message::builder()
1374            .message_attributes(
1375                "wrong_key",
1376                MessageAttributeValue::builder()
1377                    .data_type("Number")
1378                    .string_value("1234567890")
1379                    .build()
1380                    .unwrap(),
1381            )
1382            .build();
1383        assert_eq!(parse_target_scheduled_on(&message), None);
1384    }
1385
1386    // ── parse_retry_attempt: edge cases ───────────────────────────────
1387
1388    #[test]
1389    fn test_parse_retry_attempt_non_numeric_string() {
1390        let message = Message::builder()
1391            .message_attributes(
1392                "retry_attempt",
1393                MessageAttributeValue::builder()
1394                    .data_type("String")
1395                    .string_value("abc")
1396                    .build()
1397                    .unwrap(),
1398            )
1399            .build();
1400        assert_eq!(parse_retry_attempt(&message), None);
1401    }
1402
1403    #[test]
1404    fn test_parse_retry_attempt_negative_value() {
1405        let message = Message::builder()
1406            .message_attributes(
1407                "retry_attempt",
1408                MessageAttributeValue::builder()
1409                    .data_type("Number")
1410                    .string_value("-1")
1411                    .build()
1412                    .unwrap(),
1413            )
1414            .build();
1415        // Negative values can't parse as usize
1416        assert_eq!(parse_retry_attempt(&message), None);
1417    }
1418
1419    #[test]
1420    fn test_parse_retry_attempt_zero() {
1421        let message = Message::builder()
1422            .message_attributes(
1423                "retry_attempt",
1424                MessageAttributeValue::builder()
1425                    .data_type("Number")
1426                    .string_value("0")
1427                    .build()
1428                    .unwrap(),
1429            )
1430            .build();
1431        assert_eq!(parse_retry_attempt(&message), Some(0));
1432    }
1433
1434    #[test]
1435    fn test_parse_retry_attempt_large_value() {
1436        let message = Message::builder()
1437            .message_attributes(
1438                "retry_attempt",
1439                MessageAttributeValue::builder()
1440                    .data_type("Number")
1441                    .string_value("999999")
1442                    .build()
1443                    .unwrap(),
1444            )
1445            .build();
1446        assert_eq!(parse_retry_attempt(&message), Some(999999));
1447    }
1448
1449    #[test]
1450    fn test_queue_pickup_baseline_ms_uses_scheduled_time_on_first_delivery() {
1451        let message = Message::builder()
1452            .message_attributes(
1453                "target_scheduled_on",
1454                MessageAttributeValue::builder()
1455                    .data_type("Number")
1456                    .string_value("123")
1457                    .build()
1458                    .unwrap(),
1459            )
1460            .set_attributes(Some(std::collections::HashMap::from([(
1461                MessageSystemAttributeName::SentTimestamp,
1462                "999999".to_string(),
1463            )])))
1464            .build();
1465
1466        assert_eq!(queue_pickup_baseline_ms(&message, 1), Some(123_000));
1467    }
1468
1469    #[test]
1470    fn test_queue_pickup_baseline_ms_falls_back_to_sent_timestamp() {
1471        let message = Message::builder()
1472            .set_attributes(Some(std::collections::HashMap::from([(
1473                MessageSystemAttributeName::SentTimestamp,
1474                "123456".to_string(),
1475            )])))
1476            .build();
1477
1478        assert_eq!(queue_pickup_baseline_ms(&message, 1), Some(123456));
1479    }
1480
1481    #[test]
1482    fn test_queue_pickup_baseline_ms_skips_retries() {
1483        let message = Message::builder()
1484            .message_attributes(
1485                "target_scheduled_on",
1486                MessageAttributeValue::builder()
1487                    .data_type("Number")
1488                    .string_value("123")
1489                    .build()
1490                    .unwrap(),
1491            )
1492            .set_attributes(Some(std::collections::HashMap::from([(
1493                MessageSystemAttributeName::SentTimestamp,
1494                "123456".to_string(),
1495            )])))
1496            .build();
1497
1498        assert_eq!(queue_pickup_baseline_ms(&message, 2), None);
1499    }
1500
1501    // ── is_fifo_queue_url: comprehensive cases ────────────────────────
1502
1503    #[test]
1504    fn test_is_fifo_queue_url_empty_string() {
1505        assert!(!is_fifo_queue_url(""));
1506    }
1507
1508    #[test]
1509    fn test_is_fifo_queue_url_just_fifo_suffix() {
1510        assert!(is_fifo_queue_url("my-queue.fifo"));
1511    }
1512
1513    #[test]
1514    fn test_is_fifo_queue_url_fifo_in_middle() {
1515        // .fifo appearing in the path but not as suffix
1516        assert!(!is_fifo_queue_url(
1517            "https://sqs.us-east-1.amazonaws.com/123/.fifo/queue"
1518        ));
1519    }
1520
1521    #[test]
1522    fn test_is_fifo_queue_url_case_sensitive() {
1523        assert!(!is_fifo_queue_url(
1524            "https://sqs.us-east-1.amazonaws.com/123/queue.FIFO"
1525        ));
1526        assert!(!is_fifo_queue_url(
1527            "https://sqs.us-east-1.amazonaws.com/123/queue.Fifo"
1528        ));
1529    }
1530
1531    #[test]
1532    fn test_is_fifo_queue_url_standard_queue_variations() {
1533        assert!(!is_fifo_queue_url(
1534            "https://sqs.us-east-1.amazonaws.com/123456789/my-queue"
1535        ));
1536        assert!(!is_fifo_queue_url(
1537            "https://sqs.eu-west-1.amazonaws.com/123456789/relayer-tx-request"
1538        ));
1539        assert!(!is_fifo_queue_url(
1540            "http://localhost:4566/000000000000/test-queue"
1541        ));
1542    }
1543
1544    #[test]
1545    fn test_is_fifo_queue_url_localstack() {
1546        // LocalStack FIFO queue URL format
1547        assert!(is_fifo_queue_url(
1548            "http://localhost:4566/000000000000/test-queue.fifo"
1549        ));
1550    }
1551
1552    // ── map_handler_error: message preservation ───────────────────────
1553
1554    #[test]
1555    fn test_map_handler_error_preserves_abort_message() {
1556        let msg = "Validation failed: invalid nonce";
1557        let error = HandlerError::Abort(msg.to_string());
1558        match map_handler_error(error) {
1559            ProcessingError::Permanent(s) => assert_eq!(s, msg),
1560            ProcessingError::Retryable(_) => panic!("Expected Permanent"),
1561        }
1562    }
1563
1564    #[test]
1565    fn test_map_handler_error_preserves_retry_message() {
1566        let msg = "RPC timeout after 30s";
1567        let error = HandlerError::Retry(msg.to_string());
1568        match map_handler_error(error) {
1569            ProcessingError::Retryable(s) => assert_eq!(s, msg),
1570            ProcessingError::Permanent(_) => panic!("Expected Retryable"),
1571        }
1572    }
1573
1574    #[test]
1575    fn test_map_handler_error_empty_message() {
1576        let error = HandlerError::Abort(String::new());
1577        match map_handler_error(error) {
1578            ProcessingError::Permanent(s) => assert!(s.is_empty()),
1579            ProcessingError::Retryable(_) => panic!("Expected Permanent"),
1580        }
1581    }
1582
1583    // ── handler_timeout_secs: all queue types ─────────────────────────
1584
1585    #[test]
1586    fn test_handler_timeout_secs_matches_visibility_timeout_for_all_queues() {
1587        let all = [
1588            QueueType::TransactionRequest,
1589            QueueType::TransactionSubmission,
1590            QueueType::StatusCheck,
1591            QueueType::StatusCheckEvm,
1592            QueueType::StatusCheckStellar,
1593            QueueType::Notification,
1594            QueueType::TokenSwapRequest,
1595            QueueType::RelayerHealthCheck,
1596        ];
1597        for qt in all {
1598            assert_eq!(
1599                handler_timeout_secs(qt),
1600                qt.visibility_timeout_secs().max(1) as u64,
1601                "{qt:?}: handler timeout should equal max(visibility_timeout, 1)"
1602            );
1603        }
1604    }
1605
1606    // ── get_concurrency_for_queue: all queue types ────────────────────
1607
1608    #[test]
1609    fn test_get_concurrency_for_queue_all_types_positive() {
1610        let all = [
1611            QueueType::TransactionRequest,
1612            QueueType::TransactionSubmission,
1613            QueueType::StatusCheck,
1614            QueueType::StatusCheckEvm,
1615            QueueType::StatusCheckStellar,
1616            QueueType::Notification,
1617            QueueType::TokenSwapRequest,
1618            QueueType::RelayerHealthCheck,
1619        ];
1620        for qt in all {
1621            assert!(
1622                get_concurrency_for_queue(qt) > 0,
1623                "{qt:?}: concurrency must be positive (clamped to at least 1)"
1624            );
1625        }
1626    }
1627
1628    // ── poll_error_backoff_secs: overflow and invariants ───────────────
1629
1630    #[test]
1631    fn test_poll_error_backoff_never_exceeds_max() {
1632        for i in 0..200 {
1633            let backoff = poll_error_backoff_secs(i);
1634            assert!(
1635                backoff <= MAX_POLL_BACKOFF_SECS,
1636                "Error count {i}: backoff {backoff}s exceeds MAX {MAX_POLL_BACKOFF_SECS}s"
1637            );
1638        }
1639    }
1640
1641    #[test]
1642    fn test_poll_error_backoff_u32_max_does_not_overflow() {
1643        let backoff = poll_error_backoff_secs(u32::MAX);
1644        assert!(backoff <= MAX_POLL_BACKOFF_SECS);
1645        assert!(backoff > 0);
1646    }
1647
1648    #[test]
1649    fn test_poll_error_backoff_always_positive() {
1650        for i in 0..200 {
1651            assert!(
1652                poll_error_backoff_secs(i) > 0,
1653                "Error count {i}: backoff must be positive"
1654            );
1655        }
1656    }
1657
1658    #[test]
1659    fn test_poll_error_backoff_monotonic_before_cap() {
1660        // Before hitting the cap, backoff should be non-decreasing
1661        let mut prev = poll_error_backoff_secs(0);
1662        for i in 1..=4 {
1663            let curr = poll_error_backoff_secs(i);
1664            assert!(
1665                curr >= prev,
1666                "Backoff should be non-decreasing before cap: {prev} -> {curr} at error {i}"
1667            );
1668            prev = curr;
1669        }
1670    }
1671
1672    // ── Constants validation ──────────────────────────────────────────
1673
1674    #[test]
1675    fn test_max_poll_backoff_is_reasonable() {
1676        assert!(
1677            MAX_POLL_BACKOFF_SECS >= 10,
1678            "Max backoff should be at least 10s to avoid tight error loops"
1679        );
1680        assert!(
1681            MAX_POLL_BACKOFF_SECS <= 300,
1682            "Max backoff should be at most 5 minutes to detect recovery promptly"
1683        );
1684    }
1685
1686    #[test]
1687    fn test_recovery_probe_every_is_valid() {
1688        assert!(
1689            RECOVERY_PROBE_EVERY >= 2,
1690            "Recovery probe interval must be at least 2 to avoid probing every attempt"
1691        );
1692        assert!(
1693            RECOVERY_PROBE_EVERY <= 10,
1694            "Recovery probe interval should not be too large or recovery detection is slow"
1695        );
1696    }
1697
1698    // ── compute_status_retry_delay: edge cases ────────────────────────
1699
1700    #[test]
1701    fn test_compute_status_retry_delay_very_high_attempt() {
1702        let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1","network_type":"evm"}}"#;
1703        // Very high attempts should stay capped at the max (12s for EVM)
1704        assert_eq!(compute_status_retry_delay(body, 1000), 12);
1705        assert_eq!(compute_status_retry_delay(body, usize::MAX), 12);
1706    }
1707
1708    #[test]
1709    fn test_compute_status_retry_delay_empty_body() {
1710        // Empty JSON body should fall back to generic/Solana defaults
1711        assert_eq!(compute_status_retry_delay("", 0), 5);
1712        assert_eq!(compute_status_retry_delay("{}", 0), 5);
1713    }
1714
1715    #[test]
1716    fn test_compute_status_retry_delay_partial_json() {
1717        // JSON with missing inner structure
1718        assert_eq!(compute_status_retry_delay(r#"{"data":{}}"#, 0), 5);
1719        assert_eq!(
1720            compute_status_retry_delay(r#"{"data":{"network_type":"evm"}}"#, 0),
1721            8
1722        );
1723    }
1724
1725    // ── PartialStatusCheckJob deserialization ──────────────────────────
1726
1727    #[test]
1728    fn test_partial_status_check_job_deserializes_network_type() {
1729        let body = r#"{"data":{"network_type":"evm","extra_field":"ignored"}}"#;
1730        let parsed: PartialStatusCheckJob = serde_json::from_str(body).unwrap();
1731        assert_eq!(
1732            parsed.data.network_type,
1733            Some(crate::models::NetworkType::Evm)
1734        );
1735    }
1736
1737    #[test]
1738    fn test_partial_status_check_job_handles_missing_network_type() {
1739        let body = r#"{"data":{"transaction_id":"tx1"}}"#;
1740        let parsed: PartialStatusCheckJob = serde_json::from_str(body).unwrap();
1741        assert_eq!(parsed.data.network_type, None);
1742    }
1743
1744    #[test]
1745    fn test_partial_status_check_job_rejects_missing_data() {
1746        let body = r#"{"not_data":{}}"#;
1747        let result = serde_json::from_str::<PartialStatusCheckJob>(body);
1748        assert!(result.is_err());
1749    }
1750
1751    // ── is_fifo_queue_url used consistently ───────────────────────────
1752
1753    #[test]
1754    fn test_fifo_detection_consistent_with_defer_and_retry_logic() {
1755        // Both defer_message and the retry path in process_message use
1756        // is_fifo_queue_url to decide between visibility-timeout vs re-enqueue.
1757        // Verify our standard and FIFO URLs are classified identically by both
1758        // call sites (they both call the same function).
1759        let standard = "https://sqs.us-east-1.amazonaws.com/123/relayer-status-check";
1760        let fifo = "https://sqs.us-east-1.amazonaws.com/123/relayer-status-check.fifo";
1761
1762        assert!(!is_fifo_queue_url(standard));
1763        assert!(is_fifo_queue_url(fifo));
1764    }
1765
1766    // ── get_wait_time_for_queue ──────────────────────────────────────────
1767
1768    #[test]
1769    fn test_get_wait_time_for_queue_returns_positive() {
1770        let all = [
1771            QueueType::TransactionRequest,
1772            QueueType::TransactionSubmission,
1773            QueueType::StatusCheck,
1774            QueueType::StatusCheckEvm,
1775            QueueType::StatusCheckStellar,
1776            QueueType::Notification,
1777            QueueType::TokenSwapRequest,
1778            QueueType::RelayerHealthCheck,
1779        ];
1780        for qt in all {
1781            let wt = get_wait_time_for_queue(qt);
1782            assert!(
1783                wt <= 20,
1784                "{qt:?}: wait time {wt} exceeds SQS maximum of 20s"
1785            );
1786        }
1787    }
1788
1789    #[test]
1790    fn test_get_wait_time_for_queue_matches_defaults() {
1791        // Without env overrides the helper should return the queue's default
1792        assert_eq!(
1793            get_wait_time_for_queue(QueueType::TransactionRequest),
1794            QueueType::TransactionRequest.default_wait_time_secs()
1795        );
1796        assert_eq!(
1797            get_wait_time_for_queue(QueueType::StatusCheck),
1798            QueueType::StatusCheck.default_wait_time_secs()
1799        );
1800    }
1801
1802    #[test]
1803    #[serial_test::serial]
1804    fn test_get_wait_time_for_queue_respects_env_override() {
1805        // StatusCheck default is 5; override to 12 via the real env var path
1806        let env_var = format!(
1807            "SQS_{}_WAIT_TIME_SECONDS",
1808            QueueType::StatusCheck.sqs_env_key()
1809        );
1810        std::env::set_var(&env_var, "12");
1811        assert_eq!(get_wait_time_for_queue(QueueType::StatusCheck), 12);
1812        std::env::remove_var(&env_var);
1813    }
1814
1815    #[test]
1816    #[serial_test::serial]
1817    fn test_get_wait_time_for_queue_env_override_clamped_to_20() {
1818        let env_var = format!(
1819            "SQS_{}_WAIT_TIME_SECONDS",
1820            QueueType::Notification.sqs_env_key()
1821        );
1822        std::env::set_var(&env_var, "99");
1823        assert_eq!(
1824            get_wait_time_for_queue(QueueType::Notification),
1825            20,
1826            "Should clamp to SQS maximum of 20"
1827        );
1828        std::env::remove_var(&env_var);
1829    }
1830
1831    // ── get_poller_count_for_queue ───────────────────────────────────────
1832
1833    #[test]
1834    fn test_get_poller_count_for_queue_all_types_positive() {
1835        let all = [
1836            QueueType::TransactionRequest,
1837            QueueType::TransactionSubmission,
1838            QueueType::StatusCheck,
1839            QueueType::StatusCheckEvm,
1840            QueueType::StatusCheckStellar,
1841            QueueType::Notification,
1842            QueueType::TokenSwapRequest,
1843            QueueType::RelayerHealthCheck,
1844        ];
1845        for qt in all {
1846            assert!(
1847                get_poller_count_for_queue(qt) >= 1,
1848                "{qt:?}: poller count must be at least 1"
1849            );
1850        }
1851    }
1852
1853    #[test]
1854    fn test_get_poller_count_for_queue_matches_defaults() {
1855        // Without env overrides the helper should return the queue's default (clamped to >= 1)
1856        assert_eq!(
1857            get_poller_count_for_queue(QueueType::TransactionRequest),
1858            QueueType::TransactionRequest.default_poller_count().max(1)
1859        );
1860        assert_eq!(
1861            get_poller_count_for_queue(QueueType::Notification),
1862            QueueType::Notification.default_poller_count().max(1)
1863        );
1864    }
1865
1866    #[test]
1867    #[serial_test::serial]
1868    fn test_get_poller_count_for_queue_respects_env_override() {
1869        let env_var = format!("SQS_{}_POLLER_COUNT", QueueType::Notification.sqs_env_key());
1870        std::env::set_var(&env_var, "5");
1871        assert_eq!(get_poller_count_for_queue(QueueType::Notification), 5);
1872        std::env::remove_var(&env_var);
1873    }
1874
1875    #[test]
1876    #[serial_test::serial]
1877    fn test_get_poller_count_for_queue_env_zero_clamped_to_1() {
1878        let env_var = format!("SQS_{}_POLLER_COUNT", QueueType::StatusCheck.sqs_env_key());
1879        std::env::set_var(&env_var, "0");
1880        assert_eq!(
1881            get_poller_count_for_queue(QueueType::StatusCheck),
1882            1,
1883            "Zero poller count from env should be clamped to 1"
1884        );
1885        std::env::remove_var(&env_var);
1886    }
1887
1888    // ── PollLoopConfig ──────────────────────────────────────────────────
1889
1890    #[test]
1891    fn test_poll_loop_config_clone() {
1892        let config = PollLoopConfig {
1893            queue_type: QueueType::TransactionRequest,
1894            polling_interval: 15,
1895            visibility_timeout: 120,
1896            handler_timeout: Duration::from_secs(120),
1897            max_retries: 3,
1898            poller_id: 0,
1899            poller_count: 2,
1900        };
1901        let cloned = config.clone();
1902        assert_eq!(cloned.polling_interval, 15);
1903        assert_eq!(cloned.poller_id, 0);
1904        assert_eq!(cloned.poller_count, 2);
1905        assert_eq!(cloned.max_retries, 3);
1906    }
1907}