openzeppelin_relayer/utils/
redis.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use color_eyre::Result;
5use deadpool_redis::{Config, Pool, Runtime};
6use tracing::{debug, info, warn};
7
8use crate::config::ServerConfig;
9
10// ============================================================================
11// Shared Timing Constants for Bootstrap Operations
12// ============================================================================
13
14/// Default lock TTL for bootstrap operations (2 minutes).
15/// Used as a safety net for crashes during initialization or config processing.
16pub const BOOTSTRAP_LOCK_TTL_SECS: u64 = 120;
17
18/// Max wait time when another instance holds the lock.
19/// Set slightly longer than lock TTL to handle edge cases.
20pub const LOCK_WAIT_MAX_SECS: u64 = 130;
21
22/// Polling interval when waiting for completion.
23pub const LOCK_POLL_INTERVAL_MS: u64 = 500;
24
25/// Holds separate connection pools for read and write operations.
26///
27/// This struct enables optimization for Redis deployments with read replicas,
28/// such as AWS ElastiCache. Write operations use the primary pool, while read
29/// operations can be distributed across reader replicas.
30///
31/// When `REDIS_READER_URL` is not configured, both pools point to the same
32/// Redis instance (the primary), maintaining backward compatibility.
33#[derive(Clone, Debug)]
34pub struct RedisConnections {
35    /// Pool for write operations (connected to primary endpoint)
36    primary_pool: Arc<Pool>,
37    /// Pool for read operations (connected to reader endpoint, or primary if not configured)
38    reader_pool: Arc<Pool>,
39}
40
41impl RedisConnections {
42    /// Creates a new `RedisConnections` with a single pool used for both reads and writes.
43    ///
44    /// This is useful for:
45    /// - Testing where read/write separation is not needed
46    /// - Simple deployments without read replicas
47    /// - Backward compatibility
48    pub fn new_single_pool(pool: Arc<Pool>) -> Self {
49        Self {
50            primary_pool: pool.clone(),
51            reader_pool: pool,
52        }
53    }
54
55    /// Returns the primary pool for write operations.
56    ///
57    /// Use this for: `create`, `update`, `delete`, and any operation that
58    /// modifies data in Redis.
59    pub fn primary(&self) -> &Arc<Pool> {
60        &self.primary_pool
61    }
62
63    /// Returns the reader pool for read operations.
64    ///
65    /// Use this for: `get_by_id`, `list`, `find_by_*`, `count`, and any
66    /// operation that only reads data from Redis.
67    ///
68    /// If no reader endpoint is configured, this returns the same pool as `primary()`.
69    pub fn reader(&self) -> &Arc<Pool> {
70        &self.reader_pool
71    }
72}
73
74/// Creates a Redis connection pool with the specified URL, pool size, and configuration.
75async fn create_pool(url: &str, pool_max_size: usize, config: &ServerConfig) -> Result<Arc<Pool>> {
76    let cfg = Config::from_url(url);
77
78    let pool = cfg
79        .builder()
80        .map_err(|e| eyre::eyre!("Failed to create Redis pool builder for {}: {}", url, e))?
81        .max_size(pool_max_size)
82        .wait_timeout(Some(Duration::from_millis(config.redis_pool_timeout_ms)))
83        .create_timeout(Some(Duration::from_millis(
84            config.redis_connection_timeout_ms,
85        )))
86        .recycle_timeout(Some(Duration::from_millis(
87            config.redis_connection_timeout_ms,
88        )))
89        .runtime(Runtime::Tokio1)
90        .build()
91        .map_err(|e| eyre::eyre!("Failed to build Redis pool for {}: {}", url, e))?;
92
93    // Verify the pool is working by getting a connection
94    let conn = pool
95        .get()
96        .await
97        .map_err(|e| eyre::eyre!("Failed to get initial Redis connection from {}: {}", url, e))?;
98    drop(conn);
99
100    Ok(Arc::new(pool))
101}
102
103/// Initializes Redis connection pools for both primary and reader endpoints.
104///
105/// # Arguments
106///
107/// * `config` - The server configuration containing Redis URLs and pool settings.
108///
109/// # Returns
110///
111/// A `RedisConnections` struct containing:
112/// - `primary_pool`: Connected to `REDIS_URL` (for write operations)
113/// - `reader_pool`: Connected to `REDIS_READER_URL` if set, otherwise same as primary
114///
115/// # Features
116///
117/// - **Read/Write Separation**: When `REDIS_READER_URL` is configured, read operations
118///   can be distributed across read replicas, reducing load on the primary.
119/// - **Backward Compatible**: If `REDIS_READER_URL` is not set, both pools use
120///   the primary URL, maintaining existing behavior.
121/// - **Connection Pooling**: Both pools use deadpool-redis with configurable
122///   max size, wait timeout, and connection timeouts.
123///
124/// # Example Configuration
125///
126/// ```bash
127/// # AWS ElastiCache with read replicas
128/// REDIS_URL=redis://my-cluster.xxx.cache.amazonaws.com:6379
129/// REDIS_READER_URL=redis://my-cluster-ro.xxx.cache.amazonaws.com:6379
130/// ```
131pub async fn initialize_redis_connections(config: &ServerConfig) -> Result<Arc<RedisConnections>> {
132    let primary_pool_size = config.redis_pool_max_size;
133    let primary_pool = create_pool(&config.redis_url, primary_pool_size, config).await?;
134
135    info!(
136        primary_url = %config.redis_url,
137        primary_pool_size = %primary_pool_size,
138        "initializing primary Redis connection pool"
139    );
140    let reader_pool = match &config.redis_reader_url {
141        Some(reader_url) if !reader_url.is_empty() => {
142            let reader_pool_size = config.redis_reader_pool_max_size;
143
144            info!(
145                primary_url = %config.redis_url,
146                reader_url = %reader_url,
147                primary_pool_size = %primary_pool_size,
148                reader_pool_size = %reader_pool_size,
149                "Using separate reader endpoint for read operations"
150            );
151            create_pool(reader_url, reader_pool_size, config).await?
152        }
153        _ => {
154            debug!(
155                primary_url = %config.redis_url,
156                pool_size = %primary_pool_size,
157                "No reader URL configured, using primary for all operations"
158            );
159            primary_pool.clone()
160        }
161    };
162
163    Ok(Arc::new(RedisConnections {
164        primary_pool,
165        reader_pool,
166    }))
167}
168
169/// A distributed lock implementation using Redis SET NX EX pattern.
170///
171/// This lock is designed for distributed systems where multiple instances
172/// need to coordinate exclusive access to shared resources. It uses the
173/// Redis SET command with NX (only set if not exists) and EX (expiry time)
174/// options to atomically acquire locks.
175///
176/// # Features
177/// - **Atomic acquisition**: Uses Redis SET NX to ensure only one instance can acquire the lock
178/// - **Automatic expiry**: Lock automatically expires after TTL to prevent deadlocks
179/// - **Unique identifiers**: Each lock acquisition gets a unique ID to prevent accidental release by other instances
180/// - **Auto-release on drop**: Lock is released when the guard is dropped (best-effort via spawned task)
181///
182/// # TTL Considerations
183/// The TTL should be set to accommodate the worst-case runtime of the protected operation.
184/// If the operation runs longer than TTL, another instance may acquire the lock concurrently.
185/// For long-running operations, consider:
186/// - Setting a generous TTL (e.g., 2x expected runtime)
187/// - Implementing lock refresh/extension during processing
188/// - Breaking the operation into smaller locked sections
189///
190/// # Example
191/// ```ignore
192/// // Key format: {prefix}:lock:{name}
193/// let lock_key = format!("{}:lock:{}", prefix, "my-operation");
194/// let lock = DistributedLock::new(redis_client, &lock_key, Duration::from_secs(60));
195/// if let Some(guard) = lock.try_acquire().await? {
196///     // Do exclusive work here
197///     // Lock is automatically released when guard is dropped
198/// }
199/// ```
200#[derive(Clone)]
201pub struct DistributedLock {
202    pool: Arc<Pool>,
203    lock_key: String,
204    ttl: Duration,
205}
206
207impl DistributedLock {
208    /// Creates a new distributed lock instance.
209    ///
210    /// # Arguments
211    /// * `pool` - Redis connection pool
212    /// * `lock_key` - Full Redis key for this lock (e.g., "myprefix:lock:cleanup")
213    /// * `ttl` - Time-to-live for the lock. Lock will automatically expire after this duration
214    ///   to prevent deadlocks if the holder crashes.
215    pub fn new(pool: Arc<Pool>, lock_key: &str, ttl: Duration) -> Self {
216        Self {
217            pool,
218            lock_key: lock_key.to_string(),
219            ttl,
220        }
221    }
222
223    /// Attempts to acquire the distributed lock.
224    ///
225    /// This is a non-blocking operation. If the lock is already held by another
226    /// instance, it returns `Ok(None)` immediately without waiting.
227    ///
228    /// # Returns
229    /// * `Ok(Some(LockGuard))` - Lock was successfully acquired
230    /// * `Ok(None)` - Lock is already held by another instance
231    /// * `Err(_)` - Redis communication error
232    ///
233    /// # Lock Semantics
234    /// The lock is implemented using Redis `SET key value NX EX ttl`:
235    /// - `NX`: Only set if key does not exist (atomic check-and-set)
236    /// - `EX`: Set expiry time in seconds
237    pub async fn try_acquire(&self) -> Result<Option<LockGuard>> {
238        let lock_value = generate_lock_value();
239        let mut conn = self.pool.get().await?;
240
241        // Use SET NX EX for atomic lock acquisition with expiry
242        let result: Option<String> = redis::cmd("SET")
243            .arg(&self.lock_key)
244            .arg(&lock_value)
245            .arg("NX")
246            .arg("EX")
247            .arg(self.ttl.as_secs())
248            .query_async(&mut conn)
249            .await?;
250
251        match result {
252            Some(_) => {
253                debug!(
254                    lock_key = %self.lock_key,
255                    ttl_secs = %self.ttl.as_secs(),
256                    "distributed lock acquired"
257                );
258                Ok(Some(LockGuard {
259                    release_info: Some(LockReleaseInfo {
260                        pool: self.pool.clone(),
261                        lock_key: self.lock_key.clone(),
262                        lock_value,
263                    }),
264                }))
265            }
266            None => {
267                debug!(
268                    lock_key = %self.lock_key,
269                    "distributed lock already held by another instance"
270                );
271                Ok(None)
272            }
273        }
274    }
275}
276
277/// A guard that represents an acquired distributed lock.
278///
279/// The lock is automatically released when this guard is dropped. This ensures
280/// the lock is released regardless of how the protected code exits (normal return,
281/// early return via `?`, or panic).
282///
283/// The release is performed via a spawned task to avoid blocking in `Drop`.
284/// If you need to confirm the release succeeded, call `release()` explicitly instead.
285///
286/// # Drop Behavior
287/// When dropped, the guard spawns an async task to release the lock. This is
288/// best-effort: if the task fails (e.g., Redis unavailable), the lock will
289/// still expire after TTL.
290pub struct LockGuard {
291    /// Release info is wrapped in Option to support both explicit release and Drop.
292    /// When `release()` is called, this is taken (set to None) to prevent double-release.
293    /// When Drop runs, if this is Some, we spawn a release task.
294    release_info: Option<LockReleaseInfo>,
295}
296
297/// Internal struct holding the information needed to release a lock.
298struct LockReleaseInfo {
299    pool: Arc<Pool>,
300    lock_key: String,
301    lock_value: String,
302}
303
304impl LockGuard {
305    /// Explicitly releases the lock before TTL expiry.
306    ///
307    /// This uses a Lua script to ensure we only delete the lock if we still own it.
308    /// This prevents accidentally releasing a lock that was already expired and
309    /// re-acquired by another instance.
310    ///
311    /// Calling this method consumes the guard and prevents the Drop-based release.
312    ///
313    /// # Returns
314    /// * `Ok(true)` - Lock was successfully released
315    /// * `Ok(false)` - Lock was not released (already expired or owned by another instance)
316    /// * `Err(_)` - Redis communication error
317    pub async fn release(mut self) -> Result<bool> {
318        // Take the release info to prevent Drop from also trying to release
319        let info = self
320            .release_info
321            .take()
322            .expect("release_info should always be Some before release");
323
324        Self::do_release(info).await
325    }
326
327    /// Internal async release implementation.
328    async fn do_release(info: LockReleaseInfo) -> Result<bool> {
329        let mut conn = info.pool.get().await?;
330
331        // Lua script to atomically check and delete
332        // Only delete if the value matches (we still own the lock)
333        let script = r#"
334            if redis.call("GET", KEYS[1]) == ARGV[1] then
335                return redis.call("DEL", KEYS[1])
336            else
337                return 0
338            end
339        "#;
340
341        let result: i32 = redis::Script::new(script)
342            .key(&info.lock_key)
343            .arg(&info.lock_value)
344            .invoke_async(&mut conn)
345            .await?;
346
347        if result == 1 {
348            debug!(lock_key = %info.lock_key, "distributed lock released");
349            Ok(true)
350        } else {
351            warn!(
352                lock_key = %info.lock_key,
353                "distributed lock release failed - lock already expired or owned by another instance"
354            );
355            Ok(false)
356        }
357    }
358}
359
360impl Drop for LockGuard {
361    fn drop(&mut self) {
362        // If release_info is still Some, we need to release the lock
363        if let Some(info) = self.release_info.take() {
364            // Spawn a task to release the lock asynchronously
365            // This is best-effort; if it fails, the lock will expire after TTL
366            tokio::spawn(async move {
367                if let Err(e) = LockGuard::do_release(info).await {
368                    warn!(error = %e, "failed to release distributed lock in drop, will expire after TTL");
369                }
370            });
371        }
372    }
373}
374
375/// Generates a unique value for lock ownership verification.
376///
377/// Uses a combination of process ID, timestamp, and a monotonic counter
378/// to create a unique identifier for this lock acquisition. This avoids
379/// collisions in the same process even when calls share a timestamp.
380fn generate_lock_value() -> String {
381    use std::sync::atomic::{AtomicU64, Ordering};
382
383    static LOCK_VALUE_COUNTER: AtomicU64 = AtomicU64::new(0);
384    let process_id = std::process::id();
385    let timestamp = std::time::SystemTime::now()
386        .duration_since(std::time::UNIX_EPOCH)
387        .map(|d| d.as_nanos())
388        .unwrap_or(0);
389    let counter = LOCK_VALUE_COUNTER.fetch_add(1, Ordering::Relaxed);
390
391    format!("{process_id}:{timestamp}:{counter}")
392}
393
394// ============================================================================
395// Relayer Sync Metadata Functions
396// ============================================================================
397//
398// These functions track when relayers were last synchronized/initialized.
399// This allows multiple instances to skip redundant initialization when
400// a relayer was recently synced by another instance.
401
402/// The Redis hash key suffix for storing relayer sync metadata.
403const RELAYER_SYNC_META_KEY: &str = "relayer_sync_meta";
404
405/// The hash field for global initialization completion timestamp.
406const GLOBAL_INIT_FIELD: &str = "global:init_completed";
407
408/// The Redis hash key suffix for storing bootstrap process metadata.
409const BOOTSTRAP_META_KEY: &str = "bootstrap_meta";
410
411/// Hash field storing config processing status.
412const CONFIG_PROCESSING_STATUS_FIELD: &str = "config_processing:status";
413
414/// Hash field storing config processing start timestamp.
415const CONFIG_PROCESSING_STARTED_AT_FIELD: &str = "config_processing:started_at";
416
417/// Hash field storing config processing completion timestamp.
418const CONFIG_PROCESSING_COMPLETED_AT_FIELD: &str = "config_processing:completed_at";
419
420const CONFIG_PROCESSING_STATUS_IN_PROGRESS: &str = "in_progress";
421const CONFIG_PROCESSING_STATUS_COMPLETED: &str = "completed";
422
423/// Marks config processing as in progress.
424///
425/// This explicit marker is used by distributed bootstrap coordination so
426/// waiters do not infer completion from partially written repository state.
427pub async fn set_config_processing_in_progress(pool: &Arc<Pool>, prefix: &str) -> Result<()> {
428    use chrono::Utc;
429    use redis::AsyncCommands;
430
431    let mut conn = pool.get().await?;
432    let hash_key = format!("{prefix}:{BOOTSTRAP_META_KEY}");
433    let timestamp = Utc::now().timestamp();
434
435    conn.hset_multiple::<_, _, _, ()>(
436        &hash_key,
437        &[
438            (
439                CONFIG_PROCESSING_STATUS_FIELD,
440                CONFIG_PROCESSING_STATUS_IN_PROGRESS,
441            ),
442            (CONFIG_PROCESSING_STARTED_AT_FIELD, &timestamp.to_string()),
443        ],
444    )
445    .await
446    .map_err(|e| eyre::eyre!("Failed to set config processing in progress: {}", e))?;
447
448    conn.hdel::<_, _, ()>(&hash_key, CONFIG_PROCESSING_COMPLETED_AT_FIELD)
449        .await
450        .map_err(|e| eyre::eyre!("Failed to clear config processing completion time: {}", e))?;
451
452    debug!(
453        timestamp = %timestamp,
454        "recorded config processing in-progress marker"
455    );
456
457    Ok(())
458}
459
460/// Marks config processing as completed.
461pub async fn set_config_processing_completed(pool: &Arc<Pool>, prefix: &str) -> Result<()> {
462    use chrono::Utc;
463    use redis::AsyncCommands;
464
465    let mut conn = pool.get().await?;
466    let hash_key = format!("{prefix}:{BOOTSTRAP_META_KEY}");
467    let timestamp = Utc::now().timestamp();
468
469    conn.hset_multiple::<_, _, _, ()>(
470        &hash_key,
471        &[
472            (
473                CONFIG_PROCESSING_STATUS_FIELD,
474                CONFIG_PROCESSING_STATUS_COMPLETED,
475            ),
476            (CONFIG_PROCESSING_COMPLETED_AT_FIELD, &timestamp.to_string()),
477        ],
478    )
479    .await
480    .map_err(|e| eyre::eyre!("Failed to set config processing completed: {}", e))?;
481
482    debug!(
483        timestamp = %timestamp,
484        "recorded config processing completed marker"
485    );
486
487    Ok(())
488}
489
490/// Returns whether config processing has been explicitly marked as completed.
491pub async fn is_config_processing_completed(pool: &Arc<Pool>, prefix: &str) -> Result<bool> {
492    use redis::AsyncCommands;
493
494    let mut conn = pool.get().await?;
495    let hash_key = format!("{prefix}:{BOOTSTRAP_META_KEY}");
496
497    let status: Option<String> = conn
498        .hget(&hash_key, CONFIG_PROCESSING_STATUS_FIELD)
499        .await
500        .map_err(|e| eyre::eyre!("Failed to get config processing status: {}", e))?;
501
502    Ok(matches!(
503        status.as_deref(),
504        Some(CONFIG_PROCESSING_STATUS_COMPLETED)
505    ))
506}
507
508/// Returns whether config processing is explicitly marked as in progress.
509pub async fn is_config_processing_in_progress(pool: &Arc<Pool>, prefix: &str) -> Result<bool> {
510    use redis::AsyncCommands;
511
512    let mut conn = pool.get().await?;
513    let hash_key = format!("{prefix}:{BOOTSTRAP_META_KEY}");
514
515    let status: Option<String> = conn
516        .hget(&hash_key, CONFIG_PROCESSING_STATUS_FIELD)
517        .await
518        .map_err(|e| eyre::eyre!("Failed to get config processing status: {}", e))?;
519
520    Ok(matches!(
521        status.as_deref(),
522        Some(CONFIG_PROCESSING_STATUS_IN_PROGRESS)
523    ))
524}
525
526/// Sets the last sync timestamp for a relayer to the current time.
527///
528/// This should be called after a relayer has been successfully initialized
529/// to record when the initialization occurred.
530///
531/// # Arguments
532/// * `conn` - Redis connection manager
533/// * `prefix` - Key prefix for multi-tenant support
534/// * `relayer_id` - The relayer's unique identifier
535///
536/// # Redis Key Format
537/// Hash key: `{prefix}:relayer_sync_meta`
538/// Hash field: `{relayer_id}:last_sync`
539/// Value: Unix timestamp in seconds
540pub async fn set_relayer_last_sync(pool: &Arc<Pool>, prefix: &str, relayer_id: &str) -> Result<()> {
541    use chrono::Utc;
542    use redis::AsyncCommands;
543
544    let mut conn = pool.get().await?;
545    let hash_key = format!("{prefix}:{RELAYER_SYNC_META_KEY}");
546    let field = format!("{relayer_id}:last_sync");
547    let timestamp = Utc::now().timestamp();
548
549    conn.hset::<_, _, _, ()>(&hash_key, &field, timestamp)
550        .await
551        .map_err(|e| eyre::eyre!("Failed to set relayer last sync: {}", e))?;
552
553    debug!(
554        relayer_id = %relayer_id,
555        timestamp = %timestamp,
556        "recorded relayer last sync time"
557    );
558
559    Ok(())
560}
561
562/// Gets the last sync timestamp for a relayer.
563///
564/// # Arguments
565/// * `conn` - Redis connection manager
566/// * `prefix` - Key prefix for multi-tenant support
567/// * `relayer_id` - The relayer's unique identifier
568///
569/// # Returns
570/// * `Ok(Some(DateTime))` - The last sync time if recorded
571/// * `Ok(None)` - If the relayer has never been synced
572/// * `Err(_)` - Redis communication error
573pub async fn get_relayer_last_sync(
574    pool: &Arc<Pool>,
575    prefix: &str,
576    relayer_id: &str,
577) -> Result<Option<chrono::DateTime<chrono::Utc>>> {
578    use chrono::{TimeZone, Utc};
579    use redis::AsyncCommands;
580
581    let mut conn = pool.get().await?;
582    let hash_key = format!("{prefix}:{RELAYER_SYNC_META_KEY}");
583    let field = format!("{relayer_id}:last_sync");
584
585    let timestamp: Option<i64> = conn
586        .hget(&hash_key, &field)
587        .await
588        .map_err(|e| eyre::eyre!("Failed to get relayer last sync: {}", e))?;
589
590    Ok(timestamp.and_then(|ts| Utc.timestamp_opt(ts, 0).single()))
591}
592
593/// Checks if a relayer was recently synced within the given threshold.
594///
595/// This is used to skip initialization for relayers that were recently
596/// initialized by another instance (e.g., during rolling restarts).
597///
598/// # Arguments
599/// * `conn` - Redis connection manager
600/// * `prefix` - Key prefix for multi-tenant support
601/// * `relayer_id` - The relayer's unique identifier
602/// * `threshold_secs` - Number of seconds to consider as "recent"
603///
604/// # Returns
605/// * `Ok(true)` - If the relayer was synced within the threshold
606/// * `Ok(false)` - If the relayer was not synced or sync is stale
607/// * `Err(_)` - Redis communication error
608pub async fn is_relayer_recently_synced(
609    pool: &Arc<Pool>,
610    prefix: &str,
611    relayer_id: &str,
612    threshold_secs: u64,
613) -> Result<bool> {
614    use chrono::Utc;
615
616    let last_sync = get_relayer_last_sync(pool, prefix, relayer_id).await?;
617
618    match last_sync {
619        Some(sync_time) => {
620            let elapsed = Utc::now().signed_duration_since(sync_time);
621            let is_recent = elapsed.num_seconds() < threshold_secs as i64;
622
623            if is_recent {
624                debug!(
625                    relayer_id = %relayer_id,
626                    last_sync = %sync_time.to_rfc3339(),
627                    elapsed_secs = %elapsed.num_seconds(),
628                    threshold_secs = %threshold_secs,
629                    "relayer was recently synced"
630                );
631            }
632
633            Ok(is_recent)
634        }
635        None => Ok(false),
636    }
637}
638
639// ============================================================================
640// Global Initialization Tracking Functions
641// ============================================================================
642//
643// These functions track when the global relayer initialization was last completed.
644// This allows multiple instances to skip redundant initialization when
645// initialization was recently completed by another instance.
646
647/// Sets the global initialization completion timestamp to the current time.
648///
649/// This should be called after all relayers have been successfully initialized
650/// to record when the initialization occurred.
651///
652/// # Arguments
653/// * `conn` - Redis connection manager
654/// * `prefix` - Key prefix for multi-tenant support
655///
656/// # Redis Key Format
657/// Hash key: `{prefix}:relayer_sync_meta`
658/// Hash field: `global:init_completed`
659/// Value: Unix timestamp in seconds
660pub async fn set_global_init_completed(pool: &Arc<Pool>, prefix: &str) -> Result<()> {
661    use chrono::Utc;
662    use redis::AsyncCommands;
663
664    let mut conn = pool.get().await?;
665    let hash_key = format!("{prefix}:{RELAYER_SYNC_META_KEY}");
666    let timestamp = Utc::now().timestamp();
667
668    conn.hset::<_, _, _, ()>(&hash_key, GLOBAL_INIT_FIELD, timestamp)
669        .await
670        .map_err(|e| eyre::eyre!("Failed to set global init completed: {}", e))?;
671
672    debug!(timestamp = %timestamp, "recorded global initialization completion time");
673    Ok(())
674}
675
676/// Checks if global initialization was recently completed within the given threshold.
677///
678/// This is used to skip initialization when another instance recently completed
679/// initialization (e.g., during rolling restarts).
680///
681/// # Arguments
682/// * `conn` - Redis connection manager
683/// * `prefix` - Key prefix for multi-tenant support
684/// * `threshold_secs` - Number of seconds to consider as "recent"
685///
686/// # Returns
687/// * `Ok(true)` - If initialization was completed within the threshold
688/// * `Ok(false)` - If initialization was not completed or is stale
689/// * `Err(_)` - Redis communication error
690pub async fn is_global_init_recently_completed(
691    pool: &Arc<Pool>,
692    prefix: &str,
693    threshold_secs: u64,
694) -> Result<bool> {
695    use chrono::Utc;
696    use redis::AsyncCommands;
697
698    let mut conn = pool.get().await?;
699    let hash_key = format!("{prefix}:{RELAYER_SYNC_META_KEY}");
700
701    let timestamp: Option<i64> = conn
702        .hget(&hash_key, GLOBAL_INIT_FIELD)
703        .await
704        .map_err(|e| eyre::eyre!("Failed to get global init time: {}", e))?;
705
706    match timestamp {
707        Some(ts) => {
708            let now = Utc::now().timestamp();
709            let elapsed = now - ts;
710            let is_recent = elapsed < threshold_secs as i64;
711
712            if is_recent {
713                debug!(
714                    elapsed_secs = %elapsed,
715                    threshold_secs = %threshold_secs,
716                    "global initialization recently completed"
717                );
718            }
719
720            Ok(is_recent)
721        }
722        None => Ok(false),
723    }
724}
725
726#[cfg(test)]
727mod tests {
728    use super::*;
729
730    #[test]
731    fn test_generate_lock_value_is_unique() {
732        let value1 = generate_lock_value();
733        let value2 = generate_lock_value();
734
735        // Values should be different due to monotonic counter
736        assert_ne!(value1, value2);
737    }
738
739    #[test]
740    fn test_generate_lock_value_contains_process_id() {
741        let value = generate_lock_value();
742
743        // Should contain a colon separator
744        assert!(value.contains(':'));
745
746        // Should have two parts: process_id and timestamp
747        let parts: Vec<&str> = value.split(':').collect();
748        assert_eq!(parts.len(), 3);
749
750        // First part should be parseable as u32 (process ID)
751        assert!(parts[0].parse::<u32>().is_ok());
752    }
753
754    #[test]
755    fn test_distributed_lock_key_format() {
756        // Verify the lock key format: {prefix}:lock:{name}
757        let prefix = "myrelayer";
758        let lock_name = "transaction_cleanup";
759        let lock_key = format!("{prefix}:lock:{lock_name}");
760        assert_eq!(lock_key, "myrelayer:lock:transaction_cleanup");
761    }
762
763    #[test]
764    fn test_distributed_lock_key_format_with_complex_prefix() {
765        // Test with a more realistic prefix
766        let prefix = "oz-relayer-prod";
767        let lock_name = "transaction_cleanup";
768        let lock_key = format!("{prefix}:lock:{lock_name}");
769        assert_eq!(lock_key, "oz-relayer-prod:lock:transaction_cleanup");
770    }
771
772    #[test]
773    fn test_distributed_lock_key_uses_exact_key() {
774        // DistributedLock now uses the exact key provided (no automatic prefix)
775        let lock_key = "myprefix:lock:myoperation";
776        // The lock will use this key exactly as provided
777        assert_eq!(lock_key, "myprefix:lock:myoperation");
778    }
779
780    // =========================================================================
781    // RedisConnections tests
782    // =========================================================================
783
784    mod redis_connections_tests {
785        use super::*;
786
787        #[test]
788        fn test_new_single_pool_returns_same_pool_for_both() {
789            // This test verifies the backward-compatible single pool mode
790            // When no REDIS_READER_URL is set, both pools should be the same
791            let cfg = Config::from_url("redis://127.0.0.1:6379");
792            let pool = cfg
793                .builder()
794                .expect("Failed to create pool builder")
795                .max_size(16)
796                .runtime(Runtime::Tokio1)
797                .build()
798                .expect("Failed to build pool");
799            let pool = Arc::new(pool);
800
801            let connections = RedisConnections::new_single_pool(pool.clone());
802
803            // Both primary and reader should return the same pool
804            assert!(Arc::ptr_eq(connections.primary(), &pool));
805            assert!(Arc::ptr_eq(connections.reader(), &pool));
806            assert!(Arc::ptr_eq(connections.primary(), connections.reader()));
807        }
808
809        #[test]
810        fn test_redis_connections_clone() {
811            // Verify that RedisConnections can be cloned
812            let cfg = Config::from_url("redis://127.0.0.1:6379");
813            let pool = cfg
814                .builder()
815                .expect("Failed to create pool builder")
816                .max_size(16)
817                .runtime(Runtime::Tokio1)
818                .build()
819                .expect("Failed to build pool");
820            let pool = Arc::new(pool);
821
822            let connections = RedisConnections::new_single_pool(pool);
823            let cloned = connections.clone();
824
825            // Both should point to the same pools
826            assert!(Arc::ptr_eq(connections.primary(), cloned.primary()));
827            assert!(Arc::ptr_eq(connections.reader(), cloned.reader()));
828        }
829
830        #[test]
831        fn test_redis_connections_debug() {
832            // Verify Debug implementation exists
833            let cfg = Config::from_url("redis://127.0.0.1:6379");
834            let pool = cfg
835                .builder()
836                .expect("Failed to create pool builder")
837                .max_size(16)
838                .runtime(Runtime::Tokio1)
839                .build()
840                .expect("Failed to build pool");
841            let pool = Arc::new(pool);
842
843            let connections = RedisConnections::new_single_pool(pool);
844            let debug_str = format!("{connections:?}");
845
846            assert!(debug_str.contains("RedisConnections"));
847        }
848    }
849
850    // =========================================================================
851    // Integration tests - require a running Redis instance
852    // Run with: cargo test --lib redis::tests::integration -- --ignored
853    // =========================================================================
854
855    /// Helper to create a Redis connection for integration tests.
856    /// Expects Redis to be running on localhost:6379.
857    async fn create_test_redis_connection() -> Option<Arc<Pool>> {
858        let cfg = Config::from_url("redis://127.0.0.1:6379");
859        let pool = cfg
860            .builder()
861            .ok()?
862            .max_size(16)
863            .runtime(Runtime::Tokio1)
864            .build()
865            .ok()?;
866        Some(Arc::new(pool))
867    }
868
869    mod integration {
870        use super::*;
871
872        #[tokio::test]
873        #[ignore] // Requires running Redis instance
874        async fn test_distributed_lock_acquire_and_release() {
875            let conn = create_test_redis_connection()
876                .await
877                .expect("Redis connection required for this test");
878
879            let lock =
880                DistributedLock::new(conn, "test:lock:acquire_release", Duration::from_secs(60));
881
882            // Should be able to acquire the lock
883            let guard = lock.try_acquire().await.expect("Redis error");
884            assert!(guard.is_some(), "Should acquire lock when not held");
885
886            // Release the lock
887            let released = guard.unwrap().release().await.expect("Redis error");
888            assert!(released, "Should successfully release the lock");
889        }
890
891        #[tokio::test]
892        #[ignore] // Requires running Redis instance
893        async fn test_distributed_lock_prevents_double_acquisition() {
894            let conn = create_test_redis_connection()
895                .await
896                .expect("Redis connection required for this test");
897
898            let lock1 = DistributedLock::new(
899                conn.clone(),
900                "test:lock:double_acquire",
901                Duration::from_secs(60),
902            );
903            let lock2 =
904                DistributedLock::new(conn, "test:lock:double_acquire", Duration::from_secs(60));
905
906            // First acquisition should succeed
907            let guard1 = lock1.try_acquire().await.expect("Redis error");
908            assert!(guard1.is_some(), "First acquisition should succeed");
909
910            // Second acquisition should fail (lock already held)
911            let guard2 = lock2.try_acquire().await.expect("Redis error");
912            assert!(
913                guard2.is_none(),
914                "Second acquisition should fail - lock already held"
915            );
916
917            // Release the first lock
918            guard1.unwrap().release().await.expect("Redis error");
919
920            // Now second acquisition should succeed
921            let guard2_retry = lock2.try_acquire().await.expect("Redis error");
922            assert!(guard2_retry.is_some(), "Should acquire lock after release");
923
924            // Cleanup
925            guard2_retry.unwrap().release().await.expect("Redis error");
926        }
927
928        #[tokio::test]
929        #[ignore] // Requires running Redis instance
930        async fn test_distributed_lock_expires_after_ttl() {
931            let conn = create_test_redis_connection()
932                .await
933                .expect("Redis connection required for this test");
934
935            // Use a very short TTL for testing
936            let lock =
937                DistributedLock::new(conn.clone(), "test:lock:ttl_expiry", Duration::from_secs(1));
938
939            // Acquire the lock
940            let guard = lock.try_acquire().await.expect("Redis error");
941            assert!(guard.is_some(), "Should acquire lock");
942
943            // Don't release - let it expire
944            drop(guard);
945
946            // Wait for TTL to expire
947            tokio::time::sleep(Duration::from_secs(2)).await;
948
949            // Should be able to acquire again after expiry
950            let lock2 = DistributedLock::new(conn, "test:lock:ttl_expiry", Duration::from_secs(60));
951            let guard2 = lock2.try_acquire().await.expect("Redis error");
952            assert!(guard2.is_some(), "Should acquire lock after TTL expiry");
953
954            // Cleanup
955            if let Some(g) = guard2 {
956                g.release().await.expect("Redis error");
957            }
958        }
959
960        #[tokio::test]
961        #[ignore] // Requires running Redis instance
962        async fn test_distributed_lock_release_only_own_lock() {
963            let conn = create_test_redis_connection()
964                .await
965                .expect("Redis connection required for this test");
966
967            // Use a short TTL
968            let lock = DistributedLock::new(
969                conn.clone(),
970                "test:lock:release_own",
971                Duration::from_secs(1),
972            );
973
974            // Acquire the lock
975            let guard = lock.try_acquire().await.expect("Redis error");
976            assert!(guard.is_some(), "Should acquire lock");
977
978            // Wait for lock to expire
979            tokio::time::sleep(Duration::from_secs(2)).await;
980
981            // Try to release expired lock - should return false (not owned anymore)
982            let released = guard.unwrap().release().await.expect("Redis error");
983            assert!(!released, "Should not release expired lock");
984        }
985
986        #[tokio::test]
987        #[ignore] // Requires running Redis instance
988        async fn test_distributed_lock_with_prefix() {
989            let conn = create_test_redis_connection()
990                .await
991                .expect("Redis connection required for this test");
992
993            // Simulate prefixed lock key like in transaction cleanup
994            // Format: {prefix}:lock:{name}
995            let prefix = "test_prefix";
996            let lock_name = "cleanup";
997            let lock_key = format!("{prefix}:lock:{lock_name}");
998
999            let lock = DistributedLock::new(conn, &lock_key, Duration::from_secs(60));
1000
1001            let guard = lock.try_acquire().await.expect("Redis error");
1002            assert!(guard.is_some(), "Should acquire prefixed lock");
1003
1004            // Cleanup
1005            guard.unwrap().release().await.expect("Redis error");
1006        }
1007
1008        #[tokio::test]
1009        #[ignore] // Requires running Redis instance
1010        async fn test_distributed_lock_drop_releases_lock() {
1011            let conn = create_test_redis_connection()
1012                .await
1013                .expect("Redis connection required for this test");
1014
1015            let lock_key = "test:lock:drop_release";
1016
1017            // Acquire lock in inner scope
1018            {
1019                let lock = DistributedLock::new(conn.clone(), lock_key, Duration::from_secs(60));
1020                let guard = lock.try_acquire().await.expect("Redis error");
1021                assert!(guard.is_some(), "Should acquire lock");
1022
1023                // Guard is dropped here without explicit release()
1024            }
1025
1026            // Give the spawned release task time to complete
1027            tokio::time::sleep(Duration::from_millis(100)).await;
1028
1029            // Should be able to acquire again because Drop released the lock
1030            let lock2 = DistributedLock::new(conn, lock_key, Duration::from_secs(60));
1031            let guard2 = lock2.try_acquire().await.expect("Redis error");
1032            assert!(
1033                guard2.is_some(),
1034                "Should acquire lock after previous guard was dropped"
1035            );
1036
1037            // Cleanup
1038            if let Some(g) = guard2 {
1039                g.release().await.expect("Redis error");
1040            }
1041        }
1042
1043        #[tokio::test]
1044        #[ignore] // Requires running Redis instance
1045        async fn test_distributed_lock_explicit_release_prevents_double_release() {
1046            let conn = create_test_redis_connection()
1047                .await
1048                .expect("Redis connection required for this test");
1049
1050            let lock_key = "test:lock:no_double_release";
1051            let lock = DistributedLock::new(conn.clone(), lock_key, Duration::from_secs(60));
1052
1053            let guard = lock.try_acquire().await.expect("Redis error");
1054            assert!(guard.is_some(), "Should acquire lock");
1055
1056            // Explicitly release
1057            let released = guard.unwrap().release().await.expect("Redis error");
1058            assert!(released, "Should release successfully");
1059
1060            // Guard is now consumed by release(), Drop won't run
1061            // Verify lock is still released (not double-locked)
1062            tokio::time::sleep(Duration::from_millis(50)).await;
1063
1064            let lock2 = DistributedLock::new(conn, lock_key, Duration::from_secs(60));
1065            let guard2 = lock2.try_acquire().await.expect("Redis error");
1066            assert!(
1067                guard2.is_some(),
1068                "Should acquire lock - no double-release issue"
1069            );
1070
1071            // Cleanup
1072            if let Some(g) = guard2 {
1073                g.release().await.expect("Redis error");
1074            }
1075        }
1076
1077        #[tokio::test]
1078        #[ignore] // Requires running Redis instance
1079        async fn test_distributed_lock_drop_on_early_return() {
1080            let conn = create_test_redis_connection()
1081                .await
1082                .expect("Redis connection required for this test");
1083
1084            let lock_key = "test:lock:early_return";
1085
1086            // Simulate a function that returns early (like ? operator)
1087            async fn simulated_work_with_early_return(
1088                conn: Arc<Pool>,
1089                lock_key: &str,
1090            ) -> Result<(), &'static str> {
1091                let lock = DistributedLock::new(conn, lock_key, Duration::from_secs(60));
1092                let _guard = lock
1093                    .try_acquire()
1094                    .await
1095                    .map_err(|_| "lock error")?
1096                    .ok_or("lock held")?;
1097
1098                // Simulate early return (error path)
1099                Err("simulated error")
1100
1101                // _guard is dropped here due to early return
1102            }
1103
1104            // Call the function that returns early
1105            let result = simulated_work_with_early_return(conn.clone(), lock_key).await;
1106            assert!(result.is_err(), "Should have returned early with error");
1107
1108            // Give the spawned release task time to complete
1109            tokio::time::sleep(Duration::from_millis(100)).await;
1110
1111            // Lock should be released despite the early return
1112            let lock2 = DistributedLock::new(conn, lock_key, Duration::from_secs(60));
1113            let guard2 = lock2.try_acquire().await.expect("Redis error");
1114            assert!(
1115                guard2.is_some(),
1116                "Should acquire lock after early return released it"
1117            );
1118
1119            // Cleanup
1120            if let Some(g) = guard2 {
1121                g.release().await.expect("Redis error");
1122            }
1123        }
1124
1125        // =========================================================================
1126        // RedisConnections integration tests
1127        // =========================================================================
1128
1129        #[tokio::test]
1130        #[ignore] // Requires running Redis instance
1131        async fn test_redis_connections_single_pool_operations() {
1132            let pool = create_test_redis_connection()
1133                .await
1134                .expect("Redis connection required for this test");
1135
1136            let connections = RedisConnections::new_single_pool(pool);
1137
1138            // Test that we can get connections from both pools
1139            let mut primary_conn = connections
1140                .primary()
1141                .get()
1142                .await
1143                .expect("Failed to get primary connection");
1144            let mut reader_conn = connections
1145                .reader()
1146                .get()
1147                .await
1148                .expect("Failed to get reader connection");
1149
1150            // Test basic operations on both connections
1151            let _: () = redis::cmd("SET")
1152                .arg("test:connections:key")
1153                .arg("test_value")
1154                .query_async(&mut primary_conn)
1155                .await
1156                .expect("Failed to SET");
1157
1158            let value: String = redis::cmd("GET")
1159                .arg("test:connections:key")
1160                .query_async(&mut reader_conn)
1161                .await
1162                .expect("Failed to GET");
1163
1164            assert_eq!(value, "test_value");
1165
1166            // Cleanup
1167            let _: () = redis::cmd("DEL")
1168                .arg("test:connections:key")
1169                .query_async(&mut primary_conn)
1170                .await
1171                .expect("Failed to DEL");
1172        }
1173
1174        #[tokio::test]
1175        #[ignore] // Requires running Redis instance
1176        async fn test_redis_connections_backward_compatible() {
1177            // Verify that single pool mode (no reader URL) works correctly
1178            let pool = create_test_redis_connection()
1179                .await
1180                .expect("Redis connection required for this test");
1181
1182            let connections = Arc::new(RedisConnections::new_single_pool(pool));
1183
1184            // Multiple repositories should be able to share the connections
1185            let conn1 = connections.clone();
1186            let conn2 = connections.clone();
1187
1188            // Both should be able to get connections
1189            let _primary1 = conn1.primary().get().await.expect("Failed to get primary");
1190            let _reader1 = conn1.reader().get().await.expect("Failed to get reader");
1191            let _primary2 = conn2.primary().get().await.expect("Failed to get primary");
1192            let _reader2 = conn2.reader().get().await.expect("Failed to get reader");
1193
1194            // All should work without issues (backward compatible)
1195        }
1196        // =====================================================================
1197        // Relayer Sync Metadata Tests
1198        // =====================================================================
1199
1200        #[tokio::test]
1201        #[ignore] // Requires running Redis instance
1202        async fn test_set_and_get_relayer_last_sync() {
1203            let conn = create_test_redis_connection()
1204                .await
1205                .expect("Redis connection required for this test");
1206
1207            let prefix = "test_sync";
1208            let relayer_id = "test-relayer-sync";
1209
1210            // Set the last sync time
1211            set_relayer_last_sync(&conn, prefix, relayer_id)
1212                .await
1213                .expect("Should set last sync time");
1214
1215            // Get the last sync time
1216            let last_sync = get_relayer_last_sync(&conn, prefix, relayer_id)
1217                .await
1218                .expect("Should get last sync time");
1219
1220            assert!(last_sync.is_some(), "Should have a last sync time");
1221
1222            // Verify the timestamp is recent (within last minute)
1223            let elapsed = chrono::Utc::now()
1224                .signed_duration_since(last_sync.unwrap())
1225                .num_seconds();
1226            assert!(elapsed < 60, "Last sync should be within last minute");
1227
1228            // Cleanup: delete the hash
1229            let mut conn_clone = conn.get().await.expect("Failed to get connection");
1230            let hash_key = format!("{prefix}:relayer_sync_meta");
1231            let _: () = redis::AsyncCommands::del(&mut conn_clone, &hash_key)
1232                .await
1233                .expect("Cleanup failed");
1234        }
1235
1236        #[tokio::test]
1237        #[ignore] // Requires running Redis instance
1238        async fn test_get_relayer_last_sync_returns_none_when_not_set() {
1239            let conn = create_test_redis_connection()
1240                .await
1241                .expect("Redis connection required for this test");
1242
1243            let prefix = "test_sync_none";
1244            let relayer_id = "nonexistent-relayer";
1245
1246            // Get the last sync time for a relayer that hasn't been synced
1247            let last_sync = get_relayer_last_sync(&conn, prefix, relayer_id)
1248                .await
1249                .expect("Should not error");
1250
1251            assert!(
1252                last_sync.is_none(),
1253                "Should return None for unsynced relayer"
1254            );
1255        }
1256
1257        #[tokio::test]
1258        #[ignore] // Requires running Redis instance
1259        async fn test_is_relayer_recently_synced_returns_true_for_recent_sync() {
1260            let conn = create_test_redis_connection()
1261                .await
1262                .expect("Redis connection required for this test");
1263
1264            let prefix = "test_recent_sync";
1265            let relayer_id = "recent-relayer";
1266
1267            // Set the last sync time
1268            set_relayer_last_sync(&conn, prefix, relayer_id)
1269                .await
1270                .expect("Should set last sync time");
1271
1272            // Check if recently synced (within 5 minutes)
1273            let is_recent = is_relayer_recently_synced(&conn, prefix, relayer_id, 300)
1274                .await
1275                .expect("Should check recent sync");
1276
1277            assert!(is_recent, "Should be recently synced");
1278
1279            // Cleanup
1280            let mut conn_clone = conn.get().await.expect("Failed to get connection");
1281            let hash_key = format!("{prefix}:relayer_sync_meta");
1282            let _: () = redis::AsyncCommands::del(&mut conn_clone, &hash_key)
1283                .await
1284                .expect("Cleanup failed");
1285        }
1286
1287        #[tokio::test]
1288        #[ignore] // Requires running Redis instance
1289        async fn test_is_relayer_recently_synced_returns_false_when_not_set() {
1290            let conn = create_test_redis_connection()
1291                .await
1292                .expect("Redis connection required for this test");
1293
1294            let prefix = "test_not_synced";
1295            let relayer_id = "never-synced-relayer";
1296
1297            // Check if recently synced for a relayer that hasn't been synced
1298            let is_recent = is_relayer_recently_synced(&conn, prefix, relayer_id, 300)
1299                .await
1300                .expect("Should check recent sync");
1301
1302            assert!(!is_recent, "Should not be recently synced");
1303        }
1304
1305        #[tokio::test]
1306        #[ignore] // Requires running Redis instance
1307        async fn test_is_relayer_recently_synced_returns_false_for_stale_sync() {
1308            let conn = create_test_redis_connection()
1309                .await
1310                .expect("Redis connection required for this test");
1311
1312            let prefix = "test_stale_sync";
1313            let relayer_id = "stale-relayer";
1314
1315            // Manually set an old timestamp (10 minutes ago)
1316            let mut conn_clone = conn.get().await.expect("Failed to get connection");
1317            let hash_key = format!("{prefix}:relayer_sync_meta");
1318            let field = format!("{relayer_id}:last_sync");
1319            let old_timestamp = chrono::Utc::now().timestamp() - 600; // 10 minutes ago
1320
1321            redis::AsyncCommands::hset::<_, _, _, ()>(
1322                &mut conn_clone,
1323                &hash_key,
1324                &field,
1325                old_timestamp,
1326            )
1327            .await
1328            .expect("Should set old timestamp");
1329
1330            // Check if recently synced with 5 minute threshold
1331            let is_recent = is_relayer_recently_synced(&conn, prefix, relayer_id, 300)
1332                .await
1333                .expect("Should check recent sync");
1334
1335            assert!(!is_recent, "Should not be recently synced (stale)");
1336
1337            // Cleanup
1338            let _: () = redis::AsyncCommands::del(&mut conn_clone, &hash_key)
1339                .await
1340                .expect("Cleanup failed");
1341        }
1342
1343        #[tokio::test]
1344        #[ignore] // Requires running Redis instance
1345        async fn test_get_relayer_last_sync_multiple_relayers() {
1346            let conn = create_test_redis_connection()
1347                .await
1348                .expect("Redis connection required for this test");
1349
1350            let prefix = "test_multi_sync";
1351
1352            // Set sync times for multiple relayers
1353            set_relayer_last_sync(&conn, prefix, "relayer-1")
1354                .await
1355                .expect("Should set sync time");
1356            tokio::time::sleep(Duration::from_millis(10)).await;
1357            set_relayer_last_sync(&conn, prefix, "relayer-2")
1358                .await
1359                .expect("Should set sync time");
1360
1361            let sync1 = get_relayer_last_sync(&conn, prefix, "relayer-1")
1362                .await
1363                .expect("Should get sync time");
1364            let sync2 = get_relayer_last_sync(&conn, prefix, "relayer-2")
1365                .await
1366                .expect("Should get sync time");
1367
1368            assert!(sync1.is_some(), "Relayer 1 should have sync time");
1369            assert!(sync2.is_some(), "Relayer 2 should have sync time");
1370            assert!(
1371                sync2.unwrap() >= sync1.unwrap(),
1372                "Relayer 2 should be synced at same time or after relayer 1"
1373            );
1374
1375            // Cleanup
1376            let mut conn_clone = conn.get().await.expect("Failed to get connection");
1377            let hash_key = format!("{prefix}:relayer_sync_meta");
1378            let _: () = redis::AsyncCommands::del(&mut conn_clone, &hash_key)
1379                .await
1380                .expect("Cleanup failed");
1381        }
1382
1383        #[tokio::test]
1384        #[ignore] // Requires running Redis instance
1385        async fn test_get_relayer_last_sync_update_existing() {
1386            let conn = create_test_redis_connection()
1387                .await
1388                .expect("Redis connection required for this test");
1389
1390            let prefix = "test_update_sync";
1391            let relayer_id = "update-relayer";
1392
1393            // Set initial sync time
1394            set_relayer_last_sync(&conn, prefix, relayer_id)
1395                .await
1396                .expect("Should set sync time");
1397
1398            let first_sync = get_relayer_last_sync(&conn, prefix, relayer_id)
1399                .await
1400                .expect("Should get sync time")
1401                .expect("Should have sync time");
1402
1403            // Wait and update
1404            tokio::time::sleep(Duration::from_millis(100)).await;
1405
1406            set_relayer_last_sync(&conn, prefix, relayer_id)
1407                .await
1408                .expect("Should update sync time");
1409
1410            let second_sync = get_relayer_last_sync(&conn, prefix, relayer_id)
1411                .await
1412                .expect("Should get sync time")
1413                .expect("Should have sync time");
1414
1415            assert!(
1416                second_sync >= first_sync,
1417                "Updated sync time should be at the same time or later than first"
1418            );
1419
1420            // Cleanup
1421            let mut conn_clone = conn.get().await.expect("Failed to get connection");
1422            let hash_key = format!("{prefix}:relayer_sync_meta");
1423            let _: () = redis::AsyncCommands::del(&mut conn_clone, &hash_key)
1424                .await
1425                .expect("Cleanup failed");
1426        }
1427
1428        #[tokio::test]
1429        #[ignore] // Requires running Redis instance
1430        async fn test_is_relayer_recently_synced_threshold_boundary() {
1431            let conn = create_test_redis_connection()
1432                .await
1433                .expect("Redis connection required for this test");
1434
1435            let prefix = "test_boundary_sync";
1436            let relayer_id = "boundary-relayer";
1437
1438            // Set timestamp exactly at threshold (should be considered NOT recent)
1439            let mut conn_clone = conn.get().await.expect("Failed to get connection");
1440            let hash_key = format!("{prefix}:relayer_sync_meta");
1441            let field = format!("{relayer_id}:last_sync");
1442            let threshold_secs: u64 = 60;
1443            let boundary_timestamp = chrono::Utc::now().timestamp() - (threshold_secs as i64);
1444
1445            redis::AsyncCommands::hset::<_, _, _, ()>(
1446                &mut conn_clone,
1447                &hash_key,
1448                &field,
1449                boundary_timestamp,
1450            )
1451            .await
1452            .expect("Should set boundary timestamp");
1453
1454            let is_recent = is_relayer_recently_synced(&conn, prefix, relayer_id, threshold_secs)
1455                .await
1456                .expect("Should check recent sync");
1457
1458            assert!(!is_recent, "Should not be recent at exact threshold");
1459
1460            // Cleanup
1461            let _: () = redis::AsyncCommands::del(&mut conn_clone, &hash_key)
1462                .await
1463                .expect("Cleanup failed");
1464        }
1465
1466        #[tokio::test]
1467        #[ignore] // Requires running Redis instance
1468        async fn test_is_relayer_recently_synced_just_before_threshold() {
1469            let conn = create_test_redis_connection()
1470                .await
1471                .expect("Redis connection required for this test");
1472
1473            let prefix = "test_just_before_sync";
1474            let relayer_id = "just-before-relayer";
1475
1476            // Set timestamp just before threshold (should be considered recent)
1477            let mut conn_clone = conn.get().await.expect("Failed to get connection");
1478            let hash_key = format!("{prefix}:relayer_sync_meta");
1479            let field = format!("{relayer_id}:last_sync");
1480            let threshold_secs: u64 = 60;
1481            let just_before_timestamp =
1482                chrono::Utc::now().timestamp() - (threshold_secs as i64) + 5;
1483
1484            redis::AsyncCommands::hset::<_, _, _, ()>(
1485                &mut conn_clone,
1486                &hash_key,
1487                &field,
1488                just_before_timestamp,
1489            )
1490            .await
1491            .expect("Should set just-before timestamp");
1492
1493            let is_recent = is_relayer_recently_synced(&conn, prefix, relayer_id, threshold_secs)
1494                .await
1495                .expect("Should check recent sync");
1496
1497            assert!(is_recent, "Should be recent just before threshold");
1498
1499            // Cleanup
1500            let _: () = redis::AsyncCommands::del(&mut conn_clone, &hash_key)
1501                .await
1502                .expect("Cleanup failed");
1503        }
1504
1505        #[tokio::test]
1506        #[ignore] // Requires running Redis instance
1507        async fn test_is_relayer_recently_synced_different_prefixes() {
1508            let conn = create_test_redis_connection()
1509                .await
1510                .expect("Redis connection required for this test");
1511
1512            let relayer_id = "shared-relayer";
1513
1514            // Set sync for prefix1 only
1515            set_relayer_last_sync(&conn, "prefix1", relayer_id)
1516                .await
1517                .expect("Should set sync time");
1518
1519            let is_recent_prefix1 = is_relayer_recently_synced(&conn, "prefix1", relayer_id, 300)
1520                .await
1521                .expect("Should check sync");
1522            let is_recent_prefix2 = is_relayer_recently_synced(&conn, "prefix2", relayer_id, 300)
1523                .await
1524                .expect("Should check sync");
1525
1526            assert!(is_recent_prefix1, "Should be recent for prefix1");
1527            assert!(!is_recent_prefix2, "Should not be recent for prefix2");
1528
1529            // Cleanup
1530            let mut conn_clone = conn.get().await.expect("Failed to get connection");
1531            let hash_key = "prefix1:relayer_sync_meta";
1532            let _: () = redis::AsyncCommands::del(&mut conn_clone, hash_key)
1533                .await
1534                .expect("Cleanup failed");
1535        }
1536
1537        #[tokio::test]
1538        #[ignore] // Requires running Redis instance
1539        async fn test_is_relayer_recently_synced_zero_threshold() {
1540            let conn = create_test_redis_connection()
1541                .await
1542                .expect("Redis connection required for this test");
1543
1544            let prefix = "test_zero_threshold";
1545            let relayer_id = "zero-threshold-relayer";
1546
1547            set_relayer_last_sync(&conn, prefix, relayer_id)
1548                .await
1549                .expect("Should set sync time");
1550
1551            // With zero threshold, even immediate sync should be considered stale
1552            let is_recent = is_relayer_recently_synced(&conn, prefix, relayer_id, 0)
1553                .await
1554                .expect("Should check sync");
1555
1556            assert!(!is_recent, "Should not be recent with zero threshold");
1557
1558            // Cleanup
1559            let mut conn_clone = conn.get().await.expect("Failed to get connection");
1560            let hash_key = format!("{prefix}:relayer_sync_meta");
1561            let _: () = redis::AsyncCommands::del(&mut conn_clone, &hash_key)
1562                .await
1563                .expect("Cleanup failed");
1564        }
1565
1566        // =====================================================================
1567        // Global Initialization Tracking Tests
1568        // =====================================================================
1569
1570        #[tokio::test]
1571        #[ignore] // Requires running Redis instance
1572        async fn test_set_and_check_global_init_completed() {
1573            let conn = create_test_redis_connection()
1574                .await
1575                .expect("Redis connection required for this test");
1576
1577            let prefix = "test_global_init";
1578
1579            // Set global init completed
1580            set_global_init_completed(&conn, prefix)
1581                .await
1582                .expect("Should set global init completed");
1583
1584            // Check if recently completed (within 5 minutes)
1585            let is_recent = is_global_init_recently_completed(&conn, prefix, 300)
1586                .await
1587                .expect("Should check global init");
1588
1589            assert!(is_recent, "Should be recently completed");
1590
1591            // Cleanup
1592            let mut conn_clone = conn.get().await.expect("Failed to get connection");
1593            let hash_key = format!("{prefix}:relayer_sync_meta");
1594            let _: () = redis::AsyncCommands::del(&mut conn_clone, &hash_key)
1595                .await
1596                .expect("Cleanup failed");
1597        }
1598
1599        #[tokio::test]
1600        #[ignore] // Requires running Redis instance
1601        async fn test_is_global_init_recently_completed_returns_false_when_not_set() {
1602            let conn = create_test_redis_connection()
1603                .await
1604                .expect("Redis connection required for this test");
1605
1606            let prefix = "test_global_init_not_set";
1607
1608            // Check without setting - should return false
1609            let is_recent = is_global_init_recently_completed(&conn, prefix, 300)
1610                .await
1611                .expect("Should check global init");
1612
1613            assert!(!is_recent, "Should not be recently completed when not set");
1614        }
1615
1616        #[tokio::test]
1617        #[ignore] // Requires running Redis instance
1618        async fn test_is_global_init_recently_completed_returns_false_when_stale() {
1619            let conn = create_test_redis_connection()
1620                .await
1621                .expect("Redis connection required for this test");
1622
1623            let prefix = "test_global_init_stale";
1624
1625            // Manually set an old timestamp (10 minutes ago)
1626            let mut conn_clone = conn.get().await.expect("Failed to get connection");
1627            let hash_key = format!("{prefix}:relayer_sync_meta");
1628            let old_timestamp = chrono::Utc::now().timestamp() - 600; // 10 minutes ago
1629
1630            redis::AsyncCommands::hset::<_, _, _, ()>(
1631                &mut conn_clone,
1632                &hash_key,
1633                "global:init_completed",
1634                old_timestamp,
1635            )
1636            .await
1637            .expect("Should set old timestamp");
1638
1639            // Check with 5 minute threshold - should be stale
1640            let is_recent = is_global_init_recently_completed(&conn, prefix, 300)
1641                .await
1642                .expect("Should check global init");
1643
1644            assert!(!is_recent, "Should not be recent when stale");
1645
1646            // Cleanup
1647            let _: () = redis::AsyncCommands::del(&mut conn_clone, &hash_key)
1648                .await
1649                .expect("Cleanup failed");
1650        }
1651
1652        #[tokio::test]
1653        #[ignore] // Requires running Redis instance
1654        async fn test_global_init_different_prefixes() {
1655            let conn = create_test_redis_connection()
1656                .await
1657                .expect("Redis connection required for this test");
1658
1659            // Set global init for prefix1 only
1660            set_global_init_completed(&conn, "global_prefix1")
1661                .await
1662                .expect("Should set global init");
1663
1664            let is_recent_prefix1 = is_global_init_recently_completed(&conn, "global_prefix1", 300)
1665                .await
1666                .expect("Should check global init");
1667            let is_recent_prefix2 = is_global_init_recently_completed(&conn, "global_prefix2", 300)
1668                .await
1669                .expect("Should check global init");
1670
1671            assert!(is_recent_prefix1, "Should be recent for prefix1");
1672            assert!(!is_recent_prefix2, "Should not be recent for prefix2");
1673
1674            // Cleanup
1675            let mut conn_clone = conn.get().await.expect("Failed to get connection");
1676            let hash_key = "global_prefix1:relayer_sync_meta";
1677            let _: () = redis::AsyncCommands::del(&mut conn_clone, hash_key)
1678                .await
1679                .expect("Cleanup failed");
1680        }
1681
1682        #[tokio::test]
1683        #[ignore] // Requires running Redis instance
1684        async fn test_global_init_update_existing() {
1685            let conn = create_test_redis_connection()
1686                .await
1687                .expect("Redis connection required for this test");
1688
1689            let prefix = "test_global_init_update";
1690
1691            // Set initial timestamp
1692            set_global_init_completed(&conn, prefix)
1693                .await
1694                .expect("Should set global init");
1695
1696            // Wait a bit
1697            tokio::time::sleep(Duration::from_millis(100)).await;
1698
1699            // Update timestamp
1700            set_global_init_completed(&conn, prefix)
1701                .await
1702                .expect("Should update global init");
1703
1704            // Should still be recent
1705            let is_recent = is_global_init_recently_completed(&conn, prefix, 300)
1706                .await
1707                .expect("Should check global init");
1708
1709            assert!(is_recent, "Should be recent after update");
1710
1711            // Cleanup
1712            let mut conn_clone = conn.get().await.expect("Failed to get connection");
1713            let hash_key = format!("{prefix}:relayer_sync_meta");
1714            let _: () = redis::AsyncCommands::del(&mut conn_clone, &hash_key)
1715                .await
1716                .expect("Cleanup failed");
1717        }
1718
1719        #[tokio::test]
1720        #[ignore] // Requires running Redis instance
1721        async fn test_global_init_coexists_with_relayer_sync() {
1722            let conn = create_test_redis_connection()
1723                .await
1724                .expect("Redis connection required for this test");
1725
1726            let prefix = "test_coexist";
1727            let relayer_id = "test-relayer";
1728
1729            // Set both global init and relayer sync
1730            set_global_init_completed(&conn, prefix)
1731                .await
1732                .expect("Should set global init");
1733            set_relayer_last_sync(&conn, prefix, relayer_id)
1734                .await
1735                .expect("Should set relayer sync");
1736
1737            // Both should be checkable
1738            let global_recent = is_global_init_recently_completed(&conn, prefix, 300)
1739                .await
1740                .expect("Should check global init");
1741            let relayer_recent = is_relayer_recently_synced(&conn, prefix, relayer_id, 300)
1742                .await
1743                .expect("Should check relayer sync");
1744
1745            assert!(global_recent, "Global init should be recent");
1746            assert!(relayer_recent, "Relayer sync should be recent");
1747
1748            // Cleanup
1749            let mut conn_clone = conn.get().await.expect("Failed to get connection");
1750            let hash_key = format!("{prefix}:relayer_sync_meta");
1751            let _: () = redis::AsyncCommands::del(&mut conn_clone, &hash_key)
1752                .await
1753                .expect("Cleanup failed");
1754        }
1755
1756        // =====================================================================
1757        // Config Processing Marker Tests
1758        // =====================================================================
1759
1760        #[tokio::test]
1761        #[ignore] // Requires running Redis instance
1762        async fn test_config_processing_in_progress_is_not_completed() {
1763            let conn = create_test_redis_connection()
1764                .await
1765                .expect("Redis connection required for this test");
1766
1767            let prefix = "test_config_processing_in_progress";
1768
1769            set_config_processing_in_progress(&conn, prefix)
1770                .await
1771                .expect("Should set in-progress marker");
1772
1773            let completed = is_config_processing_completed(&conn, prefix)
1774                .await
1775                .expect("Should read config processing status");
1776
1777            assert!(
1778                !completed,
1779                "In-progress config processing must not be treated as completed"
1780            );
1781
1782            let mut conn_clone = conn.get().await.expect("Failed to get connection");
1783            let hash_key = format!("{prefix}:bootstrap_meta");
1784            let _: () = redis::AsyncCommands::del(&mut conn_clone, &hash_key)
1785                .await
1786                .expect("Cleanup failed");
1787        }
1788
1789        #[tokio::test]
1790        #[ignore] // Requires running Redis instance
1791        async fn test_config_processing_completed_requires_explicit_completion_marker() {
1792            let conn = create_test_redis_connection()
1793                .await
1794                .expect("Redis connection required for this test");
1795
1796            let prefix = "test_config_processing_completed";
1797
1798            set_config_processing_in_progress(&conn, prefix)
1799                .await
1800                .expect("Should set in-progress marker");
1801            set_config_processing_completed(&conn, prefix)
1802                .await
1803                .expect("Should set completed marker");
1804
1805            let completed = is_config_processing_completed(&conn, prefix)
1806                .await
1807                .expect("Should read config processing status");
1808
1809            assert!(
1810                completed,
1811                "Completed config processing must require the explicit completion marker"
1812            );
1813
1814            let mut conn_clone = conn.get().await.expect("Failed to get connection");
1815            let hash_key = format!("{prefix}:bootstrap_meta");
1816            let _: () = redis::AsyncCommands::del(&mut conn_clone, &hash_key)
1817                .await
1818                .expect("Cleanup failed");
1819        }
1820    }
1821}