1use crate::{
21 config::ServerConfig,
22 domain::{get_network_relayer, Relayer},
23 jobs::JobProducerTrait,
24 models::{
25 NetworkRepoModel, NotificationRepoModel, RelayerRepoModel, SignerRepoModel,
26 ThinDataAppState, TransactionRepoModel,
27 },
28 repositories::{
29 ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository,
30 Repository, TransactionCounterTrait, TransactionRepository,
31 },
32 utils::{
33 is_global_init_recently_completed, is_relayer_recently_synced, poll_until,
34 set_global_init_completed, set_relayer_last_sync, DistributedLock, BOOTSTRAP_LOCK_TTL_SECS,
35 LOCK_POLL_INTERVAL_MS, LOCK_WAIT_MAX_SECS,
36 },
37};
38use color_eyre::{eyre::WrapErr, Result};
39use deadpool_redis::Pool;
40use std::sync::Arc;
41use std::time::Duration;
42use tracing::{debug, info, warn};
43
44const INIT_STALENESS_THRESHOLD_SECS: u64 = 300;
47
48const GLOBAL_INIT_LOCK_NAME: &str = "relayer_init_global";
50
51async fn initialize_relayer_with_service<R>(relayer_id: &str, relayer_service: &R) -> Result<()>
55where
56 R: Relayer,
57{
58 debug!(relayer_id = %relayer_id, "initializing relayer");
59
60 relayer_service
61 .initialize_relayer()
62 .await
63 .wrap_err_with(|| format!("Failed to initialize relayer: {relayer_id}"))?;
64
65 Ok(())
66}
67
68pub async fn initialize_relayer<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
69 relayer_id: String,
70 app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
71) -> Result<()>
72where
73 J: JobProducerTrait + Send + Sync + 'static,
74 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
75 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
76 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
77 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
78 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
79 TCR: TransactionCounterTrait + Send + Sync + 'static,
80 PR: PluginRepositoryTrait + Send + Sync + 'static,
81 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
82{
83 let relayer_service = get_network_relayer(relayer_id.clone(), &app_state).await?;
84
85 initialize_relayer_with_service(&relayer_id, &relayer_service).await
86}
87
88pub fn get_relayer_ids_to_initialize(relayers: &[RelayerRepoModel]) -> Vec<String> {
90 relayers.iter().map(|r| r.id.clone()).collect()
91}
92
93pub async fn initialize_relayers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
94 app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
95) -> Result<()>
96where
97 J: JobProducerTrait + Send + Sync + 'static,
98 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
99 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
100 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
101 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
102 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
103 TCR: TransactionCounterTrait + Send + Sync + 'static,
104 PR: PluginRepositoryTrait + Send + Sync + 'static,
105 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
106{
107 let relayers = app_state.relayer_repository.list_all().await?;
108
109 if relayers.is_empty() {
111 info!("No relayers to initialize");
112 return Ok(());
113 }
114
115 info!(count = relayers.len(), "Initializing relayers");
116
117 let use_lock = ServerConfig::get_distributed_mode();
119 let connection_info = app_state.relayer_repository.connection_info();
120
121 match (use_lock, connection_info) {
122 (true, Some((conn, prefix))) => {
123 coordinate_with_distributed_lock(&relayers, &app_state, &conn, &prefix).await
125 }
126 _ => {
127 info!("Initializing relayers without distributed locking");
129 run_initialization_batch(&relayers, &app_state).await
130 }
131 }
132}
133
134async fn coordinate_with_distributed_lock<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
144 relayers: &[RelayerRepoModel],
145 app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
146 conn: &Arc<Pool>,
147 prefix: &str,
148) -> Result<()>
149where
150 J: JobProducerTrait + Send + Sync + 'static,
151 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
152 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
153 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
154 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
155 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
156 TCR: TransactionCounterTrait + Send + Sync + 'static,
157 PR: PluginRepositoryTrait + Send + Sync + 'static,
158 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
159{
160 match was_relayer_set_recently_initialized(conn, prefix, relayers).await {
162 Ok(true) => {
163 info!("Initialization recently completed by another instance, skipping");
164 return Ok(());
165 }
166 Ok(false) => {}
167 Err(e) => {
168 warn!(
169 error = %e,
170 "Failed to check recent initialization status, proceeding with initialization"
171 );
172 }
173 }
174
175 let lock_key = format!("{prefix}:lock:{GLOBAL_INIT_LOCK_NAME}");
177 let lock = DistributedLock::new(
178 conn.clone(),
179 &lock_key,
180 Duration::from_secs(BOOTSTRAP_LOCK_TTL_SECS),
181 );
182
183 let lock_result = lock.try_acquire().await;
184
185 if matches!(&lock_result, Ok(None)) {
187 info!("Another instance is initializing relayers, waiting for completion");
188 let completed = wait_for_initialization_complete(conn, prefix).await?;
189
190 if completed {
191 return Ok(());
192 }
193
194 warn!("Timeout waiting for initialization, rechecking state");
197
198 if is_global_init_recently_completed(conn, prefix, INIT_STALENESS_THRESHOLD_SECS)
199 .await
200 .unwrap_or(false)
201 {
202 info!("Initialization completed during timeout window");
203 return Ok(());
204 }
205
206 return recover_after_timeout(relayers, app_state, conn, prefix).await;
208 }
209
210 let guard = match lock_result {
212 Ok(Some(g)) => g,
213 Err(e) => {
214 warn!(
215 error = %e,
216 "Failed to acquire distributed lock, proceeding without coordination"
217 );
218 return run_initialization_batch(relayers, app_state).await;
219 }
220 Ok(None) => unreachable!(), };
222
223 if was_relayer_set_recently_initialized(conn, prefix, relayers)
224 .await
225 .unwrap_or(false)
226 {
227 info!("Initialization completed while waiting for the lock, skipping");
228 drop(guard);
229 return Ok(());
230 }
231
232 info!(
234 count = relayers.len(),
235 "Acquired initialization lock, initializing relayers"
236 );
237
238 let result = run_initialization_batch(relayers, app_state).await;
239
240 if result.is_ok() {
241 if let Err(e) = record_relayer_initialization_completion(conn, prefix, relayers).await {
242 warn!(error = %e, "Failed to record initialization completion time");
243 }
244 }
245
246 drop(guard);
247 result
248}
249
250async fn was_relayer_set_recently_initialized(
251 conn: &Arc<Pool>,
252 prefix: &str,
253 relayers: &[RelayerRepoModel],
254) -> Result<bool> {
255 if !is_global_init_recently_completed(conn, prefix, INIT_STALENESS_THRESHOLD_SECS).await? {
256 return Ok(false);
257 }
258
259 for relayer in relayers {
260 if !is_relayer_recently_synced(conn, prefix, &relayer.id, INIT_STALENESS_THRESHOLD_SECS)
261 .await?
262 {
263 return Ok(false);
264 }
265 }
266
267 Ok(true)
268}
269
270async fn record_relayer_initialization_completion(
271 conn: &Arc<Pool>,
272 prefix: &str,
273 relayers: &[RelayerRepoModel],
274) -> Result<()> {
275 for relayer in relayers {
276 set_relayer_last_sync(conn, prefix, &relayer.id).await?;
277 }
278
279 set_global_init_completed(conn, prefix).await
280}
281
282async fn wait_for_initialization_complete(conn: &Arc<Pool>, prefix: &str) -> Result<bool> {
288 let max_wait = Duration::from_secs(LOCK_WAIT_MAX_SECS);
289 let poll_interval = Duration::from_millis(LOCK_POLL_INTERVAL_MS);
290
291 let conn = conn.clone();
293 let prefix = prefix.to_string();
294
295 poll_until(
296 || is_global_init_recently_completed(&conn, &prefix, INIT_STALENESS_THRESHOLD_SECS),
297 max_wait,
298 poll_interval,
299 "initialization",
300 )
301 .await
302}
303
304async fn recover_after_timeout<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
313 relayers: &[RelayerRepoModel],
314 app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
315 conn: &Arc<Pool>,
316 prefix: &str,
317) -> Result<()>
318where
319 J: JobProducerTrait + Send + Sync + 'static,
320 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
321 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
322 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
323 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
324 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
325 TCR: TransactionCounterTrait + Send + Sync + 'static,
326 PR: PluginRepositoryTrait + Send + Sync + 'static,
327 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
328{
329 let lock_key = format!("{prefix}:lock:{GLOBAL_INIT_LOCK_NAME}");
330 let recovery_lock = DistributedLock::new(
331 conn.clone(),
332 &lock_key,
333 Duration::from_secs(BOOTSTRAP_LOCK_TTL_SECS),
334 );
335
336 match recovery_lock.try_acquire().await {
337 Ok(Some(guard)) => {
338 warn!(
340 count = relayers.len(),
341 "Previous lock holder appears to have crashed, taking over initialization"
342 );
343 let result = run_initialization_batch(relayers, app_state).await;
344 if result.is_ok() {
345 if let Err(e) =
346 record_relayer_initialization_completion(conn, prefix, relayers).await
347 {
348 warn!(error = %e, "Failed to record initialization completion time");
349 }
350 }
351 drop(guard);
352 result
353 }
354 Ok(None) => {
355 warn!("Lock still held by another instance after timeout, waiting for completion");
358 let completed = wait_for_initialization_complete(conn, prefix).await?;
359
360 if completed {
361 info!("Initialization completed by another instance during extended wait");
362 Ok(())
363 } else {
364 warn!("Extended wait also timed out, proceeding with initialization");
368 let result = run_initialization_batch(relayers, app_state).await;
369 if result.is_ok() {
370 if let Err(e) =
371 record_relayer_initialization_completion(conn, prefix, relayers).await
372 {
373 warn!(error = %e, "Failed to record initialization completion time");
374 }
375 }
376 result
377 }
378 }
379 Err(e) => {
380 warn!(
382 error = %e,
383 "Failed to check lock after timeout, attempting initialization without coordination"
384 );
385 run_initialization_batch(relayers, app_state).await
386 }
387 }
388}
389
390async fn run_initialization_batch<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
392 relayers: &[RelayerRepoModel],
393 app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
394) -> Result<()>
395where
396 J: JobProducerTrait + Send + Sync + 'static,
397 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
398 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
399 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
400 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
401 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
402 TCR: TransactionCounterTrait + Send + Sync + 'static,
403 PR: PluginRepositoryTrait + Send + Sync + 'static,
404 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
405{
406 let futures = relayers.iter().map(|relayer| {
407 let app_state = app_state.clone();
408 let relayer_id = relayer.id.clone();
409
410 async move {
411 let result = initialize_relayer(relayer_id.clone(), app_state).await;
412 (relayer_id, result)
413 }
414 });
415
416 let results = futures::future::join_all(futures).await;
417
418 let succeeded = results.iter().filter(|(_, r)| r.is_ok()).count();
420 let failed = results.iter().filter(|(_, r)| r.is_err()).count();
421
422 info!(
423 succeeded = succeeded,
424 failed = failed,
425 "Relayer initialization completed"
426 );
427
428 if failed > 0 {
430 let failures: Vec<String> = results
431 .into_iter()
432 .filter_map(|(id, r)| r.err().map(|e| format!("{id}: {e}")))
433 .collect();
434
435 return Err(eyre::eyre!(
436 "Failed to initialize {} relayer(s): {}",
437 failed,
438 failures.join("; ")
439 ));
440 }
441
442 Ok(())
443}
444
445#[cfg(test)]
446mod tests {
447 use super::*;
448 use crate::{
449 jobs::MockJobProducerTrait,
450 models::{AppState, RepositoryError},
451 repositories::{
452 ApiKeyRepositoryStorage, MockRelayerRepository, NetworkRepositoryStorage,
453 NotificationRepositoryStorage, PluginRepositoryStorage, SignerRepositoryStorage,
454 TransactionCounterRepositoryStorage, TransactionRepositoryStorage,
455 },
456 utils::mocks::mockutils::{create_mock_app_state, create_mock_relayer},
457 };
458 use actix_web::web::ThinData;
459 use std::sync::Arc;
460
461 fn create_app_state_with_mock_relayer_repo(
462 mock_relayer_repo: MockRelayerRepository,
463 ) -> AppState<
464 MockJobProducerTrait,
465 MockRelayerRepository,
466 TransactionRepositoryStorage,
467 NetworkRepositoryStorage,
468 NotificationRepositoryStorage,
469 SignerRepositoryStorage,
470 TransactionCounterRepositoryStorage,
471 PluginRepositoryStorage,
472 ApiKeyRepositoryStorage,
473 > {
474 AppState {
475 relayer_repository: Arc::new(mock_relayer_repo),
476 transaction_repository: Arc::new(TransactionRepositoryStorage::new_in_memory()),
477 signer_repository: Arc::new(SignerRepositoryStorage::new_in_memory()),
478 notification_repository: Arc::new(NotificationRepositoryStorage::new_in_memory()),
479 network_repository: Arc::new(NetworkRepositoryStorage::new_in_memory()),
480 transaction_counter_store: Arc::new(
481 TransactionCounterRepositoryStorage::new_in_memory(),
482 ),
483 job_producer: Arc::new(MockJobProducerTrait::new()),
484 plugin_repository: Arc::new(PluginRepositoryStorage::new_in_memory()),
485 api_key_repository: Arc::new(ApiKeyRepositoryStorage::new_in_memory()),
486 }
487 }
488
489 #[test]
490 fn test_get_relayer_ids_with_empty_list() {
491 let relayers: Vec<RelayerRepoModel> = vec![];
492 let ids = get_relayer_ids_to_initialize(&relayers);
493
494 assert_eq!(ids.len(), 0, "Should return empty list for no relayers");
495 }
496
497 #[test]
498 fn test_get_relayer_ids_with_single_relayer() {
499 let relayers = vec![create_mock_relayer("relayer-1".to_string(), false)];
500
501 let ids = get_relayer_ids_to_initialize(&relayers);
502
503 assert_eq!(ids.len(), 1, "Should return one ID");
504 assert_eq!(ids[0], "relayer-1");
505 }
506
507 #[test]
508 fn test_get_relayer_ids_with_multiple_relayers() {
509 let relayers = vec![
510 create_mock_relayer("evm-relayer".to_string(), false),
511 create_mock_relayer("solana-relayer".to_string(), false),
512 create_mock_relayer("stellar-relayer".to_string(), false),
513 ];
514
515 let ids = get_relayer_ids_to_initialize(&relayers);
516
517 assert_eq!(ids.len(), 3, "Should return three IDs");
518 assert_eq!(ids[0], "evm-relayer");
519 assert_eq!(ids[1], "solana-relayer");
520 assert_eq!(ids[2], "stellar-relayer");
521 }
522
523 #[test]
524 fn test_get_relayer_ids_with_mixed_states() {
525 let mut relayers = vec![
526 create_mock_relayer("active-relayer".to_string(), false),
527 create_mock_relayer("paused-relayer".to_string(), false),
528 create_mock_relayer("disabled-relayer".to_string(), false),
529 ];
530
531 relayers[1].paused = true;
533 relayers[2].system_disabled = true;
534
535 let ids = get_relayer_ids_to_initialize(&relayers);
536
537 assert_eq!(
539 ids.len(),
540 3,
541 "Should include all relayers regardless of state"
542 );
543 assert!(ids.contains(&"active-relayer".to_string()));
544 assert!(ids.contains(&"paused-relayer".to_string()));
545 assert!(ids.contains(&"disabled-relayer".to_string()));
546 }
547
548 #[test]
549 fn test_get_relayer_ids_with_different_network_types() {
550 let relayers = vec![
551 create_mock_relayer("evm-1".to_string(), false),
552 create_mock_relayer("evm-2".to_string(), false),
553 create_mock_relayer("solana-1".to_string(), false),
554 create_mock_relayer("stellar-1".to_string(), false),
555 ];
556
557 let ids = get_relayer_ids_to_initialize(&relayers);
558
559 assert_eq!(ids.len(), 4, "Should include all network types");
560
561 assert!(ids.iter().any(|id| id.starts_with("evm-")));
563 assert!(ids.iter().any(|id| id.starts_with("solana-")));
564 assert!(ids.iter().any(|id| id.starts_with("stellar-")));
565 }
566
567 #[test]
568 fn test_concurrent_initialization_count() {
569 let test_cases = vec![
573 (0, 0), (1, 1), (5, 5), (10, 10), ];
578
579 for (relayer_count, expected_init_count) in test_cases {
580 let relayers: Vec<RelayerRepoModel> = (0..relayer_count)
581 .map(|i| create_mock_relayer(format!("relayer-{i}"), false))
582 .collect();
583
584 let ids = get_relayer_ids_to_initialize(&relayers);
585
586 assert_eq!(
587 ids.len(),
588 expected_init_count,
589 "Should create {expected_init_count} initializations for {relayer_count} relayers"
590 );
591 }
592 }
593
594 #[tokio::test]
595 async fn test_initialize_relayer_with_service_success() {
596 use crate::domain::MockRelayer;
597
598 let mut mock_relayer = MockRelayer::new();
599 mock_relayer
600 .expect_initialize_relayer()
601 .times(1)
602 .returning(|| Box::pin(async { Ok(()) }));
603
604 let result = initialize_relayer_with_service("test-relayer", &mock_relayer).await;
605
606 assert!(result.is_ok(), "Should successfully initialize relayer");
607 }
608
609 #[tokio::test]
610 async fn test_initialize_relayer_with_service_failure() {
611 use crate::domain::MockRelayer;
612 use crate::models::RelayerError;
613
614 let mut mock_relayer = MockRelayer::new();
615 mock_relayer
616 .expect_initialize_relayer()
617 .times(1)
618 .returning(|| {
619 Box::pin(async {
620 Err(RelayerError::ProviderError(
621 "RPC connection failed".to_string(),
622 ))
623 })
624 });
625
626 let result = initialize_relayer_with_service("test-relayer", &mock_relayer).await;
627
628 assert!(
629 result.is_err(),
630 "Should fail when initialize_relayer returns error"
631 );
632 let err = result.unwrap_err();
633 assert!(err
634 .to_string()
635 .contains("Failed to initialize relayer: test-relayer"));
636 }
637
638 #[tokio::test]
639 async fn test_initialize_relayer_with_service_called_once() {
640 use crate::domain::MockRelayer;
641
642 let mut mock_relayer = MockRelayer::new();
643 mock_relayer
645 .expect_initialize_relayer()
646 .times(1)
647 .returning(|| Box::pin(async { Ok(()) }));
648
649 let _ = initialize_relayer_with_service("relayer-123", &mock_relayer).await;
650
651 }
653
654 #[tokio::test]
655 async fn test_initialize_relayer_with_service_multiple_relayers() {
656 use crate::domain::MockRelayer;
657
658 let mut mock_relayer_1 = MockRelayer::new();
660 mock_relayer_1
661 .expect_initialize_relayer()
662 .times(1)
663 .returning(|| Box::pin(async { Ok(()) }));
664
665 let mut mock_relayer_2 = MockRelayer::new();
666 mock_relayer_2
667 .expect_initialize_relayer()
668 .times(1)
669 .returning(|| Box::pin(async { Ok(()) }));
670
671 let result1 = initialize_relayer_with_service("relayer-1", &mock_relayer_1).await;
672 let result2 = initialize_relayer_with_service("relayer-2", &mock_relayer_2).await;
673
674 assert!(
675 result1.is_ok(),
676 "First relayer should initialize successfully"
677 );
678 assert!(
679 result2.is_ok(),
680 "Second relayer should initialize successfully"
681 );
682 }
683
684 #[test]
686 fn test_lock_ttl_is_reasonable() {
687 assert!(
689 BOOTSTRAP_LOCK_TTL_SECS >= 60,
690 "Lock TTL should be at least 60 seconds"
691 );
692 assert!(
694 BOOTSTRAP_LOCK_TTL_SECS <= 600,
695 "Lock TTL should not exceed 10 minutes"
696 );
697 }
698
699 #[test]
700 fn test_staleness_threshold_is_reasonable() {
701 assert!(
703 INIT_STALENESS_THRESHOLD_SECS >= 60,
704 "Staleness threshold should be at least 60 seconds"
705 );
706 assert!(
708 INIT_STALENESS_THRESHOLD_SECS <= 3600,
709 "Staleness threshold should not exceed 1 hour"
710 );
711 }
712
713 #[test]
714 fn test_wait_max_duration_exceeds_lock_ttl() {
715 assert!(
717 LOCK_WAIT_MAX_SECS > BOOTSTRAP_LOCK_TTL_SECS,
718 "Wait duration should exceed lock TTL"
719 );
720 }
721
722 #[test]
723 fn test_poll_interval_is_reasonable() {
724 assert!(
726 LOCK_POLL_INTERVAL_MS >= 100,
727 "Poll interval should be at least 100ms"
728 );
729 assert!(
731 LOCK_POLL_INTERVAL_MS <= 5000,
732 "Poll interval should not exceed 5 seconds"
733 );
734 }
735
736 #[test]
738 fn test_get_relayer_ids_preserves_order() {
739 let relayers = vec![
740 create_mock_relayer("z-relayer".to_string(), false),
741 create_mock_relayer("a-relayer".to_string(), false),
742 create_mock_relayer("m-relayer".to_string(), false),
743 ];
744
745 let ids = get_relayer_ids_to_initialize(&relayers);
746
747 assert_eq!(ids[0], "z-relayer");
749 assert_eq!(ids[1], "a-relayer");
750 assert_eq!(ids[2], "m-relayer");
751 }
752
753 #[test]
754 fn test_get_relayer_ids_with_special_characters() {
755 let relayers = vec![
756 create_mock_relayer("relayer-with-dashes".to_string(), false),
757 create_mock_relayer("relayer_with_underscores".to_string(), false),
758 create_mock_relayer("relayer.with.dots".to_string(), false),
759 ];
760
761 let ids = get_relayer_ids_to_initialize(&relayers);
762
763 assert_eq!(ids.len(), 3);
764 assert!(ids.contains(&"relayer-with-dashes".to_string()));
765 assert!(ids.contains(&"relayer_with_underscores".to_string()));
766 assert!(ids.contains(&"relayer.with.dots".to_string()));
767 }
768
769 #[test]
770 fn test_get_relayer_ids_with_large_list() {
771 let relayers: Vec<RelayerRepoModel> = (0..100)
772 .map(|i| create_mock_relayer(format!("relayer-{:03}", i), false))
773 .collect();
774
775 let ids = get_relayer_ids_to_initialize(&relayers);
776
777 assert_eq!(ids.len(), 100);
778 assert_eq!(ids[0], "relayer-000");
779 assert_eq!(ids[99], "relayer-099");
780 }
781
782 #[tokio::test]
784 async fn test_initialize_relayer_with_service_error_includes_relayer_id() {
785 use crate::domain::MockRelayer;
786 use crate::models::RelayerError;
787
788 let mut mock_relayer = MockRelayer::new();
789 mock_relayer
790 .expect_initialize_relayer()
791 .times(1)
792 .returning(|| {
793 Box::pin(async {
794 Err(RelayerError::NetworkConfiguration("bad config".to_string()))
795 })
796 });
797
798 let result = initialize_relayer_with_service("my-special-relayer-id", &mock_relayer).await;
799
800 assert!(result.is_err());
801 let err_str = result.unwrap_err().to_string();
802 assert!(
803 err_str.contains("my-special-relayer-id"),
804 "Error should contain relayer ID, got: {}",
805 err_str
806 );
807 }
808
809 #[tokio::test]
810 async fn test_initialize_relayer_with_service_provider_error() {
811 use crate::domain::MockRelayer;
812 use crate::models::RelayerError;
813
814 let mut mock_relayer = MockRelayer::new();
815 mock_relayer
816 .expect_initialize_relayer()
817 .times(1)
818 .returning(|| {
819 Box::pin(async { Err(RelayerError::ProviderError("provider failed".to_string())) })
820 });
821
822 let result = initialize_relayer_with_service("test-relayer", &mock_relayer).await;
823 assert!(result.is_err(), "Should fail for ProviderError");
824 }
825
826 #[tokio::test]
827 async fn test_initialize_relayer_with_service_network_config_error() {
828 use crate::domain::MockRelayer;
829 use crate::models::RelayerError;
830
831 let mut mock_relayer = MockRelayer::new();
832 mock_relayer
833 .expect_initialize_relayer()
834 .times(1)
835 .returning(|| {
836 Box::pin(async {
837 Err(RelayerError::NetworkConfiguration(
838 "network config error".to_string(),
839 ))
840 })
841 });
842
843 let result = initialize_relayer_with_service("test-relayer", &mock_relayer).await;
844 assert!(
845 result.is_err(),
846 "Should fail for NetworkConfiguration error"
847 );
848 }
849
850 async fn create_test_redis_pool() -> Option<Arc<Pool>> {
857 let cfg = deadpool_redis::Config::from_url("redis://127.0.0.1:6379");
858 let pool = cfg
859 .builder()
860 .ok()?
861 .max_size(16)
862 .runtime(deadpool_redis::Runtime::Tokio1)
863 .build()
864 .ok()?;
865 Some(Arc::new(pool))
866 }
867
868 fn create_unreachable_redis_pool() -> Arc<Pool> {
869 let cfg = deadpool_redis::Config::from_url("redis://127.0.0.1:1");
870 let pool = cfg
871 .builder()
872 .expect("should create deadpool builder")
873 .max_size(1)
874 .runtime(deadpool_redis::Runtime::Tokio1)
875 .build()
876 .expect("should build deadpool");
877 Arc::new(pool)
878 }
879
880 #[tokio::test]
883 async fn test_run_initialization_batch_empty_list() {
884 let app_state = create_mock_app_state(None, None, None, None, None, None).await;
885 let thin_state = ThinData(app_state);
886
887 let relayers: Vec<RelayerRepoModel> = vec![];
888 let result = run_initialization_batch(&relayers, &thin_state).await;
889
890 assert!(
891 result.is_ok(),
892 "Should succeed with empty relayer list: {:?}",
893 result
894 );
895 }
896
897 #[tokio::test]
898 async fn test_run_initialization_batch_handles_failures() {
899 let relayers = vec![create_mock_relayer("failing-relayer".to_string(), false)];
900
901 let app_state =
902 create_mock_app_state(None, Some(relayers.clone()), None, None, None, None).await;
903 let thin_state = ThinData(app_state);
904
905 let result = run_initialization_batch(&relayers, &thin_state).await;
906
907 assert!(result.is_err(), "Should fail due to missing signer");
908 }
909
910 #[tokio::test]
911 async fn test_run_initialization_batch_concurrent_execution() {
912 let relayers: Vec<RelayerRepoModel> = (0..5)
913 .map(|i| create_mock_relayer(format!("concurrent-relayer-{}", i), false))
914 .collect();
915
916 let app_state =
917 create_mock_app_state(None, Some(relayers.clone()), None, None, None, None).await;
918 let thin_state = ThinData(app_state);
919
920 let result = run_initialization_batch(&relayers, &thin_state).await;
922
923 assert!(result.is_err(), "Should fail due to missing signers");
924 let err_str = result.unwrap_err().to_string();
926 assert!(
927 err_str.contains("Failed to initialize"),
928 "Error should mention initialization failure"
929 );
930 }
931
932 #[tokio::test]
933 async fn test_run_initialization_batch_reports_each_failed_relayer_id() {
934 let relayers = vec![
935 create_mock_relayer("batch-relayer-1".to_string(), false),
936 create_mock_relayer("batch-relayer-2".to_string(), false),
937 ];
938
939 let app_state =
940 create_mock_app_state(None, Some(relayers.clone()), None, None, None, None).await;
941 let thin_state = ThinData(app_state);
942
943 let result = run_initialization_batch(&relayers, &thin_state).await;
944
945 assert!(result.is_err(), "Should fail due to missing signers");
946
947 let err = result.unwrap_err().to_string();
948 assert!(err.contains("Failed to initialize 2 relayer(s)"));
949 assert!(err.contains("batch-relayer-1"));
950 assert!(err.contains("batch-relayer-2"));
951 }
952
953 #[tokio::test]
956 async fn test_recover_after_timeout_gracefully_degrades_when_redis_errors() {
957 let relayers = vec![create_mock_relayer(
958 "recovery-error-relayer".to_string(),
959 false,
960 )];
961 let app_state =
962 create_mock_app_state(None, Some(relayers.clone()), None, None, None, None).await;
963 let thin_state = ThinData(app_state);
964 let unreachable_pool = create_unreachable_redis_pool();
965
966 let result =
967 recover_after_timeout(&relayers, &thin_state, &unreachable_pool, "recover_err").await;
968
969 assert!(result.is_err(), "Should fall back to initialization batch");
970 let err = result.unwrap_err().to_string();
971 assert!(err.contains("Failed to initialize"));
972 assert!(err.contains("recovery-error-relayer"));
973 }
974
975 #[tokio::test]
976 #[ignore] async fn test_recover_after_timeout_acquires_lock_and_attempts_initialization() {
978 let conn = create_test_redis_pool()
979 .await
980 .expect("Redis connection required");
981 let relayers = vec![create_mock_relayer(
982 "recovery-lock-relayer".to_string(),
983 false,
984 )];
985 let app_state =
986 create_mock_app_state(None, Some(relayers.clone()), None, None, None, None).await;
987 let thin_state = ThinData(app_state);
988 let prefix = "test_recover_after_timeout_acquire";
989
990 {
991 let mut conn_clone = conn.get().await.expect("Failed to get connection");
992 let hash_key = format!("{prefix}:relayer_sync_meta");
993 let lock_key = format!("{prefix}:lock:{GLOBAL_INIT_LOCK_NAME}");
994 let _: Result<(), _> = redis::AsyncCommands::del(&mut conn_clone, &hash_key).await;
995 let _: Result<(), _> = redis::AsyncCommands::del(&mut conn_clone, &lock_key).await;
996 }
997
998 let result = recover_after_timeout(&relayers, &thin_state, &conn, prefix).await;
999
1000 assert!(
1001 result.is_err(),
1002 "Should attempt initialization after taking over the lock"
1003 );
1004
1005 let is_recent = is_global_init_recently_completed(&conn, prefix, 300)
1006 .await
1007 .expect("Should check completion");
1008 assert!(
1009 !is_recent,
1010 "Should not record completion time when initialization fails"
1011 );
1012
1013 {
1014 let mut conn_clone = conn.get().await.expect("Failed to get connection");
1015 let hash_key = format!("{prefix}:relayer_sync_meta");
1016 let lock_key = format!("{prefix}:lock:{GLOBAL_INIT_LOCK_NAME}");
1017 let _: Result<(), _> = redis::AsyncCommands::del(&mut conn_clone, &hash_key).await;
1018 let _: Result<(), _> = redis::AsyncCommands::del(&mut conn_clone, &lock_key).await;
1019 }
1020 }
1021
1022 #[tokio::test]
1025 #[ignore] async fn test_coordinate_with_distributed_lock_skips_when_recently_completed() {
1027 let conn = create_test_redis_pool()
1028 .await
1029 .expect("Redis connection required");
1030
1031 let relayers = vec![create_mock_relayer(
1032 "global-lock-relayer".to_string(),
1033 false,
1034 )];
1035 let app_state =
1036 create_mock_app_state(None, Some(relayers.clone()), None, None, None, None).await;
1037 let thin_state = ThinData(app_state);
1038
1039 let prefix = "test_global_skip_recent";
1040
1041 set_relayer_last_sync(&conn, prefix, &relayers[0].id)
1043 .await
1044 .expect("Should set relayer sync time");
1045 set_global_init_completed(&conn, prefix)
1046 .await
1047 .expect("Should set completion time");
1048
1049 let result = coordinate_with_distributed_lock(&relayers, &thin_state, &conn, prefix).await;
1050
1051 assert!(
1053 result.is_ok(),
1054 "Should skip initialization when recently completed: {:?}",
1055 result
1056 );
1057
1058 let mut conn_clone = conn.get().await.expect("Failed to get connection");
1060 let hash_key = format!("{}:relayer_sync_meta", prefix);
1061 let _: Result<(), _> = redis::AsyncCommands::del(&mut conn_clone, &hash_key).await;
1062 }
1063
1064 #[tokio::test]
1065 #[ignore] async fn test_coordinate_with_distributed_lock_does_not_skip_without_relayer_sync_markers() {
1067 let conn = create_test_redis_pool()
1068 .await
1069 .expect("Redis connection required");
1070
1071 let relayers = vec![create_mock_relayer(
1072 "global-lock-no-sync-relayer".to_string(),
1073 false,
1074 )];
1075 let app_state =
1076 create_mock_app_state(None, Some(relayers.clone()), None, None, None, None).await;
1077 let thin_state = ThinData(app_state);
1078
1079 let prefix = "test_global_skip_without_relayer_sync";
1080
1081 set_global_init_completed(&conn, prefix)
1082 .await
1083 .expect("Should set completion time");
1084
1085 let result = coordinate_with_distributed_lock(&relayers, &thin_state, &conn, prefix).await;
1086
1087 assert!(
1088 result.is_err(),
1089 "Should not skip initialization when relayer sync markers are missing"
1090 );
1091
1092 let mut conn_clone = conn.get().await.expect("Failed to get connection");
1093 let hash_key = format!("{prefix}:relayer_sync_meta");
1094 let _: Result<(), _> = redis::AsyncCommands::del(&mut conn_clone, &hash_key).await;
1095 }
1096
1097 #[tokio::test]
1098 #[ignore] async fn test_coordinate_with_distributed_lock_acquires_lock() {
1100 let conn = create_test_redis_pool()
1101 .await
1102 .expect("Redis connection required");
1103
1104 let relayers = vec![create_mock_relayer(
1105 "lock-acquire-relayer".to_string(),
1106 false,
1107 )];
1108 let app_state =
1109 create_mock_app_state(None, Some(relayers.clone()), None, None, None, None).await;
1110 let thin_state = ThinData(app_state);
1111
1112 let prefix = "test_global_acquire_lock";
1113
1114 {
1116 let mut conn_clone = conn.get().await.expect("Failed to get connection");
1117 let hash_key = format!("{}:relayer_sync_meta", prefix);
1118 let lock_key = format!("{}:lock:{}", prefix, GLOBAL_INIT_LOCK_NAME);
1119 let _: Result<(), _> = redis::AsyncCommands::del(&mut conn_clone, &hash_key).await;
1120 let _: Result<(), _> = redis::AsyncCommands::del(&mut conn_clone, &lock_key).await;
1121 }
1122
1123 let result = coordinate_with_distributed_lock(&relayers, &thin_state, &conn, prefix).await;
1124
1125 assert!(
1127 result.is_err(),
1128 "Should fail due to missing signer configuration"
1129 );
1130
1131 let is_recent = is_global_init_recently_completed(&conn, prefix, 300)
1133 .await
1134 .expect("Should check completion");
1135 assert!(!is_recent, "Should NOT record completion time on failure");
1136
1137 {
1139 let mut conn_clone = conn.get().await.expect("Failed to get connection");
1140 let hash_key = format!("{}:relayer_sync_meta", prefix);
1141 let lock_key = format!("{}:lock:{}", prefix, GLOBAL_INIT_LOCK_NAME);
1142 let _: Result<(), _> = redis::AsyncCommands::del(&mut conn_clone, &hash_key).await;
1143 let _: Result<(), _> = redis::AsyncCommands::del(&mut conn_clone, &lock_key).await;
1144 }
1145 }
1146
1147 #[tokio::test]
1148 #[ignore] async fn test_coordinate_with_distributed_lock_waits_when_lock_held() {
1150 let conn = create_test_redis_pool()
1151 .await
1152 .expect("Redis connection required");
1153
1154 let relayers = vec![create_mock_relayer("wait-relayer".to_string(), false)];
1155 let app_state =
1156 create_mock_app_state(None, Some(relayers.clone()), None, None, None, None).await;
1157 let thin_state = ThinData(app_state);
1158
1159 let prefix = "test_global_wait_lock";
1160 let hash_key = format!("{}:relayer_sync_meta", prefix);
1161 let lock_key = format!("{}:lock:{}", prefix, GLOBAL_INIT_LOCK_NAME);
1162
1163 {
1165 let mut conn_clone = conn.get().await.expect("Failed to get connection");
1166 let _: Result<(), _> = redis::AsyncCommands::del(&mut conn_clone, &hash_key).await;
1167 let _: Result<(), _> = redis::AsyncCommands::del(&mut conn_clone, &lock_key).await;
1168 }
1169
1170 let lock = DistributedLock::new(conn.clone(), &lock_key, Duration::from_secs(5));
1172 let guard = lock
1173 .try_acquire()
1174 .await
1175 .expect("Should acquire lock")
1176 .expect("Lock should be available");
1177
1178 let conn_for_task = conn.clone();
1180 let prefix_for_task = prefix.to_string();
1181 tokio::spawn(async move {
1182 tokio::time::sleep(Duration::from_millis(500)).await;
1183 set_relayer_last_sync(&conn_for_task, &prefix_for_task, "wait-relayer")
1184 .await
1185 .expect("Should set relayer sync");
1186 set_global_init_completed(&conn_for_task, &prefix_for_task)
1187 .await
1188 .expect("Should set completion");
1189 guard.release().await.expect("Should release lock");
1190 });
1191
1192 let result = coordinate_with_distributed_lock(&relayers, &thin_state, &conn, prefix).await;
1194
1195 assert!(
1196 result.is_ok(),
1197 "Should succeed after waiting for completion: {:?}",
1198 result
1199 );
1200
1201 {
1203 let mut conn_clone = conn.get().await.expect("Failed to get connection");
1204 let _: Result<(), _> = redis::AsyncCommands::del(&mut conn_clone, &hash_key).await;
1205 let _: Result<(), _> = redis::AsyncCommands::del(&mut conn_clone, &lock_key).await;
1206 }
1207 }
1208
1209 #[tokio::test]
1212 async fn test_initialize_relayers_empty_list() {
1213 let app_state = create_mock_app_state(None, None, None, None, None, None).await;
1214 let thin_state = ThinData(app_state);
1215
1216 let result = initialize_relayers(thin_state).await;
1217
1218 assert!(
1219 result.is_ok(),
1220 "Should succeed with empty relayer list: {:?}",
1221 result
1222 );
1223 }
1224
1225 #[tokio::test]
1226 async fn test_initialize_relayers_uses_in_memory_path() {
1227 let relayers = vec![create_mock_relayer("inmem-relayer".to_string(), false)];
1228 let app_state =
1229 create_mock_app_state(None, Some(relayers.clone()), None, None, None, None).await;
1230 let thin_state = ThinData(app_state);
1231
1232 let result = initialize_relayers(thin_state).await;
1233
1234 assert!(
1235 result.is_err(),
1236 "Should fail due to missing signer configuration"
1237 );
1238 }
1239
1240 #[tokio::test]
1241 async fn test_initialize_relayer_returns_error_when_relayer_is_missing() {
1242 let app_state = create_mock_app_state(None, None, None, None, None, None).await;
1243 let thin_state = ThinData(app_state);
1244
1245 let result = initialize_relayer("missing-relayer".to_string(), thin_state).await;
1246
1247 assert!(result.is_err());
1248 assert!(result.unwrap_err().to_string().contains("missing-relayer"));
1249 }
1250
1251 #[tokio::test]
1252 async fn test_initialize_relayer_returns_error_when_signer_is_missing() {
1253 let relayers = vec![create_mock_relayer("signerless-relayer".to_string(), false)];
1254 let app_state = create_mock_app_state(None, Some(relayers), None, None, None, None).await;
1255 let thin_state = ThinData(app_state);
1256
1257 let result = initialize_relayer("signerless-relayer".to_string(), thin_state).await;
1258
1259 assert!(result.is_err());
1260 assert!(result.unwrap_err().to_string().contains("test"));
1261 }
1262
1263 #[tokio::test]
1264 async fn test_initialize_relayers_propagates_list_all_error() {
1265 let mut mock_relayer_repo = MockRelayerRepository::new();
1266 mock_relayer_repo.expect_list_all().times(1).returning(|| {
1267 Err(RepositoryError::ConnectionError(
1268 "relayer repository unavailable".to_string(),
1269 ))
1270 });
1271
1272 let thin_state = ThinData(create_app_state_with_mock_relayer_repo(mock_relayer_repo));
1273
1274 let result = initialize_relayers(thin_state).await;
1275
1276 assert!(result.is_err());
1277 assert!(result
1278 .unwrap_err()
1279 .to_string()
1280 .contains("relayer repository unavailable"));
1281 }
1282}