openzeppelin_relayer/bootstrap/
initialize_relayers.rs

1//! Relayer initialization
2//!
3//! This module contains functions for initializing relayers, ensuring they are
4//! properly configured and ready for operation.
5//!
6//! ## Distributed Locking
7//!
8//! When multiple instances of the relayer service start simultaneously with
9//! `DISTRIBUTED_MODE` enabled, this module uses distributed locking to coordinate
10//! initialization and prevent duplicate work:
11//!
12//! - **Global lock**: A single lock is used for the entire initialization process,
13//!   ensuring only one instance initializes relayers at a time.
14//! - **Recent completion check**: Skips initialization if it was recently completed
15//!   (within the staleness threshold) to handle rolling restarts efficiently.
16//! - **Wait for completion**: Instances that don't acquire the lock wait for the
17//!   initializing instance to complete, then proceed without re-initializing.
18//! - **Single-instance mode**: When `DISTRIBUTED_MODE` is disabled (default) or using
19//!   in-memory storage, locking is skipped since coordination is not needed.
20use crate::{
21    config::ServerConfig,
22    domain::{get_network_relayer, Relayer},
23    jobs::JobProducerTrait,
24    models::{
25        NetworkRepoModel, NotificationRepoModel, RelayerRepoModel, SignerRepoModel,
26        ThinDataAppState, TransactionRepoModel,
27    },
28    repositories::{
29        ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository,
30        Repository, TransactionCounterTrait, TransactionRepository,
31    },
32    utils::{
33        is_global_init_recently_completed, is_relayer_recently_synced, poll_until,
34        set_global_init_completed, set_relayer_last_sync, DistributedLock, BOOTSTRAP_LOCK_TTL_SECS,
35        LOCK_POLL_INTERVAL_MS, LOCK_WAIT_MAX_SECS,
36    },
37};
38use color_eyre::{eyre::WrapErr, Result};
39use deadpool_redis::Pool;
40use std::sync::Arc;
41use std::time::Duration;
42use tracing::{debug, info, warn};
43
44/// Staleness threshold in seconds. Initialization completed within this time is skipped.
45/// Set to 5 minutes to prevent redundant initialization on rolling restarts.
46const INIT_STALENESS_THRESHOLD_SECS: u64 = 300;
47
48/// Lock name for global initialization lock.
49const GLOBAL_INIT_LOCK_NAME: &str = "relayer_init_global";
50
51/// Internal function for initializing a relayer using a provided relayer service.
52/// This allows for easier testing with mocked relayers.
53/// Uses generics for static dispatch instead of dynamic dispatch.
54async fn initialize_relayer_with_service<R>(relayer_id: &str, relayer_service: &R) -> Result<()>
55where
56    R: Relayer,
57{
58    debug!(relayer_id = %relayer_id, "initializing relayer");
59
60    relayer_service
61        .initialize_relayer()
62        .await
63        .wrap_err_with(|| format!("Failed to initialize relayer: {relayer_id}"))?;
64
65    Ok(())
66}
67
68pub async fn initialize_relayer<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
69    relayer_id: String,
70    app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
71) -> Result<()>
72where
73    J: JobProducerTrait + Send + Sync + 'static,
74    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
75    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
76    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
77    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
78    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
79    TCR: TransactionCounterTrait + Send + Sync + 'static,
80    PR: PluginRepositoryTrait + Send + Sync + 'static,
81    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
82{
83    let relayer_service = get_network_relayer(relayer_id.clone(), &app_state).await?;
84
85    initialize_relayer_with_service(&relayer_id, &relayer_service).await
86}
87
88/// Collects relayer IDs that need initialization
89pub fn get_relayer_ids_to_initialize(relayers: &[RelayerRepoModel]) -> Vec<String> {
90    relayers.iter().map(|r| r.id.clone()).collect()
91}
92
93pub async fn initialize_relayers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
94    app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
95) -> Result<()>
96where
97    J: JobProducerTrait + Send + Sync + 'static,
98    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
99    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
100    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
101    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
102    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
103    TCR: TransactionCounterTrait + Send + Sync + 'static,
104    PR: PluginRepositoryTrait + Send + Sync + 'static,
105    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
106{
107    let relayers = app_state.relayer_repository.list_all().await?;
108
109    // Early return for empty list - no work to do
110    if relayers.is_empty() {
111        info!("No relayers to initialize");
112        return Ok(());
113    }
114
115    info!(count = relayers.len(), "Initializing relayers");
116
117    // Check if using persistent storage with distributed coordination
118    let use_lock = ServerConfig::get_distributed_mode();
119    let connection_info = app_state.relayer_repository.connection_info();
120
121    match (use_lock, connection_info) {
122        (true, Some((conn, prefix))) => {
123            // Distributed mode: use locking to coordinate across instances
124            coordinate_with_distributed_lock(&relayers, &app_state, &conn, &prefix).await
125        }
126        _ => {
127            // Single-instance mode or in-memory storage: skip locking
128            info!("Initializing relayers without distributed locking");
129            run_initialization_batch(&relayers, &app_state).await
130        }
131    }
132}
133
134/// Coordinates relayer initialization with a distributed lock for multi-instance deployments.
135///
136/// This function handles the coordination logic for distributed initialization:
137/// 1. Check if initialization was recently completed (skip if yes)
138/// 2. Try to acquire global lock
139/// 3. If lock acquired: initialize all relayers and record completion time
140/// 4. If lock held by another instance: wait for completion
141/// 5. If wait times out: recheck state and attempt recovery (lock holder may have crashed)
142/// 6. If lock error: proceed without coordination (graceful degradation)
143async fn coordinate_with_distributed_lock<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
144    relayers: &[RelayerRepoModel],
145    app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
146    conn: &Arc<Pool>,
147    prefix: &str,
148) -> Result<()>
149where
150    J: JobProducerTrait + Send + Sync + 'static,
151    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
152    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
153    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
154    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
155    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
156    TCR: TransactionCounterTrait + Send + Sync + 'static,
157    PR: PluginRepositoryTrait + Send + Sync + 'static,
158    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
159{
160    // Step 1: Check if recently completed for this exact relayer set
161    match was_relayer_set_recently_initialized(conn, prefix, relayers).await {
162        Ok(true) => {
163            info!("Initialization recently completed by another instance, skipping");
164            return Ok(());
165        }
166        Ok(false) => {}
167        Err(e) => {
168            warn!(
169                error = %e,
170                "Failed to check recent initialization status, proceeding with initialization"
171            );
172        }
173    }
174
175    // Step 2: Try to acquire global lock
176    let lock_key = format!("{prefix}:lock:{GLOBAL_INIT_LOCK_NAME}");
177    let lock = DistributedLock::new(
178        conn.clone(),
179        &lock_key,
180        Duration::from_secs(BOOTSTRAP_LOCK_TTL_SECS),
181    );
182
183    let lock_result = lock.try_acquire().await;
184
185    // Handle lock held by another instance
186    if matches!(&lock_result, Ok(None)) {
187        info!("Another instance is initializing relayers, waiting for completion");
188        let completed = wait_for_initialization_complete(conn, prefix).await?;
189
190        if completed {
191            return Ok(());
192        }
193
194        // Timeout reached — the lock holder may have crashed without completing.
195        // Recheck: was initialization completed in the final moments?
196        warn!("Timeout waiting for initialization, rechecking state");
197
198        if is_global_init_recently_completed(conn, prefix, INIT_STALENESS_THRESHOLD_SECS)
199            .await
200            .unwrap_or(false)
201        {
202            info!("Initialization completed during timeout window");
203            return Ok(());
204        }
205
206        // Not completed. Try to acquire the lock and take over initialization.
207        return recover_after_timeout(relayers, app_state, conn, prefix).await;
208    }
209
210    // Handle lock error - graceful degradation (early return)
211    let guard = match lock_result {
212        Ok(Some(g)) => g,
213        Err(e) => {
214            warn!(
215                error = %e,
216                "Failed to acquire distributed lock, proceeding without coordination"
217            );
218            return run_initialization_batch(relayers, app_state).await;
219        }
220        Ok(None) => unreachable!(), // Already handled above
221    };
222
223    if was_relayer_set_recently_initialized(conn, prefix, relayers)
224        .await
225        .unwrap_or(false)
226    {
227        info!("Initialization completed while waiting for the lock, skipping");
228        drop(guard);
229        return Ok(());
230    }
231
232    // Lock acquired - proceed with initialization
233    info!(
234        count = relayers.len(),
235        "Acquired initialization lock, initializing relayers"
236    );
237
238    let result = run_initialization_batch(relayers, app_state).await;
239
240    if result.is_ok() {
241        if let Err(e) = record_relayer_initialization_completion(conn, prefix, relayers).await {
242            warn!(error = %e, "Failed to record initialization completion time");
243        }
244    }
245
246    drop(guard);
247    result
248}
249
250async fn was_relayer_set_recently_initialized(
251    conn: &Arc<Pool>,
252    prefix: &str,
253    relayers: &[RelayerRepoModel],
254) -> Result<bool> {
255    if !is_global_init_recently_completed(conn, prefix, INIT_STALENESS_THRESHOLD_SECS).await? {
256        return Ok(false);
257    }
258
259    for relayer in relayers {
260        if !is_relayer_recently_synced(conn, prefix, &relayer.id, INIT_STALENESS_THRESHOLD_SECS)
261            .await?
262        {
263            return Ok(false);
264        }
265    }
266
267    Ok(true)
268}
269
270async fn record_relayer_initialization_completion(
271    conn: &Arc<Pool>,
272    prefix: &str,
273    relayers: &[RelayerRepoModel],
274) -> Result<()> {
275    for relayer in relayers {
276        set_relayer_last_sync(conn, prefix, &relayer.id).await?;
277    }
278
279    set_global_init_completed(conn, prefix).await
280}
281
282/// Waits for another instance to complete initialization.
283///
284/// Polls periodically until:
285/// - Initialization is completed (detected via recent completion timestamp) → returns `Ok(true)`
286/// - Timeout is reached without completion detected → returns `Ok(false)`
287async fn wait_for_initialization_complete(conn: &Arc<Pool>, prefix: &str) -> Result<bool> {
288    let max_wait = Duration::from_secs(LOCK_WAIT_MAX_SECS);
289    let poll_interval = Duration::from_millis(LOCK_POLL_INTERVAL_MS);
290
291    // Clone values for the closure
292    let conn = conn.clone();
293    let prefix = prefix.to_string();
294
295    poll_until(
296        || is_global_init_recently_completed(&conn, &prefix, INIT_STALENESS_THRESHOLD_SECS),
297        max_wait,
298        poll_interval,
299        "initialization",
300    )
301    .await
302}
303
304/// Attempts to recover after a wait timeout by acquiring the lock and initializing.
305///
306/// This handles the case where the lock holder crashed or errored without completing
307/// initialization. After timeout:
308/// - If the lock can be acquired (previous holder's TTL expired): take over and initialize
309/// - If the lock is still held (another instance is legitimately running): wait one more
310///   bounded period for completion, then initialize as last resort
311/// - If Redis errors: graceful degradation (initialize without coordination)
312async fn recover_after_timeout<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
313    relayers: &[RelayerRepoModel],
314    app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
315    conn: &Arc<Pool>,
316    prefix: &str,
317) -> Result<()>
318where
319    J: JobProducerTrait + Send + Sync + 'static,
320    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
321    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
322    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
323    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
324    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
325    TCR: TransactionCounterTrait + Send + Sync + 'static,
326    PR: PluginRepositoryTrait + Send + Sync + 'static,
327    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
328{
329    let lock_key = format!("{prefix}:lock:{GLOBAL_INIT_LOCK_NAME}");
330    let recovery_lock = DistributedLock::new(
331        conn.clone(),
332        &lock_key,
333        Duration::from_secs(BOOTSTRAP_LOCK_TTL_SECS),
334    );
335
336    match recovery_lock.try_acquire().await {
337        Ok(Some(guard)) => {
338            // Lock expired (holder crashed) — we take over
339            warn!(
340                count = relayers.len(),
341                "Previous lock holder appears to have crashed, taking over initialization"
342            );
343            let result = run_initialization_batch(relayers, app_state).await;
344            if result.is_ok() {
345                if let Err(e) =
346                    record_relayer_initialization_completion(conn, prefix, relayers).await
347                {
348                    warn!(error = %e, "Failed to record initialization completion time");
349                }
350            }
351            drop(guard);
352            result
353        }
354        Ok(None) => {
355            // Lock still held — another instance is legitimately running.
356            // Wait one more bounded period for completion instead of failing.
357            warn!("Lock still held by another instance after timeout, waiting for completion");
358            let completed = wait_for_initialization_complete(conn, prefix).await?;
359
360            if completed {
361                info!("Initialization completed by another instance during extended wait");
362                Ok(())
363            } else {
364                // Extended wait also timed out (~260s total). Proceed with initialization
365                // as a last resort rather than failing — duplicate side effects (notifications,
366                // jobs) are a minor cost compared to no instance initializing at all.
367                warn!("Extended wait also timed out, proceeding with initialization");
368                let result = run_initialization_batch(relayers, app_state).await;
369                if result.is_ok() {
370                    if let Err(e) =
371                        record_relayer_initialization_completion(conn, prefix, relayers).await
372                    {
373                        warn!(error = %e, "Failed to record initialization completion time");
374                    }
375                }
376                result
377            }
378        }
379        Err(e) => {
380            // Redis error — graceful degradation
381            warn!(
382                error = %e,
383                "Failed to check lock after timeout, attempting initialization without coordination"
384            );
385            run_initialization_batch(relayers, app_state).await
386        }
387    }
388}
389
390/// Runs the batch initialization of all relayers concurrently.
391async fn run_initialization_batch<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
392    relayers: &[RelayerRepoModel],
393    app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
394) -> Result<()>
395where
396    J: JobProducerTrait + Send + Sync + 'static,
397    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
398    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
399    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
400    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
401    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
402    TCR: TransactionCounterTrait + Send + Sync + 'static,
403    PR: PluginRepositoryTrait + Send + Sync + 'static,
404    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
405{
406    let futures = relayers.iter().map(|relayer| {
407        let app_state = app_state.clone();
408        let relayer_id = relayer.id.clone();
409
410        async move {
411            let result = initialize_relayer(relayer_id.clone(), app_state).await;
412            (relayer_id, result)
413        }
414    });
415
416    let results = futures::future::join_all(futures).await;
417
418    // Count and report results
419    let succeeded = results.iter().filter(|(_, r)| r.is_ok()).count();
420    let failed = results.iter().filter(|(_, r)| r.is_err()).count();
421
422    info!(
423        succeeded = succeeded,
424        failed = failed,
425        "Relayer initialization completed"
426    );
427
428    // Collect failures and return error if any
429    if failed > 0 {
430        let failures: Vec<String> = results
431            .into_iter()
432            .filter_map(|(id, r)| r.err().map(|e| format!("{id}: {e}")))
433            .collect();
434
435        return Err(eyre::eyre!(
436            "Failed to initialize {} relayer(s): {}",
437            failed,
438            failures.join("; ")
439        ));
440    }
441
442    Ok(())
443}
444
445#[cfg(test)]
446mod tests {
447    use super::*;
448    use crate::{
449        jobs::MockJobProducerTrait,
450        models::{AppState, RepositoryError},
451        repositories::{
452            ApiKeyRepositoryStorage, MockRelayerRepository, NetworkRepositoryStorage,
453            NotificationRepositoryStorage, PluginRepositoryStorage, SignerRepositoryStorage,
454            TransactionCounterRepositoryStorage, TransactionRepositoryStorage,
455        },
456        utils::mocks::mockutils::{create_mock_app_state, create_mock_relayer},
457    };
458    use actix_web::web::ThinData;
459    use std::sync::Arc;
460
461    fn create_app_state_with_mock_relayer_repo(
462        mock_relayer_repo: MockRelayerRepository,
463    ) -> AppState<
464        MockJobProducerTrait,
465        MockRelayerRepository,
466        TransactionRepositoryStorage,
467        NetworkRepositoryStorage,
468        NotificationRepositoryStorage,
469        SignerRepositoryStorage,
470        TransactionCounterRepositoryStorage,
471        PluginRepositoryStorage,
472        ApiKeyRepositoryStorage,
473    > {
474        AppState {
475            relayer_repository: Arc::new(mock_relayer_repo),
476            transaction_repository: Arc::new(TransactionRepositoryStorage::new_in_memory()),
477            signer_repository: Arc::new(SignerRepositoryStorage::new_in_memory()),
478            notification_repository: Arc::new(NotificationRepositoryStorage::new_in_memory()),
479            network_repository: Arc::new(NetworkRepositoryStorage::new_in_memory()),
480            transaction_counter_store: Arc::new(
481                TransactionCounterRepositoryStorage::new_in_memory(),
482            ),
483            job_producer: Arc::new(MockJobProducerTrait::new()),
484            plugin_repository: Arc::new(PluginRepositoryStorage::new_in_memory()),
485            api_key_repository: Arc::new(ApiKeyRepositoryStorage::new_in_memory()),
486        }
487    }
488
489    #[test]
490    fn test_get_relayer_ids_with_empty_list() {
491        let relayers: Vec<RelayerRepoModel> = vec![];
492        let ids = get_relayer_ids_to_initialize(&relayers);
493
494        assert_eq!(ids.len(), 0, "Should return empty list for no relayers");
495    }
496
497    #[test]
498    fn test_get_relayer_ids_with_single_relayer() {
499        let relayers = vec![create_mock_relayer("relayer-1".to_string(), false)];
500
501        let ids = get_relayer_ids_to_initialize(&relayers);
502
503        assert_eq!(ids.len(), 1, "Should return one ID");
504        assert_eq!(ids[0], "relayer-1");
505    }
506
507    #[test]
508    fn test_get_relayer_ids_with_multiple_relayers() {
509        let relayers = vec![
510            create_mock_relayer("evm-relayer".to_string(), false),
511            create_mock_relayer("solana-relayer".to_string(), false),
512            create_mock_relayer("stellar-relayer".to_string(), false),
513        ];
514
515        let ids = get_relayer_ids_to_initialize(&relayers);
516
517        assert_eq!(ids.len(), 3, "Should return three IDs");
518        assert_eq!(ids[0], "evm-relayer");
519        assert_eq!(ids[1], "solana-relayer");
520        assert_eq!(ids[2], "stellar-relayer");
521    }
522
523    #[test]
524    fn test_get_relayer_ids_with_mixed_states() {
525        let mut relayers = vec![
526            create_mock_relayer("active-relayer".to_string(), false),
527            create_mock_relayer("paused-relayer".to_string(), false),
528            create_mock_relayer("disabled-relayer".to_string(), false),
529        ];
530
531        // Modify states
532        relayers[1].paused = true;
533        relayers[2].system_disabled = true;
534
535        let ids = get_relayer_ids_to_initialize(&relayers);
536
537        // Should include ALL relayers regardless of state (initialization handles state)
538        assert_eq!(
539            ids.len(),
540            3,
541            "Should include all relayers regardless of state"
542        );
543        assert!(ids.contains(&"active-relayer".to_string()));
544        assert!(ids.contains(&"paused-relayer".to_string()));
545        assert!(ids.contains(&"disabled-relayer".to_string()));
546    }
547
548    #[test]
549    fn test_get_relayer_ids_with_different_network_types() {
550        let relayers = vec![
551            create_mock_relayer("evm-1".to_string(), false),
552            create_mock_relayer("evm-2".to_string(), false),
553            create_mock_relayer("solana-1".to_string(), false),
554            create_mock_relayer("stellar-1".to_string(), false),
555        ];
556
557        let ids = get_relayer_ids_to_initialize(&relayers);
558
559        assert_eq!(ids.len(), 4, "Should include all network types");
560
561        // Verify all network types are included
562        assert!(ids.iter().any(|id| id.starts_with("evm-")));
563        assert!(ids.iter().any(|id| id.starts_with("solana-")));
564        assert!(ids.iter().any(|id| id.starts_with("stellar-")));
565    }
566
567    #[test]
568    fn test_concurrent_initialization_count() {
569        // This test verifies the number of concurrent initializations
570        // that would be triggered for different relayer counts
571
572        let test_cases = vec![
573            (0, 0),   // No relayers = no initializations
574            (1, 1),   // One relayer = one initialization
575            (5, 5),   // Five relayers = five concurrent initializations
576            (10, 10), // Ten relayers = ten concurrent initializations
577        ];
578
579        for (relayer_count, expected_init_count) in test_cases {
580            let relayers: Vec<RelayerRepoModel> = (0..relayer_count)
581                .map(|i| create_mock_relayer(format!("relayer-{i}"), false))
582                .collect();
583
584            let ids = get_relayer_ids_to_initialize(&relayers);
585
586            assert_eq!(
587                ids.len(),
588                expected_init_count,
589                "Should create {expected_init_count} initializations for {relayer_count} relayers"
590            );
591        }
592    }
593
594    #[tokio::test]
595    async fn test_initialize_relayer_with_service_success() {
596        use crate::domain::MockRelayer;
597
598        let mut mock_relayer = MockRelayer::new();
599        mock_relayer
600            .expect_initialize_relayer()
601            .times(1)
602            .returning(|| Box::pin(async { Ok(()) }));
603
604        let result = initialize_relayer_with_service("test-relayer", &mock_relayer).await;
605
606        assert!(result.is_ok(), "Should successfully initialize relayer");
607    }
608
609    #[tokio::test]
610    async fn test_initialize_relayer_with_service_failure() {
611        use crate::domain::MockRelayer;
612        use crate::models::RelayerError;
613
614        let mut mock_relayer = MockRelayer::new();
615        mock_relayer
616            .expect_initialize_relayer()
617            .times(1)
618            .returning(|| {
619                Box::pin(async {
620                    Err(RelayerError::ProviderError(
621                        "RPC connection failed".to_string(),
622                    ))
623                })
624            });
625
626        let result = initialize_relayer_with_service("test-relayer", &mock_relayer).await;
627
628        assert!(
629            result.is_err(),
630            "Should fail when initialize_relayer returns error"
631        );
632        let err = result.unwrap_err();
633        assert!(err
634            .to_string()
635            .contains("Failed to initialize relayer: test-relayer"));
636    }
637
638    #[tokio::test]
639    async fn test_initialize_relayer_with_service_called_once() {
640        use crate::domain::MockRelayer;
641
642        let mut mock_relayer = MockRelayer::new();
643        // Verify that initialize_relayer is called exactly once
644        mock_relayer
645            .expect_initialize_relayer()
646            .times(1)
647            .returning(|| Box::pin(async { Ok(()) }));
648
649        let _ = initialize_relayer_with_service("relayer-123", &mock_relayer).await;
650
651        // Mock will panic if expectations aren't met (called more/less than once)
652    }
653
654    #[tokio::test]
655    async fn test_initialize_relayer_with_service_multiple_relayers() {
656        use crate::domain::MockRelayer;
657
658        // Test that we can call initialize_relayer_with_service multiple times
659        let mut mock_relayer_1 = MockRelayer::new();
660        mock_relayer_1
661            .expect_initialize_relayer()
662            .times(1)
663            .returning(|| Box::pin(async { Ok(()) }));
664
665        let mut mock_relayer_2 = MockRelayer::new();
666        mock_relayer_2
667            .expect_initialize_relayer()
668            .times(1)
669            .returning(|| Box::pin(async { Ok(()) }));
670
671        let result1 = initialize_relayer_with_service("relayer-1", &mock_relayer_1).await;
672        let result2 = initialize_relayer_with_service("relayer-2", &mock_relayer_2).await;
673
674        assert!(
675            result1.is_ok(),
676            "First relayer should initialize successfully"
677        );
678        assert!(
679            result2.is_ok(),
680            "Second relayer should initialize successfully"
681        );
682    }
683
684    // Tests for constants
685    #[test]
686    fn test_lock_ttl_is_reasonable() {
687        // Lock TTL should be at least 60 seconds to handle slow initializations
688        assert!(
689            BOOTSTRAP_LOCK_TTL_SECS >= 60,
690            "Lock TTL should be at least 60 seconds"
691        );
692        // But not too long (more than 10 minutes would be excessive)
693        assert!(
694            BOOTSTRAP_LOCK_TTL_SECS <= 600,
695            "Lock TTL should not exceed 10 minutes"
696        );
697    }
698
699    #[test]
700    fn test_staleness_threshold_is_reasonable() {
701        // Staleness threshold should be at least 60 seconds
702        assert!(
703            INIT_STALENESS_THRESHOLD_SECS >= 60,
704            "Staleness threshold should be at least 60 seconds"
705        );
706        // But not too long (more than 1 hour would be excessive)
707        assert!(
708            INIT_STALENESS_THRESHOLD_SECS <= 3600,
709            "Staleness threshold should not exceed 1 hour"
710        );
711    }
712
713    #[test]
714    fn test_wait_max_duration_exceeds_lock_ttl() {
715        // Wait duration should be longer than lock TTL to handle edge cases
716        assert!(
717            LOCK_WAIT_MAX_SECS > BOOTSTRAP_LOCK_TTL_SECS,
718            "Wait duration should exceed lock TTL"
719        );
720    }
721
722    #[test]
723    fn test_poll_interval_is_reasonable() {
724        // Poll interval should be at least 100ms to avoid excessive polling
725        assert!(
726            LOCK_POLL_INTERVAL_MS >= 100,
727            "Poll interval should be at least 100ms"
728        );
729        // But not too long (more than 5 seconds would be slow)
730        assert!(
731            LOCK_POLL_INTERVAL_MS <= 5000,
732            "Poll interval should not exceed 5 seconds"
733        );
734    }
735
736    // Tests for get_relayer_ids_to_initialize edge cases
737    #[test]
738    fn test_get_relayer_ids_preserves_order() {
739        let relayers = vec![
740            create_mock_relayer("z-relayer".to_string(), false),
741            create_mock_relayer("a-relayer".to_string(), false),
742            create_mock_relayer("m-relayer".to_string(), false),
743        ];
744
745        let ids = get_relayer_ids_to_initialize(&relayers);
746
747        // Should preserve insertion order, not sort
748        assert_eq!(ids[0], "z-relayer");
749        assert_eq!(ids[1], "a-relayer");
750        assert_eq!(ids[2], "m-relayer");
751    }
752
753    #[test]
754    fn test_get_relayer_ids_with_special_characters() {
755        let relayers = vec![
756            create_mock_relayer("relayer-with-dashes".to_string(), false),
757            create_mock_relayer("relayer_with_underscores".to_string(), false),
758            create_mock_relayer("relayer.with.dots".to_string(), false),
759        ];
760
761        let ids = get_relayer_ids_to_initialize(&relayers);
762
763        assert_eq!(ids.len(), 3);
764        assert!(ids.contains(&"relayer-with-dashes".to_string()));
765        assert!(ids.contains(&"relayer_with_underscores".to_string()));
766        assert!(ids.contains(&"relayer.with.dots".to_string()));
767    }
768
769    #[test]
770    fn test_get_relayer_ids_with_large_list() {
771        let relayers: Vec<RelayerRepoModel> = (0..100)
772            .map(|i| create_mock_relayer(format!("relayer-{:03}", i), false))
773            .collect();
774
775        let ids = get_relayer_ids_to_initialize(&relayers);
776
777        assert_eq!(ids.len(), 100);
778        assert_eq!(ids[0], "relayer-000");
779        assert_eq!(ids[99], "relayer-099");
780    }
781
782    // Test error message formatting
783    #[tokio::test]
784    async fn test_initialize_relayer_with_service_error_includes_relayer_id() {
785        use crate::domain::MockRelayer;
786        use crate::models::RelayerError;
787
788        let mut mock_relayer = MockRelayer::new();
789        mock_relayer
790            .expect_initialize_relayer()
791            .times(1)
792            .returning(|| {
793                Box::pin(async {
794                    Err(RelayerError::NetworkConfiguration("bad config".to_string()))
795                })
796            });
797
798        let result = initialize_relayer_with_service("my-special-relayer-id", &mock_relayer).await;
799
800        assert!(result.is_err());
801        let err_str = result.unwrap_err().to_string();
802        assert!(
803            err_str.contains("my-special-relayer-id"),
804            "Error should contain relayer ID, got: {}",
805            err_str
806        );
807    }
808
809    #[tokio::test]
810    async fn test_initialize_relayer_with_service_provider_error() {
811        use crate::domain::MockRelayer;
812        use crate::models::RelayerError;
813
814        let mut mock_relayer = MockRelayer::new();
815        mock_relayer
816            .expect_initialize_relayer()
817            .times(1)
818            .returning(|| {
819                Box::pin(async { Err(RelayerError::ProviderError("provider failed".to_string())) })
820            });
821
822        let result = initialize_relayer_with_service("test-relayer", &mock_relayer).await;
823        assert!(result.is_err(), "Should fail for ProviderError");
824    }
825
826    #[tokio::test]
827    async fn test_initialize_relayer_with_service_network_config_error() {
828        use crate::domain::MockRelayer;
829        use crate::models::RelayerError;
830
831        let mut mock_relayer = MockRelayer::new();
832        mock_relayer
833            .expect_initialize_relayer()
834            .times(1)
835            .returning(|| {
836                Box::pin(async {
837                    Err(RelayerError::NetworkConfiguration(
838                        "network config error".to_string(),
839                    ))
840                })
841            });
842
843        let result = initialize_relayer_with_service("test-relayer", &mock_relayer).await;
844        assert!(
845            result.is_err(),
846            "Should fail for NetworkConfiguration error"
847        );
848    }
849
850    // ============================================================================
851    // Integration tests for run_initialization_batch, coordinate_with_distributed_lock,
852    // wait_for_initialization_complete, and initialize_relayers
853    // ============================================================================
854
855    /// Helper to create a Redis connection pool for integration tests.
856    async fn create_test_redis_pool() -> Option<Arc<Pool>> {
857        let cfg = deadpool_redis::Config::from_url("redis://127.0.0.1:6379");
858        let pool = cfg
859            .builder()
860            .ok()?
861            .max_size(16)
862            .runtime(deadpool_redis::Runtime::Tokio1)
863            .build()
864            .ok()?;
865        Some(Arc::new(pool))
866    }
867
868    fn create_unreachable_redis_pool() -> Arc<Pool> {
869        let cfg = deadpool_redis::Config::from_url("redis://127.0.0.1:1");
870        let pool = cfg
871            .builder()
872            .expect("should create deadpool builder")
873            .max_size(1)
874            .runtime(deadpool_redis::Runtime::Tokio1)
875            .build()
876            .expect("should build deadpool");
877        Arc::new(pool)
878    }
879
880    // --- Tests for run_initialization_batch ---
881
882    #[tokio::test]
883    async fn test_run_initialization_batch_empty_list() {
884        let app_state = create_mock_app_state(None, None, None, None, None, None).await;
885        let thin_state = ThinData(app_state);
886
887        let relayers: Vec<RelayerRepoModel> = vec![];
888        let result = run_initialization_batch(&relayers, &thin_state).await;
889
890        assert!(
891            result.is_ok(),
892            "Should succeed with empty relayer list: {:?}",
893            result
894        );
895    }
896
897    #[tokio::test]
898    async fn test_run_initialization_batch_handles_failures() {
899        let relayers = vec![create_mock_relayer("failing-relayer".to_string(), false)];
900
901        let app_state =
902            create_mock_app_state(None, Some(relayers.clone()), None, None, None, None).await;
903        let thin_state = ThinData(app_state);
904
905        let result = run_initialization_batch(&relayers, &thin_state).await;
906
907        assert!(result.is_err(), "Should fail due to missing signer");
908    }
909
910    #[tokio::test]
911    async fn test_run_initialization_batch_concurrent_execution() {
912        let relayers: Vec<RelayerRepoModel> = (0..5)
913            .map(|i| create_mock_relayer(format!("concurrent-relayer-{}", i), false))
914            .collect();
915
916        let app_state =
917            create_mock_app_state(None, Some(relayers.clone()), None, None, None, None).await;
918        let thin_state = ThinData(app_state);
919
920        // This will fail because signers aren't configured, but it tests concurrent execution
921        let result = run_initialization_batch(&relayers, &thin_state).await;
922
923        assert!(result.is_err(), "Should fail due to missing signers");
924        // The error message should mention the failed relayers
925        let err_str = result.unwrap_err().to_string();
926        assert!(
927            err_str.contains("Failed to initialize"),
928            "Error should mention initialization failure"
929        );
930    }
931
932    #[tokio::test]
933    async fn test_run_initialization_batch_reports_each_failed_relayer_id() {
934        let relayers = vec![
935            create_mock_relayer("batch-relayer-1".to_string(), false),
936            create_mock_relayer("batch-relayer-2".to_string(), false),
937        ];
938
939        let app_state =
940            create_mock_app_state(None, Some(relayers.clone()), None, None, None, None).await;
941        let thin_state = ThinData(app_state);
942
943        let result = run_initialization_batch(&relayers, &thin_state).await;
944
945        assert!(result.is_err(), "Should fail due to missing signers");
946
947        let err = result.unwrap_err().to_string();
948        assert!(err.contains("Failed to initialize 2 relayer(s)"));
949        assert!(err.contains("batch-relayer-1"));
950        assert!(err.contains("batch-relayer-2"));
951    }
952
953    // --- Tests for recover_after_timeout ---
954
955    #[tokio::test]
956    async fn test_recover_after_timeout_gracefully_degrades_when_redis_errors() {
957        let relayers = vec![create_mock_relayer(
958            "recovery-error-relayer".to_string(),
959            false,
960        )];
961        let app_state =
962            create_mock_app_state(None, Some(relayers.clone()), None, None, None, None).await;
963        let thin_state = ThinData(app_state);
964        let unreachable_pool = create_unreachable_redis_pool();
965
966        let result =
967            recover_after_timeout(&relayers, &thin_state, &unreachable_pool, "recover_err").await;
968
969        assert!(result.is_err(), "Should fall back to initialization batch");
970        let err = result.unwrap_err().to_string();
971        assert!(err.contains("Failed to initialize"));
972        assert!(err.contains("recovery-error-relayer"));
973    }
974
975    #[tokio::test]
976    #[ignore] // Requires running Redis instance
977    async fn test_recover_after_timeout_acquires_lock_and_attempts_initialization() {
978        let conn = create_test_redis_pool()
979            .await
980            .expect("Redis connection required");
981        let relayers = vec![create_mock_relayer(
982            "recovery-lock-relayer".to_string(),
983            false,
984        )];
985        let app_state =
986            create_mock_app_state(None, Some(relayers.clone()), None, None, None, None).await;
987        let thin_state = ThinData(app_state);
988        let prefix = "test_recover_after_timeout_acquire";
989
990        {
991            let mut conn_clone = conn.get().await.expect("Failed to get connection");
992            let hash_key = format!("{prefix}:relayer_sync_meta");
993            let lock_key = format!("{prefix}:lock:{GLOBAL_INIT_LOCK_NAME}");
994            let _: Result<(), _> = redis::AsyncCommands::del(&mut conn_clone, &hash_key).await;
995            let _: Result<(), _> = redis::AsyncCommands::del(&mut conn_clone, &lock_key).await;
996        }
997
998        let result = recover_after_timeout(&relayers, &thin_state, &conn, prefix).await;
999
1000        assert!(
1001            result.is_err(),
1002            "Should attempt initialization after taking over the lock"
1003        );
1004
1005        let is_recent = is_global_init_recently_completed(&conn, prefix, 300)
1006            .await
1007            .expect("Should check completion");
1008        assert!(
1009            !is_recent,
1010            "Should not record completion time when initialization fails"
1011        );
1012
1013        {
1014            let mut conn_clone = conn.get().await.expect("Failed to get connection");
1015            let hash_key = format!("{prefix}:relayer_sync_meta");
1016            let lock_key = format!("{prefix}:lock:{GLOBAL_INIT_LOCK_NAME}");
1017            let _: Result<(), _> = redis::AsyncCommands::del(&mut conn_clone, &hash_key).await;
1018            let _: Result<(), _> = redis::AsyncCommands::del(&mut conn_clone, &lock_key).await;
1019        }
1020    }
1021
1022    // --- Tests for coordinate_with_distributed_lock (requires Redis) ---
1023
1024    #[tokio::test]
1025    #[ignore] // Requires running Redis instance
1026    async fn test_coordinate_with_distributed_lock_skips_when_recently_completed() {
1027        let conn = create_test_redis_pool()
1028            .await
1029            .expect("Redis connection required");
1030
1031        let relayers = vec![create_mock_relayer(
1032            "global-lock-relayer".to_string(),
1033            false,
1034        )];
1035        let app_state =
1036            create_mock_app_state(None, Some(relayers.clone()), None, None, None, None).await;
1037        let thin_state = ThinData(app_state);
1038
1039        let prefix = "test_global_skip_recent";
1040
1041        // Set completion markers to simulate recent initialization of this relayer set
1042        set_relayer_last_sync(&conn, prefix, &relayers[0].id)
1043            .await
1044            .expect("Should set relayer sync time");
1045        set_global_init_completed(&conn, prefix)
1046            .await
1047            .expect("Should set completion time");
1048
1049        let result = coordinate_with_distributed_lock(&relayers, &thin_state, &conn, prefix).await;
1050
1051        // Should succeed because it skips (recently completed)
1052        assert!(
1053            result.is_ok(),
1054            "Should skip initialization when recently completed: {:?}",
1055            result
1056        );
1057
1058        // Cleanup
1059        let mut conn_clone = conn.get().await.expect("Failed to get connection");
1060        let hash_key = format!("{}:relayer_sync_meta", prefix);
1061        let _: Result<(), _> = redis::AsyncCommands::del(&mut conn_clone, &hash_key).await;
1062    }
1063
1064    #[tokio::test]
1065    #[ignore] // Requires running Redis instance
1066    async fn test_coordinate_with_distributed_lock_does_not_skip_without_relayer_sync_markers() {
1067        let conn = create_test_redis_pool()
1068            .await
1069            .expect("Redis connection required");
1070
1071        let relayers = vec![create_mock_relayer(
1072            "global-lock-no-sync-relayer".to_string(),
1073            false,
1074        )];
1075        let app_state =
1076            create_mock_app_state(None, Some(relayers.clone()), None, None, None, None).await;
1077        let thin_state = ThinData(app_state);
1078
1079        let prefix = "test_global_skip_without_relayer_sync";
1080
1081        set_global_init_completed(&conn, prefix)
1082            .await
1083            .expect("Should set completion time");
1084
1085        let result = coordinate_with_distributed_lock(&relayers, &thin_state, &conn, prefix).await;
1086
1087        assert!(
1088            result.is_err(),
1089            "Should not skip initialization when relayer sync markers are missing"
1090        );
1091
1092        let mut conn_clone = conn.get().await.expect("Failed to get connection");
1093        let hash_key = format!("{prefix}:relayer_sync_meta");
1094        let _: Result<(), _> = redis::AsyncCommands::del(&mut conn_clone, &hash_key).await;
1095    }
1096
1097    #[tokio::test]
1098    #[ignore] // Requires running Redis instance
1099    async fn test_coordinate_with_distributed_lock_acquires_lock() {
1100        let conn = create_test_redis_pool()
1101            .await
1102            .expect("Redis connection required");
1103
1104        let relayers = vec![create_mock_relayer(
1105            "lock-acquire-relayer".to_string(),
1106            false,
1107        )];
1108        let app_state =
1109            create_mock_app_state(None, Some(relayers.clone()), None, None, None, None).await;
1110        let thin_state = ThinData(app_state);
1111
1112        let prefix = "test_global_acquire_lock";
1113
1114        // Clear any existing state
1115        {
1116            let mut conn_clone = conn.get().await.expect("Failed to get connection");
1117            let hash_key = format!("{}:relayer_sync_meta", prefix);
1118            let lock_key = format!("{}:lock:{}", prefix, GLOBAL_INIT_LOCK_NAME);
1119            let _: Result<(), _> = redis::AsyncCommands::del(&mut conn_clone, &hash_key).await;
1120            let _: Result<(), _> = redis::AsyncCommands::del(&mut conn_clone, &lock_key).await;
1121        }
1122
1123        let result = coordinate_with_distributed_lock(&relayers, &thin_state, &conn, prefix).await;
1124
1125        // Will fail because signer isn't configured, but lock should have been acquired
1126        assert!(
1127            result.is_err(),
1128            "Should fail due to missing signer configuration"
1129        );
1130
1131        // Verify completion time was NOT set (because initialization failed)
1132        let is_recent = is_global_init_recently_completed(&conn, prefix, 300)
1133            .await
1134            .expect("Should check completion");
1135        assert!(!is_recent, "Should NOT record completion time on failure");
1136
1137        // Cleanup
1138        {
1139            let mut conn_clone = conn.get().await.expect("Failed to get connection");
1140            let hash_key = format!("{}:relayer_sync_meta", prefix);
1141            let lock_key = format!("{}:lock:{}", prefix, GLOBAL_INIT_LOCK_NAME);
1142            let _: Result<(), _> = redis::AsyncCommands::del(&mut conn_clone, &hash_key).await;
1143            let _: Result<(), _> = redis::AsyncCommands::del(&mut conn_clone, &lock_key).await;
1144        }
1145    }
1146
1147    #[tokio::test]
1148    #[ignore] // Requires running Redis instance
1149    async fn test_coordinate_with_distributed_lock_waits_when_lock_held() {
1150        let conn = create_test_redis_pool()
1151            .await
1152            .expect("Redis connection required");
1153
1154        let relayers = vec![create_mock_relayer("wait-relayer".to_string(), false)];
1155        let app_state =
1156            create_mock_app_state(None, Some(relayers.clone()), None, None, None, None).await;
1157        let thin_state = ThinData(app_state);
1158
1159        let prefix = "test_global_wait_lock";
1160        let hash_key = format!("{}:relayer_sync_meta", prefix);
1161        let lock_key = format!("{}:lock:{}", prefix, GLOBAL_INIT_LOCK_NAME);
1162
1163        // Clear any existing state
1164        {
1165            let mut conn_clone = conn.get().await.expect("Failed to get connection");
1166            let _: Result<(), _> = redis::AsyncCommands::del(&mut conn_clone, &hash_key).await;
1167            let _: Result<(), _> = redis::AsyncCommands::del(&mut conn_clone, &lock_key).await;
1168        }
1169
1170        // Acquire lock to simulate another instance initializing
1171        let lock = DistributedLock::new(conn.clone(), &lock_key, Duration::from_secs(5));
1172        let guard = lock
1173            .try_acquire()
1174            .await
1175            .expect("Should acquire lock")
1176            .expect("Lock should be available");
1177
1178        // Spawn task to release lock and set completion after a short delay
1179        let conn_for_task = conn.clone();
1180        let prefix_for_task = prefix.to_string();
1181        tokio::spawn(async move {
1182            tokio::time::sleep(Duration::from_millis(500)).await;
1183            set_relayer_last_sync(&conn_for_task, &prefix_for_task, "wait-relayer")
1184                .await
1185                .expect("Should set relayer sync");
1186            set_global_init_completed(&conn_for_task, &prefix_for_task)
1187                .await
1188                .expect("Should set completion");
1189            guard.release().await.expect("Should release lock");
1190        });
1191
1192        // This should wait and then succeed (because completion will be set)
1193        let result = coordinate_with_distributed_lock(&relayers, &thin_state, &conn, prefix).await;
1194
1195        assert!(
1196            result.is_ok(),
1197            "Should succeed after waiting for completion: {:?}",
1198            result
1199        );
1200
1201        // Cleanup
1202        {
1203            let mut conn_clone = conn.get().await.expect("Failed to get connection");
1204            let _: Result<(), _> = redis::AsyncCommands::del(&mut conn_clone, &hash_key).await;
1205            let _: Result<(), _> = redis::AsyncCommands::del(&mut conn_clone, &lock_key).await;
1206        }
1207    }
1208
1209    // --- Tests for initialize_relayers main function ---
1210
1211    #[tokio::test]
1212    async fn test_initialize_relayers_empty_list() {
1213        let app_state = create_mock_app_state(None, None, None, None, None, None).await;
1214        let thin_state = ThinData(app_state);
1215
1216        let result = initialize_relayers(thin_state).await;
1217
1218        assert!(
1219            result.is_ok(),
1220            "Should succeed with empty relayer list: {:?}",
1221            result
1222        );
1223    }
1224
1225    #[tokio::test]
1226    async fn test_initialize_relayers_uses_in_memory_path() {
1227        let relayers = vec![create_mock_relayer("inmem-relayer".to_string(), false)];
1228        let app_state =
1229            create_mock_app_state(None, Some(relayers.clone()), None, None, None, None).await;
1230        let thin_state = ThinData(app_state);
1231
1232        let result = initialize_relayers(thin_state).await;
1233
1234        assert!(
1235            result.is_err(),
1236            "Should fail due to missing signer configuration"
1237        );
1238    }
1239
1240    #[tokio::test]
1241    async fn test_initialize_relayer_returns_error_when_relayer_is_missing() {
1242        let app_state = create_mock_app_state(None, None, None, None, None, None).await;
1243        let thin_state = ThinData(app_state);
1244
1245        let result = initialize_relayer("missing-relayer".to_string(), thin_state).await;
1246
1247        assert!(result.is_err());
1248        assert!(result.unwrap_err().to_string().contains("missing-relayer"));
1249    }
1250
1251    #[tokio::test]
1252    async fn test_initialize_relayer_returns_error_when_signer_is_missing() {
1253        let relayers = vec![create_mock_relayer("signerless-relayer".to_string(), false)];
1254        let app_state = create_mock_app_state(None, Some(relayers), None, None, None, None).await;
1255        let thin_state = ThinData(app_state);
1256
1257        let result = initialize_relayer("signerless-relayer".to_string(), thin_state).await;
1258
1259        assert!(result.is_err());
1260        assert!(result.unwrap_err().to_string().contains("test"));
1261    }
1262
1263    #[tokio::test]
1264    async fn test_initialize_relayers_propagates_list_all_error() {
1265        let mut mock_relayer_repo = MockRelayerRepository::new();
1266        mock_relayer_repo.expect_list_all().times(1).returning(|| {
1267            Err(RepositoryError::ConnectionError(
1268                "relayer repository unavailable".to_string(),
1269            ))
1270        });
1271
1272        let thin_state = ThinData(create_app_state_with_mock_relayer_repo(mock_relayer_repo));
1273
1274        let result = initialize_relayers(thin_state).await;
1275
1276        assert!(result.is_err());
1277        assert!(result
1278            .unwrap_err()
1279            .to_string()
1280            .contains("relayer repository unavailable"));
1281    }
1282}