1use std::sync::Arc;
17use std::time::Duration;
18
19use crate::{
20 config::{Config, RepositoryStorageType, ServerConfig},
21 jobs::JobProducerTrait,
22 models::{
23 ApiKeyRepoModel, NetworkRepoModel, NotificationRepoModel, PluginModel, Relayer,
24 RelayerRepoModel, Signer as SignerDomainModel, SignerFileConfig, SignerRepoModel,
25 ThinDataAppState, TransactionRepoModel,
26 },
27 repositories::{
28 ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository,
29 Repository, TransactionCounterTrait, TransactionRepository,
30 },
31 services::signer::{Signer as SignerService, SignerFactory},
32 utils::{
33 is_config_processing_completed, is_config_processing_in_progress, poll_until,
34 set_config_processing_completed, set_config_processing_in_progress, DistributedLock,
35 BOOTSTRAP_LOCK_TTL_SECS, LOCK_POLL_INTERVAL_MS, LOCK_WAIT_MAX_SECS,
36 },
37};
38use color_eyre::{eyre::WrapErr, Report, Result};
39use deadpool_redis::Pool;
40use futures::future::try_join_all;
41use tracing::{info, warn};
42
43const CONFIG_PROCESSING_LOCK_NAME: &str = "config_processing";
45
46#[derive(Debug, PartialEq, Eq)]
47enum ConfigBootstrapState {
48 Empty,
49 Complete,
50 Incomplete { missing: Vec<String> },
51}
52
53async fn process_api_key<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
54 server_config: &ServerConfig,
55 app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
56) -> Result<()>
57where
58 J: JobProducerTrait + Send + Sync + 'static,
59 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
60 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
61 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
62 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
63 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
64 TCR: TransactionCounterTrait + Send + Sync + 'static,
65 PR: PluginRepositoryTrait + Send + Sync + 'static,
66 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
67{
68 let api_key_model = ApiKeyRepoModel::new(
69 "default".to_string(),
70 server_config.api_key.clone(),
71 vec!["*".to_string()],
72 vec!["*".to_string()],
73 );
74
75 app_state
76 .api_key_repository
77 .create(api_key_model)
78 .await
79 .wrap_err("Failed to create api key repository entry")?;
80
81 Ok(())
82}
83
84async fn process_plugins<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
86 config_file: &Config,
87 app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
88) -> Result<()>
89where
90 J: JobProducerTrait + Send + Sync + 'static,
91 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
92 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
93 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
94 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
95 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
96 TCR: TransactionCounterTrait + Send + Sync + 'static,
97 PR: PluginRepositoryTrait + Send + Sync + 'static,
98 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
99{
100 if let Some(plugins) = &config_file.plugins {
101 let plugin_futures = plugins.iter().map(|plugin| async {
102 let plugin_model = PluginModel::try_from(plugin.clone())
103 .wrap_err("Failed to convert plugin config")?;
104 app_state
105 .plugin_repository
106 .add(plugin_model)
107 .await
108 .wrap_err("Failed to create plugin repository entry")?;
109 Ok::<(), Report>(())
110 });
111
112 try_join_all(plugin_futures)
113 .await
114 .wrap_err("Failed to initialize plugin repository")?;
115 Ok(())
116 } else {
117 Ok(())
118 }
119}
120
121async fn process_signer(signer: &SignerFileConfig) -> Result<SignerRepoModel> {
123 let domain_signer = SignerDomainModel::try_from(signer.clone())
125 .wrap_err("Failed to convert signer config to domain model")?;
126
127 let signer_repo_model = SignerRepoModel::from(domain_signer);
129
130 Ok(signer_repo_model)
131}
132
133async fn process_signers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
141 config_file: &Config,
142 app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
143) -> Result<()>
144where
145 J: JobProducerTrait + Send + Sync + 'static,
146 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
147 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
148 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
149 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
150 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
151 TCR: TransactionCounterTrait + Send + Sync + 'static,
152 PR: PluginRepositoryTrait + Send + Sync + 'static,
153 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
154{
155 let signer_futures = config_file.signers.iter().map(|signer| async {
156 let signer_repo_model = process_signer(signer).await?;
157
158 app_state
159 .signer_repository
160 .create(signer_repo_model)
161 .await
162 .wrap_err("Failed to create signer repository entry")?;
163 Ok::<(), Report>(())
164 });
165
166 try_join_all(signer_futures)
167 .await
168 .wrap_err("Failed to initialize signer repository")?;
169 Ok(())
170}
171
172async fn process_notifications<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
180 config_file: &Config,
181 app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
182) -> Result<()>
183where
184 J: JobProducerTrait + Send + Sync + 'static,
185 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
186 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
187 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
188 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
189 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
190 TCR: TransactionCounterTrait + Send + Sync + 'static,
191 PR: PluginRepositoryTrait + Send + Sync + 'static,
192 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
193{
194 let notification_futures = config_file.notifications.iter().map(|notification| async {
195 let notification_repo_model = NotificationRepoModel::try_from(notification.clone())
196 .wrap_err("Failed to convert notification config")?;
197
198 app_state
199 .notification_repository
200 .create(notification_repo_model)
201 .await
202 .wrap_err("Failed to create notification repository entry")?;
203 Ok::<(), Report>(())
204 });
205
206 try_join_all(notification_futures)
207 .await
208 .wrap_err("Failed to initialize notification repository")?;
209 Ok(())
210}
211
212async fn process_networks<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
220 config_file: &Config,
221 app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
222) -> Result<()>
223where
224 J: JobProducerTrait + Send + Sync + 'static,
225 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
226 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
227 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
228 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
229 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
230 TCR: TransactionCounterTrait + Send + Sync + 'static,
231 PR: PluginRepositoryTrait + Send + Sync + 'static,
232 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
233{
234 let network_futures = config_file.networks.iter().map(|network| async move {
235 let network_repo_model = NetworkRepoModel::try_from(network.clone())?;
236
237 app_state
238 .network_repository
239 .create(network_repo_model)
240 .await
241 .wrap_err("Failed to create network repository entry")?;
242 Ok::<(), Report>(())
243 });
244
245 try_join_all(network_futures)
246 .await
247 .wrap_err("Failed to initialize network repository")?;
248 Ok(())
249}
250
251async fn process_relayers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
262 config_file: &Config,
263 app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
264) -> Result<()>
265where
266 J: JobProducerTrait + Send + Sync + 'static,
267 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
268 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
269 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
270 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
271 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
272 TCR: TransactionCounterTrait + Send + Sync + 'static,
273 PR: PluginRepositoryTrait + Send + Sync + 'static,
274 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
275{
276 let signers = app_state.signer_repository.list_all().await?;
277
278 let relayer_futures = config_file.relayers.iter().map(|relayer| async {
279 let domain_relayer = Relayer::try_from(relayer.clone())
281 .wrap_err("Failed to convert relayer config to domain model")?;
282 let mut repo_model = RelayerRepoModel::from(domain_relayer);
283 let signer_model = signers
284 .iter()
285 .find(|s| s.id == repo_model.signer_id)
286 .ok_or_else(|| eyre::eyre!("Signer not found"))?;
287
288 let network_type = repo_model.network_type;
289 let signer_service = SignerFactory::create_signer(
290 &network_type,
291 &SignerDomainModel::from(signer_model.clone()),
292 )
293 .await
294 .wrap_err("Failed to create signer service")?;
295
296 let address = signer_service.address().await?;
297 repo_model.address = address.to_string();
298
299 app_state
300 .relayer_repository
301 .create(repo_model)
302 .await
303 .wrap_err("Failed to create relayer repository entry")?;
304 Ok::<(), Report>(())
305 });
306
307 try_join_all(relayer_futures)
308 .await
309 .wrap_err("Failed to initialize relayer repository")?;
310 Ok(())
311}
312
313async fn validate_config_bootstrap_state<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
314 config_file: &Config,
315 app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
316) -> Result<ConfigBootstrapState>
317where
318 J: JobProducerTrait + Send + Sync + 'static,
319 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
320 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
321 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
322 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
323 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
324 TCR: TransactionCounterTrait + Send + Sync + 'static,
325 PR: PluginRepositoryTrait + Send + Sync + 'static,
326 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
327{
328 let mut missing = Vec::new();
329 let relayer_has_entries = app_state.relayer_repository.has_entries().await?;
330 let signer_has_entries = app_state.signer_repository.has_entries().await?;
331 let notification_has_entries = app_state.notification_repository.has_entries().await?;
332 let network_has_entries = app_state.network_repository.has_entries().await?;
333 let plugin_has_entries = app_state.plugin_repository.has_entries().await?;
334 let api_key_has_entries = app_state.api_key_repository.has_entries().await?;
335
336 let has_any_entries = relayer_has_entries
337 || signer_has_entries
338 || notification_has_entries
339 || network_has_entries
340 || plugin_has_entries
341 || api_key_has_entries;
342
343 if !has_any_entries {
344 return Ok(ConfigBootstrapState::Empty);
345 }
346
347 if config_file
348 .plugins
349 .as_ref()
350 .is_some_and(|plugins| !plugins.is_empty())
351 && !plugin_has_entries
352 {
353 missing.push("plugin repository".to_string());
354 }
355
356 if !config_file.signers.is_empty() && !signer_has_entries {
357 missing.push("signer repository".to_string());
358 }
359
360 if !config_file.notifications.is_empty() && !notification_has_entries {
361 missing.push("notification repository".to_string());
362 }
363
364 if !config_file.networks.is_empty() && !network_has_entries {
365 missing.push("network repository".to_string());
366 }
367
368 if !config_file.relayers.is_empty() && !relayer_has_entries {
369 missing.push("relayer repository".to_string());
370 }
371
372 if !api_key_has_entries {
373 missing.push("api key repository".to_string());
374 }
375
376 if missing.is_empty() {
377 Ok(ConfigBootstrapState::Complete)
378 } else {
379 Ok(ConfigBootstrapState::Incomplete { missing })
380 }
381}
382
383fn format_incomplete_bootstrap_error(missing: &[String]) -> Report {
384 eyre::eyre!(
385 "Redis contains incomplete bootstrap-managed config state without completion marker (missing: {})",
386 missing.join(", ")
387 )
388}
389
390pub async fn process_config_file<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
404 config_file: Config,
405 server_config: Arc<ServerConfig>,
406 app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
407) -> Result<()>
408where
409 J: JobProducerTrait + Send + Sync + 'static,
410 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
411 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
412 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
413 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
414 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
415 TCR: TransactionCounterTrait + Send + Sync + 'static,
416 PR: PluginRepositoryTrait + Send + Sync + 'static,
417 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
418{
419 match server_config.repository_storage_type {
420 RepositoryStorageType::InMemory => {
421 execute_config_processing(&config_file, &server_config, app_state).await
423 }
424 RepositoryStorageType::Redis => {
425 let use_lock = ServerConfig::get_distributed_mode();
427 let connection_info = app_state.relayer_repository.connection_info();
428
429 match (use_lock, connection_info) {
430 (true, Some((conn, prefix))) => {
431 coordinate_config_with_lock(
433 &config_file,
434 &server_config,
435 app_state,
436 &conn,
437 &prefix,
438 )
439 .await
440 }
441 _ => {
442 if server_config.reset_storage_on_start {
444 return execute_config_processing(&config_file, &server_config, app_state)
445 .await;
446 }
447
448 match validate_config_bootstrap_state(&config_file, app_state).await? {
449 ConfigBootstrapState::Empty => {
450 execute_config_processing(&config_file, &server_config, app_state).await
451 }
452 ConfigBootstrapState::Complete => {
453 info!(
454 "Skipping config file processing - bootstrap-managed Redis state is complete"
455 );
456 Ok(())
457 }
458 ConfigBootstrapState::Incomplete { missing } => {
459 Err(format_incomplete_bootstrap_error(&missing))
460 }
461 }
462 }
463 }
464 }
465 }
466}
467
468async fn coordinate_config_with_lock<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
476 config_file: &Config,
477 server_config: &ServerConfig,
478 app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
479 conn: &Arc<Pool>,
480 prefix: &str,
481) -> Result<()>
482where
483 J: JobProducerTrait + Send + Sync + 'static,
484 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
485 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
486 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
487 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
488 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
489 TCR: TransactionCounterTrait + Send + Sync + 'static,
490 PR: PluginRepositoryTrait + Send + Sync + 'static,
491 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
492{
493 let lock_key = format!("{prefix}:lock:{CONFIG_PROCESSING_LOCK_NAME}");
494 let lock = DistributedLock::new(
495 conn.clone(),
496 &lock_key,
497 Duration::from_secs(BOOTSTRAP_LOCK_TTL_SECS),
498 );
499
500 match lock.try_acquire().await {
501 Ok(Some(guard)) => {
502 info!("Acquired config processing lock");
504
505 let result =
506 process_if_needed_after_lock(config_file, server_config, app_state, conn, prefix)
507 .await;
508
509 drop(guard); result
511 }
512 Ok(None) => {
513 info!("Another instance is processing config, waiting for completion");
515 let completed = wait_for_config_processing_complete(conn, prefix).await?;
516
517 if completed {
518 return Ok(());
519 }
520
521 warn!("Timeout waiting for config processing, rechecking state");
522
523 if is_config_processing_completed(conn, prefix)
524 .await
525 .unwrap_or(false)
526 {
527 info!("Config processing completed during timeout window");
528 return Ok(());
529 }
530
531 recover_config_processing_after_timeout(
532 config_file,
533 server_config,
534 app_state,
535 conn,
536 prefix,
537 )
538 .await
539 }
540 Err(e) => Err(eyre::eyre!(
541 "Failed to acquire config processing lock in distributed mode: {}",
542 e
543 )),
544 }
545}
546
547async fn process_if_needed_after_lock<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
551 config_file: &Config,
552 server_config: &ServerConfig,
553 app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
554 conn: &Arc<Pool>,
555 prefix: &str,
556) -> Result<()>
557where
558 J: JobProducerTrait + Send + Sync + 'static,
559 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
560 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
561 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
562 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
563 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
564 TCR: TransactionCounterTrait + Send + Sync + 'static,
565 PR: PluginRepositoryTrait + Send + Sync + 'static,
566 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
567{
568 let already_completed = is_config_processing_completed(conn, prefix).await?;
569 let in_progress = is_config_processing_in_progress(conn, prefix).await?;
570 let bootstrap_state = validate_config_bootstrap_state(config_file, app_state).await?;
571
572 if server_config.reset_storage_on_start {
573 execute_config_processing_with_marker(config_file, server_config, app_state, conn, prefix)
575 .await
576 } else if already_completed {
577 info!("Config processing already completed, skipping config file processing");
579 Ok(())
580 } else {
581 match bootstrap_state {
582 ConfigBootstrapState::Empty => {
583 execute_config_processing_with_marker(
584 config_file,
585 server_config,
586 app_state,
587 conn,
588 prefix,
589 )
590 .await
591 }
592 ConfigBootstrapState::Complete if !in_progress => {
593 info!(
594 "Bootstrap-managed Redis state is complete without marker, backfilling completion marker"
595 );
596 set_config_processing_completed(conn, prefix).await?;
597 Ok(())
598 }
599 ConfigBootstrapState::Complete => {
600 info!("Bootstrap-managed Redis state is complete, restoring completion marker");
601 set_config_processing_completed(conn, prefix).await?;
602 Ok(())
603 }
604 ConfigBootstrapState::Incomplete { missing } => {
605 Err(format_incomplete_bootstrap_error(&missing))
606 }
607 }
608 }
609}
610
611async fn wait_for_config_processing_complete(conn: &Arc<Pool>, prefix: &str) -> Result<bool> {
615 let max_wait = Duration::from_secs(LOCK_WAIT_MAX_SECS);
616 let poll_interval = Duration::from_millis(LOCK_POLL_INTERVAL_MS);
617
618 let conn = conn.clone();
619 let prefix = prefix.to_string();
620
621 let completed = poll_until(
622 || is_config_processing_completed(&conn, &prefix),
623 max_wait,
624 poll_interval,
625 "config processing",
626 )
627 .await?;
628
629 Ok(completed)
630}
631
632async fn recover_config_processing_after_timeout<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
639 config_file: &Config,
640 server_config: &ServerConfig,
641 app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
642 conn: &Arc<Pool>,
643 prefix: &str,
644) -> Result<()>
645where
646 J: JobProducerTrait + Send + Sync + 'static,
647 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
648 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
649 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
650 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
651 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
652 TCR: TransactionCounterTrait + Send + Sync + 'static,
653 PR: PluginRepositoryTrait + Send + Sync + 'static,
654 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
655{
656 let lock_key = format!("{prefix}:lock:{CONFIG_PROCESSING_LOCK_NAME}");
657 let recovery_lock = DistributedLock::new(
658 conn.clone(),
659 &lock_key,
660 Duration::from_secs(BOOTSTRAP_LOCK_TTL_SECS),
661 );
662
663 match recovery_lock.try_acquire().await {
664 Ok(Some(guard)) => {
665 warn!("Previous config-processing lock holder appears to have crashed, taking over");
666 let result =
667 process_if_needed_after_lock(config_file, server_config, app_state, conn, prefix)
668 .await;
669 drop(guard);
670 result
671 }
672 Ok(None) => {
673 warn!("Config-processing lock still held after timeout, waiting again for completion");
677 let completed = wait_for_config_processing_complete(conn, prefix).await?;
678
679 if completed {
680 info!("Config processing completed by another instance during extended wait");
681 Ok(())
682 } else {
683 Err(eyre::eyre!(
684 "Timed out waiting for config processing and could not acquire recovery lock"
685 ))
686 }
687 }
688 Err(e) => {
689 warn!(
690 error = %e,
691 "Failed to acquire recovery lock for config processing"
692 );
693 Err(e)
694 }
695 }
696}
697
698async fn execute_config_processing_with_marker<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
700 config_file: &Config,
701 server_config: &ServerConfig,
702 app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
703 conn: &Arc<Pool>,
704 prefix: &str,
705) -> Result<()>
706where
707 J: JobProducerTrait + Send + Sync + 'static,
708 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
709 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
710 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
711 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
712 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
713 TCR: TransactionCounterTrait + Send + Sync + 'static,
714 PR: PluginRepositoryTrait + Send + Sync + 'static,
715 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
716{
717 set_config_processing_in_progress(conn, prefix).await?;
718
719 let result = execute_config_processing(config_file, server_config, app_state).await;
720
721 if result.is_ok() {
722 set_config_processing_completed(conn, prefix).await?;
723 }
724
725 result
726}
727
728async fn execute_config_processing<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
730 config_file: &Config,
731 server_config: &ServerConfig,
732 app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
733) -> Result<()>
734where
735 J: JobProducerTrait + Send + Sync + 'static,
736 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
737 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
738 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
739 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
740 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
741 TCR: TransactionCounterTrait + Send + Sync + 'static,
742 PR: PluginRepositoryTrait + Send + Sync + 'static,
743 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
744{
745 if server_config.reset_storage_on_start {
746 info!("Resetting storage on start due to server config flag RESET_STORAGE_ON_START = true");
747 app_state.relayer_repository.drop_all_entries().await?;
748 app_state.transaction_repository.drop_all_entries().await?;
749 app_state.signer_repository.drop_all_entries().await?;
750 app_state.notification_repository.drop_all_entries().await?;
751 app_state.network_repository.drop_all_entries().await?;
752 app_state.plugin_repository.drop_all_entries().await?;
753 app_state.api_key_repository.drop_all_entries().await?;
754 app_state
755 .transaction_counter_store
756 .drop_all_entries()
757 .await?;
758 }
759
760 info!("Processing config file");
761 process_plugins(config_file, app_state).await?;
762 process_signers(config_file, app_state).await?;
763 process_notifications(config_file, app_state).await?;
764 process_networks(config_file, app_state).await?;
765 process_relayers(config_file, app_state).await?;
766 process_api_key(server_config, app_state).await?;
767
768 Ok(())
769}
770
771#[cfg(test)]
772mod tests {
773 use super::*;
774 use crate::{
775 config::{ConfigFileNetworkType, NetworksFileConfig, PluginFileConfig},
776 constants::DEFAULT_PLUGIN_TIMEOUT_SECONDS,
777 jobs::MockJobProducerTrait,
778 models::{
779 relayer::RelayerFileConfig, AppState, AwsKmsSignerFileConfig,
780 GoogleCloudKmsKeyFileConfig, GoogleCloudKmsServiceAccountFileConfig,
781 GoogleCloudKmsSignerFileConfig, LocalSignerFileConfig, NetworkType, NotificationConfig,
782 NotificationType, PaginationQuery, PlainOrEnvValue, SecretString, SignerConfigStorage,
783 SignerFileConfig, SignerFileConfigEnum, VaultSignerFileConfig,
784 VaultTransitSignerFileConfig,
785 },
786 repositories::{
787 ApiKeyRepositoryStorage, InMemoryApiKeyRepository, InMemoryNetworkRepository,
788 InMemoryNotificationRepository, InMemoryPluginRepository, InMemorySignerRepository,
789 InMemoryTransactionCounter, InMemoryTransactionRepository, NetworkRepositoryStorage,
790 NotificationRepositoryStorage, PluginRepositoryStorage, RelayerRepositoryStorage,
791 SignerRepositoryStorage, TransactionCounterRepositoryStorage,
792 TransactionRepositoryStorage,
793 },
794 utils::mocks::mockutils::{
795 create_mock_relayer, create_mock_signer, create_test_server_config,
796 },
797 };
798 use actix_web::web::ThinData;
799 use mockito;
800 use serde_json::json;
801 use std::{sync::Arc, time::Duration};
802
803 fn create_test_app_state() -> AppState<
804 MockJobProducerTrait,
805 RelayerRepositoryStorage,
806 TransactionRepositoryStorage,
807 NetworkRepositoryStorage,
808 NotificationRepositoryStorage,
809 SignerRepositoryStorage,
810 TransactionCounterRepositoryStorage,
811 PluginRepositoryStorage,
812 ApiKeyRepositoryStorage,
813 > {
814 let mut mock_job_producer = MockJobProducerTrait::new();
816
817 mock_job_producer
819 .expect_produce_transaction_request_job()
820 .returning(|_, _| Box::pin(async { Ok(()) }));
821
822 mock_job_producer
823 .expect_produce_submit_transaction_job()
824 .returning(|_, _| Box::pin(async { Ok(()) }));
825
826 mock_job_producer
827 .expect_produce_check_transaction_status_job()
828 .returning(|_, _| Box::pin(async { Ok(()) }));
829
830 mock_job_producer
831 .expect_produce_send_notification_job()
832 .returning(|_, _| Box::pin(async { Ok(()) }));
833
834 AppState {
835 relayer_repository: Arc::new(RelayerRepositoryStorage::new_in_memory()),
836 transaction_repository: Arc::new(TransactionRepositoryStorage::new_in_memory()),
837 signer_repository: Arc::new(SignerRepositoryStorage::new_in_memory()),
838 notification_repository: Arc::new(NotificationRepositoryStorage::new_in_memory()),
839 network_repository: Arc::new(NetworkRepositoryStorage::new_in_memory()),
840 transaction_counter_store: Arc::new(
841 TransactionCounterRepositoryStorage::new_in_memory(),
842 ),
843 job_producer: Arc::new(mock_job_producer),
844 plugin_repository: Arc::new(PluginRepositoryStorage::new_in_memory()),
845 api_key_repository: Arc::new(ApiKeyRepositoryStorage::new_in_memory()),
846 }
847 }
848
849 #[tokio::test]
850 async fn test_process_signer_test() {
851 let signer = SignerFileConfig {
852 id: "test-signer".to_string(),
853 config: SignerFileConfigEnum::Local(LocalSignerFileConfig {
854 path: "tests/utils/test_keys/unit-test-local-signer.json".to_string(),
855 passphrase: PlainOrEnvValue::Plain {
856 value: SecretString::new("test"),
857 },
858 }),
859 };
860
861 let result = process_signer(&signer).await;
862
863 assert!(
864 result.is_ok(),
865 "Failed to process test signer: {:?}",
866 result.err()
867 );
868 let model = result.unwrap();
869
870 assert_eq!(model.id, "test-signer");
871
872 match model.config {
873 SignerConfigStorage::Local(config) => {
874 assert!(!config.raw_key.is_empty());
875 assert_eq!(config.raw_key.len(), 32);
876 }
877 _ => panic!("Expected Local config"),
878 }
879 }
880
881 #[tokio::test]
882 async fn test_process_signer_vault_transit() -> Result<()> {
883 let signer = SignerFileConfig {
884 id: "vault-transit-signer".to_string(),
885 config: SignerFileConfigEnum::VaultTransit(VaultTransitSignerFileConfig {
886 key_name: "test-transit-key".to_string(),
887 address: "https://vault.example.com".to_string(),
888 namespace: Some("test-namespace".to_string()),
889 role_id: PlainOrEnvValue::Plain {
890 value: SecretString::new("test-role"),
891 },
892 secret_id: PlainOrEnvValue::Plain {
893 value: SecretString::new("test-secret"),
894 },
895 pubkey: "test-pubkey".to_string(),
896 mount_point: Some("transit".to_string()),
897 }),
898 };
899
900 let result = process_signer(&signer).await;
901
902 assert!(
903 result.is_ok(),
904 "Failed to process vault transit signer: {:?}",
905 result.err()
906 );
907 let model = result.unwrap();
908
909 assert_eq!(model.id, "vault-transit-signer");
910
911 match model.config {
912 SignerConfigStorage::VaultTransit(config) => {
913 assert_eq!(config.key_name, "test-transit-key");
914 assert_eq!(config.address, "https://vault.example.com");
915 assert_eq!(config.namespace, Some("test-namespace".to_string()));
916 assert_eq!(config.role_id.to_str().as_str(), "test-role");
917 assert_eq!(config.secret_id.to_str().as_str(), "test-secret");
918 assert_eq!(config.pubkey, "test-pubkey");
919 assert_eq!(config.mount_point, Some("transit".to_string()));
920 }
921 _ => panic!("Expected VaultTransit config"),
922 }
923
924 Ok(())
925 }
926
927 #[tokio::test]
928 async fn test_process_signer_aws_kms() -> Result<()> {
929 let signer = SignerFileConfig {
930 id: "aws-kms-signer".to_string(),
931 config: SignerFileConfigEnum::AwsKms(AwsKmsSignerFileConfig {
932 region: "us-east-1".to_string(),
933 key_id: "test-key-id".to_string(),
934 }),
935 };
936
937 let result = process_signer(&signer).await;
938
939 assert!(
940 result.is_ok(),
941 "Failed to process AWS KMS signer: {:?}",
942 result.err()
943 );
944 let model = result.unwrap();
945
946 assert_eq!(model.id, "aws-kms-signer");
947
948 match model.config {
949 SignerConfigStorage::AwsKms(_) => {}
950 _ => panic!("Expected AwsKms config"),
951 }
952
953 Ok(())
954 }
955
956 async fn setup_mock_approle_login(
958 mock_server: &mut mockito::ServerGuard,
959 role_id: &str,
960 secret_id: &str,
961 token: &str,
962 ) -> mockito::Mock {
963 mock_server
964 .mock("POST", "/v1/auth/approle/login")
965 .match_body(mockito::Matcher::Json(json!({
966 "role_id": role_id,
967 "secret_id": secret_id
968 })))
969 .with_status(200)
970 .with_header("content-type", "application/json")
971 .with_body(
972 serde_json::to_string(&json!({
973 "request_id": "test-request-id",
974 "lease_id": "",
975 "renewable": false,
976 "lease_duration": 0,
977 "data": null,
978 "wrap_info": null,
979 "warnings": null,
980 "auth": {
981 "client_token": token,
982 "accessor": "test-accessor",
983 "policies": ["default"],
984 "token_policies": ["default"],
985 "metadata": {
986 "role_name": "test-role"
987 },
988 "lease_duration": 3600,
989 "renewable": true,
990 "entity_id": "test-entity-id",
991 "token_type": "service",
992 "orphan": true
993 }
994 }))
995 .unwrap(),
996 )
997 .create_async()
998 .await
999 }
1000
1001 #[tokio::test]
1002 async fn test_process_signer_vault() -> Result<()> {
1003 let mut mock_server = mockito::Server::new_async().await;
1004
1005 let _login_mock = setup_mock_approle_login(
1006 &mut mock_server,
1007 "test-role-id",
1008 "test-secret-id",
1009 "test-token",
1010 )
1011 .await;
1012
1013 let _secret_mock = mock_server
1014 .mock("GET", "/v1/secret/data/test-key")
1015 .match_header("X-Vault-Token", "test-token")
1016 .with_status(200)
1017 .with_header("content-type", "application/json")
1018 .with_body(serde_json::to_string(&json!({
1019 "request_id": "test-request-id",
1020 "lease_id": "",
1021 "renewable": false,
1022 "lease_duration": 0,
1023 "data": {
1024 "data": {
1025 "value": "C5ACE14AB163556747F02C1110911537578FBE335FB74D18FBF82990AD70C3B9"
1026 },
1027 "metadata": {
1028 "created_time": "2024-01-01T00:00:00Z",
1029 "deletion_time": "",
1030 "destroyed": false,
1031 "version": 1
1032 }
1033 },
1034 "wrap_info": null,
1035 "warnings": null,
1036 "auth": null
1037 })).unwrap())
1038 .create_async()
1039 .await;
1040
1041 let signer = SignerFileConfig {
1042 id: "vault-signer".to_string(),
1043 config: SignerFileConfigEnum::Vault(VaultSignerFileConfig {
1044 key_name: "test-key".to_string(),
1045 address: mock_server.url(),
1046 namespace: Some("test-namespace".to_string()),
1047 role_id: PlainOrEnvValue::Plain {
1048 value: SecretString::new("test-role-id"),
1049 },
1050 secret_id: PlainOrEnvValue::Plain {
1051 value: SecretString::new("test-secret-id"),
1052 },
1053 mount_point: Some("secret".to_string()),
1054 }),
1055 };
1056
1057 let result = process_signer(&signer).await;
1058
1059 assert!(
1060 result.is_ok(),
1061 "Failed to process Vault signer: {:?}",
1062 result.err()
1063 );
1064 let model = result.unwrap();
1065
1066 assert_eq!(model.id, "vault-signer");
1067
1068 match model.config {
1069 SignerConfigStorage::Vault(_) => {}
1070 _ => panic!("Expected Vault config"),
1071 }
1072
1073 Ok(())
1074 }
1075
1076 #[tokio::test]
1077 async fn test_process_signers() -> Result<()> {
1078 let signers = vec![
1080 SignerFileConfig {
1081 id: "test-signer-1".to_string(),
1082 config: SignerFileConfigEnum::Local(LocalSignerFileConfig {
1083 path: "tests/utils/test_keys/unit-test-local-signer.json".to_string(),
1084 passphrase: PlainOrEnvValue::Plain {
1085 value: SecretString::new("test"),
1086 },
1087 }),
1088 },
1089 SignerFileConfig {
1090 id: "test-signer-2".to_string(),
1091 config: SignerFileConfigEnum::Local(LocalSignerFileConfig {
1092 path: "tests/utils/test_keys/unit-test-local-signer.json".to_string(),
1093 passphrase: PlainOrEnvValue::Plain {
1094 value: SecretString::new("test"),
1095 },
1096 }),
1097 },
1098 ];
1099
1100 let config = Config {
1102 signers,
1103 relayers: vec![],
1104 notifications: vec![],
1105 networks: NetworksFileConfig::new(vec![]).unwrap(),
1106 plugins: Some(vec![]),
1107 };
1108
1109 let app_state = ThinData(create_test_app_state());
1111
1112 process_signers(&config, &app_state).await?;
1114
1115 let stored_signers = app_state.signer_repository.list_all().await?;
1117 assert_eq!(stored_signers.len(), 2);
1118 assert!(stored_signers.iter().any(|s| s.id == "test-signer-1"));
1119 assert!(stored_signers.iter().any(|s| s.id == "test-signer-2"));
1120
1121 Ok(())
1122 }
1123
1124 #[tokio::test]
1125 async fn test_process_notifications() -> Result<()> {
1126 let notifications = vec![
1128 NotificationConfig {
1129 id: "test-notification-1".to_string(),
1130 r#type: NotificationType::Webhook,
1131 url: "https://hooks.slack.com/test1".to_string(),
1132 signing_key: None,
1133 },
1134 NotificationConfig {
1135 id: "test-notification-2".to_string(),
1136 r#type: NotificationType::Webhook,
1137 url: "https://hooks.slack.com/test2".to_string(),
1138 signing_key: None,
1139 },
1140 ];
1141
1142 let config = Config {
1144 signers: vec![],
1145 relayers: vec![],
1146 notifications,
1147 networks: NetworksFileConfig::new(vec![]).unwrap(),
1148 plugins: Some(vec![]),
1149 };
1150
1151 let app_state = ThinData(create_test_app_state());
1153
1154 process_notifications(&config, &app_state).await?;
1156
1157 let stored_notifications = app_state.notification_repository.list_all().await?;
1159 assert_eq!(stored_notifications.len(), 2);
1160 assert!(stored_notifications
1161 .iter()
1162 .any(|n| n.id == "test-notification-1"));
1163 assert!(stored_notifications
1164 .iter()
1165 .any(|n| n.id == "test-notification-2"));
1166
1167 Ok(())
1168 }
1169
1170 #[tokio::test]
1171 async fn test_process_networks_empty() -> Result<()> {
1172 let config = Config {
1173 signers: vec![],
1174 relayers: vec![],
1175 notifications: vec![],
1176 networks: NetworksFileConfig::new(vec![]).unwrap(),
1177 plugins: Some(vec![]),
1178 };
1179
1180 let app_state = ThinData(create_test_app_state());
1181
1182 process_networks(&config, &app_state).await?;
1183
1184 let stored_networks = app_state.network_repository.list_all().await?;
1185 assert_eq!(stored_networks.len(), 0);
1186
1187 Ok(())
1188 }
1189
1190 #[tokio::test]
1191 async fn test_process_networks_single_evm() -> Result<()> {
1192 use crate::config::network::test_utils::*;
1193
1194 let networks = vec![create_evm_network_wrapped("mainnet")];
1195
1196 let config = Config {
1197 signers: vec![],
1198 relayers: vec![],
1199 notifications: vec![],
1200 networks: NetworksFileConfig::new(networks).unwrap(),
1201 plugins: Some(vec![]),
1202 };
1203
1204 let app_state = ThinData(create_test_app_state());
1205
1206 process_networks(&config, &app_state).await?;
1207
1208 let stored_networks = app_state.network_repository.list_all().await?;
1209 assert_eq!(stored_networks.len(), 1);
1210 assert_eq!(stored_networks[0].name, "mainnet");
1211 assert_eq!(stored_networks[0].network_type, NetworkType::Evm);
1212
1213 Ok(())
1214 }
1215
1216 #[tokio::test]
1217 async fn test_process_networks_single_solana() -> Result<()> {
1218 use crate::config::network::test_utils::*;
1219
1220 let networks = vec![create_solana_network_wrapped("devnet")];
1221
1222 let config = Config {
1223 signers: vec![],
1224 relayers: vec![],
1225 notifications: vec![],
1226 networks: NetworksFileConfig::new(networks).unwrap(),
1227 plugins: Some(vec![]),
1228 };
1229
1230 let app_state = ThinData(create_test_app_state());
1231
1232 process_networks(&config, &app_state).await?;
1233
1234 let stored_networks = app_state.network_repository.list_all().await?;
1235 assert_eq!(stored_networks.len(), 1);
1236 assert_eq!(stored_networks[0].name, "devnet");
1237 assert_eq!(stored_networks[0].network_type, NetworkType::Solana);
1238
1239 Ok(())
1240 }
1241
1242 #[tokio::test]
1243 async fn test_process_networks_multiple_mixed() -> Result<()> {
1244 use crate::config::network::test_utils::*;
1245
1246 let networks = vec![
1247 create_evm_network_wrapped("mainnet"),
1248 create_solana_network_wrapped("devnet"),
1249 create_evm_network_wrapped("sepolia"),
1250 create_solana_network_wrapped("testnet"),
1251 ];
1252
1253 let config = Config {
1254 signers: vec![],
1255 relayers: vec![],
1256 notifications: vec![],
1257 networks: NetworksFileConfig::new(networks).unwrap(),
1258 plugins: Some(vec![]),
1259 };
1260
1261 let app_state = ThinData(create_test_app_state());
1262
1263 process_networks(&config, &app_state).await?;
1264
1265 let stored_networks = app_state.network_repository.list_all().await?;
1266 assert_eq!(stored_networks.len(), 4);
1267
1268 let evm_networks: Vec<_> = stored_networks
1269 .iter()
1270 .filter(|n| n.network_type == NetworkType::Evm)
1271 .collect();
1272 assert_eq!(evm_networks.len(), 2);
1273 assert!(evm_networks.iter().any(|n| n.name == "mainnet"));
1274 assert!(evm_networks.iter().any(|n| n.name == "sepolia"));
1275
1276 let solana_networks: Vec<_> = stored_networks
1277 .iter()
1278 .filter(|n| n.network_type == NetworkType::Solana)
1279 .collect();
1280 assert_eq!(solana_networks.len(), 2);
1281 assert!(solana_networks.iter().any(|n| n.name == "devnet"));
1282 assert!(solana_networks.iter().any(|n| n.name == "testnet"));
1283
1284 Ok(())
1285 }
1286
1287 #[tokio::test]
1288 async fn test_process_networks_many_networks() -> Result<()> {
1289 use crate::config::network::test_utils::*;
1290
1291 let networks = (0..10)
1292 .map(|i| create_evm_network_wrapped(&format!("network-{i}")))
1293 .collect();
1294
1295 let config = Config {
1296 signers: vec![],
1297 relayers: vec![],
1298 notifications: vec![],
1299 networks: NetworksFileConfig::new(networks).unwrap(),
1300 plugins: Some(vec![]),
1301 };
1302
1303 let app_state = ThinData(create_test_app_state());
1304
1305 process_networks(&config, &app_state).await?;
1306
1307 let stored_networks = app_state.network_repository.list_all().await?;
1308 assert_eq!(stored_networks.len(), 10);
1309
1310 for i in 0..10 {
1311 let expected_name = format!("network-{i}");
1312 assert!(
1313 stored_networks.iter().any(|n| n.name == expected_name),
1314 "Network {expected_name} not found"
1315 );
1316 }
1317
1318 Ok(())
1319 }
1320
1321 #[tokio::test]
1322 async fn test_process_networks_duplicate_names() -> Result<()> {
1323 use crate::config::network::test_utils::*;
1324
1325 let networks = vec![
1326 create_evm_network_wrapped("mainnet"),
1327 create_solana_network_wrapped("mainnet"),
1328 ];
1329
1330 let config = Config {
1331 signers: vec![],
1332 relayers: vec![],
1333 notifications: vec![],
1334 networks: NetworksFileConfig::new(networks).unwrap(),
1335 plugins: Some(vec![]),
1336 };
1337
1338 let app_state = ThinData(create_test_app_state());
1339
1340 process_networks(&config, &app_state).await?;
1341
1342 let stored_networks = app_state.network_repository.list_all().await?;
1343 assert_eq!(stored_networks.len(), 2);
1344
1345 let mainnet_networks: Vec<_> = stored_networks
1346 .iter()
1347 .filter(|n| n.name == "mainnet")
1348 .collect();
1349 assert_eq!(mainnet_networks.len(), 2);
1350 assert!(mainnet_networks
1351 .iter()
1352 .any(|n| n.network_type == NetworkType::Evm));
1353 assert!(mainnet_networks
1354 .iter()
1355 .any(|n| n.network_type == NetworkType::Solana));
1356
1357 Ok(())
1358 }
1359
1360 #[tokio::test]
1361 async fn test_process_networks() -> Result<()> {
1362 use crate::config::network::test_utils::*;
1363
1364 let networks = vec![
1365 create_evm_network_wrapped("mainnet"),
1366 create_solana_network_wrapped("devnet"),
1367 ];
1368
1369 let config = Config {
1370 signers: vec![],
1371 relayers: vec![],
1372 notifications: vec![],
1373 networks: NetworksFileConfig::new(networks).unwrap(),
1374 plugins: Some(vec![]),
1375 };
1376
1377 let app_state = ThinData(create_test_app_state());
1378
1379 process_networks(&config, &app_state).await?;
1380
1381 let stored_networks = app_state.network_repository.list_all().await?;
1382 assert_eq!(stored_networks.len(), 2);
1383 assert!(stored_networks
1384 .iter()
1385 .any(|n| n.name == "mainnet" && n.network_type == NetworkType::Evm));
1386 assert!(stored_networks
1387 .iter()
1388 .any(|n| n.name == "devnet" && n.network_type == NetworkType::Solana));
1389
1390 Ok(())
1391 }
1392
1393 #[tokio::test]
1394 async fn test_process_relayers() -> Result<()> {
1395 let signers = vec![SignerFileConfig {
1397 id: "test-signer-1".to_string(),
1398 config: SignerFileConfigEnum::Local(LocalSignerFileConfig {
1399 path: "tests/utils/test_keys/unit-test-local-signer.json".to_string(),
1400 passphrase: PlainOrEnvValue::Plain {
1401 value: SecretString::new("test"),
1402 },
1403 }),
1404 }];
1405
1406 let relayers = vec![RelayerFileConfig {
1408 id: "test-relayer-1".to_string(),
1409 network_type: ConfigFileNetworkType::Evm,
1410 signer_id: "test-signer-1".to_string(),
1411 name: "test-relayer-1".to_string(),
1412 network: "test-network".to_string(),
1413 paused: false,
1414 policies: None,
1415 notification_id: None,
1416 custom_rpc_urls: None,
1417 }];
1418
1419 let config = Config {
1421 signers: signers.clone(),
1422 relayers,
1423 notifications: vec![],
1424 networks: NetworksFileConfig::new(vec![]).unwrap(),
1425 plugins: Some(vec![]),
1426 };
1427
1428 let app_state = ThinData(create_test_app_state());
1430
1431 process_signers(&config, &app_state).await?;
1433
1434 process_relayers(&config, &app_state).await?;
1436
1437 let stored_relayers = app_state.relayer_repository.list_all().await?;
1439 assert_eq!(stored_relayers.len(), 1);
1440 assert_eq!(stored_relayers[0].id, "test-relayer-1");
1441 assert_eq!(stored_relayers[0].signer_id, "test-signer-1");
1442 assert!(!stored_relayers[0].address.is_empty()); Ok(())
1445 }
1446
1447 #[tokio::test]
1448 async fn test_process_plugins() -> Result<()> {
1449 let plugins = vec![
1451 PluginFileConfig {
1452 id: "test-plugin-1".to_string(),
1453 path: "/app/plugins/test.ts".to_string(),
1454 timeout: None,
1455 emit_logs: false,
1456 emit_traces: false,
1457 config: None,
1458 raw_response: false,
1459 allow_get_invocation: false,
1460 forward_logs: false,
1461 },
1462 PluginFileConfig {
1463 id: "test-plugin-2".to_string(),
1464 path: "/app/plugins/test2.ts".to_string(),
1465 timeout: Some(12),
1466 emit_logs: false,
1467 emit_traces: false,
1468 config: None,
1469 raw_response: false,
1470 allow_get_invocation: false,
1471 forward_logs: false,
1472 },
1473 ];
1474
1475 let config = Config {
1477 signers: vec![],
1478 relayers: vec![],
1479 notifications: vec![],
1480 networks: NetworksFileConfig::new(vec![]).unwrap(),
1481 plugins: Some(plugins),
1482 };
1483
1484 let app_state = ThinData(create_test_app_state());
1486
1487 process_plugins(&config, &app_state).await?;
1489
1490 let plugin_1 = app_state
1492 .plugin_repository
1493 .get_by_id("test-plugin-1")
1494 .await?;
1495 let plugin_2 = app_state
1496 .plugin_repository
1497 .get_by_id("test-plugin-2")
1498 .await?;
1499
1500 assert!(plugin_1.is_some());
1501 assert!(plugin_2.is_some());
1502
1503 let plugin_1 = plugin_1.unwrap();
1504 let plugin_2 = plugin_2.unwrap();
1505
1506 assert_eq!(plugin_1.path, "/app/plugins/test.ts");
1507 assert_eq!(plugin_2.path, "/app/plugins/test2.ts");
1508
1509 assert_eq!(
1511 plugin_1.timeout.as_secs(),
1512 Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS).as_secs()
1513 );
1514 assert_eq!(
1515 plugin_2.timeout.as_secs(),
1516 Duration::from_secs(12).as_secs()
1517 );
1518
1519 Ok(())
1520 }
1521
1522 #[tokio::test]
1523 async fn test_process_api_key() -> Result<()> {
1524 let server_config = Arc::new(crate::utils::mocks::mockutils::create_test_server_config(
1525 RepositoryStorageType::InMemory,
1526 ));
1527 let app_state = ThinData(create_test_app_state());
1528
1529 process_api_key(&server_config, &app_state).await?;
1530
1531 let pagination_query = PaginationQuery {
1532 page: 1,
1533 per_page: 10,
1534 };
1535
1536 let stored_api_keys = app_state
1537 .api_key_repository
1538 .list_paginated(pagination_query)
1539 .await?;
1540 assert_eq!(stored_api_keys.items.len(), 1);
1541 assert_eq!(stored_api_keys.items[0].name, "default");
1542
1543 Ok(())
1544 }
1545
1546 #[tokio::test]
1547 async fn test_process_config_file() -> Result<()> {
1548 let signers = vec![SignerFileConfig {
1550 id: "test-signer-1".to_string(),
1551 config: SignerFileConfigEnum::Local(LocalSignerFileConfig {
1552 path: "tests/utils/test_keys/unit-test-local-signer.json".to_string(),
1553 passphrase: PlainOrEnvValue::Plain {
1554 value: SecretString::new("test"),
1555 },
1556 }),
1557 }];
1558
1559 let relayers = vec![RelayerFileConfig {
1560 id: "test-relayer-1".to_string(),
1561 network_type: ConfigFileNetworkType::Evm,
1562 signer_id: "test-signer-1".to_string(),
1563 name: "test-relayer-1".to_string(),
1564 network: "test-network".to_string(),
1565 paused: false,
1566 policies: None,
1567 notification_id: None,
1568 custom_rpc_urls: None,
1569 }];
1570
1571 let notifications = vec![NotificationConfig {
1572 id: "test-notification-1".to_string(),
1573 r#type: NotificationType::Webhook,
1574 url: "https://hooks.slack.com/test1".to_string(),
1575 signing_key: None,
1576 }];
1577
1578 let plugins = vec![PluginFileConfig {
1579 id: "test-plugin-1".to_string(),
1580 path: "/app/plugins/test.ts".to_string(),
1581 timeout: None,
1582 emit_logs: false,
1583 emit_traces: false,
1584 allow_get_invocation: false,
1585 config: None,
1586 raw_response: false,
1587 forward_logs: false,
1588 }];
1589
1590 let config = Config {
1592 signers,
1593 relayers,
1594 notifications,
1595 networks: NetworksFileConfig::new(vec![]).unwrap(),
1596 plugins: Some(plugins),
1597 };
1598
1599 let signer_repo = Arc::new(InMemorySignerRepository::default());
1601 let relayer_repo = Arc::new(RelayerRepositoryStorage::new_in_memory());
1602 let notification_repo = Arc::new(InMemoryNotificationRepository::default());
1603 let network_repo = Arc::new(InMemoryNetworkRepository::default());
1604 let transaction_repo = Arc::new(TransactionRepositoryStorage::InMemory(
1605 InMemoryTransactionRepository::new(),
1606 ));
1607 let transaction_counter = Arc::new(InMemoryTransactionCounter::default());
1608 let plugin_repo = Arc::new(InMemoryPluginRepository::default());
1609 let api_key_repo = Arc::new(InMemoryApiKeyRepository::default());
1610
1611 let mut mock_job_producer = MockJobProducerTrait::new();
1613 mock_job_producer
1614 .expect_produce_transaction_request_job()
1615 .returning(|_, _| Box::pin(async { Ok(()) }));
1616 mock_job_producer
1617 .expect_produce_submit_transaction_job()
1618 .returning(|_, _| Box::pin(async { Ok(()) }));
1619 mock_job_producer
1620 .expect_produce_check_transaction_status_job()
1621 .returning(|_, _| Box::pin(async { Ok(()) }));
1622 mock_job_producer
1623 .expect_produce_send_notification_job()
1624 .returning(|_, _| Box::pin(async { Ok(()) }));
1625 let job_producer = Arc::new(mock_job_producer);
1626
1627 let app_state = ThinData(AppState {
1629 signer_repository: signer_repo.clone(),
1630 relayer_repository: relayer_repo.clone(),
1631 notification_repository: notification_repo.clone(),
1632 network_repository: network_repo.clone(),
1633 transaction_repository: transaction_repo.clone(),
1634 transaction_counter_store: transaction_counter.clone(),
1635 job_producer: job_producer.clone(),
1636 plugin_repository: plugin_repo.clone(),
1637 api_key_repository: api_key_repo.clone(),
1638 });
1639
1640 let server_config = Arc::new(crate::utils::mocks::mockutils::create_test_server_config(
1642 RepositoryStorageType::InMemory,
1643 ));
1644 process_config_file(config, server_config, &app_state).await?;
1645
1646 let stored_signers = signer_repo.list_all().await?;
1648 assert_eq!(stored_signers.len(), 1);
1649 assert_eq!(stored_signers[0].id, "test-signer-1");
1650
1651 let stored_relayers = relayer_repo.list_all().await?;
1652 assert_eq!(stored_relayers.len(), 1);
1653 assert_eq!(stored_relayers[0].id, "test-relayer-1");
1654 assert_eq!(stored_relayers[0].signer_id, "test-signer-1");
1655
1656 let stored_notifications = notification_repo.list_all().await?;
1657 assert_eq!(stored_notifications.len(), 1);
1658 assert_eq!(stored_notifications[0].id, "test-notification-1");
1659
1660 let stored_plugin = plugin_repo.get_by_id("test-plugin-1").await?;
1661 assert_eq!(stored_plugin.unwrap().path, "/app/plugins/test.ts");
1662
1663 Ok(())
1664 }
1665
1666 #[tokio::test]
1667 async fn test_process_signer_google_cloud_kms() {
1668 use crate::models::SecretString;
1669
1670 let signer = SignerFileConfig {
1671 id: "gcp-kms-signer".to_string(),
1672 config: SignerFileConfigEnum::GoogleCloudKms(GoogleCloudKmsSignerFileConfig {
1673 service_account: GoogleCloudKmsServiceAccountFileConfig {
1674 private_key: PlainOrEnvValue::Plain {
1675 value: SecretString::new("-----BEGIN EXAMPLE PRIVATE KEY-----\nFAKEKEYDATA\n-----END EXAMPLE PRIVATE KEY-----\n"),
1676 },
1677 client_email: PlainOrEnvValue::Plain {
1678 value: SecretString::new("test-service-account@example.com"),
1679 },
1680 private_key_id: PlainOrEnvValue::Plain {
1681 value: SecretString::new("fake-private-key-id"),
1682 },
1683 client_id: "fake-client-id".to_string(),
1684 project_id: "fake-project-id".to_string(),
1685 auth_uri: "https://accounts.google.com/o/oauth2/auth".to_string(),
1686 token_uri: "https://oauth2.googleapis.com/token".to_string(),
1687 client_x509_cert_url: "https://www.googleapis.com/robot/v1/metadata/x509/test-service-account%40example.com".to_string(),
1688 auth_provider_x509_cert_url: "https://www.googleapis.com/oauth2/v1/certs".to_string(),
1689 universe_domain: "googleapis.com".to_string(),
1690 },
1691 key: GoogleCloudKmsKeyFileConfig {
1692 location: "global".to_string(),
1693 key_id: "fake-key-id".to_string(),
1694 key_ring_id: "fake-key-ring-id".to_string(),
1695 key_version: 1,
1696 },
1697 }),
1698 };
1699
1700 let result = process_signer(&signer).await;
1701
1702 assert!(
1703 result.is_ok(),
1704 "Failed to process Google Cloud KMS signer: {:?}",
1705 result.err()
1706 );
1707 let model = result.unwrap();
1708
1709 assert_eq!(model.id, "gcp-kms-signer");
1710 }
1711
1712 fn create_test_server_config_with_settings(
1714 storage_type: RepositoryStorageType,
1715 reset_storage_on_start: bool,
1716 ) -> ServerConfig {
1717 ServerConfig {
1718 repository_storage_type: storage_type.clone(),
1719 reset_storage_on_start,
1720 ..create_test_server_config(storage_type)
1721 }
1722 }
1723
1724 async fn create_test_redis_pool() -> Option<Arc<Pool>> {
1725 let cfg = deadpool_redis::Config::from_url("redis://127.0.0.1:6379");
1726 let pool = cfg
1727 .builder()
1728 .ok()?
1729 .max_size(16)
1730 .runtime(deadpool_redis::Runtime::Tokio1)
1731 .build()
1732 .ok()?;
1733 Some(Arc::new(pool))
1734 }
1735
1736 fn create_minimal_test_config() -> Config {
1738 Config {
1739 signers: vec![SignerFileConfig {
1740 id: "test-signer-1".to_string(),
1741 config: SignerFileConfigEnum::Local(LocalSignerFileConfig {
1742 path: "tests/utils/test_keys/unit-test-local-signer.json".to_string(),
1743 passphrase: PlainOrEnvValue::Plain {
1744 value: SecretString::new("test"),
1745 },
1746 }),
1747 }],
1748 relayers: vec![RelayerFileConfig {
1749 id: "test-relayer-1".to_string(),
1750 network_type: ConfigFileNetworkType::Evm,
1751 signer_id: "test-signer-1".to_string(),
1752 name: "test-relayer-1".to_string(),
1753 network: "test-network".to_string(),
1754 paused: false,
1755 policies: None,
1756 notification_id: None,
1757 custom_rpc_urls: None,
1758 }],
1759 notifications: vec![NotificationConfig {
1760 id: "test-notification-1".to_string(),
1761 r#type: NotificationType::Webhook,
1762 url: "https://hooks.slack.com/test1".to_string(),
1763 signing_key: None,
1764 }],
1765 networks: NetworksFileConfig::new(vec![]).unwrap(),
1766 plugins: None,
1767 }
1768 }
1769
1770 #[tokio::test]
1771 async fn test_should_process_config_file_inmemory_storage() -> Result<()> {
1772 let config = create_minimal_test_config();
1773
1774 let server_config = Arc::new(create_test_server_config_with_settings(
1776 RepositoryStorageType::InMemory,
1777 false,
1778 ));
1779
1780 let app_state = ThinData(create_test_app_state());
1781 process_config_file(config.clone(), server_config.clone(), &app_state).await?;
1782
1783 let stored_relayers = app_state.relayer_repository.list_all().await?;
1784 assert_eq!(stored_relayers.len(), 1);
1785 assert_eq!(stored_relayers[0].id, "test-relayer-1");
1786
1787 let server_config2 = Arc::new(create_test_server_config_with_settings(
1789 RepositoryStorageType::InMemory,
1790 true,
1791 ));
1792
1793 let app_state2 = ThinData(create_test_app_state());
1794 process_config_file(config.clone(), server_config2, &app_state2).await?;
1795
1796 let stored_relayers = app_state2.relayer_repository.list_all().await?;
1797 assert_eq!(stored_relayers.len(), 1);
1798 assert_eq!(stored_relayers[0].id, "test-relayer-1");
1799
1800 Ok(())
1801 }
1802
1803 #[tokio::test]
1804 async fn test_reset_storage_on_start_clears_transaction_counter() -> Result<()> {
1805 let config = create_minimal_test_config();
1806 let server_config = Arc::new(create_test_server_config_with_settings(
1807 RepositoryStorageType::InMemory,
1808 true,
1809 ));
1810
1811 let app_state = ThinData(create_test_app_state());
1812
1813 app_state
1815 .transaction_counter_store
1816 .set("test-relayer-1", "0xABC", 999)
1817 .await
1818 .unwrap();
1819 assert_eq!(
1820 app_state
1821 .transaction_counter_store
1822 .get("test-relayer-1", "0xABC")
1823 .await
1824 .unwrap(),
1825 Some(999)
1826 );
1827
1828 process_config_file(config, server_config, &app_state).await?;
1830
1831 assert_eq!(
1833 app_state
1834 .transaction_counter_store
1835 .get("test-relayer-1", "0xABC")
1836 .await
1837 .unwrap(),
1838 None,
1839 "Transaction counter should be cleared when RESET_STORAGE_ON_START is true"
1840 );
1841
1842 Ok(())
1843 }
1844
1845 #[tokio::test]
1846 async fn test_should_process_config_file_redis_storage_empty_repositories() -> Result<()> {
1847 let config = create_minimal_test_config();
1848 let server_config = Arc::new(create_test_server_config_with_settings(
1849 RepositoryStorageType::Redis,
1850 false,
1851 ));
1852
1853 let app_state = ThinData(create_test_app_state());
1854 process_config_file(config, server_config, &app_state).await?;
1855
1856 let stored_relayers = app_state.relayer_repository.list_all().await?;
1857 assert_eq!(stored_relayers.len(), 1);
1858 assert_eq!(stored_relayers[0].id, "test-relayer-1");
1859
1860 Ok(())
1861 }
1862
1863 #[tokio::test]
1864 async fn test_validate_config_bootstrap_state_empty() -> Result<()> {
1865 let config = create_minimal_test_config();
1866 let app_state = ThinData(create_test_app_state());
1867
1868 let state = validate_config_bootstrap_state(&config, &app_state).await?;
1869
1870 assert_eq!(state, ConfigBootstrapState::Empty);
1871 Ok(())
1872 }
1873
1874 #[tokio::test]
1875 async fn test_validate_config_bootstrap_state_complete_without_marker() -> Result<()> {
1876 let config = create_minimal_test_config();
1877 let app_state = ThinData(create_test_app_state());
1878 let server_config =
1879 create_test_server_config_with_settings(RepositoryStorageType::Redis, false);
1880
1881 process_plugins(&config, &app_state).await?;
1882 process_signers(&config, &app_state).await?;
1883 process_notifications(&config, &app_state).await?;
1884 process_networks(&config, &app_state).await?;
1885 process_relayers(&config, &app_state).await?;
1886 process_api_key(&server_config, &app_state).await?;
1887
1888 let state = validate_config_bootstrap_state(&config, &app_state).await?;
1889
1890 assert_eq!(state, ConfigBootstrapState::Complete);
1891 Ok(())
1892 }
1893
1894 #[tokio::test]
1895 async fn test_validate_config_bootstrap_state_incomplete_without_marker() -> Result<()> {
1896 let config = create_minimal_test_config();
1897 let app_state = ThinData(create_test_app_state());
1898
1899 process_plugins(&config, &app_state).await?;
1900 process_signers(&config, &app_state).await?;
1901 process_notifications(&config, &app_state).await?;
1902 process_networks(&config, &app_state).await?;
1903
1904 let state = validate_config_bootstrap_state(&config, &app_state).await?;
1905
1906 match state {
1907 ConfigBootstrapState::Incomplete { missing } => {
1908 assert!(missing.iter().any(|entry| entry == "relayer repository"));
1909 assert!(missing.iter().any(|entry| entry == "api key repository"));
1910 }
1911 other => panic!("Expected incomplete bootstrap state, got {other:?}"),
1912 }
1913
1914 Ok(())
1915 }
1916
1917 #[tokio::test]
1918 async fn test_process_config_file_errors_when_redis_bootstrap_state_is_incomplete() -> Result<()>
1919 {
1920 let config = create_minimal_test_config();
1921 let server_config = Arc::new(create_test_server_config_with_settings(
1922 RepositoryStorageType::Redis,
1923 false,
1924 ));
1925
1926 let app_state = ThinData(create_test_app_state());
1927
1928 app_state
1929 .relayer_repository
1930 .create(create_mock_relayer("existing-relayer".to_string(), false))
1931 .await?;
1932
1933 let err = process_config_file(config, server_config, &app_state)
1934 .await
1935 .expect_err("partial bootstrap state should fail closed");
1936 assert!(err
1937 .to_string()
1938 .contains("incomplete bootstrap-managed config state"));
1939
1940 Ok(())
1941 }
1942
1943 #[tokio::test]
1944 async fn test_should_error_for_partial_config_file_redis_storage_populated_repositories(
1945 ) -> Result<()> {
1946 let config = create_minimal_test_config();
1947 let server_config = Arc::new(create_test_server_config_with_settings(
1948 RepositoryStorageType::Redis,
1949 false,
1950 ));
1951
1952 let app_state1 = ThinData(create_test_app_state());
1954 let app_state2 = ThinData(create_test_app_state());
1955
1956 let existing_relayer1 = create_mock_relayer("existing-relayer".to_string(), false);
1958 let existing_relayer2 = create_mock_relayer("existing-relayer".to_string(), false);
1959 app_state1
1960 .relayer_repository
1961 .create(existing_relayer1)
1962 .await?;
1963 app_state2
1964 .relayer_repository
1965 .create(existing_relayer2)
1966 .await?;
1967
1968 assert!(app_state1.relayer_repository.has_entries().await?);
1970 assert!(!app_state1.signer_repository.has_entries().await?);
1971
1972 let err = process_config_file(config, server_config, &app_state2)
1973 .await
1974 .expect_err("partial bootstrap state should fail closed");
1975 assert!(err
1976 .to_string()
1977 .contains("incomplete bootstrap-managed config state"));
1978
1979 Ok(())
1980 }
1981
1982 #[tokio::test]
1983 async fn test_should_process_config_file_redis_storage_with_reset_flag() -> Result<()> {
1984 let config = create_minimal_test_config();
1985 let server_config = Arc::new(create_test_server_config_with_settings(
1986 RepositoryStorageType::Redis,
1987 true, ));
1989
1990 let app_state = ThinData(create_test_app_state());
1991
1992 let existing_relayer = create_mock_relayer("existing-relayer".to_string(), false);
1994 let existing_signer = create_mock_signer();
1995 app_state
1996 .relayer_repository
1997 .create(existing_relayer)
1998 .await?;
1999 app_state.signer_repository.create(existing_signer).await?;
2000
2001 process_config_file(config, server_config, &app_state).await?;
2003
2004 let stored_relayer = app_state
2005 .relayer_repository
2006 .get_by_id("existing-relayer".to_string())
2007 .await;
2008 assert!(
2009 stored_relayer.is_err(),
2010 "Existing relayer should not be found"
2011 );
2012
2013 let stored_signer = app_state
2014 .signer_repository
2015 .get_by_id("existing-signer".to_string())
2016 .await;
2017 assert!(
2018 stored_signer.is_err(),
2019 "Existing signer should not be found"
2020 );
2021
2022 Ok(())
2023 }
2024
2025 #[tokio::test]
2026 #[ignore]
2027 async fn test_process_if_needed_after_lock_skips_populated_redis_without_marker() -> Result<()>
2028 {
2029 let conn = create_test_redis_pool()
2030 .await
2031 .expect("Redis connection required");
2032 let prefix = "test_config_processing_skip_populated_without_marker";
2033 let bootstrap_meta_key = format!("{prefix}:bootstrap_meta");
2034
2035 {
2036 let mut conn_clone = conn.get().await.expect("Failed to get connection");
2037 let _: Result<(), _> =
2038 redis::AsyncCommands::del(&mut conn_clone, &bootstrap_meta_key).await;
2039 }
2040
2041 let config = create_minimal_test_config();
2042 let server_config =
2043 create_test_server_config_with_settings(RepositoryStorageType::Redis, false);
2044 let app_state = ThinData(create_test_app_state());
2045
2046 process_signers(&config, &app_state).await?;
2047 process_notifications(&config, &app_state).await?;
2048 process_networks(&config, &app_state).await?;
2049 process_relayers(&config, &app_state).await?;
2050 process_api_key(&server_config, &app_state).await?;
2051
2052 process_if_needed_after_lock(&config, &server_config, &app_state, &conn, prefix).await?;
2053
2054 let stored_relayers = app_state.relayer_repository.list_all().await?;
2055 assert_eq!(stored_relayers.len(), 1);
2056 assert_eq!(stored_relayers[0].id, "test-relayer-1");
2057 assert!(is_config_processing_completed(&conn, prefix).await?);
2058
2059 {
2060 let mut conn_clone = conn.get().await.expect("Failed to get connection");
2061 let _: Result<(), _> =
2062 redis::AsyncCommands::del(&mut conn_clone, &bootstrap_meta_key).await;
2063 }
2064
2065 Ok(())
2066 }
2067
2068 #[tokio::test]
2069 #[ignore]
2070 async fn test_process_if_needed_after_lock_recovers_when_in_progress_marker_exists(
2071 ) -> Result<()> {
2072 let conn = create_test_redis_pool()
2073 .await
2074 .expect("Redis connection required");
2075 let prefix = "test_config_processing_recover_in_progress";
2076 let bootstrap_meta_key = format!("{prefix}:bootstrap_meta");
2077
2078 {
2079 let mut conn_clone = conn.get().await.expect("Failed to get connection");
2080 let _: Result<(), _> =
2081 redis::AsyncCommands::del(&mut conn_clone, &bootstrap_meta_key).await;
2082 }
2083
2084 let config = create_minimal_test_config();
2085 let server_config =
2086 create_test_server_config_with_settings(RepositoryStorageType::Redis, false);
2087 let app_state = ThinData(create_test_app_state());
2088
2089 process_signers(&config, &app_state).await?;
2090 process_notifications(&config, &app_state).await?;
2091 process_networks(&config, &app_state).await?;
2092 process_relayers(&config, &app_state).await?;
2093 process_api_key(&server_config, &app_state).await?;
2094 set_config_processing_in_progress(&conn, prefix).await?;
2095
2096 process_if_needed_after_lock(&config, &server_config, &app_state, &conn, prefix).await?;
2097
2098 let stored_relayers = app_state.relayer_repository.list_all().await?;
2099 assert_eq!(stored_relayers.len(), 1);
2100 assert_eq!(stored_relayers[0].id, "test-relayer-1");
2101 assert!(is_config_processing_completed(&conn, prefix).await?);
2102
2103 {
2104 let mut conn_clone = conn.get().await.expect("Failed to get connection");
2105 let _: Result<(), _> =
2106 redis::AsyncCommands::del(&mut conn_clone, &bootstrap_meta_key).await;
2107 }
2108
2109 Ok(())
2110 }
2111
2112 #[tokio::test]
2113 #[ignore]
2114 async fn test_recover_config_processing_after_timeout_waits_for_extended_completion() {
2115 let conn = create_test_redis_pool()
2116 .await
2117 .expect("Redis connection required");
2118 let prefix = "test_config_recovery_extended_wait";
2119 let lock_key = format!("{prefix}:lock:{CONFIG_PROCESSING_LOCK_NAME}");
2120 let bootstrap_meta_key = format!("{prefix}:bootstrap_meta");
2121
2122 {
2123 let mut conn_clone = conn.get().await.expect("Failed to get connection");
2124 let _: Result<(), _> = redis::AsyncCommands::del(&mut conn_clone, &lock_key).await;
2125 let _: Result<(), _> =
2126 redis::AsyncCommands::del(&mut conn_clone, &bootstrap_meta_key).await;
2127 }
2128
2129 let config = create_minimal_test_config();
2130 let server_config =
2131 create_test_server_config_with_settings(RepositoryStorageType::Redis, false);
2132 let app_state = ThinData(create_test_app_state());
2133
2134 let lock = DistributedLock::new(
2135 conn.clone(),
2136 &lock_key,
2137 Duration::from_secs(BOOTSTRAP_LOCK_TTL_SECS),
2138 );
2139 let guard = lock
2140 .try_acquire()
2141 .await
2142 .expect("Should acquire lock")
2143 .expect("Lock should be available");
2144
2145 let conn_for_task = conn.clone();
2146 let prefix_for_task = prefix.to_string();
2147 tokio::spawn(async move {
2148 tokio::time::sleep(Duration::from_millis(500)).await;
2149 set_config_processing_completed(&conn_for_task, &prefix_for_task)
2150 .await
2151 .expect("Should set config processing completed");
2152 guard.release().await.expect("Should release lock");
2153 });
2154
2155 let result = recover_config_processing_after_timeout(
2156 &config,
2157 &server_config,
2158 &app_state,
2159 &conn,
2160 prefix,
2161 )
2162 .await;
2163
2164 assert!(
2165 result.is_ok(),
2166 "Should succeed when completion is observed during extended wait: {:?}",
2167 result
2168 );
2169
2170 {
2171 let mut conn_clone = conn.get().await.expect("Failed to get connection");
2172 let _: Result<(), _> = redis::AsyncCommands::del(&mut conn_clone, &lock_key).await;
2173 let _: Result<(), _> =
2174 redis::AsyncCommands::del(&mut conn_clone, &bootstrap_meta_key).await;
2175 }
2176 }
2177}