openzeppelin_relayer/queues/redis/
worker.rs

1//! Redis/Apalis worker initialization.
2//!
3//! This module contains all Apalis-specific worker creation logic for the Redis
4//! queue backend, including WorkerBuilder configurations, Monitor setup,
5//! backoff strategies, and token swap cron workers.
6
7use actix_web::web::ThinData;
8
9use crate::{
10    config::ServerConfig,
11    constants::{
12        SYSTEM_CLEANUP_CRON_SCHEDULE, TRANSACTION_CLEANUP_CRON_SCHEDULE,
13        WORKER_SYSTEM_CLEANUP_RETRIES, WORKER_TOKEN_SWAP_REQUEST_RETRIES,
14        WORKER_TRANSACTION_CLEANUP_RETRIES,
15    },
16    jobs::{
17        notification_handler, relayer_health_check_handler, system_cleanup_handler,
18        token_swap_cron_handler, token_swap_request_handler, transaction_cleanup_handler,
19        transaction_request_handler, transaction_status_handler, transaction_submission_handler,
20        Job, JobProducerTrait, NotificationSend, RelayerHealthCheck, SystemCleanupCronReminder,
21        TokenSwapCronReminder, TokenSwapRequest, TransactionCleanupCronReminder,
22        TransactionRequest, TransactionSend, TransactionStatusCheck,
23    },
24    models::{
25        DefaultAppState, NetworkRepoModel, NotificationRepoModel, RelayerNetworkPolicy,
26        RelayerRepoModel, SignerRepoModel, ThinDataAppState, TransactionRepoModel,
27    },
28    repositories::{
29        ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository,
30        Repository, TransactionCounterTrait, TransactionRepository,
31    },
32};
33use apalis::prelude::*;
34
35use apalis::layers::retry::backoff::MakeBackoff;
36use apalis::layers::retry::{backoff::ExponentialBackoffMaker, RetryPolicy};
37use apalis::layers::ErrorHandlingLayer;
38
39/// Re-exports from [`tower::util`]
40pub use tower::util::rng::HasherRng;
41
42use apalis_cron::CronStream;
43use eyre::Result;
44use std::{str::FromStr, time::Duration};
45use tokio::signal::unix::SignalKind;
46use tracing::{debug, error, info};
47
48use crate::metrics::observe_queue_pickup_latency;
49
50use super::{filter_relayers_for_swap, QueueType, WorkerContext};
51use crate::queues::retry_config::{
52    RetryBackoffConfig, NOTIFICATION_BACKOFF, RELAYER_HEALTH_BACKOFF, STATUS_EVM_BACKOFF,
53    STATUS_GENERIC_BACKOFF, STATUS_STELLAR_BACKOFF, SYSTEM_CLEANUP_BACKOFF,
54    TOKEN_SWAP_CRON_BACKOFF, TOKEN_SWAP_REQUEST_BACKOFF, TX_CLEANUP_BACKOFF, TX_REQUEST_BACKOFF,
55    TX_SUBMISSION_BACKOFF,
56};
57
58// ---------------------------------------------------------------------------
59// Apalis adapter functions
60//
61// These thin adapters are the ONLY place where Apalis-specific handler types
62// (Data, Attempt, Worker<Context>, TaskId, RedisContext) appear. They convert
63// Apalis types → WorkerContext and HandlerError → apalis::prelude::Error,
64// keeping all handler business logic backend-neutral.
65// ---------------------------------------------------------------------------
66
67/// Observe queue pickup latency for Redis/Apalis workers.
68///
69/// Uses `available_at` (the intended availability time) when present to exclude
70/// intentional scheduling delay. Falls back to `timestamp` (job creation time)
71/// for immediate jobs. Only call on first attempt to avoid retry inflation.
72fn observe_redis_pickup_latency(
73    available_at: Option<&String>,
74    job_timestamp: &str,
75    queue_type: &str,
76) {
77    let fallback = job_timestamp.to_string();
78    let baseline = available_at.unwrap_or(&fallback);
79    if let Ok(baseline_epoch) = baseline.parse::<i64>() {
80        let now = chrono::Utc::now().timestamp();
81        let latency_secs = (now - baseline_epoch).max(0) as f64;
82        observe_queue_pickup_latency(queue_type, "redis", latency_secs);
83    }
84}
85
86async fn apalis_transaction_request_handler(
87    job: Job<TransactionRequest>,
88    state: Data<ThinData<DefaultAppState>>,
89    attempt: Attempt,
90    task_id: TaskId,
91) -> Result<(), apalis::prelude::Error> {
92    observe_redis_pickup_latency(
93        job.available_at.as_ref(),
94        &job.timestamp,
95        "transaction-request",
96    );
97    let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
98    transaction_request_handler(job, (*state).clone(), ctx)
99        .await
100        .map_err(Into::into)
101}
102
103async fn apalis_transaction_submission_handler(
104    job: Job<TransactionSend>,
105    state: Data<ThinData<DefaultAppState>>,
106    attempt: Attempt,
107    task_id: TaskId,
108) -> Result<(), apalis::prelude::Error> {
109    observe_redis_pickup_latency(
110        job.available_at.as_ref(),
111        &job.timestamp,
112        "transaction-submission",
113    );
114    let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
115    transaction_submission_handler(job, (*state).clone(), ctx)
116        .await
117        .map_err(Into::into)
118}
119
120async fn apalis_transaction_status_handler(
121    job: Job<TransactionStatusCheck>,
122    state: Data<ThinData<DefaultAppState>>,
123    attempt: Attempt,
124    task_id: TaskId,
125) -> Result<(), apalis::prelude::Error> {
126    observe_redis_pickup_latency(job.available_at.as_ref(), &job.timestamp, "status-check");
127    let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
128    transaction_status_handler(job, (*state).clone(), ctx)
129        .await
130        .map_err(Into::into)
131}
132
133async fn apalis_transaction_status_evm_handler(
134    job: Job<TransactionStatusCheck>,
135    state: Data<ThinData<DefaultAppState>>,
136    attempt: Attempt,
137    task_id: TaskId,
138) -> Result<(), apalis::prelude::Error> {
139    observe_redis_pickup_latency(
140        job.available_at.as_ref(),
141        &job.timestamp,
142        "status-check-evm",
143    );
144    let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
145    transaction_status_handler(job, (*state).clone(), ctx)
146        .await
147        .map_err(Into::into)
148}
149
150async fn apalis_transaction_status_stellar_handler(
151    job: Job<TransactionStatusCheck>,
152    state: Data<ThinData<DefaultAppState>>,
153    attempt: Attempt,
154    task_id: TaskId,
155) -> Result<(), apalis::prelude::Error> {
156    observe_redis_pickup_latency(
157        job.available_at.as_ref(),
158        &job.timestamp,
159        "status-check-stellar",
160    );
161    let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
162    transaction_status_handler(job, (*state).clone(), ctx)
163        .await
164        .map_err(Into::into)
165}
166
167async fn apalis_notification_handler(
168    job: Job<NotificationSend>,
169    state: Data<ThinData<DefaultAppState>>,
170    attempt: Attempt,
171    task_id: TaskId,
172) -> Result<(), apalis::prelude::Error> {
173    observe_redis_pickup_latency(job.available_at.as_ref(), &job.timestamp, "notification");
174    let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
175    notification_handler(job, (*state).clone(), ctx)
176        .await
177        .map_err(Into::into)
178}
179
180async fn apalis_token_swap_request_handler(
181    job: Job<TokenSwapRequest>,
182    state: Data<ThinData<DefaultAppState>>,
183    attempt: Attempt,
184    task_id: TaskId,
185) -> Result<(), apalis::prelude::Error> {
186    observe_redis_pickup_latency(
187        job.available_at.as_ref(),
188        &job.timestamp,
189        "token-swap-request",
190    );
191    let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
192    token_swap_request_handler(job, (*state).clone(), ctx)
193        .await
194        .map_err(Into::into)
195}
196
197async fn apalis_relayer_health_check_handler(
198    job: Job<RelayerHealthCheck>,
199    state: Data<ThinData<DefaultAppState>>,
200    attempt: Attempt,
201    task_id: TaskId,
202) -> Result<(), apalis::prelude::Error> {
203    observe_redis_pickup_latency(
204        job.available_at.as_ref(),
205        &job.timestamp,
206        "relayer-health-check",
207    );
208    let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
209    relayer_health_check_handler(job, (*state).clone(), ctx)
210        .await
211        .map_err(Into::into)
212}
213
214async fn apalis_transaction_cleanup_handler(
215    _job: TransactionCleanupCronReminder,
216    state: Data<ThinData<DefaultAppState>>,
217    attempt: Attempt,
218    task_id: TaskId,
219) -> Result<(), apalis::prelude::Error> {
220    let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
221    transaction_cleanup_handler(TransactionCleanupCronReminder(), (*state).clone(), ctx)
222        .await
223        .map_err(Into::into)
224}
225
226async fn apalis_system_cleanup_handler(
227    _job: SystemCleanupCronReminder,
228    state: Data<ThinData<DefaultAppState>>,
229    attempt: Attempt,
230    task_id: TaskId,
231) -> Result<(), apalis::prelude::Error> {
232    let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
233    system_cleanup_handler(SystemCleanupCronReminder(), (*state).clone(), ctx)
234        .await
235        .map_err(Into::into)
236}
237
238async fn apalis_token_swap_cron_handler(
239    _job: TokenSwapCronReminder,
240    relayer_id: Data<String>,
241    state: Data<ThinData<DefaultAppState>>,
242    attempt: Attempt,
243    task_id: TaskId,
244) -> Result<(), apalis::prelude::Error> {
245    let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
246    token_swap_cron_handler(
247        TokenSwapCronReminder(),
248        (*relayer_id).clone(),
249        (*state).clone(),
250        ctx,
251    )
252    .await
253    .map_err(Into::into)
254}
255
256const TRANSACTION_REQUEST: &str = "transaction_request";
257const TRANSACTION_SENDER: &str = "transaction_sender";
258// Generic transaction status checker
259const TRANSACTION_STATUS_CHECKER: &str = "transaction_status_checker";
260// Network specific status checkers
261const TRANSACTION_STATUS_CHECKER_EVM: &str = "transaction_status_checker_evm";
262const TRANSACTION_STATUS_CHECKER_STELLAR: &str = "transaction_status_checker_stellar";
263const NOTIFICATION_SENDER: &str = "notification_sender";
264const TOKEN_SWAP_REQUEST: &str = "token_swap_request";
265const TRANSACTION_CLEANUP: &str = "transaction_cleanup";
266const RELAYER_HEALTH_CHECK: &str = "relayer_health_check";
267const SYSTEM_CLEANUP: &str = "system_cleanup";
268
269/// Creates an exponential backoff with configurable parameters
270///
271/// # Arguments
272/// * `initial_ms` - Initial delay in milliseconds (e.g., 200)
273/// * `max_ms` - Maximum delay in milliseconds (e.g., 5000)
274/// * `jitter` - Jitter factor 0.0-1.0 (e.g., 0.99 for high jitter)
275///
276/// # Returns
277/// A configured backoff instance ready for use with RetryPolicy
278fn create_backoff(initial_ms: u64, max_ms: u64, jitter: f64) -> Result<ExponentialBackoffMaker> {
279    let maker = ExponentialBackoffMaker::new(
280        Duration::from_millis(initial_ms),
281        Duration::from_millis(max_ms),
282        jitter,
283        HasherRng::default(),
284    )?;
285
286    Ok(maker)
287}
288
289fn create_backoff_from_config(cfg: RetryBackoffConfig) -> Result<ExponentialBackoffMaker> {
290    create_backoff(cfg.initial_ms, cfg.max_ms, cfg.jitter)
291}
292
293/// Initializes Redis/Apalis workers and starts the lifecycle monitor.
294///
295/// # Arguments
296/// * `app_state` - Application state containing the job producer and configuration
297pub async fn initialize_redis_workers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
298    app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
299) -> Result<()>
300where
301    J: JobProducerTrait + Send + Sync + 'static,
302    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
303    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
304    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
305    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
306    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
307    TCR: TransactionCounterTrait + Send + Sync + 'static,
308    PR: PluginRepositoryTrait + Send + Sync + 'static,
309    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
310{
311    let queue_backend = app_state
312        .job_producer
313        .get_queue_backend()
314        .ok_or_else(|| eyre::eyre!("Queue backend is not available"))?;
315    let queue = queue_backend
316        .queue()
317        .cloned()
318        .ok_or_else(|| eyre::eyre!("Redis queue is not available for active backend"))?;
319
320    let transaction_request_queue_worker = WorkerBuilder::new(TRANSACTION_REQUEST)
321        .layer(ErrorHandlingLayer::new())
322        .retry(
323            RetryPolicy::retries(QueueType::TransactionRequest.max_retries())
324                .with_backoff(create_backoff_from_config(TX_REQUEST_BACKOFF)?.make_backoff()),
325        )
326        .enable_tracing()
327        .catch_panic()
328        .concurrency(ServerConfig::get_worker_concurrency(
329            QueueType::TransactionRequest.concurrency_env_key(),
330            QueueType::TransactionRequest.default_concurrency(),
331        ))
332        .data(app_state.clone())
333        .backend(queue.transaction_request_queue.clone())
334        .build_fn(apalis_transaction_request_handler);
335
336    let transaction_submission_queue_worker = WorkerBuilder::new(TRANSACTION_SENDER)
337        .layer(ErrorHandlingLayer::new())
338        .enable_tracing()
339        .catch_panic()
340        .retry(
341            RetryPolicy::retries(QueueType::TransactionSubmission.max_retries())
342                .with_backoff(create_backoff_from_config(TX_SUBMISSION_BACKOFF)?.make_backoff()),
343        )
344        .concurrency(ServerConfig::get_worker_concurrency(
345            QueueType::TransactionSubmission.concurrency_env_key(),
346            QueueType::TransactionSubmission.default_concurrency(),
347        ))
348        .data(app_state.clone())
349        .backend(queue.transaction_submission_queue.clone())
350        .build_fn(apalis_transaction_submission_handler);
351
352    // Generic status checker
353    // Uses medium settings that work reasonably for most chains
354    let transaction_status_queue_worker = WorkerBuilder::new(TRANSACTION_STATUS_CHECKER)
355        .layer(ErrorHandlingLayer::new())
356        .enable_tracing()
357        .catch_panic()
358        .retry(
359            RetryPolicy::retries(QueueType::StatusCheck.max_retries())
360                .with_backoff(create_backoff_from_config(STATUS_GENERIC_BACKOFF)?.make_backoff()),
361        )
362        .concurrency(ServerConfig::get_worker_concurrency(
363            QueueType::StatusCheck.concurrency_env_key(),
364            QueueType::StatusCheck.default_concurrency(),
365        ))
366        .data(app_state.clone())
367        .backend(queue.transaction_status_queue.clone())
368        .build_fn(apalis_transaction_status_handler);
369
370    // EVM status checker - slower retries to avoid premature resubmission
371    // EVM has longer block times (~12s) and needs time for resubmission logic
372    let transaction_status_queue_worker_evm = WorkerBuilder::new(TRANSACTION_STATUS_CHECKER_EVM)
373        .layer(ErrorHandlingLayer::new())
374        .enable_tracing()
375        .catch_panic()
376        .retry(
377            RetryPolicy::retries(QueueType::StatusCheck.max_retries())
378                .with_backoff(create_backoff_from_config(STATUS_EVM_BACKOFF)?.make_backoff()),
379        )
380        .concurrency(ServerConfig::get_worker_concurrency(
381            QueueType::StatusCheckEvm.concurrency_env_key(),
382            QueueType::StatusCheckEvm.default_concurrency(),
383        ))
384        .data(app_state.clone())
385        .backend(queue.transaction_status_queue_evm.clone())
386        .build_fn(apalis_transaction_status_evm_handler);
387
388    // Stellar status checker - fast retries for fast finality
389    // Stellar has sub-second finality, needs more frequent status checks
390    let transaction_status_queue_worker_stellar =
391        WorkerBuilder::new(TRANSACTION_STATUS_CHECKER_STELLAR)
392            .layer(ErrorHandlingLayer::new())
393            .enable_tracing()
394            .catch_panic()
395            .retry(
396                RetryPolicy::retries(QueueType::StatusCheckStellar.max_retries()).with_backoff(
397                    create_backoff_from_config(STATUS_STELLAR_BACKOFF)?.make_backoff(),
398                ),
399            )
400            .concurrency(ServerConfig::get_worker_concurrency(
401                QueueType::StatusCheckStellar.concurrency_env_key(),
402                QueueType::StatusCheckStellar.default_concurrency(),
403            ))
404            .data(app_state.clone())
405            .backend(queue.transaction_status_queue_stellar.clone())
406            .build_fn(apalis_transaction_status_stellar_handler);
407
408    let notification_queue_worker = WorkerBuilder::new(NOTIFICATION_SENDER)
409        .layer(ErrorHandlingLayer::new())
410        .enable_tracing()
411        .catch_panic()
412        .retry(
413            RetryPolicy::retries(QueueType::Notification.max_retries())
414                .with_backoff(create_backoff_from_config(NOTIFICATION_BACKOFF)?.make_backoff()),
415        )
416        .concurrency(ServerConfig::get_worker_concurrency(
417            QueueType::Notification.concurrency_env_key(),
418            QueueType::Notification.default_concurrency(),
419        ))
420        .data(app_state.clone())
421        .backend(queue.notification_queue.clone())
422        .build_fn(apalis_notification_handler);
423
424    let token_swap_request_queue_worker = WorkerBuilder::new(TOKEN_SWAP_REQUEST)
425        .layer(ErrorHandlingLayer::new())
426        .enable_tracing()
427        .catch_panic()
428        .retry(
429            RetryPolicy::retries(QueueType::TokenSwapRequest.max_retries()).with_backoff(
430                create_backoff_from_config(TOKEN_SWAP_REQUEST_BACKOFF)?.make_backoff(),
431            ),
432        )
433        .concurrency(ServerConfig::get_worker_concurrency(
434            QueueType::TokenSwapRequest.concurrency_env_key(),
435            QueueType::TokenSwapRequest.default_concurrency(),
436        ))
437        .data(app_state.clone())
438        .backend(queue.token_swap_request_queue.clone())
439        .build_fn(apalis_token_swap_request_handler);
440
441    let transaction_cleanup_queue_worker = WorkerBuilder::new(TRANSACTION_CLEANUP)
442        .layer(ErrorHandlingLayer::new())
443        .enable_tracing()
444        .catch_panic()
445        .retry(
446            RetryPolicy::retries(WORKER_TRANSACTION_CLEANUP_RETRIES)
447                .with_backoff(create_backoff_from_config(TX_CLEANUP_BACKOFF)?.make_backoff()),
448        )
449        .concurrency(ServerConfig::get_worker_concurrency(TRANSACTION_CLEANUP, 1)) // Default to 1 to avoid DB conflicts
450        .data(app_state.clone())
451        .backend(CronStream::new(
452            apalis_cron::Schedule::from_str(TRANSACTION_CLEANUP_CRON_SCHEDULE)?,
453        ))
454        .build_fn(apalis_transaction_cleanup_handler);
455
456    let system_cleanup_queue_worker = WorkerBuilder::new(SYSTEM_CLEANUP)
457        .layer(ErrorHandlingLayer::new())
458        .enable_tracing()
459        .catch_panic()
460        .retry(
461            RetryPolicy::retries(WORKER_SYSTEM_CLEANUP_RETRIES)
462                .with_backoff(create_backoff_from_config(SYSTEM_CLEANUP_BACKOFF)?.make_backoff()),
463        )
464        .concurrency(1)
465        .data(app_state.clone())
466        .backend(CronStream::new(apalis_cron::Schedule::from_str(
467            SYSTEM_CLEANUP_CRON_SCHEDULE,
468        )?))
469        .build_fn(apalis_system_cleanup_handler);
470
471    let relayer_health_check_worker = WorkerBuilder::new(RELAYER_HEALTH_CHECK)
472        .layer(ErrorHandlingLayer::new())
473        .enable_tracing()
474        .catch_panic()
475        .retry(
476            RetryPolicy::retries(QueueType::RelayerHealthCheck.max_retries())
477                .with_backoff(create_backoff_from_config(RELAYER_HEALTH_BACKOFF)?.make_backoff()),
478        )
479        .concurrency(ServerConfig::get_worker_concurrency(
480            QueueType::RelayerHealthCheck.concurrency_env_key(),
481            QueueType::RelayerHealthCheck.default_concurrency(),
482        ))
483        .data(app_state.clone())
484        .backend(queue.relayer_health_check_queue.clone())
485        .build_fn(apalis_relayer_health_check_handler);
486
487    let monitor = Monitor::new()
488        .register(transaction_request_queue_worker)
489        .register(transaction_submission_queue_worker)
490        .register(transaction_status_queue_worker)
491        .register(transaction_status_queue_worker_evm)
492        .register(transaction_status_queue_worker_stellar)
493        .register(notification_queue_worker)
494        .register(token_swap_request_queue_worker)
495        .register(transaction_cleanup_queue_worker)
496        .register(system_cleanup_queue_worker)
497        .register(relayer_health_check_worker)
498        .on_event(monitor_handle_event)
499        .shutdown_timeout(Duration::from_millis(5000));
500
501    let monitor_future = monitor.run_with_signal(async {
502        let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
503            .map_err(|e| std::io::Error::other(format!("Failed to create SIGINT signal: {e}")))?;
504        let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
505            .map_err(|e| std::io::Error::other(format!("Failed to create SIGTERM signal: {e}")))?;
506
507        debug!("Workers monitor started");
508
509        tokio::select! {
510            _ = sigint.recv() => debug!("Received SIGINT."),
511            _ = sigterm.recv() => debug!("Received SIGTERM."),
512        };
513
514        debug!("Workers monitor shutting down");
515
516        Ok(())
517    });
518    tokio::spawn(async move {
519        if let Err(e) = monitor_future.await {
520            error!(error = %e, "monitor error");
521        }
522    });
523    debug!("Workers monitor shutdown complete");
524
525    Ok(())
526}
527
528/// Initializes swap workers for Solana and Stellar relayers.
529/// This function creates and registers workers for relayers that have swap enabled and cron schedule set.
530pub async fn initialize_redis_token_swap_workers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
531    app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
532) -> Result<()>
533where
534    J: JobProducerTrait + Send + Sync + 'static,
535    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
536    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
537    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
538    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
539    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
540    TCR: TransactionCounterTrait + Send + Sync + 'static,
541    PR: PluginRepositoryTrait + Send + Sync + 'static,
542    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
543{
544    let active_relayers = app_state.relayer_repository.list_active().await?;
545    let relayers_with_swap_enabled = filter_relayers_for_swap(active_relayers);
546
547    if relayers_with_swap_enabled.is_empty() {
548        debug!("No relayers with swap enabled");
549        return Ok(());
550    }
551    info!(
552        "Found {} relayers with swap enabled",
553        relayers_with_swap_enabled.len()
554    );
555
556    let mut workers = Vec::new();
557
558    let swap_backoff = create_backoff_from_config(TOKEN_SWAP_CRON_BACKOFF)?.make_backoff();
559
560    for relayer in relayers_with_swap_enabled {
561        debug!(relayer = ?relayer, "found relayer with swap enabled");
562
563        let (cron_schedule, network_type) = match &relayer.policies {
564            RelayerNetworkPolicy::Solana(policy) => match policy.get_swap_config() {
565                Some(config) => match config.cron_schedule {
566                    Some(schedule) => (schedule, "solana".to_string()),
567                    None => {
568                        debug!(relayer_id = %relayer.id, "No cron schedule specified for Solana relayer; skipping");
569                        continue;
570                    }
571                },
572                None => {
573                    debug!(relayer_id = %relayer.id, "No swap configuration specified for Solana relayer; skipping");
574                    continue;
575                }
576            },
577            RelayerNetworkPolicy::Stellar(policy) => match policy.get_swap_config() {
578                Some(config) => match config.cron_schedule {
579                    Some(schedule) => (schedule, "stellar".to_string()),
580                    None => {
581                        debug!(relayer_id = %relayer.id, "No cron schedule specified for Stellar relayer; skipping");
582                        continue;
583                    }
584                },
585                None => {
586                    debug!(relayer_id = %relayer.id, "No swap configuration specified for Stellar relayer; skipping");
587                    continue;
588                }
589            },
590            RelayerNetworkPolicy::Evm(_) => {
591                debug!(relayer_id = %relayer.id, "EVM relayers do not support swap; skipping");
592                continue;
593            }
594        };
595
596        let calendar_schedule = match apalis_cron::Schedule::from_str(&cron_schedule) {
597            Ok(schedule) => schedule,
598            Err(e) => {
599                error!(relayer_id = %relayer.id, error = %e, "Failed to parse cron schedule; skipping");
600                continue;
601            }
602        };
603
604        // Create worker and add to the workers vector
605        let worker = WorkerBuilder::new(format!(
606            "{}-swap-schedule-{}",
607            network_type,
608            relayer.id.clone()
609        ))
610        .layer(ErrorHandlingLayer::new())
611        .enable_tracing()
612        .catch_panic()
613        .retry(
614            RetryPolicy::retries(WORKER_TOKEN_SWAP_REQUEST_RETRIES)
615                .with_backoff(swap_backoff.clone()),
616        )
617        .concurrency(1)
618        .data(relayer.id.clone())
619        .data(app_state.clone())
620        .backend(CronStream::new(calendar_schedule))
621        .build_fn(apalis_token_swap_cron_handler);
622
623        workers.push(worker);
624        debug!(
625            relayer_id = %relayer.id,
626            network_type = %network_type,
627            "Created worker for relayer with swap enabled"
628        );
629    }
630
631    let mut monitor = Monitor::new()
632        .on_event(monitor_handle_event)
633        .shutdown_timeout(Duration::from_millis(5000));
634
635    // Register all workers with the monitor
636    for worker in workers {
637        monitor = monitor.register(worker);
638    }
639
640    let monitor_future = monitor.run_with_signal(async {
641        let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
642            .map_err(|e| std::io::Error::other(format!("Failed to create SIGINT signal: {e}")))?;
643        let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
644            .map_err(|e| std::io::Error::other(format!("Failed to create SIGTERM signal: {e}")))?;
645
646        debug!("Swap Monitor started");
647
648        tokio::select! {
649            _ = sigint.recv() => debug!("Received SIGINT."),
650            _ = sigterm.recv() => debug!("Received SIGTERM."),
651        };
652
653        debug!("Swap Monitor shutting down");
654
655        Ok(())
656    });
657    tokio::spawn(async move {
658        if let Err(e) = monitor_future.await {
659            error!(error = %e, "monitor error");
660        }
661    });
662    Ok(())
663}
664
665fn monitor_handle_event(e: Worker<Event>) {
666    let worker_id = e.id();
667    match e.inner() {
668        Event::Engage(task_id) => {
669            debug!(worker_id = %worker_id, task_id = %task_id, "worker got a job");
670        }
671        Event::Error(e) => {
672            error!(worker_id = %worker_id, error = %e, "worker encountered an error");
673        }
674        Event::Exit => {
675            debug!(worker_id = %worker_id, "worker exited");
676        }
677        Event::Idle => {
678            debug!(worker_id = %worker_id, "worker is idle");
679        }
680        Event::Start => {
681            debug!(worker_id = %worker_id, "worker started");
682        }
683        Event::Stop => {
684            debug!(worker_id = %worker_id, "worker stopped");
685        }
686        _ => {}
687    }
688}
689
690#[cfg(test)]
691mod tests {
692    use super::*;
693    use crate::queues::retry_config::{
694        NOTIFICATION_BACKOFF, RELAYER_HEALTH_BACKOFF, STATUS_EVM_BACKOFF, STATUS_GENERIC_BACKOFF,
695        STATUS_STELLAR_BACKOFF, SYSTEM_CLEANUP_BACKOFF, TOKEN_SWAP_CRON_BACKOFF,
696        TOKEN_SWAP_REQUEST_BACKOFF, TX_CLEANUP_BACKOFF, TX_REQUEST_BACKOFF, TX_SUBMISSION_BACKOFF,
697    };
698
699    // ── create_backoff tests ───────────────────────────────────────────
700
701    #[test]
702    fn test_create_backoff_with_valid_parameters() {
703        let result = create_backoff(200, 5000, 0.99);
704        assert!(
705            result.is_ok(),
706            "Should create backoff with valid parameters"
707        );
708    }
709
710    #[test]
711    fn test_create_backoff_with_zero_initial() {
712        let result = create_backoff(0, 5000, 0.99);
713        assert!(
714            result.is_ok(),
715            "Should handle zero initial delay (edge case)"
716        );
717    }
718
719    #[test]
720    fn test_create_backoff_with_equal_initial_and_max() {
721        let result = create_backoff(1000, 1000, 0.5);
722        assert!(result.is_ok(), "Should handle equal initial and max delays");
723    }
724
725    #[test]
726    fn test_create_backoff_with_zero_jitter() {
727        let result = create_backoff(500, 5000, 0.0);
728        assert!(result.is_ok(), "Should handle zero jitter");
729    }
730
731    #[test]
732    fn test_create_backoff_with_max_jitter() {
733        let result = create_backoff(500, 5000, 1.0);
734        assert!(result.is_ok(), "Should handle maximum jitter (1.0)");
735    }
736
737    #[test]
738    fn test_create_backoff_with_small_values() {
739        let result = create_backoff(1, 10, 0.5);
740        assert!(result.is_ok(), "Should handle very small delay values");
741    }
742
743    #[test]
744    fn test_create_backoff_with_large_values() {
745        let result = create_backoff(10000, 60000, 0.99);
746        assert!(result.is_ok(), "Should handle large delay values");
747    }
748
749    #[test]
750    fn test_create_backoff_from_config_profiles() {
751        let profiles = [
752            TX_REQUEST_BACKOFF,
753            TX_SUBMISSION_BACKOFF,
754            STATUS_GENERIC_BACKOFF,
755            STATUS_EVM_BACKOFF,
756            STATUS_STELLAR_BACKOFF,
757            NOTIFICATION_BACKOFF,
758            TOKEN_SWAP_REQUEST_BACKOFF,
759            TX_CLEANUP_BACKOFF,
760            SYSTEM_CLEANUP_BACKOFF,
761            RELAYER_HEALTH_BACKOFF,
762            TOKEN_SWAP_CRON_BACKOFF,
763        ];
764
765        for cfg in profiles {
766            let result = create_backoff_from_config(cfg);
767            assert!(
768                result.is_ok(),
769                "backoff profile should be constructible: {:?}",
770                cfg
771            );
772        }
773    }
774
775    #[test]
776    fn test_create_backoff_from_config_produces_usable_backoff() {
777        let profiles = [
778            TX_REQUEST_BACKOFF,
779            TX_SUBMISSION_BACKOFF,
780            STATUS_GENERIC_BACKOFF,
781            STATUS_EVM_BACKOFF,
782            STATUS_STELLAR_BACKOFF,
783            NOTIFICATION_BACKOFF,
784            TOKEN_SWAP_REQUEST_BACKOFF,
785            TX_CLEANUP_BACKOFF,
786            SYSTEM_CLEANUP_BACKOFF,
787            RELAYER_HEALTH_BACKOFF,
788            TOKEN_SWAP_CRON_BACKOFF,
789        ];
790
791        for cfg in profiles {
792            let mut maker = create_backoff_from_config(cfg).unwrap();
793            // Calling make_backoff() should not panic
794            let _backoff = maker.make_backoff();
795        }
796    }
797
798    #[test]
799    fn test_create_backoff_with_initial_greater_than_max_errors() {
800        let result = create_backoff(10000, 100, 0.5);
801        assert!(
802            result.is_err(),
803            "initial > max should be rejected by ExponentialBackoffMaker"
804        );
805    }
806
807    // ── Backoff config invariant tests ─────────────────────────────────
808
809    #[test]
810    fn test_all_backoff_configs_have_valid_initial_le_max() {
811        let profiles: &[(&str, RetryBackoffConfig)] = &[
812            ("TX_REQUEST", TX_REQUEST_BACKOFF),
813            ("TX_SUBMISSION", TX_SUBMISSION_BACKOFF),
814            ("STATUS_GENERIC", STATUS_GENERIC_BACKOFF),
815            ("STATUS_EVM", STATUS_EVM_BACKOFF),
816            ("STATUS_STELLAR", STATUS_STELLAR_BACKOFF),
817            ("NOTIFICATION", NOTIFICATION_BACKOFF),
818            ("TOKEN_SWAP_REQUEST", TOKEN_SWAP_REQUEST_BACKOFF),
819            ("TX_CLEANUP", TX_CLEANUP_BACKOFF),
820            ("SYSTEM_CLEANUP", SYSTEM_CLEANUP_BACKOFF),
821            ("RELAYER_HEALTH", RELAYER_HEALTH_BACKOFF),
822            ("TOKEN_SWAP_CRON", TOKEN_SWAP_CRON_BACKOFF),
823        ];
824
825        for (name, cfg) in profiles {
826            assert!(
827                cfg.initial_ms <= cfg.max_ms,
828                "{name}: initial_ms ({}) must be <= max_ms ({})",
829                cfg.initial_ms,
830                cfg.max_ms
831            );
832        }
833    }
834
835    #[test]
836    fn test_all_backoff_configs_have_valid_jitter_range() {
837        let profiles: &[(&str, RetryBackoffConfig)] = &[
838            ("TX_REQUEST", TX_REQUEST_BACKOFF),
839            ("TX_SUBMISSION", TX_SUBMISSION_BACKOFF),
840            ("STATUS_GENERIC", STATUS_GENERIC_BACKOFF),
841            ("STATUS_EVM", STATUS_EVM_BACKOFF),
842            ("STATUS_STELLAR", STATUS_STELLAR_BACKOFF),
843            ("NOTIFICATION", NOTIFICATION_BACKOFF),
844            ("TOKEN_SWAP_REQUEST", TOKEN_SWAP_REQUEST_BACKOFF),
845            ("TX_CLEANUP", TX_CLEANUP_BACKOFF),
846            ("SYSTEM_CLEANUP", SYSTEM_CLEANUP_BACKOFF),
847            ("RELAYER_HEALTH", RELAYER_HEALTH_BACKOFF),
848            ("TOKEN_SWAP_CRON", TOKEN_SWAP_CRON_BACKOFF),
849        ];
850
851        for (name, cfg) in profiles {
852            assert!(
853                (0.0..=1.0).contains(&cfg.jitter),
854                "{name}: jitter ({}) must be in [0.0, 1.0]",
855                cfg.jitter
856            );
857        }
858    }
859
860    #[test]
861    fn test_all_backoff_configs_have_positive_initial_ms() {
862        let profiles: &[(&str, RetryBackoffConfig)] = &[
863            ("TX_REQUEST", TX_REQUEST_BACKOFF),
864            ("TX_SUBMISSION", TX_SUBMISSION_BACKOFF),
865            ("STATUS_GENERIC", STATUS_GENERIC_BACKOFF),
866            ("STATUS_EVM", STATUS_EVM_BACKOFF),
867            ("STATUS_STELLAR", STATUS_STELLAR_BACKOFF),
868            ("NOTIFICATION", NOTIFICATION_BACKOFF),
869            ("TOKEN_SWAP_REQUEST", TOKEN_SWAP_REQUEST_BACKOFF),
870            ("TX_CLEANUP", TX_CLEANUP_BACKOFF),
871            ("SYSTEM_CLEANUP", SYSTEM_CLEANUP_BACKOFF),
872            ("RELAYER_HEALTH", RELAYER_HEALTH_BACKOFF),
873            ("TOKEN_SWAP_CRON", TOKEN_SWAP_CRON_BACKOFF),
874        ];
875
876        for (name, cfg) in profiles {
877            assert!(
878                cfg.initial_ms > 0,
879                "{name}: initial_ms must be positive, got {}",
880                cfg.initial_ms
881            );
882        }
883    }
884
885    // ── Worker name constant tests ─────────────────────────────────────
886
887    #[test]
888    fn test_worker_name_constants_are_nonempty() {
889        let names = [
890            TRANSACTION_REQUEST,
891            TRANSACTION_SENDER,
892            TRANSACTION_STATUS_CHECKER,
893            TRANSACTION_STATUS_CHECKER_EVM,
894            TRANSACTION_STATUS_CHECKER_STELLAR,
895            NOTIFICATION_SENDER,
896            TOKEN_SWAP_REQUEST,
897            TRANSACTION_CLEANUP,
898            RELAYER_HEALTH_CHECK,
899            SYSTEM_CLEANUP,
900        ];
901
902        for name in &names {
903            assert!(!name.is_empty(), "Worker name constant must not be empty");
904        }
905    }
906
907    #[test]
908    fn test_worker_name_constants_are_unique() {
909        let names = [
910            TRANSACTION_REQUEST,
911            TRANSACTION_SENDER,
912            TRANSACTION_STATUS_CHECKER,
913            TRANSACTION_STATUS_CHECKER_EVM,
914            TRANSACTION_STATUS_CHECKER_STELLAR,
915            NOTIFICATION_SENDER,
916            TOKEN_SWAP_REQUEST,
917            TRANSACTION_CLEANUP,
918            RELAYER_HEALTH_CHECK,
919            SYSTEM_CLEANUP,
920        ];
921
922        for (i, a) in names.iter().enumerate() {
923            for (j, b) in names.iter().enumerate() {
924                if i != j {
925                    assert_ne!(
926                        a, b,
927                        "Worker names must be unique: '{}' at index {} and {}",
928                        a, i, j
929                    );
930                }
931            }
932        }
933    }
934
935    #[test]
936    fn test_worker_names_match_concurrency_env_keys() {
937        // The WorkerBuilder name for each queue-type-backed worker should match
938        // the concurrency_env_key used with ServerConfig::get_worker_concurrency,
939        // so that concurrency configuration picks up the correct env var.
940        assert_eq!(
941            TRANSACTION_REQUEST,
942            QueueType::TransactionRequest.concurrency_env_key()
943        );
944        assert_eq!(
945            TRANSACTION_SENDER,
946            QueueType::TransactionSubmission.concurrency_env_key()
947        );
948        assert_eq!(
949            TRANSACTION_STATUS_CHECKER,
950            QueueType::StatusCheck.concurrency_env_key()
951        );
952        assert_eq!(
953            TRANSACTION_STATUS_CHECKER_EVM,
954            QueueType::StatusCheckEvm.concurrency_env_key()
955        );
956        assert_eq!(
957            TRANSACTION_STATUS_CHECKER_STELLAR,
958            QueueType::StatusCheckStellar.concurrency_env_key()
959        );
960        assert_eq!(
961            NOTIFICATION_SENDER,
962            QueueType::Notification.concurrency_env_key()
963        );
964        assert_eq!(
965            TOKEN_SWAP_REQUEST,
966            QueueType::TokenSwapRequest.concurrency_env_key()
967        );
968        assert_eq!(
969            RELAYER_HEALTH_CHECK,
970            QueueType::RelayerHealthCheck.concurrency_env_key()
971        );
972    }
973
974    // ── monitor_handle_event tests ─────────────────────────────────────
975
976    fn make_worker_event(event: Event) -> Worker<Event> {
977        let worker_id = WorkerId::from_str("test-worker").unwrap();
978        Worker::new(worker_id, event)
979    }
980
981    #[test]
982    fn test_monitor_handle_event_start_does_not_panic() {
983        monitor_handle_event(make_worker_event(Event::Start));
984    }
985
986    #[test]
987    fn test_monitor_handle_event_engage_does_not_panic() {
988        let task_id = TaskId::new();
989        monitor_handle_event(make_worker_event(Event::Engage(task_id)));
990    }
991
992    #[test]
993    fn test_monitor_handle_event_idle_does_not_panic() {
994        monitor_handle_event(make_worker_event(Event::Idle));
995    }
996
997    #[test]
998    fn test_monitor_handle_event_error_does_not_panic() {
999        let error: Box<dyn std::error::Error + Send + Sync> = "test error".to_string().into();
1000        monitor_handle_event(make_worker_event(Event::Error(error)));
1001    }
1002
1003    #[test]
1004    fn test_monitor_handle_event_stop_does_not_panic() {
1005        monitor_handle_event(make_worker_event(Event::Stop));
1006    }
1007
1008    #[test]
1009    fn test_monitor_handle_event_exit_does_not_panic() {
1010        monitor_handle_event(make_worker_event(Event::Exit));
1011    }
1012
1013    #[test]
1014    fn test_monitor_handle_event_custom_does_not_panic() {
1015        monitor_handle_event(make_worker_event(Event::Custom("test-custom".to_string())));
1016    }
1017}