openzeppelin_relayer/bootstrap/
config_processor.rs

1//! This module provides functionality for processing configuration files and populating
2//! repositories.
3//!
4//! ## Distributed Locking for Config Processing
5//!
6//! When multiple instances of the relayer service start simultaneously with Redis storage
7//! and `DISTRIBUTED_MODE` is enabled, this module uses distributed locking to coordinate
8//! config processing and prevent race conditions:
9//!
10//! - **Global lock**: A single lock is used for the entire config processing,
11//!   ensuring only one instance processes the config at a time.
12//! - **Post-lock population check**: After acquiring the lock, checks if Redis is already
13//!   populated (by another instance that held the lock first), and skips if so.
14//! - **Single-instance mode**: When `DISTRIBUTED_MODE` is disabled (default) or using
15//!   in-memory storage, locking is skipped since coordination is not needed.
16use std::sync::Arc;
17use std::time::Duration;
18
19use crate::{
20    config::{Config, RepositoryStorageType, ServerConfig},
21    jobs::JobProducerTrait,
22    models::{
23        ApiKeyRepoModel, NetworkRepoModel, NotificationRepoModel, PluginModel, Relayer,
24        RelayerRepoModel, Signer as SignerDomainModel, SignerFileConfig, SignerRepoModel,
25        ThinDataAppState, TransactionRepoModel,
26    },
27    repositories::{
28        ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository,
29        Repository, TransactionCounterTrait, TransactionRepository,
30    },
31    services::signer::{Signer as SignerService, SignerFactory},
32    utils::{
33        is_config_processing_completed, is_config_processing_in_progress, poll_until,
34        set_config_processing_completed, set_config_processing_in_progress, DistributedLock,
35        BOOTSTRAP_LOCK_TTL_SECS, LOCK_POLL_INTERVAL_MS, LOCK_WAIT_MAX_SECS,
36    },
37};
38use color_eyre::{eyre::WrapErr, Report, Result};
39use deadpool_redis::Pool;
40use futures::future::try_join_all;
41use tracing::{info, warn};
42
43/// Lock name for config processing lock.
44const CONFIG_PROCESSING_LOCK_NAME: &str = "config_processing";
45
46#[derive(Debug, PartialEq, Eq)]
47enum ConfigBootstrapState {
48    Empty,
49    Complete,
50    Incomplete { missing: Vec<String> },
51}
52
53async fn process_api_key<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
54    server_config: &ServerConfig,
55    app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
56) -> Result<()>
57where
58    J: JobProducerTrait + Send + Sync + 'static,
59    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
60    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
61    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
62    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
63    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
64    TCR: TransactionCounterTrait + Send + Sync + 'static,
65    PR: PluginRepositoryTrait + Send + Sync + 'static,
66    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
67{
68    let api_key_model = ApiKeyRepoModel::new(
69        "default".to_string(),
70        server_config.api_key.clone(),
71        vec!["*".to_string()],
72        vec!["*".to_string()],
73    );
74
75    app_state
76        .api_key_repository
77        .create(api_key_model)
78        .await
79        .wrap_err("Failed to create api key repository entry")?;
80
81    Ok(())
82}
83
84/// Process all plugins from the config file and store them in the repository.
85async fn process_plugins<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
86    config_file: &Config,
87    app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
88) -> Result<()>
89where
90    J: JobProducerTrait + Send + Sync + 'static,
91    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
92    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
93    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
94    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
95    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
96    TCR: TransactionCounterTrait + Send + Sync + 'static,
97    PR: PluginRepositoryTrait + Send + Sync + 'static,
98    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
99{
100    if let Some(plugins) = &config_file.plugins {
101        let plugin_futures = plugins.iter().map(|plugin| async {
102            let plugin_model = PluginModel::try_from(plugin.clone())
103                .wrap_err("Failed to convert plugin config")?;
104            app_state
105                .plugin_repository
106                .add(plugin_model)
107                .await
108                .wrap_err("Failed to create plugin repository entry")?;
109            Ok::<(), Report>(())
110        });
111
112        try_join_all(plugin_futures)
113            .await
114            .wrap_err("Failed to initialize plugin repository")?;
115        Ok(())
116    } else {
117        Ok(())
118    }
119}
120
121/// Process a signer configuration from the config file and convert it into a `SignerRepoModel`.
122async fn process_signer(signer: &SignerFileConfig) -> Result<SignerRepoModel> {
123    // Convert config to domain model (this validates and applies business logic)
124    let domain_signer = SignerDomainModel::try_from(signer.clone())
125        .wrap_err("Failed to convert signer config to domain model")?;
126
127    // Convert domain model to repository model for storage
128    let signer_repo_model = SignerRepoModel::from(domain_signer);
129
130    Ok(signer_repo_model)
131}
132
133/// Process all signers from the config file and store them in the repository.
134///
135/// For each signer in the config file:
136/// 1. Process it using `process_signer` (config -> domain -> repository)
137/// 2. Store the resulting repository model
138///
139/// This function processes signers in parallel using futures.
140async fn process_signers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
141    config_file: &Config,
142    app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
143) -> Result<()>
144where
145    J: JobProducerTrait + Send + Sync + 'static,
146    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
147    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
148    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
149    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
150    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
151    TCR: TransactionCounterTrait + Send + Sync + 'static,
152    PR: PluginRepositoryTrait + Send + Sync + 'static,
153    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
154{
155    let signer_futures = config_file.signers.iter().map(|signer| async {
156        let signer_repo_model = process_signer(signer).await?;
157
158        app_state
159            .signer_repository
160            .create(signer_repo_model)
161            .await
162            .wrap_err("Failed to create signer repository entry")?;
163        Ok::<(), Report>(())
164    });
165
166    try_join_all(signer_futures)
167        .await
168        .wrap_err("Failed to initialize signer repository")?;
169    Ok(())
170}
171
172/// Process all notification configurations from the config file and store them in the repository.
173///
174/// For each notification in the config file:
175/// 1. Convert it to a repository model
176/// 2. Store the resulting model in the repository
177///
178/// This function processes notifications in parallel using futures.
179async fn process_notifications<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
180    config_file: &Config,
181    app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
182) -> Result<()>
183where
184    J: JobProducerTrait + Send + Sync + 'static,
185    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
186    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
187    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
188    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
189    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
190    TCR: TransactionCounterTrait + Send + Sync + 'static,
191    PR: PluginRepositoryTrait + Send + Sync + 'static,
192    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
193{
194    let notification_futures = config_file.notifications.iter().map(|notification| async {
195        let notification_repo_model = NotificationRepoModel::try_from(notification.clone())
196            .wrap_err("Failed to convert notification config")?;
197
198        app_state
199            .notification_repository
200            .create(notification_repo_model)
201            .await
202            .wrap_err("Failed to create notification repository entry")?;
203        Ok::<(), Report>(())
204    });
205
206    try_join_all(notification_futures)
207        .await
208        .wrap_err("Failed to initialize notification repository")?;
209    Ok(())
210}
211
212/// Process all network configurations from the config file and store them in the repository.
213///
214/// For each network in the config file:
215/// 1. Convert it to a repository model using TryFrom
216/// 2. Store the resulting model in the repository
217///
218/// This function processes networks in parallel using futures.
219async fn process_networks<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
220    config_file: &Config,
221    app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
222) -> Result<()>
223where
224    J: JobProducerTrait + Send + Sync + 'static,
225    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
226    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
227    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
228    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
229    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
230    TCR: TransactionCounterTrait + Send + Sync + 'static,
231    PR: PluginRepositoryTrait + Send + Sync + 'static,
232    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
233{
234    let network_futures = config_file.networks.iter().map(|network| async move {
235        let network_repo_model = NetworkRepoModel::try_from(network.clone())?;
236
237        app_state
238            .network_repository
239            .create(network_repo_model)
240            .await
241            .wrap_err("Failed to create network repository entry")?;
242        Ok::<(), Report>(())
243    });
244
245    try_join_all(network_futures)
246        .await
247        .wrap_err("Failed to initialize network repository")?;
248    Ok(())
249}
250
251/// Process all relayer configurations from the config file and store them in the repository.
252///
253/// For each relayer in the config file:
254/// 1. Convert it to a repository model
255/// 2. Retrieve the associated signer
256/// 3. Create a signer service
257/// 4. Get the signer's address and add it to the relayer model
258/// 5. Store the resulting model in the repository
259///
260/// This function processes relayers in parallel using futures.
261async fn process_relayers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
262    config_file: &Config,
263    app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
264) -> Result<()>
265where
266    J: JobProducerTrait + Send + Sync + 'static,
267    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
268    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
269    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
270    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
271    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
272    TCR: TransactionCounterTrait + Send + Sync + 'static,
273    PR: PluginRepositoryTrait + Send + Sync + 'static,
274    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
275{
276    let signers = app_state.signer_repository.list_all().await?;
277
278    let relayer_futures = config_file.relayers.iter().map(|relayer| async {
279        // Convert config to domain model first, then to repository model
280        let domain_relayer = Relayer::try_from(relayer.clone())
281            .wrap_err("Failed to convert relayer config to domain model")?;
282        let mut repo_model = RelayerRepoModel::from(domain_relayer);
283        let signer_model = signers
284            .iter()
285            .find(|s| s.id == repo_model.signer_id)
286            .ok_or_else(|| eyre::eyre!("Signer not found"))?;
287
288        let network_type = repo_model.network_type;
289        let signer_service = SignerFactory::create_signer(
290            &network_type,
291            &SignerDomainModel::from(signer_model.clone()),
292        )
293        .await
294        .wrap_err("Failed to create signer service")?;
295
296        let address = signer_service.address().await?;
297        repo_model.address = address.to_string();
298
299        app_state
300            .relayer_repository
301            .create(repo_model)
302            .await
303            .wrap_err("Failed to create relayer repository entry")?;
304        Ok::<(), Report>(())
305    });
306
307    try_join_all(relayer_futures)
308        .await
309        .wrap_err("Failed to initialize relayer repository")?;
310    Ok(())
311}
312
313async fn validate_config_bootstrap_state<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
314    config_file: &Config,
315    app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
316) -> Result<ConfigBootstrapState>
317where
318    J: JobProducerTrait + Send + Sync + 'static,
319    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
320    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
321    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
322    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
323    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
324    TCR: TransactionCounterTrait + Send + Sync + 'static,
325    PR: PluginRepositoryTrait + Send + Sync + 'static,
326    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
327{
328    let mut missing = Vec::new();
329    let relayer_has_entries = app_state.relayer_repository.has_entries().await?;
330    let signer_has_entries = app_state.signer_repository.has_entries().await?;
331    let notification_has_entries = app_state.notification_repository.has_entries().await?;
332    let network_has_entries = app_state.network_repository.has_entries().await?;
333    let plugin_has_entries = app_state.plugin_repository.has_entries().await?;
334    let api_key_has_entries = app_state.api_key_repository.has_entries().await?;
335
336    let has_any_entries = relayer_has_entries
337        || signer_has_entries
338        || notification_has_entries
339        || network_has_entries
340        || plugin_has_entries
341        || api_key_has_entries;
342
343    if !has_any_entries {
344        return Ok(ConfigBootstrapState::Empty);
345    }
346
347    if config_file
348        .plugins
349        .as_ref()
350        .is_some_and(|plugins| !plugins.is_empty())
351        && !plugin_has_entries
352    {
353        missing.push("plugin repository".to_string());
354    }
355
356    if !config_file.signers.is_empty() && !signer_has_entries {
357        missing.push("signer repository".to_string());
358    }
359
360    if !config_file.notifications.is_empty() && !notification_has_entries {
361        missing.push("notification repository".to_string());
362    }
363
364    if !config_file.networks.is_empty() && !network_has_entries {
365        missing.push("network repository".to_string());
366    }
367
368    if !config_file.relayers.is_empty() && !relayer_has_entries {
369        missing.push("relayer repository".to_string());
370    }
371
372    if !api_key_has_entries {
373        missing.push("api key repository".to_string());
374    }
375
376    if missing.is_empty() {
377        Ok(ConfigBootstrapState::Complete)
378    } else {
379        Ok(ConfigBootstrapState::Incomplete { missing })
380    }
381}
382
383fn format_incomplete_bootstrap_error(missing: &[String]) -> Report {
384    eyre::eyre!(
385        "Redis contains incomplete bootstrap-managed config state without completion marker (missing: {})",
386        missing.join(", ")
387    )
388}
389
390/// Process a complete configuration file by initializing all repositories.
391///
392/// This function processes the entire configuration file in the following order:
393/// 1. Process plugins
394/// 2. Process signers
395/// 3. Process notifications
396/// 4. Process networks
397/// 5. Process relayers
398/// 6. Process API key
399///
400/// When using Redis storage with `DISTRIBUTED_MODE` enabled, this function uses distributed
401/// locking to prevent race conditions when multiple instances start simultaneously
402/// (especially with `RESET_STORAGE_ON_START=true`).
403pub async fn process_config_file<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
404    config_file: Config,
405    server_config: Arc<ServerConfig>,
406    app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
407) -> Result<()>
408where
409    J: JobProducerTrait + Send + Sync + 'static,
410    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
411    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
412    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
413    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
414    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
415    TCR: TransactionCounterTrait + Send + Sync + 'static,
416    PR: PluginRepositoryTrait + Send + Sync + 'static,
417    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
418{
419    match server_config.repository_storage_type {
420        RepositoryStorageType::InMemory => {
421            // In-memory mode: no locking needed, process directly
422            execute_config_processing(&config_file, &server_config, app_state).await
423        }
424        RepositoryStorageType::Redis => {
425            // Check if distributed locking is needed
426            let use_lock = ServerConfig::get_distributed_mode();
427            let connection_info = app_state.relayer_repository.connection_info();
428
429            match (use_lock, connection_info) {
430                (true, Some((conn, prefix))) => {
431                    // Distributed mode: use locking to coordinate across instances
432                    coordinate_config_with_lock(
433                        &config_file,
434                        &server_config,
435                        app_state,
436                        &conn,
437                        &prefix,
438                    )
439                    .await
440                }
441                _ => {
442                    // Single-instance mode or no connection info: validate state directly.
443                    if server_config.reset_storage_on_start {
444                        return execute_config_processing(&config_file, &server_config, app_state)
445                            .await;
446                    }
447
448                    match validate_config_bootstrap_state(&config_file, app_state).await? {
449                        ConfigBootstrapState::Empty => {
450                            execute_config_processing(&config_file, &server_config, app_state).await
451                        }
452                        ConfigBootstrapState::Complete => {
453                            info!(
454                                "Skipping config file processing - bootstrap-managed Redis state is complete"
455                            );
456                            Ok(())
457                        }
458                        ConfigBootstrapState::Incomplete { missing } => {
459                            Err(format_incomplete_bootstrap_error(&missing))
460                        }
461                    }
462                }
463            }
464        }
465    }
466}
467
468/// Process config file with distributed locking for Redis storage.
469///
470/// Flow:
471/// 1. Try to acquire global lock for config processing
472/// 2. If lock acquired: check for an explicit completion marker, process if needed
473/// 3. If lock held: wait for the completion marker to appear
474/// 4. If wait times out: recheck state and attempt takeover after lock expiry
475async fn coordinate_config_with_lock<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
476    config_file: &Config,
477    server_config: &ServerConfig,
478    app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
479    conn: &Arc<Pool>,
480    prefix: &str,
481) -> Result<()>
482where
483    J: JobProducerTrait + Send + Sync + 'static,
484    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
485    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
486    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
487    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
488    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
489    TCR: TransactionCounterTrait + Send + Sync + 'static,
490    PR: PluginRepositoryTrait + Send + Sync + 'static,
491    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
492{
493    let lock_key = format!("{prefix}:lock:{CONFIG_PROCESSING_LOCK_NAME}");
494    let lock = DistributedLock::new(
495        conn.clone(),
496        &lock_key,
497        Duration::from_secs(BOOTSTRAP_LOCK_TTL_SECS),
498    );
499
500    match lock.try_acquire().await {
501        Ok(Some(guard)) => {
502            // We got the lock - check if we need to process
503            info!("Acquired config processing lock");
504
505            let result =
506                process_if_needed_after_lock(config_file, server_config, app_state, conn, prefix)
507                    .await;
508
509            drop(guard); // Release lock
510            result
511        }
512        Ok(None) => {
513            // Lock held by another instance - wait for it to complete
514            info!("Another instance is processing config, waiting for completion");
515            let completed = wait_for_config_processing_complete(conn, prefix).await?;
516
517            if completed {
518                return Ok(());
519            }
520
521            warn!("Timeout waiting for config processing, rechecking state");
522
523            if is_config_processing_completed(conn, prefix)
524                .await
525                .unwrap_or(false)
526            {
527                info!("Config processing completed during timeout window");
528                return Ok(());
529            }
530
531            recover_config_processing_after_timeout(
532                config_file,
533                server_config,
534                app_state,
535                conn,
536                prefix,
537            )
538            .await
539        }
540        Err(e) => Err(eyre::eyre!(
541            "Failed to acquire config processing lock in distributed mode: {}",
542            e
543        )),
544    }
545}
546
547/// Process config after successfully acquiring the lock.
548///
549/// Checks if config processing was already completed and only processes if needed.
550async fn process_if_needed_after_lock<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
551    config_file: &Config,
552    server_config: &ServerConfig,
553    app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
554    conn: &Arc<Pool>,
555    prefix: &str,
556) -> Result<()>
557where
558    J: JobProducerTrait + Send + Sync + 'static,
559    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
560    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
561    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
562    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
563    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
564    TCR: TransactionCounterTrait + Send + Sync + 'static,
565    PR: PluginRepositoryTrait + Send + Sync + 'static,
566    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
567{
568    let already_completed = is_config_processing_completed(conn, prefix).await?;
569    let in_progress = is_config_processing_in_progress(conn, prefix).await?;
570    let bootstrap_state = validate_config_bootstrap_state(config_file, app_state).await?;
571
572    if server_config.reset_storage_on_start {
573        // With reset flag: always reset and process (we have the lock)
574        execute_config_processing_with_marker(config_file, server_config, app_state, conn, prefix)
575            .await
576    } else if already_completed {
577        // No reset flag and already completed: skip
578        info!("Config processing already completed, skipping config file processing");
579        Ok(())
580    } else {
581        match bootstrap_state {
582            ConfigBootstrapState::Empty => {
583                execute_config_processing_with_marker(
584                    config_file,
585                    server_config,
586                    app_state,
587                    conn,
588                    prefix,
589                )
590                .await
591            }
592            ConfigBootstrapState::Complete if !in_progress => {
593                info!(
594                    "Bootstrap-managed Redis state is complete without marker, backfilling completion marker"
595                );
596                set_config_processing_completed(conn, prefix).await?;
597                Ok(())
598            }
599            ConfigBootstrapState::Complete => {
600                info!("Bootstrap-managed Redis state is complete, restoring completion marker");
601                set_config_processing_completed(conn, prefix).await?;
602                Ok(())
603            }
604            ConfigBootstrapState::Incomplete { missing } => {
605                Err(format_incomplete_bootstrap_error(&missing))
606            }
607        }
608    }
609}
610
611/// Waits for another instance to complete config processing.
612///
613/// Polls periodically until the explicit completion marker is set or timeout is reached.
614async fn wait_for_config_processing_complete(conn: &Arc<Pool>, prefix: &str) -> Result<bool> {
615    let max_wait = Duration::from_secs(LOCK_WAIT_MAX_SECS);
616    let poll_interval = Duration::from_millis(LOCK_POLL_INTERVAL_MS);
617
618    let conn = conn.clone();
619    let prefix = prefix.to_string();
620
621    let completed = poll_until(
622        || is_config_processing_completed(&conn, &prefix),
623        max_wait,
624        poll_interval,
625        "config processing",
626    )
627    .await?;
628
629    Ok(completed)
630}
631
632/// Attempts to recover config processing after a wait timeout.
633///
634/// This is the config-processing analogue to relayer initialization recovery:
635/// if the original lock holder died after setting the in-progress marker but
636/// before completion, a waiting instance should take over once the lock TTL
637/// expires instead of failing the whole rollout.
638async fn recover_config_processing_after_timeout<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
639    config_file: &Config,
640    server_config: &ServerConfig,
641    app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
642    conn: &Arc<Pool>,
643    prefix: &str,
644) -> Result<()>
645where
646    J: JobProducerTrait + Send + Sync + 'static,
647    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
648    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
649    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
650    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
651    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
652    TCR: TransactionCounterTrait + Send + Sync + 'static,
653    PR: PluginRepositoryTrait + Send + Sync + 'static,
654    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
655{
656    let lock_key = format!("{prefix}:lock:{CONFIG_PROCESSING_LOCK_NAME}");
657    let recovery_lock = DistributedLock::new(
658        conn.clone(),
659        &lock_key,
660        Duration::from_secs(BOOTSTRAP_LOCK_TTL_SECS),
661    );
662
663    match recovery_lock.try_acquire().await {
664        Ok(Some(guard)) => {
665            warn!("Previous config-processing lock holder appears to have crashed, taking over");
666            let result =
667                process_if_needed_after_lock(config_file, server_config, app_state, conn, prefix)
668                    .await;
669            drop(guard);
670            result
671        }
672        Ok(None) => {
673            // Another instance may still be processing config.
674            // Wait one more bounded period for the explicit completion marker
675            // before giving up on this instance.
676            warn!("Config-processing lock still held after timeout, waiting again for completion");
677            let completed = wait_for_config_processing_complete(conn, prefix).await?;
678
679            if completed {
680                info!("Config processing completed by another instance during extended wait");
681                Ok(())
682            } else {
683                Err(eyre::eyre!(
684                    "Timed out waiting for config processing and could not acquire recovery lock"
685                ))
686            }
687        }
688        Err(e) => {
689            warn!(
690                error = %e,
691                "Failed to acquire recovery lock for config processing"
692            );
693            Err(e)
694        }
695    }
696}
697
698/// Internal function that performs the actual config processing.
699async fn execute_config_processing_with_marker<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
700    config_file: &Config,
701    server_config: &ServerConfig,
702    app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
703    conn: &Arc<Pool>,
704    prefix: &str,
705) -> Result<()>
706where
707    J: JobProducerTrait + Send + Sync + 'static,
708    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
709    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
710    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
711    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
712    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
713    TCR: TransactionCounterTrait + Send + Sync + 'static,
714    PR: PluginRepositoryTrait + Send + Sync + 'static,
715    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
716{
717    set_config_processing_in_progress(conn, prefix).await?;
718
719    let result = execute_config_processing(config_file, server_config, app_state).await;
720
721    if result.is_ok() {
722        set_config_processing_completed(conn, prefix).await?;
723    }
724
725    result
726}
727
728/// Internal function that performs the actual config processing work.
729async fn execute_config_processing<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
730    config_file: &Config,
731    server_config: &ServerConfig,
732    app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
733) -> Result<()>
734where
735    J: JobProducerTrait + Send + Sync + 'static,
736    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
737    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
738    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
739    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
740    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
741    TCR: TransactionCounterTrait + Send + Sync + 'static,
742    PR: PluginRepositoryTrait + Send + Sync + 'static,
743    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
744{
745    if server_config.reset_storage_on_start {
746        info!("Resetting storage on start due to server config flag RESET_STORAGE_ON_START = true");
747        app_state.relayer_repository.drop_all_entries().await?;
748        app_state.transaction_repository.drop_all_entries().await?;
749        app_state.signer_repository.drop_all_entries().await?;
750        app_state.notification_repository.drop_all_entries().await?;
751        app_state.network_repository.drop_all_entries().await?;
752        app_state.plugin_repository.drop_all_entries().await?;
753        app_state.api_key_repository.drop_all_entries().await?;
754        app_state
755            .transaction_counter_store
756            .drop_all_entries()
757            .await?;
758    }
759
760    info!("Processing config file");
761    process_plugins(config_file, app_state).await?;
762    process_signers(config_file, app_state).await?;
763    process_notifications(config_file, app_state).await?;
764    process_networks(config_file, app_state).await?;
765    process_relayers(config_file, app_state).await?;
766    process_api_key(server_config, app_state).await?;
767
768    Ok(())
769}
770
771#[cfg(test)]
772mod tests {
773    use super::*;
774    use crate::{
775        config::{ConfigFileNetworkType, NetworksFileConfig, PluginFileConfig},
776        constants::DEFAULT_PLUGIN_TIMEOUT_SECONDS,
777        jobs::MockJobProducerTrait,
778        models::{
779            relayer::RelayerFileConfig, AppState, AwsKmsSignerFileConfig,
780            GoogleCloudKmsKeyFileConfig, GoogleCloudKmsServiceAccountFileConfig,
781            GoogleCloudKmsSignerFileConfig, LocalSignerFileConfig, NetworkType, NotificationConfig,
782            NotificationType, PaginationQuery, PlainOrEnvValue, SecretString, SignerConfigStorage,
783            SignerFileConfig, SignerFileConfigEnum, VaultSignerFileConfig,
784            VaultTransitSignerFileConfig,
785        },
786        repositories::{
787            ApiKeyRepositoryStorage, InMemoryApiKeyRepository, InMemoryNetworkRepository,
788            InMemoryNotificationRepository, InMemoryPluginRepository, InMemorySignerRepository,
789            InMemoryTransactionCounter, InMemoryTransactionRepository, NetworkRepositoryStorage,
790            NotificationRepositoryStorage, PluginRepositoryStorage, RelayerRepositoryStorage,
791            SignerRepositoryStorage, TransactionCounterRepositoryStorage,
792            TransactionRepositoryStorage,
793        },
794        utils::mocks::mockutils::{
795            create_mock_relayer, create_mock_signer, create_test_server_config,
796        },
797    };
798    use actix_web::web::ThinData;
799    use mockito;
800    use serde_json::json;
801    use std::{sync::Arc, time::Duration};
802
803    fn create_test_app_state() -> AppState<
804        MockJobProducerTrait,
805        RelayerRepositoryStorage,
806        TransactionRepositoryStorage,
807        NetworkRepositoryStorage,
808        NotificationRepositoryStorage,
809        SignerRepositoryStorage,
810        TransactionCounterRepositoryStorage,
811        PluginRepositoryStorage,
812        ApiKeyRepositoryStorage,
813    > {
814        // Create a mock job producer
815        let mut mock_job_producer = MockJobProducerTrait::new();
816
817        // Set up expectations for the mock
818        mock_job_producer
819            .expect_produce_transaction_request_job()
820            .returning(|_, _| Box::pin(async { Ok(()) }));
821
822        mock_job_producer
823            .expect_produce_submit_transaction_job()
824            .returning(|_, _| Box::pin(async { Ok(()) }));
825
826        mock_job_producer
827            .expect_produce_check_transaction_status_job()
828            .returning(|_, _| Box::pin(async { Ok(()) }));
829
830        mock_job_producer
831            .expect_produce_send_notification_job()
832            .returning(|_, _| Box::pin(async { Ok(()) }));
833
834        AppState {
835            relayer_repository: Arc::new(RelayerRepositoryStorage::new_in_memory()),
836            transaction_repository: Arc::new(TransactionRepositoryStorage::new_in_memory()),
837            signer_repository: Arc::new(SignerRepositoryStorage::new_in_memory()),
838            notification_repository: Arc::new(NotificationRepositoryStorage::new_in_memory()),
839            network_repository: Arc::new(NetworkRepositoryStorage::new_in_memory()),
840            transaction_counter_store: Arc::new(
841                TransactionCounterRepositoryStorage::new_in_memory(),
842            ),
843            job_producer: Arc::new(mock_job_producer),
844            plugin_repository: Arc::new(PluginRepositoryStorage::new_in_memory()),
845            api_key_repository: Arc::new(ApiKeyRepositoryStorage::new_in_memory()),
846        }
847    }
848
849    #[tokio::test]
850    async fn test_process_signer_test() {
851        let signer = SignerFileConfig {
852            id: "test-signer".to_string(),
853            config: SignerFileConfigEnum::Local(LocalSignerFileConfig {
854                path: "tests/utils/test_keys/unit-test-local-signer.json".to_string(),
855                passphrase: PlainOrEnvValue::Plain {
856                    value: SecretString::new("test"),
857                },
858            }),
859        };
860
861        let result = process_signer(&signer).await;
862
863        assert!(
864            result.is_ok(),
865            "Failed to process test signer: {:?}",
866            result.err()
867        );
868        let model = result.unwrap();
869
870        assert_eq!(model.id, "test-signer");
871
872        match model.config {
873            SignerConfigStorage::Local(config) => {
874                assert!(!config.raw_key.is_empty());
875                assert_eq!(config.raw_key.len(), 32);
876            }
877            _ => panic!("Expected Local config"),
878        }
879    }
880
881    #[tokio::test]
882    async fn test_process_signer_vault_transit() -> Result<()> {
883        let signer = SignerFileConfig {
884            id: "vault-transit-signer".to_string(),
885            config: SignerFileConfigEnum::VaultTransit(VaultTransitSignerFileConfig {
886                key_name: "test-transit-key".to_string(),
887                address: "https://vault.example.com".to_string(),
888                namespace: Some("test-namespace".to_string()),
889                role_id: PlainOrEnvValue::Plain {
890                    value: SecretString::new("test-role"),
891                },
892                secret_id: PlainOrEnvValue::Plain {
893                    value: SecretString::new("test-secret"),
894                },
895                pubkey: "test-pubkey".to_string(),
896                mount_point: Some("transit".to_string()),
897            }),
898        };
899
900        let result = process_signer(&signer).await;
901
902        assert!(
903            result.is_ok(),
904            "Failed to process vault transit signer: {:?}",
905            result.err()
906        );
907        let model = result.unwrap();
908
909        assert_eq!(model.id, "vault-transit-signer");
910
911        match model.config {
912            SignerConfigStorage::VaultTransit(config) => {
913                assert_eq!(config.key_name, "test-transit-key");
914                assert_eq!(config.address, "https://vault.example.com");
915                assert_eq!(config.namespace, Some("test-namespace".to_string()));
916                assert_eq!(config.role_id.to_str().as_str(), "test-role");
917                assert_eq!(config.secret_id.to_str().as_str(), "test-secret");
918                assert_eq!(config.pubkey, "test-pubkey");
919                assert_eq!(config.mount_point, Some("transit".to_string()));
920            }
921            _ => panic!("Expected VaultTransit config"),
922        }
923
924        Ok(())
925    }
926
927    #[tokio::test]
928    async fn test_process_signer_aws_kms() -> Result<()> {
929        let signer = SignerFileConfig {
930            id: "aws-kms-signer".to_string(),
931            config: SignerFileConfigEnum::AwsKms(AwsKmsSignerFileConfig {
932                region: "us-east-1".to_string(),
933                key_id: "test-key-id".to_string(),
934            }),
935        };
936
937        let result = process_signer(&signer).await;
938
939        assert!(
940            result.is_ok(),
941            "Failed to process AWS KMS signer: {:?}",
942            result.err()
943        );
944        let model = result.unwrap();
945
946        assert_eq!(model.id, "aws-kms-signer");
947
948        match model.config {
949            SignerConfigStorage::AwsKms(_) => {}
950            _ => panic!("Expected AwsKms config"),
951        }
952
953        Ok(())
954    }
955
956    // utility function to setup a mock AppRole login response
957    async fn setup_mock_approle_login(
958        mock_server: &mut mockito::ServerGuard,
959        role_id: &str,
960        secret_id: &str,
961        token: &str,
962    ) -> mockito::Mock {
963        mock_server
964            .mock("POST", "/v1/auth/approle/login")
965            .match_body(mockito::Matcher::Json(json!({
966                "role_id": role_id,
967                "secret_id": secret_id
968            })))
969            .with_status(200)
970            .with_header("content-type", "application/json")
971            .with_body(
972                serde_json::to_string(&json!({
973                    "request_id": "test-request-id",
974                    "lease_id": "",
975                    "renewable": false,
976                    "lease_duration": 0,
977                    "data": null,
978                    "wrap_info": null,
979                    "warnings": null,
980                    "auth": {
981                        "client_token": token,
982                        "accessor": "test-accessor",
983                        "policies": ["default"],
984                        "token_policies": ["default"],
985                        "metadata": {
986                            "role_name": "test-role"
987                        },
988                        "lease_duration": 3600,
989                        "renewable": true,
990                        "entity_id": "test-entity-id",
991                        "token_type": "service",
992                        "orphan": true
993                    }
994                }))
995                .unwrap(),
996            )
997            .create_async()
998            .await
999    }
1000
1001    #[tokio::test]
1002    async fn test_process_signer_vault() -> Result<()> {
1003        let mut mock_server = mockito::Server::new_async().await;
1004
1005        let _login_mock = setup_mock_approle_login(
1006            &mut mock_server,
1007            "test-role-id",
1008            "test-secret-id",
1009            "test-token",
1010        )
1011        .await;
1012
1013        let _secret_mock = mock_server
1014            .mock("GET", "/v1/secret/data/test-key")
1015            .match_header("X-Vault-Token", "test-token")
1016            .with_status(200)
1017            .with_header("content-type", "application/json")
1018            .with_body(serde_json::to_string(&json!({
1019                "request_id": "test-request-id",
1020                "lease_id": "",
1021                "renewable": false,
1022                "lease_duration": 0,
1023                "data": {
1024                    "data": {
1025                        "value": "C5ACE14AB163556747F02C1110911537578FBE335FB74D18FBF82990AD70C3B9"
1026                    },
1027                    "metadata": {
1028                        "created_time": "2024-01-01T00:00:00Z",
1029                        "deletion_time": "",
1030                        "destroyed": false,
1031                        "version": 1
1032                    }
1033                },
1034                "wrap_info": null,
1035                "warnings": null,
1036                "auth": null
1037            })).unwrap())
1038            .create_async()
1039            .await;
1040
1041        let signer = SignerFileConfig {
1042            id: "vault-signer".to_string(),
1043            config: SignerFileConfigEnum::Vault(VaultSignerFileConfig {
1044                key_name: "test-key".to_string(),
1045                address: mock_server.url(),
1046                namespace: Some("test-namespace".to_string()),
1047                role_id: PlainOrEnvValue::Plain {
1048                    value: SecretString::new("test-role-id"),
1049                },
1050                secret_id: PlainOrEnvValue::Plain {
1051                    value: SecretString::new("test-secret-id"),
1052                },
1053                mount_point: Some("secret".to_string()),
1054            }),
1055        };
1056
1057        let result = process_signer(&signer).await;
1058
1059        assert!(
1060            result.is_ok(),
1061            "Failed to process Vault signer: {:?}",
1062            result.err()
1063        );
1064        let model = result.unwrap();
1065
1066        assert_eq!(model.id, "vault-signer");
1067
1068        match model.config {
1069            SignerConfigStorage::Vault(_) => {}
1070            _ => panic!("Expected Vault config"),
1071        }
1072
1073        Ok(())
1074    }
1075
1076    #[tokio::test]
1077    async fn test_process_signers() -> Result<()> {
1078        // Create test signers
1079        let signers = vec![
1080            SignerFileConfig {
1081                id: "test-signer-1".to_string(),
1082                config: SignerFileConfigEnum::Local(LocalSignerFileConfig {
1083                    path: "tests/utils/test_keys/unit-test-local-signer.json".to_string(),
1084                    passphrase: PlainOrEnvValue::Plain {
1085                        value: SecretString::new("test"),
1086                    },
1087                }),
1088            },
1089            SignerFileConfig {
1090                id: "test-signer-2".to_string(),
1091                config: SignerFileConfigEnum::Local(LocalSignerFileConfig {
1092                    path: "tests/utils/test_keys/unit-test-local-signer.json".to_string(),
1093                    passphrase: PlainOrEnvValue::Plain {
1094                        value: SecretString::new("test"),
1095                    },
1096                }),
1097            },
1098        ];
1099
1100        // Create config
1101        let config = Config {
1102            signers,
1103            relayers: vec![],
1104            notifications: vec![],
1105            networks: NetworksFileConfig::new(vec![]).unwrap(),
1106            plugins: Some(vec![]),
1107        };
1108
1109        // Create app state
1110        let app_state = ThinData(create_test_app_state());
1111
1112        // Process signers
1113        process_signers(&config, &app_state).await?;
1114
1115        // Verify signers were created
1116        let stored_signers = app_state.signer_repository.list_all().await?;
1117        assert_eq!(stored_signers.len(), 2);
1118        assert!(stored_signers.iter().any(|s| s.id == "test-signer-1"));
1119        assert!(stored_signers.iter().any(|s| s.id == "test-signer-2"));
1120
1121        Ok(())
1122    }
1123
1124    #[tokio::test]
1125    async fn test_process_notifications() -> Result<()> {
1126        // Create test notifications
1127        let notifications = vec![
1128            NotificationConfig {
1129                id: "test-notification-1".to_string(),
1130                r#type: NotificationType::Webhook,
1131                url: "https://hooks.slack.com/test1".to_string(),
1132                signing_key: None,
1133            },
1134            NotificationConfig {
1135                id: "test-notification-2".to_string(),
1136                r#type: NotificationType::Webhook,
1137                url: "https://hooks.slack.com/test2".to_string(),
1138                signing_key: None,
1139            },
1140        ];
1141
1142        // Create config
1143        let config = Config {
1144            signers: vec![],
1145            relayers: vec![],
1146            notifications,
1147            networks: NetworksFileConfig::new(vec![]).unwrap(),
1148            plugins: Some(vec![]),
1149        };
1150
1151        // Create app state
1152        let app_state = ThinData(create_test_app_state());
1153
1154        // Process notifications
1155        process_notifications(&config, &app_state).await?;
1156
1157        // Verify notifications were created
1158        let stored_notifications = app_state.notification_repository.list_all().await?;
1159        assert_eq!(stored_notifications.len(), 2);
1160        assert!(stored_notifications
1161            .iter()
1162            .any(|n| n.id == "test-notification-1"));
1163        assert!(stored_notifications
1164            .iter()
1165            .any(|n| n.id == "test-notification-2"));
1166
1167        Ok(())
1168    }
1169
1170    #[tokio::test]
1171    async fn test_process_networks_empty() -> Result<()> {
1172        let config = Config {
1173            signers: vec![],
1174            relayers: vec![],
1175            notifications: vec![],
1176            networks: NetworksFileConfig::new(vec![]).unwrap(),
1177            plugins: Some(vec![]),
1178        };
1179
1180        let app_state = ThinData(create_test_app_state());
1181
1182        process_networks(&config, &app_state).await?;
1183
1184        let stored_networks = app_state.network_repository.list_all().await?;
1185        assert_eq!(stored_networks.len(), 0);
1186
1187        Ok(())
1188    }
1189
1190    #[tokio::test]
1191    async fn test_process_networks_single_evm() -> Result<()> {
1192        use crate::config::network::test_utils::*;
1193
1194        let networks = vec![create_evm_network_wrapped("mainnet")];
1195
1196        let config = Config {
1197            signers: vec![],
1198            relayers: vec![],
1199            notifications: vec![],
1200            networks: NetworksFileConfig::new(networks).unwrap(),
1201            plugins: Some(vec![]),
1202        };
1203
1204        let app_state = ThinData(create_test_app_state());
1205
1206        process_networks(&config, &app_state).await?;
1207
1208        let stored_networks = app_state.network_repository.list_all().await?;
1209        assert_eq!(stored_networks.len(), 1);
1210        assert_eq!(stored_networks[0].name, "mainnet");
1211        assert_eq!(stored_networks[0].network_type, NetworkType::Evm);
1212
1213        Ok(())
1214    }
1215
1216    #[tokio::test]
1217    async fn test_process_networks_single_solana() -> Result<()> {
1218        use crate::config::network::test_utils::*;
1219
1220        let networks = vec![create_solana_network_wrapped("devnet")];
1221
1222        let config = Config {
1223            signers: vec![],
1224            relayers: vec![],
1225            notifications: vec![],
1226            networks: NetworksFileConfig::new(networks).unwrap(),
1227            plugins: Some(vec![]),
1228        };
1229
1230        let app_state = ThinData(create_test_app_state());
1231
1232        process_networks(&config, &app_state).await?;
1233
1234        let stored_networks = app_state.network_repository.list_all().await?;
1235        assert_eq!(stored_networks.len(), 1);
1236        assert_eq!(stored_networks[0].name, "devnet");
1237        assert_eq!(stored_networks[0].network_type, NetworkType::Solana);
1238
1239        Ok(())
1240    }
1241
1242    #[tokio::test]
1243    async fn test_process_networks_multiple_mixed() -> Result<()> {
1244        use crate::config::network::test_utils::*;
1245
1246        let networks = vec![
1247            create_evm_network_wrapped("mainnet"),
1248            create_solana_network_wrapped("devnet"),
1249            create_evm_network_wrapped("sepolia"),
1250            create_solana_network_wrapped("testnet"),
1251        ];
1252
1253        let config = Config {
1254            signers: vec![],
1255            relayers: vec![],
1256            notifications: vec![],
1257            networks: NetworksFileConfig::new(networks).unwrap(),
1258            plugins: Some(vec![]),
1259        };
1260
1261        let app_state = ThinData(create_test_app_state());
1262
1263        process_networks(&config, &app_state).await?;
1264
1265        let stored_networks = app_state.network_repository.list_all().await?;
1266        assert_eq!(stored_networks.len(), 4);
1267
1268        let evm_networks: Vec<_> = stored_networks
1269            .iter()
1270            .filter(|n| n.network_type == NetworkType::Evm)
1271            .collect();
1272        assert_eq!(evm_networks.len(), 2);
1273        assert!(evm_networks.iter().any(|n| n.name == "mainnet"));
1274        assert!(evm_networks.iter().any(|n| n.name == "sepolia"));
1275
1276        let solana_networks: Vec<_> = stored_networks
1277            .iter()
1278            .filter(|n| n.network_type == NetworkType::Solana)
1279            .collect();
1280        assert_eq!(solana_networks.len(), 2);
1281        assert!(solana_networks.iter().any(|n| n.name == "devnet"));
1282        assert!(solana_networks.iter().any(|n| n.name == "testnet"));
1283
1284        Ok(())
1285    }
1286
1287    #[tokio::test]
1288    async fn test_process_networks_many_networks() -> Result<()> {
1289        use crate::config::network::test_utils::*;
1290
1291        let networks = (0..10)
1292            .map(|i| create_evm_network_wrapped(&format!("network-{i}")))
1293            .collect();
1294
1295        let config = Config {
1296            signers: vec![],
1297            relayers: vec![],
1298            notifications: vec![],
1299            networks: NetworksFileConfig::new(networks).unwrap(),
1300            plugins: Some(vec![]),
1301        };
1302
1303        let app_state = ThinData(create_test_app_state());
1304
1305        process_networks(&config, &app_state).await?;
1306
1307        let stored_networks = app_state.network_repository.list_all().await?;
1308        assert_eq!(stored_networks.len(), 10);
1309
1310        for i in 0..10 {
1311            let expected_name = format!("network-{i}");
1312            assert!(
1313                stored_networks.iter().any(|n| n.name == expected_name),
1314                "Network {expected_name} not found"
1315            );
1316        }
1317
1318        Ok(())
1319    }
1320
1321    #[tokio::test]
1322    async fn test_process_networks_duplicate_names() -> Result<()> {
1323        use crate::config::network::test_utils::*;
1324
1325        let networks = vec![
1326            create_evm_network_wrapped("mainnet"),
1327            create_solana_network_wrapped("mainnet"),
1328        ];
1329
1330        let config = Config {
1331            signers: vec![],
1332            relayers: vec![],
1333            notifications: vec![],
1334            networks: NetworksFileConfig::new(networks).unwrap(),
1335            plugins: Some(vec![]),
1336        };
1337
1338        let app_state = ThinData(create_test_app_state());
1339
1340        process_networks(&config, &app_state).await?;
1341
1342        let stored_networks = app_state.network_repository.list_all().await?;
1343        assert_eq!(stored_networks.len(), 2);
1344
1345        let mainnet_networks: Vec<_> = stored_networks
1346            .iter()
1347            .filter(|n| n.name == "mainnet")
1348            .collect();
1349        assert_eq!(mainnet_networks.len(), 2);
1350        assert!(mainnet_networks
1351            .iter()
1352            .any(|n| n.network_type == NetworkType::Evm));
1353        assert!(mainnet_networks
1354            .iter()
1355            .any(|n| n.network_type == NetworkType::Solana));
1356
1357        Ok(())
1358    }
1359
1360    #[tokio::test]
1361    async fn test_process_networks() -> Result<()> {
1362        use crate::config::network::test_utils::*;
1363
1364        let networks = vec![
1365            create_evm_network_wrapped("mainnet"),
1366            create_solana_network_wrapped("devnet"),
1367        ];
1368
1369        let config = Config {
1370            signers: vec![],
1371            relayers: vec![],
1372            notifications: vec![],
1373            networks: NetworksFileConfig::new(networks).unwrap(),
1374            plugins: Some(vec![]),
1375        };
1376
1377        let app_state = ThinData(create_test_app_state());
1378
1379        process_networks(&config, &app_state).await?;
1380
1381        let stored_networks = app_state.network_repository.list_all().await?;
1382        assert_eq!(stored_networks.len(), 2);
1383        assert!(stored_networks
1384            .iter()
1385            .any(|n| n.name == "mainnet" && n.network_type == NetworkType::Evm));
1386        assert!(stored_networks
1387            .iter()
1388            .any(|n| n.name == "devnet" && n.network_type == NetworkType::Solana));
1389
1390        Ok(())
1391    }
1392
1393    #[tokio::test]
1394    async fn test_process_relayers() -> Result<()> {
1395        // Create test signers
1396        let signers = vec![SignerFileConfig {
1397            id: "test-signer-1".to_string(),
1398            config: SignerFileConfigEnum::Local(LocalSignerFileConfig {
1399                path: "tests/utils/test_keys/unit-test-local-signer.json".to_string(),
1400                passphrase: PlainOrEnvValue::Plain {
1401                    value: SecretString::new("test"),
1402                },
1403            }),
1404        }];
1405
1406        // Create test relayers
1407        let relayers = vec![RelayerFileConfig {
1408            id: "test-relayer-1".to_string(),
1409            network_type: ConfigFileNetworkType::Evm,
1410            signer_id: "test-signer-1".to_string(),
1411            name: "test-relayer-1".to_string(),
1412            network: "test-network".to_string(),
1413            paused: false,
1414            policies: None,
1415            notification_id: None,
1416            custom_rpc_urls: None,
1417        }];
1418
1419        // Create config
1420        let config = Config {
1421            signers: signers.clone(),
1422            relayers,
1423            notifications: vec![],
1424            networks: NetworksFileConfig::new(vec![]).unwrap(),
1425            plugins: Some(vec![]),
1426        };
1427
1428        // Create app state
1429        let app_state = ThinData(create_test_app_state());
1430
1431        // First process signers (required for relayers)
1432        process_signers(&config, &app_state).await?;
1433
1434        // Process relayers
1435        process_relayers(&config, &app_state).await?;
1436
1437        // Verify relayers were created
1438        let stored_relayers = app_state.relayer_repository.list_all().await?;
1439        assert_eq!(stored_relayers.len(), 1);
1440        assert_eq!(stored_relayers[0].id, "test-relayer-1");
1441        assert_eq!(stored_relayers[0].signer_id, "test-signer-1");
1442        assert!(!stored_relayers[0].address.is_empty()); // Address should be populated
1443
1444        Ok(())
1445    }
1446
1447    #[tokio::test]
1448    async fn test_process_plugins() -> Result<()> {
1449        // Create test plugins
1450        let plugins = vec![
1451            PluginFileConfig {
1452                id: "test-plugin-1".to_string(),
1453                path: "/app/plugins/test.ts".to_string(),
1454                timeout: None,
1455                emit_logs: false,
1456                emit_traces: false,
1457                config: None,
1458                raw_response: false,
1459                allow_get_invocation: false,
1460                forward_logs: false,
1461            },
1462            PluginFileConfig {
1463                id: "test-plugin-2".to_string(),
1464                path: "/app/plugins/test2.ts".to_string(),
1465                timeout: Some(12),
1466                emit_logs: false,
1467                emit_traces: false,
1468                config: None,
1469                raw_response: false,
1470                allow_get_invocation: false,
1471                forward_logs: false,
1472            },
1473        ];
1474
1475        // Create config
1476        let config = Config {
1477            signers: vec![],
1478            relayers: vec![],
1479            notifications: vec![],
1480            networks: NetworksFileConfig::new(vec![]).unwrap(),
1481            plugins: Some(plugins),
1482        };
1483
1484        // Create app state
1485        let app_state = ThinData(create_test_app_state());
1486
1487        // Process plugins
1488        process_plugins(&config, &app_state).await?;
1489
1490        // Verify plugins were created
1491        let plugin_1 = app_state
1492            .plugin_repository
1493            .get_by_id("test-plugin-1")
1494            .await?;
1495        let plugin_2 = app_state
1496            .plugin_repository
1497            .get_by_id("test-plugin-2")
1498            .await?;
1499
1500        assert!(plugin_1.is_some());
1501        assert!(plugin_2.is_some());
1502
1503        let plugin_1 = plugin_1.unwrap();
1504        let plugin_2 = plugin_2.unwrap();
1505
1506        assert_eq!(plugin_1.path, "/app/plugins/test.ts");
1507        assert_eq!(plugin_2.path, "/app/plugins/test2.ts");
1508
1509        // check that the timeout is set to the default value when not provided.
1510        assert_eq!(
1511            plugin_1.timeout.as_secs(),
1512            Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS).as_secs()
1513        );
1514        assert_eq!(
1515            plugin_2.timeout.as_secs(),
1516            Duration::from_secs(12).as_secs()
1517        );
1518
1519        Ok(())
1520    }
1521
1522    #[tokio::test]
1523    async fn test_process_api_key() -> Result<()> {
1524        let server_config = Arc::new(crate::utils::mocks::mockutils::create_test_server_config(
1525            RepositoryStorageType::InMemory,
1526        ));
1527        let app_state = ThinData(create_test_app_state());
1528
1529        process_api_key(&server_config, &app_state).await?;
1530
1531        let pagination_query = PaginationQuery {
1532            page: 1,
1533            per_page: 10,
1534        };
1535
1536        let stored_api_keys = app_state
1537            .api_key_repository
1538            .list_paginated(pagination_query)
1539            .await?;
1540        assert_eq!(stored_api_keys.items.len(), 1);
1541        assert_eq!(stored_api_keys.items[0].name, "default");
1542
1543        Ok(())
1544    }
1545
1546    #[tokio::test]
1547    async fn test_process_config_file() -> Result<()> {
1548        // Create test signers, relayers, and notifications
1549        let signers = vec![SignerFileConfig {
1550            id: "test-signer-1".to_string(),
1551            config: SignerFileConfigEnum::Local(LocalSignerFileConfig {
1552                path: "tests/utils/test_keys/unit-test-local-signer.json".to_string(),
1553                passphrase: PlainOrEnvValue::Plain {
1554                    value: SecretString::new("test"),
1555                },
1556            }),
1557        }];
1558
1559        let relayers = vec![RelayerFileConfig {
1560            id: "test-relayer-1".to_string(),
1561            network_type: ConfigFileNetworkType::Evm,
1562            signer_id: "test-signer-1".to_string(),
1563            name: "test-relayer-1".to_string(),
1564            network: "test-network".to_string(),
1565            paused: false,
1566            policies: None,
1567            notification_id: None,
1568            custom_rpc_urls: None,
1569        }];
1570
1571        let notifications = vec![NotificationConfig {
1572            id: "test-notification-1".to_string(),
1573            r#type: NotificationType::Webhook,
1574            url: "https://hooks.slack.com/test1".to_string(),
1575            signing_key: None,
1576        }];
1577
1578        let plugins = vec![PluginFileConfig {
1579            id: "test-plugin-1".to_string(),
1580            path: "/app/plugins/test.ts".to_string(),
1581            timeout: None,
1582            emit_logs: false,
1583            emit_traces: false,
1584            allow_get_invocation: false,
1585            config: None,
1586            raw_response: false,
1587            forward_logs: false,
1588        }];
1589
1590        // Create config
1591        let config = Config {
1592            signers,
1593            relayers,
1594            notifications,
1595            networks: NetworksFileConfig::new(vec![]).unwrap(),
1596            plugins: Some(plugins),
1597        };
1598
1599        // Create shared repositories
1600        let signer_repo = Arc::new(InMemorySignerRepository::default());
1601        let relayer_repo = Arc::new(RelayerRepositoryStorage::new_in_memory());
1602        let notification_repo = Arc::new(InMemoryNotificationRepository::default());
1603        let network_repo = Arc::new(InMemoryNetworkRepository::default());
1604        let transaction_repo = Arc::new(TransactionRepositoryStorage::InMemory(
1605            InMemoryTransactionRepository::new(),
1606        ));
1607        let transaction_counter = Arc::new(InMemoryTransactionCounter::default());
1608        let plugin_repo = Arc::new(InMemoryPluginRepository::default());
1609        let api_key_repo = Arc::new(InMemoryApiKeyRepository::default());
1610
1611        // Create a mock job producer
1612        let mut mock_job_producer = MockJobProducerTrait::new();
1613        mock_job_producer
1614            .expect_produce_transaction_request_job()
1615            .returning(|_, _| Box::pin(async { Ok(()) }));
1616        mock_job_producer
1617            .expect_produce_submit_transaction_job()
1618            .returning(|_, _| Box::pin(async { Ok(()) }));
1619        mock_job_producer
1620            .expect_produce_check_transaction_status_job()
1621            .returning(|_, _| Box::pin(async { Ok(()) }));
1622        mock_job_producer
1623            .expect_produce_send_notification_job()
1624            .returning(|_, _| Box::pin(async { Ok(()) }));
1625        let job_producer = Arc::new(mock_job_producer);
1626
1627        // Create app state
1628        let app_state = ThinData(AppState {
1629            signer_repository: signer_repo.clone(),
1630            relayer_repository: relayer_repo.clone(),
1631            notification_repository: notification_repo.clone(),
1632            network_repository: network_repo.clone(),
1633            transaction_repository: transaction_repo.clone(),
1634            transaction_counter_store: transaction_counter.clone(),
1635            job_producer: job_producer.clone(),
1636            plugin_repository: plugin_repo.clone(),
1637            api_key_repository: api_key_repo.clone(),
1638        });
1639
1640        // Process the entire config file
1641        let server_config = Arc::new(crate::utils::mocks::mockutils::create_test_server_config(
1642            RepositoryStorageType::InMemory,
1643        ));
1644        process_config_file(config, server_config, &app_state).await?;
1645
1646        // Verify all repositories were populated
1647        let stored_signers = signer_repo.list_all().await?;
1648        assert_eq!(stored_signers.len(), 1);
1649        assert_eq!(stored_signers[0].id, "test-signer-1");
1650
1651        let stored_relayers = relayer_repo.list_all().await?;
1652        assert_eq!(stored_relayers.len(), 1);
1653        assert_eq!(stored_relayers[0].id, "test-relayer-1");
1654        assert_eq!(stored_relayers[0].signer_id, "test-signer-1");
1655
1656        let stored_notifications = notification_repo.list_all().await?;
1657        assert_eq!(stored_notifications.len(), 1);
1658        assert_eq!(stored_notifications[0].id, "test-notification-1");
1659
1660        let stored_plugin = plugin_repo.get_by_id("test-plugin-1").await?;
1661        assert_eq!(stored_plugin.unwrap().path, "/app/plugins/test.ts");
1662
1663        Ok(())
1664    }
1665
1666    #[tokio::test]
1667    async fn test_process_signer_google_cloud_kms() {
1668        use crate::models::SecretString;
1669
1670        let signer = SignerFileConfig {
1671            id: "gcp-kms-signer".to_string(),
1672            config: SignerFileConfigEnum::GoogleCloudKms(GoogleCloudKmsSignerFileConfig {
1673                service_account: GoogleCloudKmsServiceAccountFileConfig {
1674                    private_key: PlainOrEnvValue::Plain {
1675                        value: SecretString::new("-----BEGIN EXAMPLE PRIVATE KEY-----\nFAKEKEYDATA\n-----END EXAMPLE PRIVATE KEY-----\n"),
1676                    },
1677                    client_email: PlainOrEnvValue::Plain {
1678                        value: SecretString::new("test-service-account@example.com"),
1679                    },
1680                    private_key_id: PlainOrEnvValue::Plain {
1681                        value: SecretString::new("fake-private-key-id"),
1682                    },
1683                    client_id: "fake-client-id".to_string(),
1684                    project_id: "fake-project-id".to_string(),
1685                    auth_uri: "https://accounts.google.com/o/oauth2/auth".to_string(),
1686                    token_uri: "https://oauth2.googleapis.com/token".to_string(),
1687                    client_x509_cert_url: "https://www.googleapis.com/robot/v1/metadata/x509/test-service-account%40example.com".to_string(),
1688                    auth_provider_x509_cert_url: "https://www.googleapis.com/oauth2/v1/certs".to_string(),
1689                    universe_domain: "googleapis.com".to_string(),
1690                },
1691                key: GoogleCloudKmsKeyFileConfig {
1692                    location: "global".to_string(),
1693                    key_id: "fake-key-id".to_string(),
1694                    key_ring_id: "fake-key-ring-id".to_string(),
1695                    key_version: 1,
1696                },
1697            }),
1698        };
1699
1700        let result = process_signer(&signer).await;
1701
1702        assert!(
1703            result.is_ok(),
1704            "Failed to process Google Cloud KMS signer: {:?}",
1705            result.err()
1706        );
1707        let model = result.unwrap();
1708
1709        assert_eq!(model.id, "gcp-kms-signer");
1710    }
1711
1712    // Helper function to create test server config with specific settings
1713    fn create_test_server_config_with_settings(
1714        storage_type: RepositoryStorageType,
1715        reset_storage_on_start: bool,
1716    ) -> ServerConfig {
1717        ServerConfig {
1718            repository_storage_type: storage_type.clone(),
1719            reset_storage_on_start,
1720            ..create_test_server_config(storage_type)
1721        }
1722    }
1723
1724    async fn create_test_redis_pool() -> Option<Arc<Pool>> {
1725        let cfg = deadpool_redis::Config::from_url("redis://127.0.0.1:6379");
1726        let pool = cfg
1727            .builder()
1728            .ok()?
1729            .max_size(16)
1730            .runtime(deadpool_redis::Runtime::Tokio1)
1731            .build()
1732            .ok()?;
1733        Some(Arc::new(pool))
1734    }
1735
1736    // Helper function to create minimal test config
1737    fn create_minimal_test_config() -> Config {
1738        Config {
1739            signers: vec![SignerFileConfig {
1740                id: "test-signer-1".to_string(),
1741                config: SignerFileConfigEnum::Local(LocalSignerFileConfig {
1742                    path: "tests/utils/test_keys/unit-test-local-signer.json".to_string(),
1743                    passphrase: PlainOrEnvValue::Plain {
1744                        value: SecretString::new("test"),
1745                    },
1746                }),
1747            }],
1748            relayers: vec![RelayerFileConfig {
1749                id: "test-relayer-1".to_string(),
1750                network_type: ConfigFileNetworkType::Evm,
1751                signer_id: "test-signer-1".to_string(),
1752                name: "test-relayer-1".to_string(),
1753                network: "test-network".to_string(),
1754                paused: false,
1755                policies: None,
1756                notification_id: None,
1757                custom_rpc_urls: None,
1758            }],
1759            notifications: vec![NotificationConfig {
1760                id: "test-notification-1".to_string(),
1761                r#type: NotificationType::Webhook,
1762                url: "https://hooks.slack.com/test1".to_string(),
1763                signing_key: None,
1764            }],
1765            networks: NetworksFileConfig::new(vec![]).unwrap(),
1766            plugins: None,
1767        }
1768    }
1769
1770    #[tokio::test]
1771    async fn test_should_process_config_file_inmemory_storage() -> Result<()> {
1772        let config = create_minimal_test_config();
1773
1774        // Test 1: InMemory storage with reset_storage_on_start = false
1775        let server_config = Arc::new(create_test_server_config_with_settings(
1776            RepositoryStorageType::InMemory,
1777            false,
1778        ));
1779
1780        let app_state = ThinData(create_test_app_state());
1781        process_config_file(config.clone(), server_config.clone(), &app_state).await?;
1782
1783        let stored_relayers = app_state.relayer_repository.list_all().await?;
1784        assert_eq!(stored_relayers.len(), 1);
1785        assert_eq!(stored_relayers[0].id, "test-relayer-1");
1786
1787        // Test 2: InMemory storage with reset_storage_on_start = true
1788        let server_config2 = Arc::new(create_test_server_config_with_settings(
1789            RepositoryStorageType::InMemory,
1790            true,
1791        ));
1792
1793        let app_state2 = ThinData(create_test_app_state());
1794        process_config_file(config.clone(), server_config2, &app_state2).await?;
1795
1796        let stored_relayers = app_state2.relayer_repository.list_all().await?;
1797        assert_eq!(stored_relayers.len(), 1);
1798        assert_eq!(stored_relayers[0].id, "test-relayer-1");
1799
1800        Ok(())
1801    }
1802
1803    #[tokio::test]
1804    async fn test_reset_storage_on_start_clears_transaction_counter() -> Result<()> {
1805        let config = create_minimal_test_config();
1806        let server_config = Arc::new(create_test_server_config_with_settings(
1807            RepositoryStorageType::InMemory,
1808            true,
1809        ));
1810
1811        let app_state = ThinData(create_test_app_state());
1812
1813        // Seed transaction counter with a value that simulates an inflated nonce
1814        app_state
1815            .transaction_counter_store
1816            .set("test-relayer-1", "0xABC", 999)
1817            .await
1818            .unwrap();
1819        assert_eq!(
1820            app_state
1821                .transaction_counter_store
1822                .get("test-relayer-1", "0xABC")
1823                .await
1824                .unwrap(),
1825            Some(999)
1826        );
1827
1828        // Process config with reset_storage_on_start = true
1829        process_config_file(config, server_config, &app_state).await?;
1830
1831        // Transaction counter should have been cleared
1832        assert_eq!(
1833            app_state
1834                .transaction_counter_store
1835                .get("test-relayer-1", "0xABC")
1836                .await
1837                .unwrap(),
1838            None,
1839            "Transaction counter should be cleared when RESET_STORAGE_ON_START is true"
1840        );
1841
1842        Ok(())
1843    }
1844
1845    #[tokio::test]
1846    async fn test_should_process_config_file_redis_storage_empty_repositories() -> Result<()> {
1847        let config = create_minimal_test_config();
1848        let server_config = Arc::new(create_test_server_config_with_settings(
1849            RepositoryStorageType::Redis,
1850            false,
1851        ));
1852
1853        let app_state = ThinData(create_test_app_state());
1854        process_config_file(config, server_config, &app_state).await?;
1855
1856        let stored_relayers = app_state.relayer_repository.list_all().await?;
1857        assert_eq!(stored_relayers.len(), 1);
1858        assert_eq!(stored_relayers[0].id, "test-relayer-1");
1859
1860        Ok(())
1861    }
1862
1863    #[tokio::test]
1864    async fn test_validate_config_bootstrap_state_empty() -> Result<()> {
1865        let config = create_minimal_test_config();
1866        let app_state = ThinData(create_test_app_state());
1867
1868        let state = validate_config_bootstrap_state(&config, &app_state).await?;
1869
1870        assert_eq!(state, ConfigBootstrapState::Empty);
1871        Ok(())
1872    }
1873
1874    #[tokio::test]
1875    async fn test_validate_config_bootstrap_state_complete_without_marker() -> Result<()> {
1876        let config = create_minimal_test_config();
1877        let app_state = ThinData(create_test_app_state());
1878        let server_config =
1879            create_test_server_config_with_settings(RepositoryStorageType::Redis, false);
1880
1881        process_plugins(&config, &app_state).await?;
1882        process_signers(&config, &app_state).await?;
1883        process_notifications(&config, &app_state).await?;
1884        process_networks(&config, &app_state).await?;
1885        process_relayers(&config, &app_state).await?;
1886        process_api_key(&server_config, &app_state).await?;
1887
1888        let state = validate_config_bootstrap_state(&config, &app_state).await?;
1889
1890        assert_eq!(state, ConfigBootstrapState::Complete);
1891        Ok(())
1892    }
1893
1894    #[tokio::test]
1895    async fn test_validate_config_bootstrap_state_incomplete_without_marker() -> Result<()> {
1896        let config = create_minimal_test_config();
1897        let app_state = ThinData(create_test_app_state());
1898
1899        process_plugins(&config, &app_state).await?;
1900        process_signers(&config, &app_state).await?;
1901        process_notifications(&config, &app_state).await?;
1902        process_networks(&config, &app_state).await?;
1903
1904        let state = validate_config_bootstrap_state(&config, &app_state).await?;
1905
1906        match state {
1907            ConfigBootstrapState::Incomplete { missing } => {
1908                assert!(missing.iter().any(|entry| entry == "relayer repository"));
1909                assert!(missing.iter().any(|entry| entry == "api key repository"));
1910            }
1911            other => panic!("Expected incomplete bootstrap state, got {other:?}"),
1912        }
1913
1914        Ok(())
1915    }
1916
1917    #[tokio::test]
1918    async fn test_process_config_file_errors_when_redis_bootstrap_state_is_incomplete() -> Result<()>
1919    {
1920        let config = create_minimal_test_config();
1921        let server_config = Arc::new(create_test_server_config_with_settings(
1922            RepositoryStorageType::Redis,
1923            false,
1924        ));
1925
1926        let app_state = ThinData(create_test_app_state());
1927
1928        app_state
1929            .relayer_repository
1930            .create(create_mock_relayer("existing-relayer".to_string(), false))
1931            .await?;
1932
1933        let err = process_config_file(config, server_config, &app_state)
1934            .await
1935            .expect_err("partial bootstrap state should fail closed");
1936        assert!(err
1937            .to_string()
1938            .contains("incomplete bootstrap-managed config state"));
1939
1940        Ok(())
1941    }
1942
1943    #[tokio::test]
1944    async fn test_should_error_for_partial_config_file_redis_storage_populated_repositories(
1945    ) -> Result<()> {
1946        let config = create_minimal_test_config();
1947        let server_config = Arc::new(create_test_server_config_with_settings(
1948            RepositoryStorageType::Redis,
1949            false,
1950        ));
1951
1952        // Create two identical app states to test the decision logic
1953        let app_state1 = ThinData(create_test_app_state());
1954        let app_state2 = ThinData(create_test_app_state());
1955
1956        // Pre-populate repositories to simulate Redis already having data
1957        let existing_relayer1 = create_mock_relayer("existing-relayer".to_string(), false);
1958        let existing_relayer2 = create_mock_relayer("existing-relayer".to_string(), false);
1959        app_state1
1960            .relayer_repository
1961            .create(existing_relayer1)
1962            .await?;
1963        app_state2
1964            .relayer_repository
1965            .create(existing_relayer2)
1966            .await?;
1967
1968        // Check initial state
1969        assert!(app_state1.relayer_repository.has_entries().await?);
1970        assert!(!app_state1.signer_repository.has_entries().await?);
1971
1972        let err = process_config_file(config, server_config, &app_state2)
1973            .await
1974            .expect_err("partial bootstrap state should fail closed");
1975        assert!(err
1976            .to_string()
1977            .contains("incomplete bootstrap-managed config state"));
1978
1979        Ok(())
1980    }
1981
1982    #[tokio::test]
1983    async fn test_should_process_config_file_redis_storage_with_reset_flag() -> Result<()> {
1984        let config = create_minimal_test_config();
1985        let server_config = Arc::new(create_test_server_config_with_settings(
1986            RepositoryStorageType::Redis,
1987            true, // reset_storage_on_start = true
1988        ));
1989
1990        let app_state = ThinData(create_test_app_state());
1991
1992        // Pre-populate repositories to simulate Redis already having data
1993        let existing_relayer = create_mock_relayer("existing-relayer".to_string(), false);
1994        let existing_signer = create_mock_signer();
1995        app_state
1996            .relayer_repository
1997            .create(existing_relayer)
1998            .await?;
1999        app_state.signer_repository.create(existing_signer).await?;
2000
2001        // Should process config file because reset_storage_on_start = true
2002        process_config_file(config, server_config, &app_state).await?;
2003
2004        let stored_relayer = app_state
2005            .relayer_repository
2006            .get_by_id("existing-relayer".to_string())
2007            .await;
2008        assert!(
2009            stored_relayer.is_err(),
2010            "Existing relayer should not be found"
2011        );
2012
2013        let stored_signer = app_state
2014            .signer_repository
2015            .get_by_id("existing-signer".to_string())
2016            .await;
2017        assert!(
2018            stored_signer.is_err(),
2019            "Existing signer should not be found"
2020        );
2021
2022        Ok(())
2023    }
2024
2025    #[tokio::test]
2026    #[ignore]
2027    async fn test_process_if_needed_after_lock_skips_populated_redis_without_marker() -> Result<()>
2028    {
2029        let conn = create_test_redis_pool()
2030            .await
2031            .expect("Redis connection required");
2032        let prefix = "test_config_processing_skip_populated_without_marker";
2033        let bootstrap_meta_key = format!("{prefix}:bootstrap_meta");
2034
2035        {
2036            let mut conn_clone = conn.get().await.expect("Failed to get connection");
2037            let _: Result<(), _> =
2038                redis::AsyncCommands::del(&mut conn_clone, &bootstrap_meta_key).await;
2039        }
2040
2041        let config = create_minimal_test_config();
2042        let server_config =
2043            create_test_server_config_with_settings(RepositoryStorageType::Redis, false);
2044        let app_state = ThinData(create_test_app_state());
2045
2046        process_signers(&config, &app_state).await?;
2047        process_notifications(&config, &app_state).await?;
2048        process_networks(&config, &app_state).await?;
2049        process_relayers(&config, &app_state).await?;
2050        process_api_key(&server_config, &app_state).await?;
2051
2052        process_if_needed_after_lock(&config, &server_config, &app_state, &conn, prefix).await?;
2053
2054        let stored_relayers = app_state.relayer_repository.list_all().await?;
2055        assert_eq!(stored_relayers.len(), 1);
2056        assert_eq!(stored_relayers[0].id, "test-relayer-1");
2057        assert!(is_config_processing_completed(&conn, prefix).await?);
2058
2059        {
2060            let mut conn_clone = conn.get().await.expect("Failed to get connection");
2061            let _: Result<(), _> =
2062                redis::AsyncCommands::del(&mut conn_clone, &bootstrap_meta_key).await;
2063        }
2064
2065        Ok(())
2066    }
2067
2068    #[tokio::test]
2069    #[ignore]
2070    async fn test_process_if_needed_after_lock_recovers_when_in_progress_marker_exists(
2071    ) -> Result<()> {
2072        let conn = create_test_redis_pool()
2073            .await
2074            .expect("Redis connection required");
2075        let prefix = "test_config_processing_recover_in_progress";
2076        let bootstrap_meta_key = format!("{prefix}:bootstrap_meta");
2077
2078        {
2079            let mut conn_clone = conn.get().await.expect("Failed to get connection");
2080            let _: Result<(), _> =
2081                redis::AsyncCommands::del(&mut conn_clone, &bootstrap_meta_key).await;
2082        }
2083
2084        let config = create_minimal_test_config();
2085        let server_config =
2086            create_test_server_config_with_settings(RepositoryStorageType::Redis, false);
2087        let app_state = ThinData(create_test_app_state());
2088
2089        process_signers(&config, &app_state).await?;
2090        process_notifications(&config, &app_state).await?;
2091        process_networks(&config, &app_state).await?;
2092        process_relayers(&config, &app_state).await?;
2093        process_api_key(&server_config, &app_state).await?;
2094        set_config_processing_in_progress(&conn, prefix).await?;
2095
2096        process_if_needed_after_lock(&config, &server_config, &app_state, &conn, prefix).await?;
2097
2098        let stored_relayers = app_state.relayer_repository.list_all().await?;
2099        assert_eq!(stored_relayers.len(), 1);
2100        assert_eq!(stored_relayers[0].id, "test-relayer-1");
2101        assert!(is_config_processing_completed(&conn, prefix).await?);
2102
2103        {
2104            let mut conn_clone = conn.get().await.expect("Failed to get connection");
2105            let _: Result<(), _> =
2106                redis::AsyncCommands::del(&mut conn_clone, &bootstrap_meta_key).await;
2107        }
2108
2109        Ok(())
2110    }
2111
2112    #[tokio::test]
2113    #[ignore]
2114    async fn test_recover_config_processing_after_timeout_waits_for_extended_completion() {
2115        let conn = create_test_redis_pool()
2116            .await
2117            .expect("Redis connection required");
2118        let prefix = "test_config_recovery_extended_wait";
2119        let lock_key = format!("{prefix}:lock:{CONFIG_PROCESSING_LOCK_NAME}");
2120        let bootstrap_meta_key = format!("{prefix}:bootstrap_meta");
2121
2122        {
2123            let mut conn_clone = conn.get().await.expect("Failed to get connection");
2124            let _: Result<(), _> = redis::AsyncCommands::del(&mut conn_clone, &lock_key).await;
2125            let _: Result<(), _> =
2126                redis::AsyncCommands::del(&mut conn_clone, &bootstrap_meta_key).await;
2127        }
2128
2129        let config = create_minimal_test_config();
2130        let server_config =
2131            create_test_server_config_with_settings(RepositoryStorageType::Redis, false);
2132        let app_state = ThinData(create_test_app_state());
2133
2134        let lock = DistributedLock::new(
2135            conn.clone(),
2136            &lock_key,
2137            Duration::from_secs(BOOTSTRAP_LOCK_TTL_SECS),
2138        );
2139        let guard = lock
2140            .try_acquire()
2141            .await
2142            .expect("Should acquire lock")
2143            .expect("Lock should be available");
2144
2145        let conn_for_task = conn.clone();
2146        let prefix_for_task = prefix.to_string();
2147        tokio::spawn(async move {
2148            tokio::time::sleep(Duration::from_millis(500)).await;
2149            set_config_processing_completed(&conn_for_task, &prefix_for_task)
2150                .await
2151                .expect("Should set config processing completed");
2152            guard.release().await.expect("Should release lock");
2153        });
2154
2155        let result = recover_config_processing_after_timeout(
2156            &config,
2157            &server_config,
2158            &app_state,
2159            &conn,
2160            prefix,
2161        )
2162        .await;
2163
2164        assert!(
2165            result.is_ok(),
2166            "Should succeed when completion is observed during extended wait: {:?}",
2167            result
2168        );
2169
2170        {
2171            let mut conn_clone = conn.get().await.expect("Failed to get connection");
2172            let _: Result<(), _> = redis::AsyncCommands::del(&mut conn_clone, &lock_key).await;
2173            let _: Result<(), _> =
2174                redis::AsyncCommands::del(&mut conn_clone, &bootstrap_meta_key).await;
2175        }
2176    }
2177}