1use std::future::Future;
7use std::panic::AssertUnwindSafe;
8use std::sync::Arc;
9use std::time::Duration;
10
11use actix_web::web::ThinData;
12use aws_sdk_sqs::error::{ProvideErrorMetadata, SdkError};
13use aws_sdk_sqs::types::{
14 DeleteMessageBatchRequestEntry, Message, MessageAttributeValue, MessageSystemAttributeName,
15};
16use futures::FutureExt;
17use serde::de::DeserializeOwned;
18use tokio::sync::watch;
19use tokio::task::{JoinHandle, JoinSet};
20use tracing::{debug, error, info, warn};
21
22use crate::metrics::observe_queue_pickup_latency;
23use crate::queues::{backoff_config_for_queue, retry_delay_secs};
24use crate::{
25 config::ServerConfig,
26 jobs::{
27 notification_handler, relayer_health_check_handler, token_swap_request_handler,
28 transaction_request_handler, transaction_status_handler, transaction_submission_handler,
29 Job, NotificationSend, RelayerHealthCheck, TokenSwapRequest, TransactionRequest,
30 TransactionSend, TransactionStatusCheck,
31 },
32};
33
34use super::{HandlerError, WorkerContext};
35use super::{QueueBackendError, QueueType, WorkerHandle};
36
37#[derive(Debug)]
38enum ProcessingError {
39 Retryable(String),
40 Permanent(String),
41}
42
43#[derive(Debug)]
46enum MessageOutcome {
47 Delete { receipt_handle: String },
49 Retain,
52}
53
54#[derive(Clone)]
57struct PollLoopConfig {
58 queue_type: QueueType,
59 polling_interval: u64,
60 visibility_timeout: u32,
61 handler_timeout: Duration,
62 max_retries: usize,
63 poller_id: usize,
64 poller_count: usize,
65}
66
67pub async fn spawn_worker_for_queue(
81 sqs_client: aws_sdk_sqs::Client,
82 queue_type: QueueType,
83 queue_url: String,
84 app_state: Arc<ThinData<crate::models::DefaultAppState>>,
85 shutdown_rx: watch::Receiver<bool>,
86) -> Result<WorkerHandle, QueueBackendError> {
87 let concurrency = get_concurrency_for_queue(queue_type);
88 let max_retries = queue_type.max_retries();
89 let polling_interval = get_wait_time_for_queue(queue_type);
90 let poller_count = get_poller_count_for_queue(queue_type);
91 let visibility_timeout = queue_type.visibility_timeout_secs();
92 let handler_timeout_secs = handler_timeout_secs(queue_type);
93 let handler_timeout = Duration::from_secs(handler_timeout_secs);
94
95 info!(
96 queue_type = ?queue_type,
97 queue_url = %queue_url,
98 concurrency = concurrency,
99 max_retries = max_retries,
100 polling_interval_secs = polling_interval,
101 poller_count = poller_count,
102 visibility_timeout_secs = visibility_timeout,
103 handler_timeout_secs = handler_timeout_secs,
104 "Spawning SQS worker"
105 );
106
107 let semaphore = Arc::new(tokio::sync::Semaphore::new(concurrency));
109
110 let handle: JoinHandle<()> = tokio::spawn(async move {
111 let mut poller_handles: JoinSet<()> = JoinSet::new();
112
113 for poller_id in 0..poller_count {
114 let client = sqs_client.clone();
115 let url = queue_url.clone();
116 let state = app_state.clone();
117 let sem = semaphore.clone();
118 let mut rx = shutdown_rx.clone();
119 let config = PollLoopConfig {
120 queue_type,
121 polling_interval,
122 visibility_timeout,
123 handler_timeout,
124 max_retries,
125 poller_id,
126 poller_count,
127 };
128
129 poller_handles.spawn(async move {
130 run_poll_loop(client, url, state, sem, &mut rx, config).await;
131 });
132 }
133
134 while let Some(join_result) = poller_handles.join_next().await {
136 if let Err(err) = join_result {
137 error!(
138 queue_type = ?queue_type,
139 error = %err,
140 "SQS poller task terminated unexpectedly"
141 );
142 }
143 }
144 info!(queue_type = ?queue_type, "SQS worker stopped");
145 });
146
147 Ok(WorkerHandle::Tokio(handle))
148}
149
150async fn run_poll_loop(
153 sqs_client: aws_sdk_sqs::Client,
154 queue_url: String,
155 app_state: Arc<ThinData<crate::models::DefaultAppState>>,
156 semaphore: Arc<tokio::sync::Semaphore>,
157 shutdown_rx: &mut watch::Receiver<bool>,
158 config: PollLoopConfig,
159) {
160 let PollLoopConfig {
161 queue_type,
162 polling_interval,
163 visibility_timeout,
164 handler_timeout,
165 max_retries,
166 poller_id,
167 poller_count,
168 } = config;
169 let mut inflight: JoinSet<Option<String>> = JoinSet::new();
170 let mut consecutive_poll_errors: u32 = 0;
171 let mut pending_deletes: Vec<String> = Vec::new();
172
173 loop {
174 while let Some(result) = inflight.try_join_next() {
176 match result {
177 Ok(Some(receipt_handle)) => pending_deletes.push(receipt_handle),
178 Ok(None) => {} Err(e) => {
180 warn!(
181 queue_type = ?queue_type,
182 poller_id = poller_id,
183 error = %e,
184 "In-flight task failed"
185 );
186 }
187 }
188 }
189
190 if !pending_deletes.is_empty() {
192 flush_delete_batch(&sqs_client, &queue_url, &pending_deletes, queue_type).await;
193 pending_deletes.clear();
194 }
195
196 if *shutdown_rx.borrow() {
198 info!(queue_type = ?queue_type, poller_id = poller_id, "Shutdown signal received, stopping SQS poller");
199 break;
200 }
201
202 let available_permits = semaphore.available_permits();
209 let base_share = available_permits / poller_count;
210 let remainder = available_permits % poller_count;
211 let my_share = base_share + usize::from(poller_id < remainder);
212 if my_share == 0 {
213 tokio::select! {
214 _ = tokio::time::sleep(Duration::from_millis(50)) => continue,
215 _ = shutdown_rx.changed() => {
216 info!(queue_type = ?queue_type, poller_id = poller_id, "Shutdown signal received, stopping SQS poller");
217 break;
218 }
219 }
220 }
221
222 let batch_size = my_share.min(10) as i32;
224
225 let messages_result = tokio::select! {
227 result = sqs_client
228 .receive_message()
229 .queue_url(&queue_url)
230 .max_number_of_messages(batch_size) .wait_time_seconds(polling_interval as i32)
232 .visibility_timeout(visibility_timeout as i32)
233 .message_system_attribute_names(MessageSystemAttributeName::ApproximateReceiveCount)
234 .message_system_attribute_names(MessageSystemAttributeName::MessageGroupId)
235 .message_system_attribute_names(MessageSystemAttributeName::SentTimestamp)
236 .message_attribute_names("target_scheduled_on")
237 .message_attribute_names("retry_attempt")
238 .send() => result,
239 _ = shutdown_rx.changed() => {
240 info!(queue_type = ?queue_type, poller_id = poller_id, "Shutdown signal received during SQS poll, stopping poller");
241 break;
242 }
243 };
244
245 match messages_result {
246 Ok(output) => {
247 if consecutive_poll_errors > 0 {
248 info!(
249 queue_type = ?queue_type,
250 poller_id = poller_id,
251 previous_errors = consecutive_poll_errors,
252 "SQS polling recovered after consecutive errors"
253 );
254 }
255 consecutive_poll_errors = 0;
256
257 if let Some(messages) = output.messages {
258 if !messages.is_empty() {
259 debug!(
260 queue_type = ?queue_type,
261 poller_id = poller_id,
262 message_count = messages.len(),
263 "Received messages from SQS"
264 );
265
266 for message in messages {
268 let permit = match semaphore.clone().acquire_owned().await {
269 Ok(permit) => permit,
270 Err(err) => {
271 error!(
272 queue_type = ?queue_type,
273 poller_id = poller_id,
274 error = %err,
275 "Semaphore closed, stopping SQS poller loop"
276 );
277 return;
278 }
279 };
280 let client = sqs_client.clone();
281 let url = queue_url.clone();
282 let state = app_state.clone();
283
284 inflight.spawn(async move {
285 let _permit = permit; let result = tokio::time::timeout(
288 handler_timeout,
289 AssertUnwindSafe(process_message(
290 client.clone(),
291 message,
292 queue_type,
293 &url,
294 state,
295 max_retries,
296 ))
297 .catch_unwind(),
298 )
299 .await;
300
301 match result {
302 Ok(Ok(Ok(MessageOutcome::Delete { receipt_handle }))) => {
303 Some(receipt_handle)
304 }
305 Ok(Ok(Ok(MessageOutcome::Retain))) => None,
306 Ok(Ok(Err(e))) => {
307 error!(
308 queue_type = ?queue_type,
309 error = %e,
310 "Failed to process message"
311 );
312 None
313 }
314 Ok(Err(panic_info)) => {
315 let msg = panic_info
316 .downcast_ref::<String>()
317 .map(|s| s.as_str())
318 .or_else(|| {
319 panic_info.downcast_ref::<&str>().copied()
320 })
321 .unwrap_or("unknown panic");
322 error!(
323 queue_type = ?queue_type,
324 panic = %msg,
325 "Message handler panicked"
326 );
327 None
328 }
329 Err(_) => {
330 error!(
331 queue_type = ?queue_type,
332 timeout_secs = handler_timeout.as_secs(),
333 "Message handler timed out; message will be retried after visibility timeout"
334 );
335 None
336 }
337 }
338 });
339 }
340 }
341 }
342 }
343 Err(e) => {
344 consecutive_poll_errors = consecutive_poll_errors.saturating_add(1);
345 let backoff_secs = poll_error_backoff_secs(consecutive_poll_errors);
346 let (error_kind, error_code, error_message) = match &e {
347 SdkError::ServiceError(ctx) => {
348 ("service", ctx.err().code(), ctx.err().message())
349 }
350 SdkError::DispatchFailure(_) => ("dispatch", None, None),
351 SdkError::ResponseError(_) => ("response", None, None),
352 SdkError::TimeoutError(_) => ("timeout", None, None),
353 _ => ("other", None, None),
354 };
355 error!(
356 queue_type = ?queue_type,
357 poller_id = poller_id,
358 error_kind = error_kind,
359 error_code = error_code.unwrap_or("unknown"),
360 error_message = error_message.unwrap_or("n/a"),
361 error = %e,
362 error_debug = ?e,
363 consecutive_errors = consecutive_poll_errors,
364 backoff_secs = backoff_secs,
365 "Failed to receive messages from SQS, backing off"
366 );
367 tokio::select! {
368 _ = tokio::time::sleep(Duration::from_secs(backoff_secs)) => {}
369 _ = shutdown_rx.changed() => {
370 info!(queue_type = ?queue_type, poller_id = poller_id, "Shutdown signal received during backoff, stopping poller");
371 break;
372 }
373 }
374 }
375 }
376 }
377
378 if !inflight.is_empty() {
380 info!(
381 queue_type = ?queue_type,
382 poller_id = poller_id,
383 count = inflight.len(),
384 "Draining in-flight tasks before shutdown"
385 );
386 match tokio::time::timeout(Duration::from_secs(30), async {
387 while let Some(result) = inflight.join_next().await {
388 match result {
389 Ok(Some(receipt_handle)) => pending_deletes.push(receipt_handle),
390 Ok(None) => {}
391 Err(e) => {
392 warn!(
393 queue_type = ?queue_type,
394 poller_id = poller_id,
395 error = %e,
396 "In-flight task failed during drain"
397 );
398 }
399 }
400 }
401 })
402 .await
403 {
404 Ok(()) => {
405 info!(queue_type = ?queue_type, poller_id = poller_id, "All in-flight tasks drained")
406 }
407 Err(_) => {
408 warn!(
409 queue_type = ?queue_type,
410 poller_id = poller_id,
411 remaining = inflight.len(),
412 "Drain timeout, abandoning remaining tasks"
413 );
414 inflight.abort_all();
415 }
416 }
417 }
418
419 if !pending_deletes.is_empty() {
421 flush_delete_batch(&sqs_client, &queue_url, &pending_deletes, queue_type).await;
422 }
423}
424
425async fn process_message(
430 sqs_client: aws_sdk_sqs::Client,
431 message: Message,
432 queue_type: QueueType,
433 queue_url: &str,
434 app_state: Arc<ThinData<crate::models::DefaultAppState>>,
435 max_retries: usize,
436) -> Result<MessageOutcome, QueueBackendError> {
437 let body = message
438 .body()
439 .ok_or_else(|| QueueBackendError::QueueError("Empty message body".to_string()))?;
440
441 let receipt_handle = message
442 .receipt_handle()
443 .ok_or_else(|| QueueBackendError::QueueError("Missing receipt handle".to_string()))?;
444
445 if let Some(target_scheduled_on) = parse_target_scheduled_on(&message) {
447 let now = std::time::SystemTime::now()
448 .duration_since(std::time::SystemTime::UNIX_EPOCH)
449 .map_err(|e| QueueBackendError::QueueError(format!("System clock error: {e}")))?
450 .as_secs() as i64;
451 let remaining = target_scheduled_on - now;
452 if remaining > 0 {
453 let should_delete_original = defer_message(
454 &sqs_client,
455 queue_url,
456 body.to_string(),
457 &message,
458 target_scheduled_on,
459 remaining.min(900) as i32,
460 )
461 .await?;
462
463 debug!(
464 queue_type = ?queue_type,
465 remaining_seconds = remaining,
466 "Deferred scheduled SQS message for next delay hop"
467 );
468 return if should_delete_original {
469 Ok(MessageOutcome::Delete {
470 receipt_handle: receipt_handle.to_string(),
471 })
472 } else {
473 Ok(MessageOutcome::Retain)
474 };
475 }
476 }
477
478 let receive_count = message
480 .attributes()
481 .and_then(|attrs| attrs.get(&MessageSystemAttributeName::ApproximateReceiveCount))
482 .and_then(|count| count.parse::<usize>().ok())
483 .unwrap_or(1);
484 let attempt_number = receive_count.saturating_sub(1);
486 let logical_retry_attempt = parse_retry_attempt(&message).unwrap_or(attempt_number);
489
490 if let Some(baseline) = queue_pickup_baseline_ms(&message, receive_count) {
495 let now_ms = chrono::Utc::now().timestamp_millis();
496 let latency_secs = (now_ms - baseline).max(0) as f64 / 1000.0;
497 observe_queue_pickup_latency(queue_type.queue_name(), "sqs", latency_secs);
498 }
499
500 let sqs_message_id = message.message_id().unwrap_or("unknown").to_string();
502
503 debug!(
504 queue_type = ?queue_type,
505 message_id = %sqs_message_id,
506 attempt = attempt_number,
507 receive_count = receive_count,
508 max_retries = max_retries,
509 "Processing message"
510 );
511
512 let result = match queue_type {
514 QueueType::TransactionRequest => {
515 process_job::<TransactionRequest, _, _>(
516 body,
517 app_state,
518 attempt_number,
519 sqs_message_id,
520 "TransactionRequest",
521 transaction_request_handler,
522 )
523 .await
524 }
525 QueueType::TransactionSubmission => {
526 process_job::<TransactionSend, _, _>(
527 body,
528 app_state,
529 attempt_number,
530 sqs_message_id,
531 "TransactionSend",
532 transaction_submission_handler,
533 )
534 .await
535 }
536 QueueType::StatusCheck | QueueType::StatusCheckEvm | QueueType::StatusCheckStellar => {
537 process_job::<TransactionStatusCheck, _, _>(
538 body,
539 app_state,
540 attempt_number,
541 sqs_message_id,
542 "TransactionStatusCheck",
543 transaction_status_handler,
544 )
545 .await
546 }
547 QueueType::Notification => {
548 process_job::<NotificationSend, _, _>(
549 body,
550 app_state,
551 attempt_number,
552 sqs_message_id,
553 "NotificationSend",
554 notification_handler,
555 )
556 .await
557 }
558 QueueType::TokenSwapRequest => {
559 process_job::<TokenSwapRequest, _, _>(
560 body,
561 app_state,
562 attempt_number,
563 sqs_message_id,
564 "TokenSwapRequest",
565 token_swap_request_handler,
566 )
567 .await
568 }
569 QueueType::RelayerHealthCheck => {
570 process_job::<RelayerHealthCheck, _, _>(
571 body,
572 app_state,
573 attempt_number,
574 sqs_message_id,
575 "RelayerHealthCheck",
576 relayer_health_check_handler,
577 )
578 .await
579 }
580 };
581
582 match result {
583 Ok(()) => {
584 debug!(
585 queue_type = ?queue_type,
586 attempt = attempt_number,
587 "Message processed successfully"
588 );
589
590 Ok(MessageOutcome::Delete {
591 receipt_handle: receipt_handle.to_string(),
592 })
593 }
594 Err(ProcessingError::Permanent(e)) => {
595 error!(
596 queue_type = ?queue_type,
597 attempt = attempt_number,
598 error = %e,
599 "Permanent handler failure, message will be deleted"
600 );
601
602 Ok(MessageOutcome::Delete {
603 receipt_handle: receipt_handle.to_string(),
604 })
605 }
606 Err(ProcessingError::Retryable(e)) => {
607 if max_retries != usize::MAX && receive_count > max_retries {
609 error!(
610 queue_type = ?queue_type,
611 attempt = attempt_number,
612 receive_count = receive_count,
613 max_retries = max_retries,
614 error = %e,
615 "Max retries exceeded; message will be automatically moved to DLQ by SQS redrive policy"
616 );
617 return Ok(MessageOutcome::Retain);
618 }
619
620 let delay = if queue_type.is_status_check() {
624 compute_status_retry_delay(body, logical_retry_attempt)
625 } else {
626 retry_delay_secs(backoff_config_for_queue(queue_type), logical_retry_attempt)
627 };
628
629 if is_fifo_queue_url(queue_url) {
632 if let Err(err) = sqs_client
633 .change_message_visibility()
634 .queue_url(queue_url)
635 .receipt_handle(receipt_handle)
636 .visibility_timeout(delay.clamp(1, 900))
637 .send()
638 .await
639 {
640 error!(
641 queue_type = ?queue_type,
642 error = %err,
643 "Failed to set visibility timeout for retry; falling back to existing visibility timeout"
644 );
645 return Ok(MessageOutcome::Retain);
646 }
647
648 debug!(
649 queue_type = ?queue_type,
650 attempt = logical_retry_attempt,
651 delay_seconds = delay,
652 error = %e,
653 "Retry scheduled via visibility timeout"
654 );
655
656 return Ok(MessageOutcome::Retain);
657 }
658
659 let next_retry_attempt = logical_retry_attempt.saturating_add(1);
660
661 if let Err(send_err) = sqs_client
665 .send_message()
666 .queue_url(queue_url)
667 .message_body(body.to_string())
668 .delay_seconds(delay)
669 .message_attributes(
670 "retry_attempt",
671 MessageAttributeValue::builder()
672 .data_type("Number")
673 .string_value(next_retry_attempt.to_string())
674 .build()
675 .map_err(|err| {
676 QueueBackendError::SqsError(format!(
677 "Failed to build retry_attempt attribute: {err}"
678 ))
679 })?,
680 )
681 .send()
682 .await
683 {
684 error!(
685 queue_type = ?queue_type,
686 error = %send_err,
687 "Failed to re-enqueue message; leaving original for visibility timeout retry"
688 );
689 return Ok(MessageOutcome::Retain);
691 }
692
693 debug!(
694 queue_type = ?queue_type,
695 attempt = logical_retry_attempt,
696 delay_seconds = delay,
697 error = %e,
698 "Message re-enqueued with backoff delay"
699 );
700
701 Ok(MessageOutcome::Delete {
703 receipt_handle: receipt_handle.to_string(),
704 })
705 }
706 }
707}
708
709async fn process_job<T, F, Fut>(
712 body: &str,
713 app_state: Arc<ThinData<crate::models::DefaultAppState>>,
714 attempt: usize,
715 task_id: String,
716 type_name: &str,
717 handler: F,
718) -> Result<(), ProcessingError>
719where
720 T: DeserializeOwned,
721 F: FnOnce(Job<T>, ThinData<crate::models::DefaultAppState>, WorkerContext) -> Fut,
722 Fut: Future<Output = Result<(), HandlerError>>,
723{
724 let job: Job<T> = serde_json::from_str(body).map_err(|e| {
725 error!(error = %e, "Failed to deserialize {} job", type_name);
726 ProcessingError::Permanent(format!("Failed to deserialize {type_name} job: {e}"))
728 })?;
729
730 let ctx = WorkerContext::new(attempt, task_id);
731 handler(job, (*app_state).clone(), ctx)
732 .await
733 .map_err(map_handler_error)
734}
735
736fn map_handler_error(error: HandlerError) -> ProcessingError {
737 match error {
738 HandlerError::Abort(msg) => ProcessingError::Permanent(msg),
739 HandlerError::Retry(msg) => ProcessingError::Retryable(msg),
740 }
741}
742
743fn parse_target_scheduled_on(message: &Message) -> Option<i64> {
744 message
745 .message_attributes()
746 .and_then(|attrs| attrs.get("target_scheduled_on"))
747 .and_then(|value| value.string_value())
748 .and_then(|value| value.parse::<i64>().ok())
749}
750
751fn parse_retry_attempt(message: &Message) -> Option<usize> {
752 message
753 .message_attributes()
754 .and_then(|attrs| attrs.get("retry_attempt"))
755 .and_then(|value| value.string_value())
756 .and_then(|value| value.parse::<usize>().ok())
757}
758
759fn queue_pickup_baseline_ms(message: &Message, receive_count: usize) -> Option<i64> {
760 if receive_count != 1 {
761 return None;
762 }
763
764 parse_target_scheduled_on(message)
765 .map(|ts_secs| ts_secs * 1000)
766 .or_else(|| {
767 message
768 .attributes()
769 .and_then(|a| a.get(&MessageSystemAttributeName::SentTimestamp))
770 .and_then(|v| v.parse::<i64>().ok())
771 })
772}
773
774fn is_fifo_queue_url(queue_url: &str) -> bool {
775 queue_url.ends_with(".fifo")
776}
777
778async fn defer_message(
779 sqs_client: &aws_sdk_sqs::Client,
780 queue_url: &str,
781 body: String,
782 message: &Message,
783 target_scheduled_on: i64,
784 delay_seconds: i32,
785) -> Result<bool, QueueBackendError> {
786 if is_fifo_queue_url(queue_url) {
787 let receipt_handle = message.receipt_handle().ok_or_else(|| {
788 QueueBackendError::QueueError(
789 "Cannot defer FIFO message: missing receipt handle".to_string(),
790 )
791 })?;
792
793 sqs_client
794 .change_message_visibility()
795 .queue_url(queue_url)
796 .receipt_handle(receipt_handle)
797 .visibility_timeout(delay_seconds.clamp(1, 900))
798 .send()
799 .await
800 .map_err(|e| {
801 QueueBackendError::SqsError(format!(
802 "Failed to defer FIFO message via visibility timeout: {e}"
803 ))
804 })?;
805
806 return Ok(false);
807 }
808
809 let request = sqs_client
812 .send_message()
813 .queue_url(queue_url)
814 .message_body(body)
815 .delay_seconds(delay_seconds.clamp(1, 900))
816 .message_attributes(
817 "target_scheduled_on",
818 MessageAttributeValue::builder()
819 .data_type("Number")
820 .string_value(target_scheduled_on.to_string())
821 .build()
822 .map_err(|e| {
823 QueueBackendError::SqsError(format!(
824 "Failed to build deferred scheduled attribute: {e}"
825 ))
826 })?,
827 );
828
829 request.send().await.map_err(|e| {
830 QueueBackendError::SqsError(format!("Failed to defer scheduled message: {e}"))
831 })?;
832
833 Ok(true)
834}
835
836#[derive(serde::Deserialize)]
841struct StatusCheckData {
842 network_type: Option<crate::models::NetworkType>,
843}
844
845#[derive(serde::Deserialize)]
850struct PartialStatusCheckJob {
851 data: StatusCheckData,
852}
853
854fn compute_status_retry_delay(body: &str, attempt: usize) -> i32 {
861 let network_type = serde_json::from_str::<PartialStatusCheckJob>(body)
862 .ok()
863 .and_then(|j| j.data.network_type);
864
865 crate::queues::retry_config::status_check_retry_delay_secs(network_type, attempt)
866}
867
868fn get_wait_time_for_queue(queue_type: QueueType) -> u64 {
870 ServerConfig::get_sqs_wait_time(
871 queue_type.sqs_env_key(),
872 queue_type.default_wait_time_secs(),
873 )
874}
875
876fn get_poller_count_for_queue(queue_type: QueueType) -> usize {
878 let configured = ServerConfig::get_sqs_poller_count(
879 queue_type.sqs_env_key(),
880 queue_type.default_poller_count(),
881 );
882 if configured == 0 {
883 warn!(
884 queue_type = ?queue_type,
885 "Configured poller count is 0; clamping to 1"
886 );
887 1
888 } else {
889 configured
890 }
891}
892
893fn get_concurrency_for_queue(queue_type: QueueType) -> usize {
895 let configured = ServerConfig::get_worker_concurrency(
896 queue_type.concurrency_env_key(),
897 queue_type.default_concurrency(),
898 );
899 if configured == 0 {
900 warn!(
901 queue_type = ?queue_type,
902 "Configured concurrency is 0; clamping to 1"
903 );
904 1
905 } else {
906 configured
907 }
908}
909
910fn handler_timeout_secs(queue_type: QueueType) -> u64 {
914 u64::from(queue_type.visibility_timeout_secs().max(1))
915}
916
917const MAX_POLL_BACKOFF_SECS: u64 = 60;
919
920const RECOVERY_PROBE_EVERY: u32 = 4;
924
925fn poll_error_backoff_secs(consecutive_errors: u32) -> u64 {
929 let base: u64 = 5;
930
931 if consecutive_errors >= 7 && consecutive_errors % RECOVERY_PROBE_EVERY == 0 {
934 return base;
935 }
936
937 let exponent = consecutive_errors.saturating_sub(1).min(16);
938 base.saturating_mul(2_u64.saturating_pow(exponent))
939 .min(MAX_POLL_BACKOFF_SECS)
940}
941
942async fn flush_delete_batch(
948 sqs_client: &aws_sdk_sqs::Client,
949 queue_url: &str,
950 batch: &[String],
951 queue_type: QueueType,
952) -> usize {
953 if batch.is_empty() {
954 return 0;
955 }
956
957 let mut deleted = 0;
958
959 for chunk in batch.chunks(10) {
960 let entries: Vec<DeleteMessageBatchRequestEntry> = chunk
961 .iter()
962 .enumerate()
963 .map(|(i, handle)| {
964 DeleteMessageBatchRequestEntry::builder()
965 .id(i.to_string())
966 .receipt_handle(handle)
967 .build()
968 .expect("id and receipt_handle are always set")
969 })
970 .collect();
971
972 match sqs_client
973 .delete_message_batch()
974 .queue_url(queue_url)
975 .set_entries(Some(entries))
976 .send()
977 .await
978 {
979 Ok(output) => {
980 deleted += output.successful().len();
981
982 for f in output.failed() {
983 warn!(
984 queue_type = ?queue_type,
985 id = %f.id(),
986 code = %f.code(),
987 message = f.message().unwrap_or("unknown"),
988 "Batch delete entry failed (message will be redelivered)"
989 );
990 }
991 }
992 Err(e) => {
993 error!(
994 queue_type = ?queue_type,
995 error = %e,
996 batch_size = chunk.len(),
997 "Batch delete API call failed (messages will be redelivered)"
998 );
999 }
1000 }
1001 }
1002
1003 deleted
1004}
1005
1006#[cfg(test)]
1007mod tests {
1008 use super::*;
1009
1010 #[test]
1011 fn test_get_concurrency_for_queue() {
1012 let concurrency = get_concurrency_for_queue(QueueType::TransactionRequest);
1014 assert!(concurrency > 0);
1015
1016 let concurrency = get_concurrency_for_queue(QueueType::StatusCheck);
1017 assert!(concurrency > 0);
1018 }
1019
1020 #[test]
1021 fn test_handler_timeout_secs_is_positive() {
1022 let all = [
1023 QueueType::TransactionRequest,
1024 QueueType::TransactionSubmission,
1025 QueueType::StatusCheck,
1026 QueueType::StatusCheckEvm,
1027 QueueType::StatusCheckStellar,
1028 QueueType::Notification,
1029 QueueType::TokenSwapRequest,
1030 QueueType::RelayerHealthCheck,
1031 ];
1032 for queue_type in all {
1033 assert!(handler_timeout_secs(queue_type) > 0);
1034 }
1035 }
1036
1037 #[test]
1038 fn test_handler_timeout_secs_uses_visibility_timeout() {
1039 assert_eq!(
1040 handler_timeout_secs(QueueType::StatusCheckEvm),
1041 QueueType::StatusCheckEvm.visibility_timeout_secs() as u64
1042 );
1043 assert_eq!(
1044 handler_timeout_secs(QueueType::Notification),
1045 QueueType::Notification.visibility_timeout_secs() as u64
1046 );
1047 }
1048
1049 #[test]
1050 fn test_parse_target_scheduled_on() {
1051 let message = Message::builder().build();
1053
1054 assert_eq!(parse_target_scheduled_on(&message), None);
1056
1057 let message = Message::builder()
1059 .message_attributes(
1060 "target_scheduled_on",
1061 MessageAttributeValue::builder()
1062 .data_type("Number")
1063 .string_value("1234567890")
1064 .build()
1065 .unwrap(),
1066 )
1067 .build();
1068
1069 assert_eq!(parse_target_scheduled_on(&message), Some(1234567890));
1070 }
1071
1072 #[test]
1073 fn test_parse_retry_attempt() {
1074 let message = Message::builder().build();
1075 assert_eq!(parse_retry_attempt(&message), None);
1076
1077 let message = Message::builder()
1078 .message_attributes(
1079 "retry_attempt",
1080 MessageAttributeValue::builder()
1081 .data_type("Number")
1082 .string_value("7")
1083 .build()
1084 .unwrap(),
1085 )
1086 .build();
1087 assert_eq!(parse_retry_attempt(&message), Some(7));
1088 }
1089
1090 #[test]
1091 fn test_map_handler_error() {
1092 let error = HandlerError::Abort("Validation failed".to_string());
1094 let result = map_handler_error(error);
1095 assert!(matches!(result, ProcessingError::Permanent(_)));
1096
1097 let error = HandlerError::Retry("Network timeout".to_string());
1099 let result = map_handler_error(error);
1100 assert!(matches!(result, ProcessingError::Retryable(_)));
1101 }
1102
1103 #[test]
1104 fn test_is_fifo_queue_url() {
1105 assert!(is_fifo_queue_url(
1106 "https://sqs.us-east-1.amazonaws.com/123/queue.fifo"
1107 ));
1108 assert!(!is_fifo_queue_url(
1109 "https://sqs.us-east-1.amazonaws.com/123/queue"
1110 ));
1111 }
1112
1113 #[test]
1114 fn test_compute_status_retry_delay_evm() {
1115 let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1","network_type":"evm"}}"#;
1117 assert_eq!(compute_status_retry_delay(body, 0), 8);
1118 assert_eq!(compute_status_retry_delay(body, 1), 12);
1119 assert_eq!(compute_status_retry_delay(body, 8), 12);
1120 }
1121
1122 #[test]
1123 fn test_compute_status_retry_delay_stellar() {
1124 let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1","network_type":"stellar"}}"#;
1125 assert_eq!(compute_status_retry_delay(body, 0), 2);
1126 assert_eq!(compute_status_retry_delay(body, 1), 3);
1127 assert_eq!(compute_status_retry_delay(body, 8), 3);
1128 }
1129
1130 #[test]
1131 fn test_compute_status_retry_delay_solana() {
1132 let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1","network_type":"solana"}}"#;
1133 assert_eq!(compute_status_retry_delay(body, 0), 5);
1134 assert_eq!(compute_status_retry_delay(body, 1), 8);
1135 assert_eq!(compute_status_retry_delay(body, 8), 8);
1136 }
1137
1138 #[test]
1139 fn test_compute_status_retry_delay_missing_network() {
1140 let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1"}}"#;
1141 assert_eq!(compute_status_retry_delay(body, 0), 5);
1142 assert_eq!(compute_status_retry_delay(body, 1), 8);
1143 assert_eq!(compute_status_retry_delay(body, 8), 8);
1144 }
1145
1146 #[test]
1147 fn test_compute_status_retry_delay_invalid_body() {
1148 assert_eq!(compute_status_retry_delay("not json", 0), 5);
1149 assert_eq!(compute_status_retry_delay("not json", 1), 8);
1150 assert_eq!(compute_status_retry_delay("not json", 8), 8);
1151 }
1152
1153 #[tokio::test]
1154 async fn test_semaphore_released_on_panic() {
1155 let sem = Arc::new(tokio::sync::Semaphore::new(1));
1156 let permit = sem.clone().acquire_owned().await.unwrap();
1157
1158 let handle = tokio::spawn(async move {
1159 let _permit = permit; let _ = AssertUnwindSafe(async { panic!("test panic") })
1161 .catch_unwind()
1162 .await;
1163 });
1164
1165 handle.await.unwrap();
1166 let _p = tokio::time::timeout(Duration::from_millis(100), sem.acquire())
1168 .await
1169 .expect("permit should be available after panic");
1170 }
1171
1172 #[test]
1173 fn test_poll_error_backoff_secs() {
1174 assert_eq!(poll_error_backoff_secs(1), 5);
1176 assert_eq!(poll_error_backoff_secs(2), 10);
1178 assert_eq!(poll_error_backoff_secs(3), 20);
1180 assert_eq!(poll_error_backoff_secs(4), 40);
1182 assert_eq!(poll_error_backoff_secs(5), 60);
1184 assert_eq!(poll_error_backoff_secs(6), 60);
1185 assert_eq!(poll_error_backoff_secs(7), 60);
1186 assert_eq!(poll_error_backoff_secs(8), 5);
1188 assert_eq!(poll_error_backoff_secs(9), 60);
1189 assert_eq!(poll_error_backoff_secs(12), 5); }
1191
1192 #[test]
1193 fn test_poll_error_backoff_zero_errors() {
1194 assert_eq!(poll_error_backoff_secs(0), 5);
1196 }
1197
1198 #[test]
1199 fn test_poll_error_backoff_recovery_probes() {
1200 for i in (8..=100).step_by(RECOVERY_PROBE_EVERY as usize) {
1202 assert_eq!(
1203 poll_error_backoff_secs(i as u32),
1204 5,
1205 "Expected recovery probe at error {i}"
1206 );
1207 }
1208 }
1209
1210 #[test]
1211 fn test_message_outcome_delete_carries_receipt_handle() {
1212 let handle = "test-receipt-handle-123".to_string();
1213 let outcome = MessageOutcome::Delete {
1214 receipt_handle: handle.clone(),
1215 };
1216 match outcome {
1217 MessageOutcome::Delete { receipt_handle } => {
1218 assert_eq!(receipt_handle, handle);
1219 }
1220 MessageOutcome::Retain => panic!("Expected Delete variant"),
1221 }
1222 }
1223
1224 #[test]
1225 fn test_message_outcome_retain() {
1226 let outcome = MessageOutcome::Retain;
1227 assert!(matches!(outcome, MessageOutcome::Retain));
1228 }
1229
1230 #[test]
1231 fn test_batch_delete_entry_builder() {
1232 let handles = vec![
1235 "receipt-0".to_string(),
1236 "receipt-1".to_string(),
1237 "receipt-2".to_string(),
1238 ];
1239 let entries: Vec<DeleteMessageBatchRequestEntry> = handles
1240 .iter()
1241 .enumerate()
1242 .map(|(i, handle)| {
1243 DeleteMessageBatchRequestEntry::builder()
1244 .id(i.to_string())
1245 .receipt_handle(handle)
1246 .build()
1247 .expect("id and receipt_handle are set")
1248 })
1249 .collect();
1250
1251 assert_eq!(entries.len(), 3);
1252 assert_eq!(entries[0].id(), "0");
1253 assert_eq!(entries[0].receipt_handle(), "receipt-0");
1254 assert_eq!(entries[2].id(), "2");
1255 assert_eq!(entries[2].receipt_handle(), "receipt-2");
1256 }
1257
1258 #[test]
1259 fn test_batch_chunking_logic() {
1260 let handles: Vec<String> = (0..25).map(|i| format!("receipt-{i}")).collect();
1263 let chunks: Vec<&[String]> = handles.chunks(10).collect();
1264
1265 assert_eq!(chunks.len(), 3);
1266 assert_eq!(chunks[0].len(), 10);
1267 assert_eq!(chunks[1].len(), 10);
1268 assert_eq!(chunks[2].len(), 5);
1269 }
1270
1271 #[test]
1272 fn test_outcome_collection_pattern() {
1273 let outcomes = vec![
1276 Some("receipt-1".to_string()), None, Some("receipt-2".to_string()), None, Some("receipt-3".to_string()), ];
1282
1283 let pending_deletes: Vec<String> = outcomes.into_iter().flatten().collect();
1284
1285 assert_eq!(pending_deletes.len(), 3);
1286 assert_eq!(pending_deletes[0], "receipt-1");
1287 assert_eq!(pending_deletes[1], "receipt-2");
1288 assert_eq!(pending_deletes[2], "receipt-3");
1289 }
1290
1291 #[test]
1294 fn test_parse_target_scheduled_on_non_numeric_string() {
1295 let message = Message::builder()
1296 .message_attributes(
1297 "target_scheduled_on",
1298 MessageAttributeValue::builder()
1299 .data_type("String")
1300 .string_value("not-a-number")
1301 .build()
1302 .unwrap(),
1303 )
1304 .build();
1305 assert_eq!(parse_target_scheduled_on(&message), None);
1306 }
1307
1308 #[test]
1309 fn test_parse_target_scheduled_on_empty_string() {
1310 let message = Message::builder()
1311 .message_attributes(
1312 "target_scheduled_on",
1313 MessageAttributeValue::builder()
1314 .data_type("Number")
1315 .string_value("")
1316 .build()
1317 .unwrap(),
1318 )
1319 .build();
1320 assert_eq!(parse_target_scheduled_on(&message), None);
1321 }
1322
1323 #[test]
1324 fn test_parse_target_scheduled_on_negative_value() {
1325 let message = Message::builder()
1326 .message_attributes(
1327 "target_scheduled_on",
1328 MessageAttributeValue::builder()
1329 .data_type("Number")
1330 .string_value("-1000")
1331 .build()
1332 .unwrap(),
1333 )
1334 .build();
1335 assert_eq!(parse_target_scheduled_on(&message), Some(-1000));
1337 }
1338
1339 #[test]
1340 fn test_parse_target_scheduled_on_float_string() {
1341 let message = Message::builder()
1342 .message_attributes(
1343 "target_scheduled_on",
1344 MessageAttributeValue::builder()
1345 .data_type("Number")
1346 .string_value("1234567890.5")
1347 .build()
1348 .unwrap(),
1349 )
1350 .build();
1351 assert_eq!(parse_target_scheduled_on(&message), None);
1353 }
1354
1355 #[test]
1356 fn test_parse_target_scheduled_on_zero() {
1357 let message = Message::builder()
1358 .message_attributes(
1359 "target_scheduled_on",
1360 MessageAttributeValue::builder()
1361 .data_type("Number")
1362 .string_value("0")
1363 .build()
1364 .unwrap(),
1365 )
1366 .build();
1367 assert_eq!(parse_target_scheduled_on(&message), Some(0));
1368 }
1369
1370 #[test]
1371 fn test_parse_target_scheduled_on_wrong_attribute_name() {
1372 let message = Message::builder()
1374 .message_attributes(
1375 "wrong_key",
1376 MessageAttributeValue::builder()
1377 .data_type("Number")
1378 .string_value("1234567890")
1379 .build()
1380 .unwrap(),
1381 )
1382 .build();
1383 assert_eq!(parse_target_scheduled_on(&message), None);
1384 }
1385
1386 #[test]
1389 fn test_parse_retry_attempt_non_numeric_string() {
1390 let message = Message::builder()
1391 .message_attributes(
1392 "retry_attempt",
1393 MessageAttributeValue::builder()
1394 .data_type("String")
1395 .string_value("abc")
1396 .build()
1397 .unwrap(),
1398 )
1399 .build();
1400 assert_eq!(parse_retry_attempt(&message), None);
1401 }
1402
1403 #[test]
1404 fn test_parse_retry_attempt_negative_value() {
1405 let message = Message::builder()
1406 .message_attributes(
1407 "retry_attempt",
1408 MessageAttributeValue::builder()
1409 .data_type("Number")
1410 .string_value("-1")
1411 .build()
1412 .unwrap(),
1413 )
1414 .build();
1415 assert_eq!(parse_retry_attempt(&message), None);
1417 }
1418
1419 #[test]
1420 fn test_parse_retry_attempt_zero() {
1421 let message = Message::builder()
1422 .message_attributes(
1423 "retry_attempt",
1424 MessageAttributeValue::builder()
1425 .data_type("Number")
1426 .string_value("0")
1427 .build()
1428 .unwrap(),
1429 )
1430 .build();
1431 assert_eq!(parse_retry_attempt(&message), Some(0));
1432 }
1433
1434 #[test]
1435 fn test_parse_retry_attempt_large_value() {
1436 let message = Message::builder()
1437 .message_attributes(
1438 "retry_attempt",
1439 MessageAttributeValue::builder()
1440 .data_type("Number")
1441 .string_value("999999")
1442 .build()
1443 .unwrap(),
1444 )
1445 .build();
1446 assert_eq!(parse_retry_attempt(&message), Some(999999));
1447 }
1448
1449 #[test]
1450 fn test_queue_pickup_baseline_ms_uses_scheduled_time_on_first_delivery() {
1451 let message = Message::builder()
1452 .message_attributes(
1453 "target_scheduled_on",
1454 MessageAttributeValue::builder()
1455 .data_type("Number")
1456 .string_value("123")
1457 .build()
1458 .unwrap(),
1459 )
1460 .set_attributes(Some(std::collections::HashMap::from([(
1461 MessageSystemAttributeName::SentTimestamp,
1462 "999999".to_string(),
1463 )])))
1464 .build();
1465
1466 assert_eq!(queue_pickup_baseline_ms(&message, 1), Some(123_000));
1467 }
1468
1469 #[test]
1470 fn test_queue_pickup_baseline_ms_falls_back_to_sent_timestamp() {
1471 let message = Message::builder()
1472 .set_attributes(Some(std::collections::HashMap::from([(
1473 MessageSystemAttributeName::SentTimestamp,
1474 "123456".to_string(),
1475 )])))
1476 .build();
1477
1478 assert_eq!(queue_pickup_baseline_ms(&message, 1), Some(123456));
1479 }
1480
1481 #[test]
1482 fn test_queue_pickup_baseline_ms_skips_retries() {
1483 let message = Message::builder()
1484 .message_attributes(
1485 "target_scheduled_on",
1486 MessageAttributeValue::builder()
1487 .data_type("Number")
1488 .string_value("123")
1489 .build()
1490 .unwrap(),
1491 )
1492 .set_attributes(Some(std::collections::HashMap::from([(
1493 MessageSystemAttributeName::SentTimestamp,
1494 "123456".to_string(),
1495 )])))
1496 .build();
1497
1498 assert_eq!(queue_pickup_baseline_ms(&message, 2), None);
1499 }
1500
1501 #[test]
1504 fn test_is_fifo_queue_url_empty_string() {
1505 assert!(!is_fifo_queue_url(""));
1506 }
1507
1508 #[test]
1509 fn test_is_fifo_queue_url_just_fifo_suffix() {
1510 assert!(is_fifo_queue_url("my-queue.fifo"));
1511 }
1512
1513 #[test]
1514 fn test_is_fifo_queue_url_fifo_in_middle() {
1515 assert!(!is_fifo_queue_url(
1517 "https://sqs.us-east-1.amazonaws.com/123/.fifo/queue"
1518 ));
1519 }
1520
1521 #[test]
1522 fn test_is_fifo_queue_url_case_sensitive() {
1523 assert!(!is_fifo_queue_url(
1524 "https://sqs.us-east-1.amazonaws.com/123/queue.FIFO"
1525 ));
1526 assert!(!is_fifo_queue_url(
1527 "https://sqs.us-east-1.amazonaws.com/123/queue.Fifo"
1528 ));
1529 }
1530
1531 #[test]
1532 fn test_is_fifo_queue_url_standard_queue_variations() {
1533 assert!(!is_fifo_queue_url(
1534 "https://sqs.us-east-1.amazonaws.com/123456789/my-queue"
1535 ));
1536 assert!(!is_fifo_queue_url(
1537 "https://sqs.eu-west-1.amazonaws.com/123456789/relayer-tx-request"
1538 ));
1539 assert!(!is_fifo_queue_url(
1540 "http://localhost:4566/000000000000/test-queue"
1541 ));
1542 }
1543
1544 #[test]
1545 fn test_is_fifo_queue_url_localstack() {
1546 assert!(is_fifo_queue_url(
1548 "http://localhost:4566/000000000000/test-queue.fifo"
1549 ));
1550 }
1551
1552 #[test]
1555 fn test_map_handler_error_preserves_abort_message() {
1556 let msg = "Validation failed: invalid nonce";
1557 let error = HandlerError::Abort(msg.to_string());
1558 match map_handler_error(error) {
1559 ProcessingError::Permanent(s) => assert_eq!(s, msg),
1560 ProcessingError::Retryable(_) => panic!("Expected Permanent"),
1561 }
1562 }
1563
1564 #[test]
1565 fn test_map_handler_error_preserves_retry_message() {
1566 let msg = "RPC timeout after 30s";
1567 let error = HandlerError::Retry(msg.to_string());
1568 match map_handler_error(error) {
1569 ProcessingError::Retryable(s) => assert_eq!(s, msg),
1570 ProcessingError::Permanent(_) => panic!("Expected Retryable"),
1571 }
1572 }
1573
1574 #[test]
1575 fn test_map_handler_error_empty_message() {
1576 let error = HandlerError::Abort(String::new());
1577 match map_handler_error(error) {
1578 ProcessingError::Permanent(s) => assert!(s.is_empty()),
1579 ProcessingError::Retryable(_) => panic!("Expected Permanent"),
1580 }
1581 }
1582
1583 #[test]
1586 fn test_handler_timeout_secs_matches_visibility_timeout_for_all_queues() {
1587 let all = [
1588 QueueType::TransactionRequest,
1589 QueueType::TransactionSubmission,
1590 QueueType::StatusCheck,
1591 QueueType::StatusCheckEvm,
1592 QueueType::StatusCheckStellar,
1593 QueueType::Notification,
1594 QueueType::TokenSwapRequest,
1595 QueueType::RelayerHealthCheck,
1596 ];
1597 for qt in all {
1598 assert_eq!(
1599 handler_timeout_secs(qt),
1600 qt.visibility_timeout_secs().max(1) as u64,
1601 "{qt:?}: handler timeout should equal max(visibility_timeout, 1)"
1602 );
1603 }
1604 }
1605
1606 #[test]
1609 fn test_get_concurrency_for_queue_all_types_positive() {
1610 let all = [
1611 QueueType::TransactionRequest,
1612 QueueType::TransactionSubmission,
1613 QueueType::StatusCheck,
1614 QueueType::StatusCheckEvm,
1615 QueueType::StatusCheckStellar,
1616 QueueType::Notification,
1617 QueueType::TokenSwapRequest,
1618 QueueType::RelayerHealthCheck,
1619 ];
1620 for qt in all {
1621 assert!(
1622 get_concurrency_for_queue(qt) > 0,
1623 "{qt:?}: concurrency must be positive (clamped to at least 1)"
1624 );
1625 }
1626 }
1627
1628 #[test]
1631 fn test_poll_error_backoff_never_exceeds_max() {
1632 for i in 0..200 {
1633 let backoff = poll_error_backoff_secs(i);
1634 assert!(
1635 backoff <= MAX_POLL_BACKOFF_SECS,
1636 "Error count {i}: backoff {backoff}s exceeds MAX {MAX_POLL_BACKOFF_SECS}s"
1637 );
1638 }
1639 }
1640
1641 #[test]
1642 fn test_poll_error_backoff_u32_max_does_not_overflow() {
1643 let backoff = poll_error_backoff_secs(u32::MAX);
1644 assert!(backoff <= MAX_POLL_BACKOFF_SECS);
1645 assert!(backoff > 0);
1646 }
1647
1648 #[test]
1649 fn test_poll_error_backoff_always_positive() {
1650 for i in 0..200 {
1651 assert!(
1652 poll_error_backoff_secs(i) > 0,
1653 "Error count {i}: backoff must be positive"
1654 );
1655 }
1656 }
1657
1658 #[test]
1659 fn test_poll_error_backoff_monotonic_before_cap() {
1660 let mut prev = poll_error_backoff_secs(0);
1662 for i in 1..=4 {
1663 let curr = poll_error_backoff_secs(i);
1664 assert!(
1665 curr >= prev,
1666 "Backoff should be non-decreasing before cap: {prev} -> {curr} at error {i}"
1667 );
1668 prev = curr;
1669 }
1670 }
1671
1672 #[test]
1675 fn test_max_poll_backoff_is_reasonable() {
1676 assert!(
1677 MAX_POLL_BACKOFF_SECS >= 10,
1678 "Max backoff should be at least 10s to avoid tight error loops"
1679 );
1680 assert!(
1681 MAX_POLL_BACKOFF_SECS <= 300,
1682 "Max backoff should be at most 5 minutes to detect recovery promptly"
1683 );
1684 }
1685
1686 #[test]
1687 fn test_recovery_probe_every_is_valid() {
1688 assert!(
1689 RECOVERY_PROBE_EVERY >= 2,
1690 "Recovery probe interval must be at least 2 to avoid probing every attempt"
1691 );
1692 assert!(
1693 RECOVERY_PROBE_EVERY <= 10,
1694 "Recovery probe interval should not be too large or recovery detection is slow"
1695 );
1696 }
1697
1698 #[test]
1701 fn test_compute_status_retry_delay_very_high_attempt() {
1702 let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1","network_type":"evm"}}"#;
1703 assert_eq!(compute_status_retry_delay(body, 1000), 12);
1705 assert_eq!(compute_status_retry_delay(body, usize::MAX), 12);
1706 }
1707
1708 #[test]
1709 fn test_compute_status_retry_delay_empty_body() {
1710 assert_eq!(compute_status_retry_delay("", 0), 5);
1712 assert_eq!(compute_status_retry_delay("{}", 0), 5);
1713 }
1714
1715 #[test]
1716 fn test_compute_status_retry_delay_partial_json() {
1717 assert_eq!(compute_status_retry_delay(r#"{"data":{}}"#, 0), 5);
1719 assert_eq!(
1720 compute_status_retry_delay(r#"{"data":{"network_type":"evm"}}"#, 0),
1721 8
1722 );
1723 }
1724
1725 #[test]
1728 fn test_partial_status_check_job_deserializes_network_type() {
1729 let body = r#"{"data":{"network_type":"evm","extra_field":"ignored"}}"#;
1730 let parsed: PartialStatusCheckJob = serde_json::from_str(body).unwrap();
1731 assert_eq!(
1732 parsed.data.network_type,
1733 Some(crate::models::NetworkType::Evm)
1734 );
1735 }
1736
1737 #[test]
1738 fn test_partial_status_check_job_handles_missing_network_type() {
1739 let body = r#"{"data":{"transaction_id":"tx1"}}"#;
1740 let parsed: PartialStatusCheckJob = serde_json::from_str(body).unwrap();
1741 assert_eq!(parsed.data.network_type, None);
1742 }
1743
1744 #[test]
1745 fn test_partial_status_check_job_rejects_missing_data() {
1746 let body = r#"{"not_data":{}}"#;
1747 let result = serde_json::from_str::<PartialStatusCheckJob>(body);
1748 assert!(result.is_err());
1749 }
1750
1751 #[test]
1754 fn test_fifo_detection_consistent_with_defer_and_retry_logic() {
1755 let standard = "https://sqs.us-east-1.amazonaws.com/123/relayer-status-check";
1760 let fifo = "https://sqs.us-east-1.amazonaws.com/123/relayer-status-check.fifo";
1761
1762 assert!(!is_fifo_queue_url(standard));
1763 assert!(is_fifo_queue_url(fifo));
1764 }
1765
1766 #[test]
1769 fn test_get_wait_time_for_queue_returns_positive() {
1770 let all = [
1771 QueueType::TransactionRequest,
1772 QueueType::TransactionSubmission,
1773 QueueType::StatusCheck,
1774 QueueType::StatusCheckEvm,
1775 QueueType::StatusCheckStellar,
1776 QueueType::Notification,
1777 QueueType::TokenSwapRequest,
1778 QueueType::RelayerHealthCheck,
1779 ];
1780 for qt in all {
1781 let wt = get_wait_time_for_queue(qt);
1782 assert!(
1783 wt <= 20,
1784 "{qt:?}: wait time {wt} exceeds SQS maximum of 20s"
1785 );
1786 }
1787 }
1788
1789 #[test]
1790 fn test_get_wait_time_for_queue_matches_defaults() {
1791 assert_eq!(
1793 get_wait_time_for_queue(QueueType::TransactionRequest),
1794 QueueType::TransactionRequest.default_wait_time_secs()
1795 );
1796 assert_eq!(
1797 get_wait_time_for_queue(QueueType::StatusCheck),
1798 QueueType::StatusCheck.default_wait_time_secs()
1799 );
1800 }
1801
1802 #[test]
1803 #[serial_test::serial]
1804 fn test_get_wait_time_for_queue_respects_env_override() {
1805 let env_var = format!(
1807 "SQS_{}_WAIT_TIME_SECONDS",
1808 QueueType::StatusCheck.sqs_env_key()
1809 );
1810 std::env::set_var(&env_var, "12");
1811 assert_eq!(get_wait_time_for_queue(QueueType::StatusCheck), 12);
1812 std::env::remove_var(&env_var);
1813 }
1814
1815 #[test]
1816 #[serial_test::serial]
1817 fn test_get_wait_time_for_queue_env_override_clamped_to_20() {
1818 let env_var = format!(
1819 "SQS_{}_WAIT_TIME_SECONDS",
1820 QueueType::Notification.sqs_env_key()
1821 );
1822 std::env::set_var(&env_var, "99");
1823 assert_eq!(
1824 get_wait_time_for_queue(QueueType::Notification),
1825 20,
1826 "Should clamp to SQS maximum of 20"
1827 );
1828 std::env::remove_var(&env_var);
1829 }
1830
1831 #[test]
1834 fn test_get_poller_count_for_queue_all_types_positive() {
1835 let all = [
1836 QueueType::TransactionRequest,
1837 QueueType::TransactionSubmission,
1838 QueueType::StatusCheck,
1839 QueueType::StatusCheckEvm,
1840 QueueType::StatusCheckStellar,
1841 QueueType::Notification,
1842 QueueType::TokenSwapRequest,
1843 QueueType::RelayerHealthCheck,
1844 ];
1845 for qt in all {
1846 assert!(
1847 get_poller_count_for_queue(qt) >= 1,
1848 "{qt:?}: poller count must be at least 1"
1849 );
1850 }
1851 }
1852
1853 #[test]
1854 fn test_get_poller_count_for_queue_matches_defaults() {
1855 assert_eq!(
1857 get_poller_count_for_queue(QueueType::TransactionRequest),
1858 QueueType::TransactionRequest.default_poller_count().max(1)
1859 );
1860 assert_eq!(
1861 get_poller_count_for_queue(QueueType::Notification),
1862 QueueType::Notification.default_poller_count().max(1)
1863 );
1864 }
1865
1866 #[test]
1867 #[serial_test::serial]
1868 fn test_get_poller_count_for_queue_respects_env_override() {
1869 let env_var = format!("SQS_{}_POLLER_COUNT", QueueType::Notification.sqs_env_key());
1870 std::env::set_var(&env_var, "5");
1871 assert_eq!(get_poller_count_for_queue(QueueType::Notification), 5);
1872 std::env::remove_var(&env_var);
1873 }
1874
1875 #[test]
1876 #[serial_test::serial]
1877 fn test_get_poller_count_for_queue_env_zero_clamped_to_1() {
1878 let env_var = format!("SQS_{}_POLLER_COUNT", QueueType::StatusCheck.sqs_env_key());
1879 std::env::set_var(&env_var, "0");
1880 assert_eq!(
1881 get_poller_count_for_queue(QueueType::StatusCheck),
1882 1,
1883 "Zero poller count from env should be clamped to 1"
1884 );
1885 std::env::remove_var(&env_var);
1886 }
1887
1888 #[test]
1891 fn test_poll_loop_config_clone() {
1892 let config = PollLoopConfig {
1893 queue_type: QueueType::TransactionRequest,
1894 polling_interval: 15,
1895 visibility_timeout: 120,
1896 handler_timeout: Duration::from_secs(120),
1897 max_retries: 3,
1898 poller_id: 0,
1899 poller_count: 2,
1900 };
1901 let cloned = config.clone();
1902 assert_eq!(cloned.polling_interval, 15);
1903 assert_eq!(cloned.poller_id, 0);
1904 assert_eq!(cloned.poller_count, 2);
1905 assert_eq!(cloned.max_retries, 3);
1906 }
1907}