1use actix_web::web::ThinData;
8
9use crate::{
10 config::ServerConfig,
11 constants::{
12 SYSTEM_CLEANUP_CRON_SCHEDULE, TRANSACTION_CLEANUP_CRON_SCHEDULE,
13 WORKER_SYSTEM_CLEANUP_RETRIES, WORKER_TOKEN_SWAP_REQUEST_RETRIES,
14 WORKER_TRANSACTION_CLEANUP_RETRIES,
15 },
16 jobs::{
17 notification_handler, relayer_health_check_handler, system_cleanup_handler,
18 token_swap_cron_handler, token_swap_request_handler, transaction_cleanup_handler,
19 transaction_request_handler, transaction_status_handler, transaction_submission_handler,
20 Job, JobProducerTrait, NotificationSend, RelayerHealthCheck, SystemCleanupCronReminder,
21 TokenSwapCronReminder, TokenSwapRequest, TransactionCleanupCronReminder,
22 TransactionRequest, TransactionSend, TransactionStatusCheck,
23 },
24 models::{
25 DefaultAppState, NetworkRepoModel, NotificationRepoModel, RelayerNetworkPolicy,
26 RelayerRepoModel, SignerRepoModel, ThinDataAppState, TransactionRepoModel,
27 },
28 repositories::{
29 ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository,
30 Repository, TransactionCounterTrait, TransactionRepository,
31 },
32};
33use apalis::prelude::*;
34
35use apalis::layers::retry::backoff::MakeBackoff;
36use apalis::layers::retry::{backoff::ExponentialBackoffMaker, RetryPolicy};
37use apalis::layers::ErrorHandlingLayer;
38
39pub use tower::util::rng::HasherRng;
41
42use apalis_cron::CronStream;
43use eyre::Result;
44use std::{str::FromStr, time::Duration};
45use tokio::signal::unix::SignalKind;
46use tracing::{debug, error, info};
47
48use crate::metrics::observe_queue_pickup_latency;
49
50use super::{filter_relayers_for_swap, QueueType, WorkerContext};
51use crate::queues::retry_config::{
52 RetryBackoffConfig, NOTIFICATION_BACKOFF, RELAYER_HEALTH_BACKOFF, STATUS_EVM_BACKOFF,
53 STATUS_GENERIC_BACKOFF, STATUS_STELLAR_BACKOFF, SYSTEM_CLEANUP_BACKOFF,
54 TOKEN_SWAP_CRON_BACKOFF, TOKEN_SWAP_REQUEST_BACKOFF, TX_CLEANUP_BACKOFF, TX_REQUEST_BACKOFF,
55 TX_SUBMISSION_BACKOFF,
56};
57
58fn observe_redis_pickup_latency(
73 available_at: Option<&String>,
74 job_timestamp: &str,
75 queue_type: &str,
76) {
77 let fallback = job_timestamp.to_string();
78 let baseline = available_at.unwrap_or(&fallback);
79 if let Ok(baseline_epoch) = baseline.parse::<i64>() {
80 let now = chrono::Utc::now().timestamp();
81 let latency_secs = (now - baseline_epoch).max(0) as f64;
82 observe_queue_pickup_latency(queue_type, "redis", latency_secs);
83 }
84}
85
86async fn apalis_transaction_request_handler(
87 job: Job<TransactionRequest>,
88 state: Data<ThinData<DefaultAppState>>,
89 attempt: Attempt,
90 task_id: TaskId,
91) -> Result<(), apalis::prelude::Error> {
92 observe_redis_pickup_latency(
93 job.available_at.as_ref(),
94 &job.timestamp,
95 "transaction-request",
96 );
97 let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
98 transaction_request_handler(job, (*state).clone(), ctx)
99 .await
100 .map_err(Into::into)
101}
102
103async fn apalis_transaction_submission_handler(
104 job: Job<TransactionSend>,
105 state: Data<ThinData<DefaultAppState>>,
106 attempt: Attempt,
107 task_id: TaskId,
108) -> Result<(), apalis::prelude::Error> {
109 observe_redis_pickup_latency(
110 job.available_at.as_ref(),
111 &job.timestamp,
112 "transaction-submission",
113 );
114 let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
115 transaction_submission_handler(job, (*state).clone(), ctx)
116 .await
117 .map_err(Into::into)
118}
119
120async fn apalis_transaction_status_handler(
121 job: Job<TransactionStatusCheck>,
122 state: Data<ThinData<DefaultAppState>>,
123 attempt: Attempt,
124 task_id: TaskId,
125) -> Result<(), apalis::prelude::Error> {
126 observe_redis_pickup_latency(job.available_at.as_ref(), &job.timestamp, "status-check");
127 let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
128 transaction_status_handler(job, (*state).clone(), ctx)
129 .await
130 .map_err(Into::into)
131}
132
133async fn apalis_transaction_status_evm_handler(
134 job: Job<TransactionStatusCheck>,
135 state: Data<ThinData<DefaultAppState>>,
136 attempt: Attempt,
137 task_id: TaskId,
138) -> Result<(), apalis::prelude::Error> {
139 observe_redis_pickup_latency(
140 job.available_at.as_ref(),
141 &job.timestamp,
142 "status-check-evm",
143 );
144 let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
145 transaction_status_handler(job, (*state).clone(), ctx)
146 .await
147 .map_err(Into::into)
148}
149
150async fn apalis_transaction_status_stellar_handler(
151 job: Job<TransactionStatusCheck>,
152 state: Data<ThinData<DefaultAppState>>,
153 attempt: Attempt,
154 task_id: TaskId,
155) -> Result<(), apalis::prelude::Error> {
156 observe_redis_pickup_latency(
157 job.available_at.as_ref(),
158 &job.timestamp,
159 "status-check-stellar",
160 );
161 let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
162 transaction_status_handler(job, (*state).clone(), ctx)
163 .await
164 .map_err(Into::into)
165}
166
167async fn apalis_notification_handler(
168 job: Job<NotificationSend>,
169 state: Data<ThinData<DefaultAppState>>,
170 attempt: Attempt,
171 task_id: TaskId,
172) -> Result<(), apalis::prelude::Error> {
173 observe_redis_pickup_latency(job.available_at.as_ref(), &job.timestamp, "notification");
174 let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
175 notification_handler(job, (*state).clone(), ctx)
176 .await
177 .map_err(Into::into)
178}
179
180async fn apalis_token_swap_request_handler(
181 job: Job<TokenSwapRequest>,
182 state: Data<ThinData<DefaultAppState>>,
183 attempt: Attempt,
184 task_id: TaskId,
185) -> Result<(), apalis::prelude::Error> {
186 observe_redis_pickup_latency(
187 job.available_at.as_ref(),
188 &job.timestamp,
189 "token-swap-request",
190 );
191 let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
192 token_swap_request_handler(job, (*state).clone(), ctx)
193 .await
194 .map_err(Into::into)
195}
196
197async fn apalis_relayer_health_check_handler(
198 job: Job<RelayerHealthCheck>,
199 state: Data<ThinData<DefaultAppState>>,
200 attempt: Attempt,
201 task_id: TaskId,
202) -> Result<(), apalis::prelude::Error> {
203 observe_redis_pickup_latency(
204 job.available_at.as_ref(),
205 &job.timestamp,
206 "relayer-health-check",
207 );
208 let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
209 relayer_health_check_handler(job, (*state).clone(), ctx)
210 .await
211 .map_err(Into::into)
212}
213
214async fn apalis_transaction_cleanup_handler(
215 _job: TransactionCleanupCronReminder,
216 state: Data<ThinData<DefaultAppState>>,
217 attempt: Attempt,
218 task_id: TaskId,
219) -> Result<(), apalis::prelude::Error> {
220 let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
221 transaction_cleanup_handler(TransactionCleanupCronReminder(), (*state).clone(), ctx)
222 .await
223 .map_err(Into::into)
224}
225
226async fn apalis_system_cleanup_handler(
227 _job: SystemCleanupCronReminder,
228 state: Data<ThinData<DefaultAppState>>,
229 attempt: Attempt,
230 task_id: TaskId,
231) -> Result<(), apalis::prelude::Error> {
232 let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
233 system_cleanup_handler(SystemCleanupCronReminder(), (*state).clone(), ctx)
234 .await
235 .map_err(Into::into)
236}
237
238async fn apalis_token_swap_cron_handler(
239 _job: TokenSwapCronReminder,
240 relayer_id: Data<String>,
241 state: Data<ThinData<DefaultAppState>>,
242 attempt: Attempt,
243 task_id: TaskId,
244) -> Result<(), apalis::prelude::Error> {
245 let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
246 token_swap_cron_handler(
247 TokenSwapCronReminder(),
248 (*relayer_id).clone(),
249 (*state).clone(),
250 ctx,
251 )
252 .await
253 .map_err(Into::into)
254}
255
256const TRANSACTION_REQUEST: &str = "transaction_request";
257const TRANSACTION_SENDER: &str = "transaction_sender";
258const TRANSACTION_STATUS_CHECKER: &str = "transaction_status_checker";
260const TRANSACTION_STATUS_CHECKER_EVM: &str = "transaction_status_checker_evm";
262const TRANSACTION_STATUS_CHECKER_STELLAR: &str = "transaction_status_checker_stellar";
263const NOTIFICATION_SENDER: &str = "notification_sender";
264const TOKEN_SWAP_REQUEST: &str = "token_swap_request";
265const TRANSACTION_CLEANUP: &str = "transaction_cleanup";
266const RELAYER_HEALTH_CHECK: &str = "relayer_health_check";
267const SYSTEM_CLEANUP: &str = "system_cleanup";
268
269fn create_backoff(initial_ms: u64, max_ms: u64, jitter: f64) -> Result<ExponentialBackoffMaker> {
279 let maker = ExponentialBackoffMaker::new(
280 Duration::from_millis(initial_ms),
281 Duration::from_millis(max_ms),
282 jitter,
283 HasherRng::default(),
284 )?;
285
286 Ok(maker)
287}
288
289fn create_backoff_from_config(cfg: RetryBackoffConfig) -> Result<ExponentialBackoffMaker> {
290 create_backoff(cfg.initial_ms, cfg.max_ms, cfg.jitter)
291}
292
293pub async fn initialize_redis_workers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
298 app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
299) -> Result<()>
300where
301 J: JobProducerTrait + Send + Sync + 'static,
302 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
303 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
304 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
305 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
306 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
307 TCR: TransactionCounterTrait + Send + Sync + 'static,
308 PR: PluginRepositoryTrait + Send + Sync + 'static,
309 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
310{
311 let queue_backend = app_state
312 .job_producer
313 .get_queue_backend()
314 .ok_or_else(|| eyre::eyre!("Queue backend is not available"))?;
315 let queue = queue_backend
316 .queue()
317 .cloned()
318 .ok_or_else(|| eyre::eyre!("Redis queue is not available for active backend"))?;
319
320 let transaction_request_queue_worker = WorkerBuilder::new(TRANSACTION_REQUEST)
321 .layer(ErrorHandlingLayer::new())
322 .retry(
323 RetryPolicy::retries(QueueType::TransactionRequest.max_retries())
324 .with_backoff(create_backoff_from_config(TX_REQUEST_BACKOFF)?.make_backoff()),
325 )
326 .enable_tracing()
327 .catch_panic()
328 .concurrency(ServerConfig::get_worker_concurrency(
329 QueueType::TransactionRequest.concurrency_env_key(),
330 QueueType::TransactionRequest.default_concurrency(),
331 ))
332 .data(app_state.clone())
333 .backend(queue.transaction_request_queue.clone())
334 .build_fn(apalis_transaction_request_handler);
335
336 let transaction_submission_queue_worker = WorkerBuilder::new(TRANSACTION_SENDER)
337 .layer(ErrorHandlingLayer::new())
338 .enable_tracing()
339 .catch_panic()
340 .retry(
341 RetryPolicy::retries(QueueType::TransactionSubmission.max_retries())
342 .with_backoff(create_backoff_from_config(TX_SUBMISSION_BACKOFF)?.make_backoff()),
343 )
344 .concurrency(ServerConfig::get_worker_concurrency(
345 QueueType::TransactionSubmission.concurrency_env_key(),
346 QueueType::TransactionSubmission.default_concurrency(),
347 ))
348 .data(app_state.clone())
349 .backend(queue.transaction_submission_queue.clone())
350 .build_fn(apalis_transaction_submission_handler);
351
352 let transaction_status_queue_worker = WorkerBuilder::new(TRANSACTION_STATUS_CHECKER)
355 .layer(ErrorHandlingLayer::new())
356 .enable_tracing()
357 .catch_panic()
358 .retry(
359 RetryPolicy::retries(QueueType::StatusCheck.max_retries())
360 .with_backoff(create_backoff_from_config(STATUS_GENERIC_BACKOFF)?.make_backoff()),
361 )
362 .concurrency(ServerConfig::get_worker_concurrency(
363 QueueType::StatusCheck.concurrency_env_key(),
364 QueueType::StatusCheck.default_concurrency(),
365 ))
366 .data(app_state.clone())
367 .backend(queue.transaction_status_queue.clone())
368 .build_fn(apalis_transaction_status_handler);
369
370 let transaction_status_queue_worker_evm = WorkerBuilder::new(TRANSACTION_STATUS_CHECKER_EVM)
373 .layer(ErrorHandlingLayer::new())
374 .enable_tracing()
375 .catch_panic()
376 .retry(
377 RetryPolicy::retries(QueueType::StatusCheck.max_retries())
378 .with_backoff(create_backoff_from_config(STATUS_EVM_BACKOFF)?.make_backoff()),
379 )
380 .concurrency(ServerConfig::get_worker_concurrency(
381 QueueType::StatusCheckEvm.concurrency_env_key(),
382 QueueType::StatusCheckEvm.default_concurrency(),
383 ))
384 .data(app_state.clone())
385 .backend(queue.transaction_status_queue_evm.clone())
386 .build_fn(apalis_transaction_status_evm_handler);
387
388 let transaction_status_queue_worker_stellar =
391 WorkerBuilder::new(TRANSACTION_STATUS_CHECKER_STELLAR)
392 .layer(ErrorHandlingLayer::new())
393 .enable_tracing()
394 .catch_panic()
395 .retry(
396 RetryPolicy::retries(QueueType::StatusCheckStellar.max_retries()).with_backoff(
397 create_backoff_from_config(STATUS_STELLAR_BACKOFF)?.make_backoff(),
398 ),
399 )
400 .concurrency(ServerConfig::get_worker_concurrency(
401 QueueType::StatusCheckStellar.concurrency_env_key(),
402 QueueType::StatusCheckStellar.default_concurrency(),
403 ))
404 .data(app_state.clone())
405 .backend(queue.transaction_status_queue_stellar.clone())
406 .build_fn(apalis_transaction_status_stellar_handler);
407
408 let notification_queue_worker = WorkerBuilder::new(NOTIFICATION_SENDER)
409 .layer(ErrorHandlingLayer::new())
410 .enable_tracing()
411 .catch_panic()
412 .retry(
413 RetryPolicy::retries(QueueType::Notification.max_retries())
414 .with_backoff(create_backoff_from_config(NOTIFICATION_BACKOFF)?.make_backoff()),
415 )
416 .concurrency(ServerConfig::get_worker_concurrency(
417 QueueType::Notification.concurrency_env_key(),
418 QueueType::Notification.default_concurrency(),
419 ))
420 .data(app_state.clone())
421 .backend(queue.notification_queue.clone())
422 .build_fn(apalis_notification_handler);
423
424 let token_swap_request_queue_worker = WorkerBuilder::new(TOKEN_SWAP_REQUEST)
425 .layer(ErrorHandlingLayer::new())
426 .enable_tracing()
427 .catch_panic()
428 .retry(
429 RetryPolicy::retries(QueueType::TokenSwapRequest.max_retries()).with_backoff(
430 create_backoff_from_config(TOKEN_SWAP_REQUEST_BACKOFF)?.make_backoff(),
431 ),
432 )
433 .concurrency(ServerConfig::get_worker_concurrency(
434 QueueType::TokenSwapRequest.concurrency_env_key(),
435 QueueType::TokenSwapRequest.default_concurrency(),
436 ))
437 .data(app_state.clone())
438 .backend(queue.token_swap_request_queue.clone())
439 .build_fn(apalis_token_swap_request_handler);
440
441 let transaction_cleanup_queue_worker = WorkerBuilder::new(TRANSACTION_CLEANUP)
442 .layer(ErrorHandlingLayer::new())
443 .enable_tracing()
444 .catch_panic()
445 .retry(
446 RetryPolicy::retries(WORKER_TRANSACTION_CLEANUP_RETRIES)
447 .with_backoff(create_backoff_from_config(TX_CLEANUP_BACKOFF)?.make_backoff()),
448 )
449 .concurrency(ServerConfig::get_worker_concurrency(TRANSACTION_CLEANUP, 1)) .data(app_state.clone())
451 .backend(CronStream::new(
452 apalis_cron::Schedule::from_str(TRANSACTION_CLEANUP_CRON_SCHEDULE)?,
453 ))
454 .build_fn(apalis_transaction_cleanup_handler);
455
456 let system_cleanup_queue_worker = WorkerBuilder::new(SYSTEM_CLEANUP)
457 .layer(ErrorHandlingLayer::new())
458 .enable_tracing()
459 .catch_panic()
460 .retry(
461 RetryPolicy::retries(WORKER_SYSTEM_CLEANUP_RETRIES)
462 .with_backoff(create_backoff_from_config(SYSTEM_CLEANUP_BACKOFF)?.make_backoff()),
463 )
464 .concurrency(1)
465 .data(app_state.clone())
466 .backend(CronStream::new(apalis_cron::Schedule::from_str(
467 SYSTEM_CLEANUP_CRON_SCHEDULE,
468 )?))
469 .build_fn(apalis_system_cleanup_handler);
470
471 let relayer_health_check_worker = WorkerBuilder::new(RELAYER_HEALTH_CHECK)
472 .layer(ErrorHandlingLayer::new())
473 .enable_tracing()
474 .catch_panic()
475 .retry(
476 RetryPolicy::retries(QueueType::RelayerHealthCheck.max_retries())
477 .with_backoff(create_backoff_from_config(RELAYER_HEALTH_BACKOFF)?.make_backoff()),
478 )
479 .concurrency(ServerConfig::get_worker_concurrency(
480 QueueType::RelayerHealthCheck.concurrency_env_key(),
481 QueueType::RelayerHealthCheck.default_concurrency(),
482 ))
483 .data(app_state.clone())
484 .backend(queue.relayer_health_check_queue.clone())
485 .build_fn(apalis_relayer_health_check_handler);
486
487 let monitor = Monitor::new()
488 .register(transaction_request_queue_worker)
489 .register(transaction_submission_queue_worker)
490 .register(transaction_status_queue_worker)
491 .register(transaction_status_queue_worker_evm)
492 .register(transaction_status_queue_worker_stellar)
493 .register(notification_queue_worker)
494 .register(token_swap_request_queue_worker)
495 .register(transaction_cleanup_queue_worker)
496 .register(system_cleanup_queue_worker)
497 .register(relayer_health_check_worker)
498 .on_event(monitor_handle_event)
499 .shutdown_timeout(Duration::from_millis(5000));
500
501 let monitor_future = monitor.run_with_signal(async {
502 let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
503 .map_err(|e| std::io::Error::other(format!("Failed to create SIGINT signal: {e}")))?;
504 let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
505 .map_err(|e| std::io::Error::other(format!("Failed to create SIGTERM signal: {e}")))?;
506
507 debug!("Workers monitor started");
508
509 tokio::select! {
510 _ = sigint.recv() => debug!("Received SIGINT."),
511 _ = sigterm.recv() => debug!("Received SIGTERM."),
512 };
513
514 debug!("Workers monitor shutting down");
515
516 Ok(())
517 });
518 tokio::spawn(async move {
519 if let Err(e) = monitor_future.await {
520 error!(error = %e, "monitor error");
521 }
522 });
523 debug!("Workers monitor shutdown complete");
524
525 Ok(())
526}
527
528pub async fn initialize_redis_token_swap_workers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
531 app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
532) -> Result<()>
533where
534 J: JobProducerTrait + Send + Sync + 'static,
535 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
536 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
537 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
538 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
539 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
540 TCR: TransactionCounterTrait + Send + Sync + 'static,
541 PR: PluginRepositoryTrait + Send + Sync + 'static,
542 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
543{
544 let active_relayers = app_state.relayer_repository.list_active().await?;
545 let relayers_with_swap_enabled = filter_relayers_for_swap(active_relayers);
546
547 if relayers_with_swap_enabled.is_empty() {
548 debug!("No relayers with swap enabled");
549 return Ok(());
550 }
551 info!(
552 "Found {} relayers with swap enabled",
553 relayers_with_swap_enabled.len()
554 );
555
556 let mut workers = Vec::new();
557
558 let swap_backoff = create_backoff_from_config(TOKEN_SWAP_CRON_BACKOFF)?.make_backoff();
559
560 for relayer in relayers_with_swap_enabled {
561 debug!(relayer = ?relayer, "found relayer with swap enabled");
562
563 let (cron_schedule, network_type) = match &relayer.policies {
564 RelayerNetworkPolicy::Solana(policy) => match policy.get_swap_config() {
565 Some(config) => match config.cron_schedule {
566 Some(schedule) => (schedule, "solana".to_string()),
567 None => {
568 debug!(relayer_id = %relayer.id, "No cron schedule specified for Solana relayer; skipping");
569 continue;
570 }
571 },
572 None => {
573 debug!(relayer_id = %relayer.id, "No swap configuration specified for Solana relayer; skipping");
574 continue;
575 }
576 },
577 RelayerNetworkPolicy::Stellar(policy) => match policy.get_swap_config() {
578 Some(config) => match config.cron_schedule {
579 Some(schedule) => (schedule, "stellar".to_string()),
580 None => {
581 debug!(relayer_id = %relayer.id, "No cron schedule specified for Stellar relayer; skipping");
582 continue;
583 }
584 },
585 None => {
586 debug!(relayer_id = %relayer.id, "No swap configuration specified for Stellar relayer; skipping");
587 continue;
588 }
589 },
590 RelayerNetworkPolicy::Evm(_) => {
591 debug!(relayer_id = %relayer.id, "EVM relayers do not support swap; skipping");
592 continue;
593 }
594 };
595
596 let calendar_schedule = match apalis_cron::Schedule::from_str(&cron_schedule) {
597 Ok(schedule) => schedule,
598 Err(e) => {
599 error!(relayer_id = %relayer.id, error = %e, "Failed to parse cron schedule; skipping");
600 continue;
601 }
602 };
603
604 let worker = WorkerBuilder::new(format!(
606 "{}-swap-schedule-{}",
607 network_type,
608 relayer.id.clone()
609 ))
610 .layer(ErrorHandlingLayer::new())
611 .enable_tracing()
612 .catch_panic()
613 .retry(
614 RetryPolicy::retries(WORKER_TOKEN_SWAP_REQUEST_RETRIES)
615 .with_backoff(swap_backoff.clone()),
616 )
617 .concurrency(1)
618 .data(relayer.id.clone())
619 .data(app_state.clone())
620 .backend(CronStream::new(calendar_schedule))
621 .build_fn(apalis_token_swap_cron_handler);
622
623 workers.push(worker);
624 debug!(
625 relayer_id = %relayer.id,
626 network_type = %network_type,
627 "Created worker for relayer with swap enabled"
628 );
629 }
630
631 let mut monitor = Monitor::new()
632 .on_event(monitor_handle_event)
633 .shutdown_timeout(Duration::from_millis(5000));
634
635 for worker in workers {
637 monitor = monitor.register(worker);
638 }
639
640 let monitor_future = monitor.run_with_signal(async {
641 let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
642 .map_err(|e| std::io::Error::other(format!("Failed to create SIGINT signal: {e}")))?;
643 let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
644 .map_err(|e| std::io::Error::other(format!("Failed to create SIGTERM signal: {e}")))?;
645
646 debug!("Swap Monitor started");
647
648 tokio::select! {
649 _ = sigint.recv() => debug!("Received SIGINT."),
650 _ = sigterm.recv() => debug!("Received SIGTERM."),
651 };
652
653 debug!("Swap Monitor shutting down");
654
655 Ok(())
656 });
657 tokio::spawn(async move {
658 if let Err(e) = monitor_future.await {
659 error!(error = %e, "monitor error");
660 }
661 });
662 Ok(())
663}
664
665fn monitor_handle_event(e: Worker<Event>) {
666 let worker_id = e.id();
667 match e.inner() {
668 Event::Engage(task_id) => {
669 debug!(worker_id = %worker_id, task_id = %task_id, "worker got a job");
670 }
671 Event::Error(e) => {
672 error!(worker_id = %worker_id, error = %e, "worker encountered an error");
673 }
674 Event::Exit => {
675 debug!(worker_id = %worker_id, "worker exited");
676 }
677 Event::Idle => {
678 debug!(worker_id = %worker_id, "worker is idle");
679 }
680 Event::Start => {
681 debug!(worker_id = %worker_id, "worker started");
682 }
683 Event::Stop => {
684 debug!(worker_id = %worker_id, "worker stopped");
685 }
686 _ => {}
687 }
688}
689
690#[cfg(test)]
691mod tests {
692 use super::*;
693 use crate::queues::retry_config::{
694 NOTIFICATION_BACKOFF, RELAYER_HEALTH_BACKOFF, STATUS_EVM_BACKOFF, STATUS_GENERIC_BACKOFF,
695 STATUS_STELLAR_BACKOFF, SYSTEM_CLEANUP_BACKOFF, TOKEN_SWAP_CRON_BACKOFF,
696 TOKEN_SWAP_REQUEST_BACKOFF, TX_CLEANUP_BACKOFF, TX_REQUEST_BACKOFF, TX_SUBMISSION_BACKOFF,
697 };
698
699 #[test]
702 fn test_create_backoff_with_valid_parameters() {
703 let result = create_backoff(200, 5000, 0.99);
704 assert!(
705 result.is_ok(),
706 "Should create backoff with valid parameters"
707 );
708 }
709
710 #[test]
711 fn test_create_backoff_with_zero_initial() {
712 let result = create_backoff(0, 5000, 0.99);
713 assert!(
714 result.is_ok(),
715 "Should handle zero initial delay (edge case)"
716 );
717 }
718
719 #[test]
720 fn test_create_backoff_with_equal_initial_and_max() {
721 let result = create_backoff(1000, 1000, 0.5);
722 assert!(result.is_ok(), "Should handle equal initial and max delays");
723 }
724
725 #[test]
726 fn test_create_backoff_with_zero_jitter() {
727 let result = create_backoff(500, 5000, 0.0);
728 assert!(result.is_ok(), "Should handle zero jitter");
729 }
730
731 #[test]
732 fn test_create_backoff_with_max_jitter() {
733 let result = create_backoff(500, 5000, 1.0);
734 assert!(result.is_ok(), "Should handle maximum jitter (1.0)");
735 }
736
737 #[test]
738 fn test_create_backoff_with_small_values() {
739 let result = create_backoff(1, 10, 0.5);
740 assert!(result.is_ok(), "Should handle very small delay values");
741 }
742
743 #[test]
744 fn test_create_backoff_with_large_values() {
745 let result = create_backoff(10000, 60000, 0.99);
746 assert!(result.is_ok(), "Should handle large delay values");
747 }
748
749 #[test]
750 fn test_create_backoff_from_config_profiles() {
751 let profiles = [
752 TX_REQUEST_BACKOFF,
753 TX_SUBMISSION_BACKOFF,
754 STATUS_GENERIC_BACKOFF,
755 STATUS_EVM_BACKOFF,
756 STATUS_STELLAR_BACKOFF,
757 NOTIFICATION_BACKOFF,
758 TOKEN_SWAP_REQUEST_BACKOFF,
759 TX_CLEANUP_BACKOFF,
760 SYSTEM_CLEANUP_BACKOFF,
761 RELAYER_HEALTH_BACKOFF,
762 TOKEN_SWAP_CRON_BACKOFF,
763 ];
764
765 for cfg in profiles {
766 let result = create_backoff_from_config(cfg);
767 assert!(
768 result.is_ok(),
769 "backoff profile should be constructible: {:?}",
770 cfg
771 );
772 }
773 }
774
775 #[test]
776 fn test_create_backoff_from_config_produces_usable_backoff() {
777 let profiles = [
778 TX_REQUEST_BACKOFF,
779 TX_SUBMISSION_BACKOFF,
780 STATUS_GENERIC_BACKOFF,
781 STATUS_EVM_BACKOFF,
782 STATUS_STELLAR_BACKOFF,
783 NOTIFICATION_BACKOFF,
784 TOKEN_SWAP_REQUEST_BACKOFF,
785 TX_CLEANUP_BACKOFF,
786 SYSTEM_CLEANUP_BACKOFF,
787 RELAYER_HEALTH_BACKOFF,
788 TOKEN_SWAP_CRON_BACKOFF,
789 ];
790
791 for cfg in profiles {
792 let mut maker = create_backoff_from_config(cfg).unwrap();
793 let _backoff = maker.make_backoff();
795 }
796 }
797
798 #[test]
799 fn test_create_backoff_with_initial_greater_than_max_errors() {
800 let result = create_backoff(10000, 100, 0.5);
801 assert!(
802 result.is_err(),
803 "initial > max should be rejected by ExponentialBackoffMaker"
804 );
805 }
806
807 #[test]
810 fn test_all_backoff_configs_have_valid_initial_le_max() {
811 let profiles: &[(&str, RetryBackoffConfig)] = &[
812 ("TX_REQUEST", TX_REQUEST_BACKOFF),
813 ("TX_SUBMISSION", TX_SUBMISSION_BACKOFF),
814 ("STATUS_GENERIC", STATUS_GENERIC_BACKOFF),
815 ("STATUS_EVM", STATUS_EVM_BACKOFF),
816 ("STATUS_STELLAR", STATUS_STELLAR_BACKOFF),
817 ("NOTIFICATION", NOTIFICATION_BACKOFF),
818 ("TOKEN_SWAP_REQUEST", TOKEN_SWAP_REQUEST_BACKOFF),
819 ("TX_CLEANUP", TX_CLEANUP_BACKOFF),
820 ("SYSTEM_CLEANUP", SYSTEM_CLEANUP_BACKOFF),
821 ("RELAYER_HEALTH", RELAYER_HEALTH_BACKOFF),
822 ("TOKEN_SWAP_CRON", TOKEN_SWAP_CRON_BACKOFF),
823 ];
824
825 for (name, cfg) in profiles {
826 assert!(
827 cfg.initial_ms <= cfg.max_ms,
828 "{name}: initial_ms ({}) must be <= max_ms ({})",
829 cfg.initial_ms,
830 cfg.max_ms
831 );
832 }
833 }
834
835 #[test]
836 fn test_all_backoff_configs_have_valid_jitter_range() {
837 let profiles: &[(&str, RetryBackoffConfig)] = &[
838 ("TX_REQUEST", TX_REQUEST_BACKOFF),
839 ("TX_SUBMISSION", TX_SUBMISSION_BACKOFF),
840 ("STATUS_GENERIC", STATUS_GENERIC_BACKOFF),
841 ("STATUS_EVM", STATUS_EVM_BACKOFF),
842 ("STATUS_STELLAR", STATUS_STELLAR_BACKOFF),
843 ("NOTIFICATION", NOTIFICATION_BACKOFF),
844 ("TOKEN_SWAP_REQUEST", TOKEN_SWAP_REQUEST_BACKOFF),
845 ("TX_CLEANUP", TX_CLEANUP_BACKOFF),
846 ("SYSTEM_CLEANUP", SYSTEM_CLEANUP_BACKOFF),
847 ("RELAYER_HEALTH", RELAYER_HEALTH_BACKOFF),
848 ("TOKEN_SWAP_CRON", TOKEN_SWAP_CRON_BACKOFF),
849 ];
850
851 for (name, cfg) in profiles {
852 assert!(
853 (0.0..=1.0).contains(&cfg.jitter),
854 "{name}: jitter ({}) must be in [0.0, 1.0]",
855 cfg.jitter
856 );
857 }
858 }
859
860 #[test]
861 fn test_all_backoff_configs_have_positive_initial_ms() {
862 let profiles: &[(&str, RetryBackoffConfig)] = &[
863 ("TX_REQUEST", TX_REQUEST_BACKOFF),
864 ("TX_SUBMISSION", TX_SUBMISSION_BACKOFF),
865 ("STATUS_GENERIC", STATUS_GENERIC_BACKOFF),
866 ("STATUS_EVM", STATUS_EVM_BACKOFF),
867 ("STATUS_STELLAR", STATUS_STELLAR_BACKOFF),
868 ("NOTIFICATION", NOTIFICATION_BACKOFF),
869 ("TOKEN_SWAP_REQUEST", TOKEN_SWAP_REQUEST_BACKOFF),
870 ("TX_CLEANUP", TX_CLEANUP_BACKOFF),
871 ("SYSTEM_CLEANUP", SYSTEM_CLEANUP_BACKOFF),
872 ("RELAYER_HEALTH", RELAYER_HEALTH_BACKOFF),
873 ("TOKEN_SWAP_CRON", TOKEN_SWAP_CRON_BACKOFF),
874 ];
875
876 for (name, cfg) in profiles {
877 assert!(
878 cfg.initial_ms > 0,
879 "{name}: initial_ms must be positive, got {}",
880 cfg.initial_ms
881 );
882 }
883 }
884
885 #[test]
888 fn test_worker_name_constants_are_nonempty() {
889 let names = [
890 TRANSACTION_REQUEST,
891 TRANSACTION_SENDER,
892 TRANSACTION_STATUS_CHECKER,
893 TRANSACTION_STATUS_CHECKER_EVM,
894 TRANSACTION_STATUS_CHECKER_STELLAR,
895 NOTIFICATION_SENDER,
896 TOKEN_SWAP_REQUEST,
897 TRANSACTION_CLEANUP,
898 RELAYER_HEALTH_CHECK,
899 SYSTEM_CLEANUP,
900 ];
901
902 for name in &names {
903 assert!(!name.is_empty(), "Worker name constant must not be empty");
904 }
905 }
906
907 #[test]
908 fn test_worker_name_constants_are_unique() {
909 let names = [
910 TRANSACTION_REQUEST,
911 TRANSACTION_SENDER,
912 TRANSACTION_STATUS_CHECKER,
913 TRANSACTION_STATUS_CHECKER_EVM,
914 TRANSACTION_STATUS_CHECKER_STELLAR,
915 NOTIFICATION_SENDER,
916 TOKEN_SWAP_REQUEST,
917 TRANSACTION_CLEANUP,
918 RELAYER_HEALTH_CHECK,
919 SYSTEM_CLEANUP,
920 ];
921
922 for (i, a) in names.iter().enumerate() {
923 for (j, b) in names.iter().enumerate() {
924 if i != j {
925 assert_ne!(
926 a, b,
927 "Worker names must be unique: '{}' at index {} and {}",
928 a, i, j
929 );
930 }
931 }
932 }
933 }
934
935 #[test]
936 fn test_worker_names_match_concurrency_env_keys() {
937 assert_eq!(
941 TRANSACTION_REQUEST,
942 QueueType::TransactionRequest.concurrency_env_key()
943 );
944 assert_eq!(
945 TRANSACTION_SENDER,
946 QueueType::TransactionSubmission.concurrency_env_key()
947 );
948 assert_eq!(
949 TRANSACTION_STATUS_CHECKER,
950 QueueType::StatusCheck.concurrency_env_key()
951 );
952 assert_eq!(
953 TRANSACTION_STATUS_CHECKER_EVM,
954 QueueType::StatusCheckEvm.concurrency_env_key()
955 );
956 assert_eq!(
957 TRANSACTION_STATUS_CHECKER_STELLAR,
958 QueueType::StatusCheckStellar.concurrency_env_key()
959 );
960 assert_eq!(
961 NOTIFICATION_SENDER,
962 QueueType::Notification.concurrency_env_key()
963 );
964 assert_eq!(
965 TOKEN_SWAP_REQUEST,
966 QueueType::TokenSwapRequest.concurrency_env_key()
967 );
968 assert_eq!(
969 RELAYER_HEALTH_CHECK,
970 QueueType::RelayerHealthCheck.concurrency_env_key()
971 );
972 }
973
974 fn make_worker_event(event: Event) -> Worker<Event> {
977 let worker_id = WorkerId::from_str("test-worker").unwrap();
978 Worker::new(worker_id, event)
979 }
980
981 #[test]
982 fn test_monitor_handle_event_start_does_not_panic() {
983 monitor_handle_event(make_worker_event(Event::Start));
984 }
985
986 #[test]
987 fn test_monitor_handle_event_engage_does_not_panic() {
988 let task_id = TaskId::new();
989 monitor_handle_event(make_worker_event(Event::Engage(task_id)));
990 }
991
992 #[test]
993 fn test_monitor_handle_event_idle_does_not_panic() {
994 monitor_handle_event(make_worker_event(Event::Idle));
995 }
996
997 #[test]
998 fn test_monitor_handle_event_error_does_not_panic() {
999 let error: Box<dyn std::error::Error + Send + Sync> = "test error".to_string().into();
1000 monitor_handle_event(make_worker_event(Event::Error(error)));
1001 }
1002
1003 #[test]
1004 fn test_monitor_handle_event_stop_does_not_panic() {
1005 monitor_handle_event(make_worker_event(Event::Stop));
1006 }
1007
1008 #[test]
1009 fn test_monitor_handle_event_exit_does_not_panic() {
1010 monitor_handle_event(make_worker_event(Event::Exit));
1011 }
1012
1013 #[test]
1014 fn test_monitor_handle_event_custom_does_not_panic() {
1015 monitor_handle_event(make_worker_event(Event::Custom("test-custom".to_string())));
1016 }
1017}