1use std::sync::Arc;
2use std::time::Duration;
3
4use color_eyre::Result;
5use deadpool_redis::{Config, Pool, Runtime};
6use tracing::{debug, info, warn};
7
8use crate::config::ServerConfig;
9
10pub const BOOTSTRAP_LOCK_TTL_SECS: u64 = 120;
17
18pub const LOCK_WAIT_MAX_SECS: u64 = 130;
21
22pub const LOCK_POLL_INTERVAL_MS: u64 = 500;
24
25#[derive(Clone, Debug)]
34pub struct RedisConnections {
35 primary_pool: Arc<Pool>,
37 reader_pool: Arc<Pool>,
39}
40
41impl RedisConnections {
42 pub fn new_single_pool(pool: Arc<Pool>) -> Self {
49 Self {
50 primary_pool: pool.clone(),
51 reader_pool: pool,
52 }
53 }
54
55 pub fn primary(&self) -> &Arc<Pool> {
60 &self.primary_pool
61 }
62
63 pub fn reader(&self) -> &Arc<Pool> {
70 &self.reader_pool
71 }
72}
73
74async fn create_pool(url: &str, pool_max_size: usize, config: &ServerConfig) -> Result<Arc<Pool>> {
76 let cfg = Config::from_url(url);
77
78 let pool = cfg
79 .builder()
80 .map_err(|e| eyre::eyre!("Failed to create Redis pool builder for {}: {}", url, e))?
81 .max_size(pool_max_size)
82 .wait_timeout(Some(Duration::from_millis(config.redis_pool_timeout_ms)))
83 .create_timeout(Some(Duration::from_millis(
84 config.redis_connection_timeout_ms,
85 )))
86 .recycle_timeout(Some(Duration::from_millis(
87 config.redis_connection_timeout_ms,
88 )))
89 .runtime(Runtime::Tokio1)
90 .build()
91 .map_err(|e| eyre::eyre!("Failed to build Redis pool for {}: {}", url, e))?;
92
93 let conn = pool
95 .get()
96 .await
97 .map_err(|e| eyre::eyre!("Failed to get initial Redis connection from {}: {}", url, e))?;
98 drop(conn);
99
100 Ok(Arc::new(pool))
101}
102
103pub async fn initialize_redis_connections(config: &ServerConfig) -> Result<Arc<RedisConnections>> {
132 let primary_pool_size = config.redis_pool_max_size;
133 let primary_pool = create_pool(&config.redis_url, primary_pool_size, config).await?;
134
135 info!(
136 primary_url = %config.redis_url,
137 primary_pool_size = %primary_pool_size,
138 "initializing primary Redis connection pool"
139 );
140 let reader_pool = match &config.redis_reader_url {
141 Some(reader_url) if !reader_url.is_empty() => {
142 let reader_pool_size = config.redis_reader_pool_max_size;
143
144 info!(
145 primary_url = %config.redis_url,
146 reader_url = %reader_url,
147 primary_pool_size = %primary_pool_size,
148 reader_pool_size = %reader_pool_size,
149 "Using separate reader endpoint for read operations"
150 );
151 create_pool(reader_url, reader_pool_size, config).await?
152 }
153 _ => {
154 debug!(
155 primary_url = %config.redis_url,
156 pool_size = %primary_pool_size,
157 "No reader URL configured, using primary for all operations"
158 );
159 primary_pool.clone()
160 }
161 };
162
163 Ok(Arc::new(RedisConnections {
164 primary_pool,
165 reader_pool,
166 }))
167}
168
169#[derive(Clone)]
201pub struct DistributedLock {
202 pool: Arc<Pool>,
203 lock_key: String,
204 ttl: Duration,
205}
206
207impl DistributedLock {
208 pub fn new(pool: Arc<Pool>, lock_key: &str, ttl: Duration) -> Self {
216 Self {
217 pool,
218 lock_key: lock_key.to_string(),
219 ttl,
220 }
221 }
222
223 pub async fn try_acquire(&self) -> Result<Option<LockGuard>> {
238 let lock_value = generate_lock_value();
239 let mut conn = self.pool.get().await?;
240
241 let result: Option<String> = redis::cmd("SET")
243 .arg(&self.lock_key)
244 .arg(&lock_value)
245 .arg("NX")
246 .arg("EX")
247 .arg(self.ttl.as_secs())
248 .query_async(&mut conn)
249 .await?;
250
251 match result {
252 Some(_) => {
253 debug!(
254 lock_key = %self.lock_key,
255 ttl_secs = %self.ttl.as_secs(),
256 "distributed lock acquired"
257 );
258 Ok(Some(LockGuard {
259 release_info: Some(LockReleaseInfo {
260 pool: self.pool.clone(),
261 lock_key: self.lock_key.clone(),
262 lock_value,
263 }),
264 }))
265 }
266 None => {
267 debug!(
268 lock_key = %self.lock_key,
269 "distributed lock already held by another instance"
270 );
271 Ok(None)
272 }
273 }
274 }
275}
276
277pub struct LockGuard {
291 release_info: Option<LockReleaseInfo>,
295}
296
297struct LockReleaseInfo {
299 pool: Arc<Pool>,
300 lock_key: String,
301 lock_value: String,
302}
303
304impl LockGuard {
305 pub async fn release(mut self) -> Result<bool> {
318 let info = self
320 .release_info
321 .take()
322 .expect("release_info should always be Some before release");
323
324 Self::do_release(info).await
325 }
326
327 async fn do_release(info: LockReleaseInfo) -> Result<bool> {
329 let mut conn = info.pool.get().await?;
330
331 let script = r#"
334 if redis.call("GET", KEYS[1]) == ARGV[1] then
335 return redis.call("DEL", KEYS[1])
336 else
337 return 0
338 end
339 "#;
340
341 let result: i32 = redis::Script::new(script)
342 .key(&info.lock_key)
343 .arg(&info.lock_value)
344 .invoke_async(&mut conn)
345 .await?;
346
347 if result == 1 {
348 debug!(lock_key = %info.lock_key, "distributed lock released");
349 Ok(true)
350 } else {
351 warn!(
352 lock_key = %info.lock_key,
353 "distributed lock release failed - lock already expired or owned by another instance"
354 );
355 Ok(false)
356 }
357 }
358}
359
360impl Drop for LockGuard {
361 fn drop(&mut self) {
362 if let Some(info) = self.release_info.take() {
364 tokio::spawn(async move {
367 if let Err(e) = LockGuard::do_release(info).await {
368 warn!(error = %e, "failed to release distributed lock in drop, will expire after TTL");
369 }
370 });
371 }
372 }
373}
374
375fn generate_lock_value() -> String {
381 use std::sync::atomic::{AtomicU64, Ordering};
382
383 static LOCK_VALUE_COUNTER: AtomicU64 = AtomicU64::new(0);
384 let process_id = std::process::id();
385 let timestamp = std::time::SystemTime::now()
386 .duration_since(std::time::UNIX_EPOCH)
387 .map(|d| d.as_nanos())
388 .unwrap_or(0);
389 let counter = LOCK_VALUE_COUNTER.fetch_add(1, Ordering::Relaxed);
390
391 format!("{process_id}:{timestamp}:{counter}")
392}
393
394const RELAYER_SYNC_META_KEY: &str = "relayer_sync_meta";
404
405const GLOBAL_INIT_FIELD: &str = "global:init_completed";
407
408const BOOTSTRAP_META_KEY: &str = "bootstrap_meta";
410
411const CONFIG_PROCESSING_STATUS_FIELD: &str = "config_processing:status";
413
414const CONFIG_PROCESSING_STARTED_AT_FIELD: &str = "config_processing:started_at";
416
417const CONFIG_PROCESSING_COMPLETED_AT_FIELD: &str = "config_processing:completed_at";
419
420const CONFIG_PROCESSING_STATUS_IN_PROGRESS: &str = "in_progress";
421const CONFIG_PROCESSING_STATUS_COMPLETED: &str = "completed";
422
423pub async fn set_config_processing_in_progress(pool: &Arc<Pool>, prefix: &str) -> Result<()> {
428 use chrono::Utc;
429 use redis::AsyncCommands;
430
431 let mut conn = pool.get().await?;
432 let hash_key = format!("{prefix}:{BOOTSTRAP_META_KEY}");
433 let timestamp = Utc::now().timestamp();
434
435 conn.hset_multiple::<_, _, _, ()>(
436 &hash_key,
437 &[
438 (
439 CONFIG_PROCESSING_STATUS_FIELD,
440 CONFIG_PROCESSING_STATUS_IN_PROGRESS,
441 ),
442 (CONFIG_PROCESSING_STARTED_AT_FIELD, ×tamp.to_string()),
443 ],
444 )
445 .await
446 .map_err(|e| eyre::eyre!("Failed to set config processing in progress: {}", e))?;
447
448 conn.hdel::<_, _, ()>(&hash_key, CONFIG_PROCESSING_COMPLETED_AT_FIELD)
449 .await
450 .map_err(|e| eyre::eyre!("Failed to clear config processing completion time: {}", e))?;
451
452 debug!(
453 timestamp = %timestamp,
454 "recorded config processing in-progress marker"
455 );
456
457 Ok(())
458}
459
460pub async fn set_config_processing_completed(pool: &Arc<Pool>, prefix: &str) -> Result<()> {
462 use chrono::Utc;
463 use redis::AsyncCommands;
464
465 let mut conn = pool.get().await?;
466 let hash_key = format!("{prefix}:{BOOTSTRAP_META_KEY}");
467 let timestamp = Utc::now().timestamp();
468
469 conn.hset_multiple::<_, _, _, ()>(
470 &hash_key,
471 &[
472 (
473 CONFIG_PROCESSING_STATUS_FIELD,
474 CONFIG_PROCESSING_STATUS_COMPLETED,
475 ),
476 (CONFIG_PROCESSING_COMPLETED_AT_FIELD, ×tamp.to_string()),
477 ],
478 )
479 .await
480 .map_err(|e| eyre::eyre!("Failed to set config processing completed: {}", e))?;
481
482 debug!(
483 timestamp = %timestamp,
484 "recorded config processing completed marker"
485 );
486
487 Ok(())
488}
489
490pub async fn is_config_processing_completed(pool: &Arc<Pool>, prefix: &str) -> Result<bool> {
492 use redis::AsyncCommands;
493
494 let mut conn = pool.get().await?;
495 let hash_key = format!("{prefix}:{BOOTSTRAP_META_KEY}");
496
497 let status: Option<String> = conn
498 .hget(&hash_key, CONFIG_PROCESSING_STATUS_FIELD)
499 .await
500 .map_err(|e| eyre::eyre!("Failed to get config processing status: {}", e))?;
501
502 Ok(matches!(
503 status.as_deref(),
504 Some(CONFIG_PROCESSING_STATUS_COMPLETED)
505 ))
506}
507
508pub async fn is_config_processing_in_progress(pool: &Arc<Pool>, prefix: &str) -> Result<bool> {
510 use redis::AsyncCommands;
511
512 let mut conn = pool.get().await?;
513 let hash_key = format!("{prefix}:{BOOTSTRAP_META_KEY}");
514
515 let status: Option<String> = conn
516 .hget(&hash_key, CONFIG_PROCESSING_STATUS_FIELD)
517 .await
518 .map_err(|e| eyre::eyre!("Failed to get config processing status: {}", e))?;
519
520 Ok(matches!(
521 status.as_deref(),
522 Some(CONFIG_PROCESSING_STATUS_IN_PROGRESS)
523 ))
524}
525
526pub async fn set_relayer_last_sync(pool: &Arc<Pool>, prefix: &str, relayer_id: &str) -> Result<()> {
541 use chrono::Utc;
542 use redis::AsyncCommands;
543
544 let mut conn = pool.get().await?;
545 let hash_key = format!("{prefix}:{RELAYER_SYNC_META_KEY}");
546 let field = format!("{relayer_id}:last_sync");
547 let timestamp = Utc::now().timestamp();
548
549 conn.hset::<_, _, _, ()>(&hash_key, &field, timestamp)
550 .await
551 .map_err(|e| eyre::eyre!("Failed to set relayer last sync: {}", e))?;
552
553 debug!(
554 relayer_id = %relayer_id,
555 timestamp = %timestamp,
556 "recorded relayer last sync time"
557 );
558
559 Ok(())
560}
561
562pub async fn get_relayer_last_sync(
574 pool: &Arc<Pool>,
575 prefix: &str,
576 relayer_id: &str,
577) -> Result<Option<chrono::DateTime<chrono::Utc>>> {
578 use chrono::{TimeZone, Utc};
579 use redis::AsyncCommands;
580
581 let mut conn = pool.get().await?;
582 let hash_key = format!("{prefix}:{RELAYER_SYNC_META_KEY}");
583 let field = format!("{relayer_id}:last_sync");
584
585 let timestamp: Option<i64> = conn
586 .hget(&hash_key, &field)
587 .await
588 .map_err(|e| eyre::eyre!("Failed to get relayer last sync: {}", e))?;
589
590 Ok(timestamp.and_then(|ts| Utc.timestamp_opt(ts, 0).single()))
591}
592
593pub async fn is_relayer_recently_synced(
609 pool: &Arc<Pool>,
610 prefix: &str,
611 relayer_id: &str,
612 threshold_secs: u64,
613) -> Result<bool> {
614 use chrono::Utc;
615
616 let last_sync = get_relayer_last_sync(pool, prefix, relayer_id).await?;
617
618 match last_sync {
619 Some(sync_time) => {
620 let elapsed = Utc::now().signed_duration_since(sync_time);
621 let is_recent = elapsed.num_seconds() < threshold_secs as i64;
622
623 if is_recent {
624 debug!(
625 relayer_id = %relayer_id,
626 last_sync = %sync_time.to_rfc3339(),
627 elapsed_secs = %elapsed.num_seconds(),
628 threshold_secs = %threshold_secs,
629 "relayer was recently synced"
630 );
631 }
632
633 Ok(is_recent)
634 }
635 None => Ok(false),
636 }
637}
638
639pub async fn set_global_init_completed(pool: &Arc<Pool>, prefix: &str) -> Result<()> {
661 use chrono::Utc;
662 use redis::AsyncCommands;
663
664 let mut conn = pool.get().await?;
665 let hash_key = format!("{prefix}:{RELAYER_SYNC_META_KEY}");
666 let timestamp = Utc::now().timestamp();
667
668 conn.hset::<_, _, _, ()>(&hash_key, GLOBAL_INIT_FIELD, timestamp)
669 .await
670 .map_err(|e| eyre::eyre!("Failed to set global init completed: {}", e))?;
671
672 debug!(timestamp = %timestamp, "recorded global initialization completion time");
673 Ok(())
674}
675
676pub async fn is_global_init_recently_completed(
691 pool: &Arc<Pool>,
692 prefix: &str,
693 threshold_secs: u64,
694) -> Result<bool> {
695 use chrono::Utc;
696 use redis::AsyncCommands;
697
698 let mut conn = pool.get().await?;
699 let hash_key = format!("{prefix}:{RELAYER_SYNC_META_KEY}");
700
701 let timestamp: Option<i64> = conn
702 .hget(&hash_key, GLOBAL_INIT_FIELD)
703 .await
704 .map_err(|e| eyre::eyre!("Failed to get global init time: {}", e))?;
705
706 match timestamp {
707 Some(ts) => {
708 let now = Utc::now().timestamp();
709 let elapsed = now - ts;
710 let is_recent = elapsed < threshold_secs as i64;
711
712 if is_recent {
713 debug!(
714 elapsed_secs = %elapsed,
715 threshold_secs = %threshold_secs,
716 "global initialization recently completed"
717 );
718 }
719
720 Ok(is_recent)
721 }
722 None => Ok(false),
723 }
724}
725
726#[cfg(test)]
727mod tests {
728 use super::*;
729
730 #[test]
731 fn test_generate_lock_value_is_unique() {
732 let value1 = generate_lock_value();
733 let value2 = generate_lock_value();
734
735 assert_ne!(value1, value2);
737 }
738
739 #[test]
740 fn test_generate_lock_value_contains_process_id() {
741 let value = generate_lock_value();
742
743 assert!(value.contains(':'));
745
746 let parts: Vec<&str> = value.split(':').collect();
748 assert_eq!(parts.len(), 3);
749
750 assert!(parts[0].parse::<u32>().is_ok());
752 }
753
754 #[test]
755 fn test_distributed_lock_key_format() {
756 let prefix = "myrelayer";
758 let lock_name = "transaction_cleanup";
759 let lock_key = format!("{prefix}:lock:{lock_name}");
760 assert_eq!(lock_key, "myrelayer:lock:transaction_cleanup");
761 }
762
763 #[test]
764 fn test_distributed_lock_key_format_with_complex_prefix() {
765 let prefix = "oz-relayer-prod";
767 let lock_name = "transaction_cleanup";
768 let lock_key = format!("{prefix}:lock:{lock_name}");
769 assert_eq!(lock_key, "oz-relayer-prod:lock:transaction_cleanup");
770 }
771
772 #[test]
773 fn test_distributed_lock_key_uses_exact_key() {
774 let lock_key = "myprefix:lock:myoperation";
776 assert_eq!(lock_key, "myprefix:lock:myoperation");
778 }
779
780 mod redis_connections_tests {
785 use super::*;
786
787 #[test]
788 fn test_new_single_pool_returns_same_pool_for_both() {
789 let cfg = Config::from_url("redis://127.0.0.1:6379");
792 let pool = cfg
793 .builder()
794 .expect("Failed to create pool builder")
795 .max_size(16)
796 .runtime(Runtime::Tokio1)
797 .build()
798 .expect("Failed to build pool");
799 let pool = Arc::new(pool);
800
801 let connections = RedisConnections::new_single_pool(pool.clone());
802
803 assert!(Arc::ptr_eq(connections.primary(), &pool));
805 assert!(Arc::ptr_eq(connections.reader(), &pool));
806 assert!(Arc::ptr_eq(connections.primary(), connections.reader()));
807 }
808
809 #[test]
810 fn test_redis_connections_clone() {
811 let cfg = Config::from_url("redis://127.0.0.1:6379");
813 let pool = cfg
814 .builder()
815 .expect("Failed to create pool builder")
816 .max_size(16)
817 .runtime(Runtime::Tokio1)
818 .build()
819 .expect("Failed to build pool");
820 let pool = Arc::new(pool);
821
822 let connections = RedisConnections::new_single_pool(pool);
823 let cloned = connections.clone();
824
825 assert!(Arc::ptr_eq(connections.primary(), cloned.primary()));
827 assert!(Arc::ptr_eq(connections.reader(), cloned.reader()));
828 }
829
830 #[test]
831 fn test_redis_connections_debug() {
832 let cfg = Config::from_url("redis://127.0.0.1:6379");
834 let pool = cfg
835 .builder()
836 .expect("Failed to create pool builder")
837 .max_size(16)
838 .runtime(Runtime::Tokio1)
839 .build()
840 .expect("Failed to build pool");
841 let pool = Arc::new(pool);
842
843 let connections = RedisConnections::new_single_pool(pool);
844 let debug_str = format!("{connections:?}");
845
846 assert!(debug_str.contains("RedisConnections"));
847 }
848 }
849
850 async fn create_test_redis_connection() -> Option<Arc<Pool>> {
858 let cfg = Config::from_url("redis://127.0.0.1:6379");
859 let pool = cfg
860 .builder()
861 .ok()?
862 .max_size(16)
863 .runtime(Runtime::Tokio1)
864 .build()
865 .ok()?;
866 Some(Arc::new(pool))
867 }
868
869 mod integration {
870 use super::*;
871
872 #[tokio::test]
873 #[ignore] async fn test_distributed_lock_acquire_and_release() {
875 let conn = create_test_redis_connection()
876 .await
877 .expect("Redis connection required for this test");
878
879 let lock =
880 DistributedLock::new(conn, "test:lock:acquire_release", Duration::from_secs(60));
881
882 let guard = lock.try_acquire().await.expect("Redis error");
884 assert!(guard.is_some(), "Should acquire lock when not held");
885
886 let released = guard.unwrap().release().await.expect("Redis error");
888 assert!(released, "Should successfully release the lock");
889 }
890
891 #[tokio::test]
892 #[ignore] async fn test_distributed_lock_prevents_double_acquisition() {
894 let conn = create_test_redis_connection()
895 .await
896 .expect("Redis connection required for this test");
897
898 let lock1 = DistributedLock::new(
899 conn.clone(),
900 "test:lock:double_acquire",
901 Duration::from_secs(60),
902 );
903 let lock2 =
904 DistributedLock::new(conn, "test:lock:double_acquire", Duration::from_secs(60));
905
906 let guard1 = lock1.try_acquire().await.expect("Redis error");
908 assert!(guard1.is_some(), "First acquisition should succeed");
909
910 let guard2 = lock2.try_acquire().await.expect("Redis error");
912 assert!(
913 guard2.is_none(),
914 "Second acquisition should fail - lock already held"
915 );
916
917 guard1.unwrap().release().await.expect("Redis error");
919
920 let guard2_retry = lock2.try_acquire().await.expect("Redis error");
922 assert!(guard2_retry.is_some(), "Should acquire lock after release");
923
924 guard2_retry.unwrap().release().await.expect("Redis error");
926 }
927
928 #[tokio::test]
929 #[ignore] async fn test_distributed_lock_expires_after_ttl() {
931 let conn = create_test_redis_connection()
932 .await
933 .expect("Redis connection required for this test");
934
935 let lock =
937 DistributedLock::new(conn.clone(), "test:lock:ttl_expiry", Duration::from_secs(1));
938
939 let guard = lock.try_acquire().await.expect("Redis error");
941 assert!(guard.is_some(), "Should acquire lock");
942
943 drop(guard);
945
946 tokio::time::sleep(Duration::from_secs(2)).await;
948
949 let lock2 = DistributedLock::new(conn, "test:lock:ttl_expiry", Duration::from_secs(60));
951 let guard2 = lock2.try_acquire().await.expect("Redis error");
952 assert!(guard2.is_some(), "Should acquire lock after TTL expiry");
953
954 if let Some(g) = guard2 {
956 g.release().await.expect("Redis error");
957 }
958 }
959
960 #[tokio::test]
961 #[ignore] async fn test_distributed_lock_release_only_own_lock() {
963 let conn = create_test_redis_connection()
964 .await
965 .expect("Redis connection required for this test");
966
967 let lock = DistributedLock::new(
969 conn.clone(),
970 "test:lock:release_own",
971 Duration::from_secs(1),
972 );
973
974 let guard = lock.try_acquire().await.expect("Redis error");
976 assert!(guard.is_some(), "Should acquire lock");
977
978 tokio::time::sleep(Duration::from_secs(2)).await;
980
981 let released = guard.unwrap().release().await.expect("Redis error");
983 assert!(!released, "Should not release expired lock");
984 }
985
986 #[tokio::test]
987 #[ignore] async fn test_distributed_lock_with_prefix() {
989 let conn = create_test_redis_connection()
990 .await
991 .expect("Redis connection required for this test");
992
993 let prefix = "test_prefix";
996 let lock_name = "cleanup";
997 let lock_key = format!("{prefix}:lock:{lock_name}");
998
999 let lock = DistributedLock::new(conn, &lock_key, Duration::from_secs(60));
1000
1001 let guard = lock.try_acquire().await.expect("Redis error");
1002 assert!(guard.is_some(), "Should acquire prefixed lock");
1003
1004 guard.unwrap().release().await.expect("Redis error");
1006 }
1007
1008 #[tokio::test]
1009 #[ignore] async fn test_distributed_lock_drop_releases_lock() {
1011 let conn = create_test_redis_connection()
1012 .await
1013 .expect("Redis connection required for this test");
1014
1015 let lock_key = "test:lock:drop_release";
1016
1017 {
1019 let lock = DistributedLock::new(conn.clone(), lock_key, Duration::from_secs(60));
1020 let guard = lock.try_acquire().await.expect("Redis error");
1021 assert!(guard.is_some(), "Should acquire lock");
1022
1023 }
1025
1026 tokio::time::sleep(Duration::from_millis(100)).await;
1028
1029 let lock2 = DistributedLock::new(conn, lock_key, Duration::from_secs(60));
1031 let guard2 = lock2.try_acquire().await.expect("Redis error");
1032 assert!(
1033 guard2.is_some(),
1034 "Should acquire lock after previous guard was dropped"
1035 );
1036
1037 if let Some(g) = guard2 {
1039 g.release().await.expect("Redis error");
1040 }
1041 }
1042
1043 #[tokio::test]
1044 #[ignore] async fn test_distributed_lock_explicit_release_prevents_double_release() {
1046 let conn = create_test_redis_connection()
1047 .await
1048 .expect("Redis connection required for this test");
1049
1050 let lock_key = "test:lock:no_double_release";
1051 let lock = DistributedLock::new(conn.clone(), lock_key, Duration::from_secs(60));
1052
1053 let guard = lock.try_acquire().await.expect("Redis error");
1054 assert!(guard.is_some(), "Should acquire lock");
1055
1056 let released = guard.unwrap().release().await.expect("Redis error");
1058 assert!(released, "Should release successfully");
1059
1060 tokio::time::sleep(Duration::from_millis(50)).await;
1063
1064 let lock2 = DistributedLock::new(conn, lock_key, Duration::from_secs(60));
1065 let guard2 = lock2.try_acquire().await.expect("Redis error");
1066 assert!(
1067 guard2.is_some(),
1068 "Should acquire lock - no double-release issue"
1069 );
1070
1071 if let Some(g) = guard2 {
1073 g.release().await.expect("Redis error");
1074 }
1075 }
1076
1077 #[tokio::test]
1078 #[ignore] async fn test_distributed_lock_drop_on_early_return() {
1080 let conn = create_test_redis_connection()
1081 .await
1082 .expect("Redis connection required for this test");
1083
1084 let lock_key = "test:lock:early_return";
1085
1086 async fn simulated_work_with_early_return(
1088 conn: Arc<Pool>,
1089 lock_key: &str,
1090 ) -> Result<(), &'static str> {
1091 let lock = DistributedLock::new(conn, lock_key, Duration::from_secs(60));
1092 let _guard = lock
1093 .try_acquire()
1094 .await
1095 .map_err(|_| "lock error")?
1096 .ok_or("lock held")?;
1097
1098 Err("simulated error")
1100
1101 }
1103
1104 let result = simulated_work_with_early_return(conn.clone(), lock_key).await;
1106 assert!(result.is_err(), "Should have returned early with error");
1107
1108 tokio::time::sleep(Duration::from_millis(100)).await;
1110
1111 let lock2 = DistributedLock::new(conn, lock_key, Duration::from_secs(60));
1113 let guard2 = lock2.try_acquire().await.expect("Redis error");
1114 assert!(
1115 guard2.is_some(),
1116 "Should acquire lock after early return released it"
1117 );
1118
1119 if let Some(g) = guard2 {
1121 g.release().await.expect("Redis error");
1122 }
1123 }
1124
1125 #[tokio::test]
1130 #[ignore] async fn test_redis_connections_single_pool_operations() {
1132 let pool = create_test_redis_connection()
1133 .await
1134 .expect("Redis connection required for this test");
1135
1136 let connections = RedisConnections::new_single_pool(pool);
1137
1138 let mut primary_conn = connections
1140 .primary()
1141 .get()
1142 .await
1143 .expect("Failed to get primary connection");
1144 let mut reader_conn = connections
1145 .reader()
1146 .get()
1147 .await
1148 .expect("Failed to get reader connection");
1149
1150 let _: () = redis::cmd("SET")
1152 .arg("test:connections:key")
1153 .arg("test_value")
1154 .query_async(&mut primary_conn)
1155 .await
1156 .expect("Failed to SET");
1157
1158 let value: String = redis::cmd("GET")
1159 .arg("test:connections:key")
1160 .query_async(&mut reader_conn)
1161 .await
1162 .expect("Failed to GET");
1163
1164 assert_eq!(value, "test_value");
1165
1166 let _: () = redis::cmd("DEL")
1168 .arg("test:connections:key")
1169 .query_async(&mut primary_conn)
1170 .await
1171 .expect("Failed to DEL");
1172 }
1173
1174 #[tokio::test]
1175 #[ignore] async fn test_redis_connections_backward_compatible() {
1177 let pool = create_test_redis_connection()
1179 .await
1180 .expect("Redis connection required for this test");
1181
1182 let connections = Arc::new(RedisConnections::new_single_pool(pool));
1183
1184 let conn1 = connections.clone();
1186 let conn2 = connections.clone();
1187
1188 let _primary1 = conn1.primary().get().await.expect("Failed to get primary");
1190 let _reader1 = conn1.reader().get().await.expect("Failed to get reader");
1191 let _primary2 = conn2.primary().get().await.expect("Failed to get primary");
1192 let _reader2 = conn2.reader().get().await.expect("Failed to get reader");
1193
1194 }
1196 #[tokio::test]
1201 #[ignore] async fn test_set_and_get_relayer_last_sync() {
1203 let conn = create_test_redis_connection()
1204 .await
1205 .expect("Redis connection required for this test");
1206
1207 let prefix = "test_sync";
1208 let relayer_id = "test-relayer-sync";
1209
1210 set_relayer_last_sync(&conn, prefix, relayer_id)
1212 .await
1213 .expect("Should set last sync time");
1214
1215 let last_sync = get_relayer_last_sync(&conn, prefix, relayer_id)
1217 .await
1218 .expect("Should get last sync time");
1219
1220 assert!(last_sync.is_some(), "Should have a last sync time");
1221
1222 let elapsed = chrono::Utc::now()
1224 .signed_duration_since(last_sync.unwrap())
1225 .num_seconds();
1226 assert!(elapsed < 60, "Last sync should be within last minute");
1227
1228 let mut conn_clone = conn.get().await.expect("Failed to get connection");
1230 let hash_key = format!("{prefix}:relayer_sync_meta");
1231 let _: () = redis::AsyncCommands::del(&mut conn_clone, &hash_key)
1232 .await
1233 .expect("Cleanup failed");
1234 }
1235
1236 #[tokio::test]
1237 #[ignore] async fn test_get_relayer_last_sync_returns_none_when_not_set() {
1239 let conn = create_test_redis_connection()
1240 .await
1241 .expect("Redis connection required for this test");
1242
1243 let prefix = "test_sync_none";
1244 let relayer_id = "nonexistent-relayer";
1245
1246 let last_sync = get_relayer_last_sync(&conn, prefix, relayer_id)
1248 .await
1249 .expect("Should not error");
1250
1251 assert!(
1252 last_sync.is_none(),
1253 "Should return None for unsynced relayer"
1254 );
1255 }
1256
1257 #[tokio::test]
1258 #[ignore] async fn test_is_relayer_recently_synced_returns_true_for_recent_sync() {
1260 let conn = create_test_redis_connection()
1261 .await
1262 .expect("Redis connection required for this test");
1263
1264 let prefix = "test_recent_sync";
1265 let relayer_id = "recent-relayer";
1266
1267 set_relayer_last_sync(&conn, prefix, relayer_id)
1269 .await
1270 .expect("Should set last sync time");
1271
1272 let is_recent = is_relayer_recently_synced(&conn, prefix, relayer_id, 300)
1274 .await
1275 .expect("Should check recent sync");
1276
1277 assert!(is_recent, "Should be recently synced");
1278
1279 let mut conn_clone = conn.get().await.expect("Failed to get connection");
1281 let hash_key = format!("{prefix}:relayer_sync_meta");
1282 let _: () = redis::AsyncCommands::del(&mut conn_clone, &hash_key)
1283 .await
1284 .expect("Cleanup failed");
1285 }
1286
1287 #[tokio::test]
1288 #[ignore] async fn test_is_relayer_recently_synced_returns_false_when_not_set() {
1290 let conn = create_test_redis_connection()
1291 .await
1292 .expect("Redis connection required for this test");
1293
1294 let prefix = "test_not_synced";
1295 let relayer_id = "never-synced-relayer";
1296
1297 let is_recent = is_relayer_recently_synced(&conn, prefix, relayer_id, 300)
1299 .await
1300 .expect("Should check recent sync");
1301
1302 assert!(!is_recent, "Should not be recently synced");
1303 }
1304
1305 #[tokio::test]
1306 #[ignore] async fn test_is_relayer_recently_synced_returns_false_for_stale_sync() {
1308 let conn = create_test_redis_connection()
1309 .await
1310 .expect("Redis connection required for this test");
1311
1312 let prefix = "test_stale_sync";
1313 let relayer_id = "stale-relayer";
1314
1315 let mut conn_clone = conn.get().await.expect("Failed to get connection");
1317 let hash_key = format!("{prefix}:relayer_sync_meta");
1318 let field = format!("{relayer_id}:last_sync");
1319 let old_timestamp = chrono::Utc::now().timestamp() - 600; redis::AsyncCommands::hset::<_, _, _, ()>(
1322 &mut conn_clone,
1323 &hash_key,
1324 &field,
1325 old_timestamp,
1326 )
1327 .await
1328 .expect("Should set old timestamp");
1329
1330 let is_recent = is_relayer_recently_synced(&conn, prefix, relayer_id, 300)
1332 .await
1333 .expect("Should check recent sync");
1334
1335 assert!(!is_recent, "Should not be recently synced (stale)");
1336
1337 let _: () = redis::AsyncCommands::del(&mut conn_clone, &hash_key)
1339 .await
1340 .expect("Cleanup failed");
1341 }
1342
1343 #[tokio::test]
1344 #[ignore] async fn test_get_relayer_last_sync_multiple_relayers() {
1346 let conn = create_test_redis_connection()
1347 .await
1348 .expect("Redis connection required for this test");
1349
1350 let prefix = "test_multi_sync";
1351
1352 set_relayer_last_sync(&conn, prefix, "relayer-1")
1354 .await
1355 .expect("Should set sync time");
1356 tokio::time::sleep(Duration::from_millis(10)).await;
1357 set_relayer_last_sync(&conn, prefix, "relayer-2")
1358 .await
1359 .expect("Should set sync time");
1360
1361 let sync1 = get_relayer_last_sync(&conn, prefix, "relayer-1")
1362 .await
1363 .expect("Should get sync time");
1364 let sync2 = get_relayer_last_sync(&conn, prefix, "relayer-2")
1365 .await
1366 .expect("Should get sync time");
1367
1368 assert!(sync1.is_some(), "Relayer 1 should have sync time");
1369 assert!(sync2.is_some(), "Relayer 2 should have sync time");
1370 assert!(
1371 sync2.unwrap() >= sync1.unwrap(),
1372 "Relayer 2 should be synced at same time or after relayer 1"
1373 );
1374
1375 let mut conn_clone = conn.get().await.expect("Failed to get connection");
1377 let hash_key = format!("{prefix}:relayer_sync_meta");
1378 let _: () = redis::AsyncCommands::del(&mut conn_clone, &hash_key)
1379 .await
1380 .expect("Cleanup failed");
1381 }
1382
1383 #[tokio::test]
1384 #[ignore] async fn test_get_relayer_last_sync_update_existing() {
1386 let conn = create_test_redis_connection()
1387 .await
1388 .expect("Redis connection required for this test");
1389
1390 let prefix = "test_update_sync";
1391 let relayer_id = "update-relayer";
1392
1393 set_relayer_last_sync(&conn, prefix, relayer_id)
1395 .await
1396 .expect("Should set sync time");
1397
1398 let first_sync = get_relayer_last_sync(&conn, prefix, relayer_id)
1399 .await
1400 .expect("Should get sync time")
1401 .expect("Should have sync time");
1402
1403 tokio::time::sleep(Duration::from_millis(100)).await;
1405
1406 set_relayer_last_sync(&conn, prefix, relayer_id)
1407 .await
1408 .expect("Should update sync time");
1409
1410 let second_sync = get_relayer_last_sync(&conn, prefix, relayer_id)
1411 .await
1412 .expect("Should get sync time")
1413 .expect("Should have sync time");
1414
1415 assert!(
1416 second_sync >= first_sync,
1417 "Updated sync time should be at the same time or later than first"
1418 );
1419
1420 let mut conn_clone = conn.get().await.expect("Failed to get connection");
1422 let hash_key = format!("{prefix}:relayer_sync_meta");
1423 let _: () = redis::AsyncCommands::del(&mut conn_clone, &hash_key)
1424 .await
1425 .expect("Cleanup failed");
1426 }
1427
1428 #[tokio::test]
1429 #[ignore] async fn test_is_relayer_recently_synced_threshold_boundary() {
1431 let conn = create_test_redis_connection()
1432 .await
1433 .expect("Redis connection required for this test");
1434
1435 let prefix = "test_boundary_sync";
1436 let relayer_id = "boundary-relayer";
1437
1438 let mut conn_clone = conn.get().await.expect("Failed to get connection");
1440 let hash_key = format!("{prefix}:relayer_sync_meta");
1441 let field = format!("{relayer_id}:last_sync");
1442 let threshold_secs: u64 = 60;
1443 let boundary_timestamp = chrono::Utc::now().timestamp() - (threshold_secs as i64);
1444
1445 redis::AsyncCommands::hset::<_, _, _, ()>(
1446 &mut conn_clone,
1447 &hash_key,
1448 &field,
1449 boundary_timestamp,
1450 )
1451 .await
1452 .expect("Should set boundary timestamp");
1453
1454 let is_recent = is_relayer_recently_synced(&conn, prefix, relayer_id, threshold_secs)
1455 .await
1456 .expect("Should check recent sync");
1457
1458 assert!(!is_recent, "Should not be recent at exact threshold");
1459
1460 let _: () = redis::AsyncCommands::del(&mut conn_clone, &hash_key)
1462 .await
1463 .expect("Cleanup failed");
1464 }
1465
1466 #[tokio::test]
1467 #[ignore] async fn test_is_relayer_recently_synced_just_before_threshold() {
1469 let conn = create_test_redis_connection()
1470 .await
1471 .expect("Redis connection required for this test");
1472
1473 let prefix = "test_just_before_sync";
1474 let relayer_id = "just-before-relayer";
1475
1476 let mut conn_clone = conn.get().await.expect("Failed to get connection");
1478 let hash_key = format!("{prefix}:relayer_sync_meta");
1479 let field = format!("{relayer_id}:last_sync");
1480 let threshold_secs: u64 = 60;
1481 let just_before_timestamp =
1482 chrono::Utc::now().timestamp() - (threshold_secs as i64) + 5;
1483
1484 redis::AsyncCommands::hset::<_, _, _, ()>(
1485 &mut conn_clone,
1486 &hash_key,
1487 &field,
1488 just_before_timestamp,
1489 )
1490 .await
1491 .expect("Should set just-before timestamp");
1492
1493 let is_recent = is_relayer_recently_synced(&conn, prefix, relayer_id, threshold_secs)
1494 .await
1495 .expect("Should check recent sync");
1496
1497 assert!(is_recent, "Should be recent just before threshold");
1498
1499 let _: () = redis::AsyncCommands::del(&mut conn_clone, &hash_key)
1501 .await
1502 .expect("Cleanup failed");
1503 }
1504
1505 #[tokio::test]
1506 #[ignore] async fn test_is_relayer_recently_synced_different_prefixes() {
1508 let conn = create_test_redis_connection()
1509 .await
1510 .expect("Redis connection required for this test");
1511
1512 let relayer_id = "shared-relayer";
1513
1514 set_relayer_last_sync(&conn, "prefix1", relayer_id)
1516 .await
1517 .expect("Should set sync time");
1518
1519 let is_recent_prefix1 = is_relayer_recently_synced(&conn, "prefix1", relayer_id, 300)
1520 .await
1521 .expect("Should check sync");
1522 let is_recent_prefix2 = is_relayer_recently_synced(&conn, "prefix2", relayer_id, 300)
1523 .await
1524 .expect("Should check sync");
1525
1526 assert!(is_recent_prefix1, "Should be recent for prefix1");
1527 assert!(!is_recent_prefix2, "Should not be recent for prefix2");
1528
1529 let mut conn_clone = conn.get().await.expect("Failed to get connection");
1531 let hash_key = "prefix1:relayer_sync_meta";
1532 let _: () = redis::AsyncCommands::del(&mut conn_clone, hash_key)
1533 .await
1534 .expect("Cleanup failed");
1535 }
1536
1537 #[tokio::test]
1538 #[ignore] async fn test_is_relayer_recently_synced_zero_threshold() {
1540 let conn = create_test_redis_connection()
1541 .await
1542 .expect("Redis connection required for this test");
1543
1544 let prefix = "test_zero_threshold";
1545 let relayer_id = "zero-threshold-relayer";
1546
1547 set_relayer_last_sync(&conn, prefix, relayer_id)
1548 .await
1549 .expect("Should set sync time");
1550
1551 let is_recent = is_relayer_recently_synced(&conn, prefix, relayer_id, 0)
1553 .await
1554 .expect("Should check sync");
1555
1556 assert!(!is_recent, "Should not be recent with zero threshold");
1557
1558 let mut conn_clone = conn.get().await.expect("Failed to get connection");
1560 let hash_key = format!("{prefix}:relayer_sync_meta");
1561 let _: () = redis::AsyncCommands::del(&mut conn_clone, &hash_key)
1562 .await
1563 .expect("Cleanup failed");
1564 }
1565
1566 #[tokio::test]
1571 #[ignore] async fn test_set_and_check_global_init_completed() {
1573 let conn = create_test_redis_connection()
1574 .await
1575 .expect("Redis connection required for this test");
1576
1577 let prefix = "test_global_init";
1578
1579 set_global_init_completed(&conn, prefix)
1581 .await
1582 .expect("Should set global init completed");
1583
1584 let is_recent = is_global_init_recently_completed(&conn, prefix, 300)
1586 .await
1587 .expect("Should check global init");
1588
1589 assert!(is_recent, "Should be recently completed");
1590
1591 let mut conn_clone = conn.get().await.expect("Failed to get connection");
1593 let hash_key = format!("{prefix}:relayer_sync_meta");
1594 let _: () = redis::AsyncCommands::del(&mut conn_clone, &hash_key)
1595 .await
1596 .expect("Cleanup failed");
1597 }
1598
1599 #[tokio::test]
1600 #[ignore] async fn test_is_global_init_recently_completed_returns_false_when_not_set() {
1602 let conn = create_test_redis_connection()
1603 .await
1604 .expect("Redis connection required for this test");
1605
1606 let prefix = "test_global_init_not_set";
1607
1608 let is_recent = is_global_init_recently_completed(&conn, prefix, 300)
1610 .await
1611 .expect("Should check global init");
1612
1613 assert!(!is_recent, "Should not be recently completed when not set");
1614 }
1615
1616 #[tokio::test]
1617 #[ignore] async fn test_is_global_init_recently_completed_returns_false_when_stale() {
1619 let conn = create_test_redis_connection()
1620 .await
1621 .expect("Redis connection required for this test");
1622
1623 let prefix = "test_global_init_stale";
1624
1625 let mut conn_clone = conn.get().await.expect("Failed to get connection");
1627 let hash_key = format!("{prefix}:relayer_sync_meta");
1628 let old_timestamp = chrono::Utc::now().timestamp() - 600; redis::AsyncCommands::hset::<_, _, _, ()>(
1631 &mut conn_clone,
1632 &hash_key,
1633 "global:init_completed",
1634 old_timestamp,
1635 )
1636 .await
1637 .expect("Should set old timestamp");
1638
1639 let is_recent = is_global_init_recently_completed(&conn, prefix, 300)
1641 .await
1642 .expect("Should check global init");
1643
1644 assert!(!is_recent, "Should not be recent when stale");
1645
1646 let _: () = redis::AsyncCommands::del(&mut conn_clone, &hash_key)
1648 .await
1649 .expect("Cleanup failed");
1650 }
1651
1652 #[tokio::test]
1653 #[ignore] async fn test_global_init_different_prefixes() {
1655 let conn = create_test_redis_connection()
1656 .await
1657 .expect("Redis connection required for this test");
1658
1659 set_global_init_completed(&conn, "global_prefix1")
1661 .await
1662 .expect("Should set global init");
1663
1664 let is_recent_prefix1 = is_global_init_recently_completed(&conn, "global_prefix1", 300)
1665 .await
1666 .expect("Should check global init");
1667 let is_recent_prefix2 = is_global_init_recently_completed(&conn, "global_prefix2", 300)
1668 .await
1669 .expect("Should check global init");
1670
1671 assert!(is_recent_prefix1, "Should be recent for prefix1");
1672 assert!(!is_recent_prefix2, "Should not be recent for prefix2");
1673
1674 let mut conn_clone = conn.get().await.expect("Failed to get connection");
1676 let hash_key = "global_prefix1:relayer_sync_meta";
1677 let _: () = redis::AsyncCommands::del(&mut conn_clone, hash_key)
1678 .await
1679 .expect("Cleanup failed");
1680 }
1681
1682 #[tokio::test]
1683 #[ignore] async fn test_global_init_update_existing() {
1685 let conn = create_test_redis_connection()
1686 .await
1687 .expect("Redis connection required for this test");
1688
1689 let prefix = "test_global_init_update";
1690
1691 set_global_init_completed(&conn, prefix)
1693 .await
1694 .expect("Should set global init");
1695
1696 tokio::time::sleep(Duration::from_millis(100)).await;
1698
1699 set_global_init_completed(&conn, prefix)
1701 .await
1702 .expect("Should update global init");
1703
1704 let is_recent = is_global_init_recently_completed(&conn, prefix, 300)
1706 .await
1707 .expect("Should check global init");
1708
1709 assert!(is_recent, "Should be recent after update");
1710
1711 let mut conn_clone = conn.get().await.expect("Failed to get connection");
1713 let hash_key = format!("{prefix}:relayer_sync_meta");
1714 let _: () = redis::AsyncCommands::del(&mut conn_clone, &hash_key)
1715 .await
1716 .expect("Cleanup failed");
1717 }
1718
1719 #[tokio::test]
1720 #[ignore] async fn test_global_init_coexists_with_relayer_sync() {
1722 let conn = create_test_redis_connection()
1723 .await
1724 .expect("Redis connection required for this test");
1725
1726 let prefix = "test_coexist";
1727 let relayer_id = "test-relayer";
1728
1729 set_global_init_completed(&conn, prefix)
1731 .await
1732 .expect("Should set global init");
1733 set_relayer_last_sync(&conn, prefix, relayer_id)
1734 .await
1735 .expect("Should set relayer sync");
1736
1737 let global_recent = is_global_init_recently_completed(&conn, prefix, 300)
1739 .await
1740 .expect("Should check global init");
1741 let relayer_recent = is_relayer_recently_synced(&conn, prefix, relayer_id, 300)
1742 .await
1743 .expect("Should check relayer sync");
1744
1745 assert!(global_recent, "Global init should be recent");
1746 assert!(relayer_recent, "Relayer sync should be recent");
1747
1748 let mut conn_clone = conn.get().await.expect("Failed to get connection");
1750 let hash_key = format!("{prefix}:relayer_sync_meta");
1751 let _: () = redis::AsyncCommands::del(&mut conn_clone, &hash_key)
1752 .await
1753 .expect("Cleanup failed");
1754 }
1755
1756 #[tokio::test]
1761 #[ignore] async fn test_config_processing_in_progress_is_not_completed() {
1763 let conn = create_test_redis_connection()
1764 .await
1765 .expect("Redis connection required for this test");
1766
1767 let prefix = "test_config_processing_in_progress";
1768
1769 set_config_processing_in_progress(&conn, prefix)
1770 .await
1771 .expect("Should set in-progress marker");
1772
1773 let completed = is_config_processing_completed(&conn, prefix)
1774 .await
1775 .expect("Should read config processing status");
1776
1777 assert!(
1778 !completed,
1779 "In-progress config processing must not be treated as completed"
1780 );
1781
1782 let mut conn_clone = conn.get().await.expect("Failed to get connection");
1783 let hash_key = format!("{prefix}:bootstrap_meta");
1784 let _: () = redis::AsyncCommands::del(&mut conn_clone, &hash_key)
1785 .await
1786 .expect("Cleanup failed");
1787 }
1788
1789 #[tokio::test]
1790 #[ignore] async fn test_config_processing_completed_requires_explicit_completion_marker() {
1792 let conn = create_test_redis_connection()
1793 .await
1794 .expect("Redis connection required for this test");
1795
1796 let prefix = "test_config_processing_completed";
1797
1798 set_config_processing_in_progress(&conn, prefix)
1799 .await
1800 .expect("Should set in-progress marker");
1801 set_config_processing_completed(&conn, prefix)
1802 .await
1803 .expect("Should set completed marker");
1804
1805 let completed = is_config_processing_completed(&conn, prefix)
1806 .await
1807 .expect("Should read config processing status");
1808
1809 assert!(
1810 completed,
1811 "Completed config processing must require the explicit completion marker"
1812 );
1813
1814 let mut conn_clone = conn.get().await.expect("Failed to get connection");
1815 let hash_key = format!("{prefix}:bootstrap_meta");
1816 let _: () = redis::AsyncCommands::del(&mut conn_clone, &hash_key)
1817 .await
1818 .expect("Cleanup failed");
1819 }
1820 }
1821}