openzeppelin_relayer/repositories/relayer/
relayer_redis.rs

1//! Redis-backed implementation of the RelayerRepository.
2
3use crate::models::UpdateRelayerRequest;
4use crate::models::{
5    DisabledReason, PaginationQuery, RelayerNetworkPolicy, RelayerRepoModel, RepositoryError,
6};
7use crate::repositories::redis_base::RedisRepository;
8use crate::repositories::{BatchRetrievalResult, PaginatedResult, RelayerRepository, Repository};
9use crate::utils::RedisConnections;
10use async_trait::async_trait;
11use redis::AsyncCommands;
12use std::fmt;
13use std::sync::Arc;
14use tracing::{debug, error, warn};
15
16const RELAYER_PREFIX: &str = "relayer";
17const RELAYER_LIST_KEY: &str = "relayer_list";
18
19#[derive(Clone)]
20pub struct RedisRelayerRepository {
21    pub connections: Arc<RedisConnections>,
22    pub key_prefix: String,
23}
24
25impl RedisRepository for RedisRelayerRepository {}
26
27impl RedisRelayerRepository {
28    pub fn new(
29        connections: Arc<RedisConnections>,
30        key_prefix: String,
31    ) -> Result<Self, RepositoryError> {
32        if key_prefix.is_empty() {
33            return Err(RepositoryError::InvalidData(
34                "Redis key prefix cannot be empty".to_string(),
35            ));
36        }
37
38        Ok(Self {
39            connections,
40            key_prefix,
41        })
42    }
43
44    /// Generate key for relayer data: relayer:{relayer_id}
45    fn relayer_key(&self, relayer_id: &str) -> String {
46        format!("{}:{}:{}", self.key_prefix, RELAYER_PREFIX, relayer_id)
47    }
48
49    /// Generate key for relayer list: relayer_list (set of all relayer IDs)
50    fn relayer_list_key(&self) -> String {
51        format!("{}:{}", self.key_prefix, RELAYER_LIST_KEY)
52    }
53
54    /// Batch fetch relayers by IDs
55    async fn get_relayers_by_ids(
56        &self,
57        ids: &[String],
58    ) -> Result<BatchRetrievalResult<RelayerRepoModel>, RepositoryError> {
59        if ids.is_empty() {
60            debug!("no relayer IDs provided for batch fetch");
61            return Ok(BatchRetrievalResult {
62                results: vec![],
63                failed_ids: vec![],
64            });
65        }
66
67        let mut conn = self
68            .get_connection(self.connections.reader(), "batch_fetch_relayers")
69            .await?;
70        let keys: Vec<String> = ids.iter().map(|id| self.relayer_key(id)).collect();
71
72        debug!(count = %keys.len(), "batch fetching relayer data");
73
74        let values: Vec<Option<String>> = conn
75            .mget(&keys)
76            .await
77            .map_err(|e| self.map_redis_error(e, "batch_fetch_relayers"))?;
78
79        let mut relayers = Vec::new();
80        let mut failed_count = 0;
81        let mut failed_ids = Vec::new();
82        for (i, value) in values.into_iter().enumerate() {
83            match value {
84                Some(json) => {
85                    match self.deserialize_entity(&json, &ids[i], "relayer") {
86                        Ok(relayer) => relayers.push(relayer),
87                        Err(e) => {
88                            failed_count += 1;
89                            error!(relayer_id = %ids[i], error = %e, "failed to deserialize relayer");
90                            failed_ids.push(ids[i].clone());
91                            // Continue processing other relayers
92                        }
93                    }
94                }
95                None => {
96                    warn!(relayer_id = %ids[i], "relayer not found in batch fetch");
97                }
98            }
99        }
100
101        if failed_count > 0 {
102            warn!(failed_count = %failed_count, total_count = %ids.len(), "failed to deserialize relayers in batch");
103        }
104
105        debug!(count = %relayers.len(), "successfully fetched relayers");
106        Ok(BatchRetrievalResult {
107            results: relayers,
108            failed_ids,
109        })
110    }
111}
112
113impl fmt::Debug for RedisRelayerRepository {
114    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115        f.debug_struct("RedisRelayerRepository")
116            .field("connections", &"<RedisConnections>")
117            .field("key_prefix", &self.key_prefix)
118            .finish()
119    }
120}
121
122#[async_trait]
123impl Repository<RelayerRepoModel, String> for RedisRelayerRepository {
124    async fn create(&self, entity: RelayerRepoModel) -> Result<RelayerRepoModel, RepositoryError> {
125        if entity.id.is_empty() {
126            return Err(RepositoryError::InvalidData(
127                "Relayer ID cannot be empty".to_string(),
128            ));
129        }
130
131        if entity.name.is_empty() {
132            return Err(RepositoryError::InvalidData(
133                "Relayer name cannot be empty".to_string(),
134            ));
135        }
136
137        let mut conn = self
138            .get_connection(self.connections.primary(), "create")
139            .await?;
140        let relayer_key = self.relayer_key(&entity.id);
141
142        // Check if relayer already exists
143        let exists: bool = conn
144            .exists(&relayer_key)
145            .await
146            .map_err(|e| self.map_redis_error(e, "create_relayer_exists_check"))?;
147
148        if exists {
149            return Err(RepositoryError::ConstraintViolation(format!(
150                "Relayer with ID {} already exists",
151                entity.id
152            )));
153        }
154
155        let serialized = self.serialize_entity(&entity, |r| &r.id, "relayer")?;
156
157        // Use pipeline for atomic operations
158        let mut pipe = redis::pipe();
159        pipe.atomic();
160        pipe.set(&relayer_key, &serialized);
161        pipe.sadd(self.relayer_list_key(), &entity.id);
162
163        pipe.exec_async(&mut conn)
164            .await
165            .map_err(|e| self.map_redis_error(e, "create_relayer_pipeline"))?;
166
167        debug!(relayer_id = %entity.id, "created relayer");
168        Ok(entity)
169    }
170
171    async fn get_by_id(&self, id: String) -> Result<RelayerRepoModel, RepositoryError> {
172        if id.is_empty() {
173            return Err(RepositoryError::InvalidData(
174                "Relayer ID cannot be empty".to_string(),
175            ));
176        }
177
178        let mut conn = self
179            .get_connection(self.connections.reader(), "get_by_id")
180            .await?;
181        let relayer_key = self.relayer_key(&id);
182
183        debug!(relayer_id = %id, "fetching relayer");
184
185        let json: Option<String> = conn
186            .get(&relayer_key)
187            .await
188            .map_err(|e| self.map_redis_error(e, "get_relayer_by_id"))?;
189
190        match json {
191            Some(json) => {
192                debug!(relayer_id = %id, "found relayer");
193                self.deserialize_entity(&json, &id, "relayer")
194            }
195            None => {
196                debug!(relayer_id = %id, "relayer not found");
197                Err(RepositoryError::NotFound(format!(
198                    "Relayer with ID {id} not found"
199                )))
200            }
201        }
202    }
203
204    async fn list_all(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
205        let relayer_ids = {
206            let mut conn = self
207                .get_connection(self.connections.reader(), "list_all")
208                .await?;
209            let relayer_list_key = self.relayer_list_key();
210
211            debug!("listing all relayers");
212
213            let ids: Vec<String> = conn
214                .smembers(&relayer_list_key)
215                .await
216                .map_err(|e| self.map_redis_error(e, "list_all_relayers"))?;
217
218            debug!(count = %ids.len(), "found relayers in index");
219            ids
220            // Connection dropped here before nested call to avoid connection doubling
221        };
222
223        let relayers = self.get_relayers_by_ids(&relayer_ids).await?;
224        Ok(relayers.results)
225    }
226
227    async fn list_paginated(
228        &self,
229        query: PaginationQuery,
230    ) -> Result<PaginatedResult<RelayerRepoModel>, RepositoryError> {
231        if query.page == 0 {
232            return Err(RepositoryError::InvalidData(
233                "Page number must be greater than 0".to_string(),
234            ));
235        }
236
237        if query.per_page == 0 {
238            return Err(RepositoryError::InvalidData(
239                "Per page count must be greater than 0".to_string(),
240            ));
241        }
242
243        let (total, page_ids) = {
244            let mut conn = self
245                .get_connection(self.connections.reader(), "list_paginated")
246                .await?;
247            let relayer_list_key = self.relayer_list_key();
248
249            // Get total count
250            let total: u64 = conn
251                .scard(&relayer_list_key)
252                .await
253                .map_err(|e| self.map_redis_error(e, "list_paginated_count"))?;
254
255            if total == 0 {
256                return Ok(PaginatedResult {
257                    items: vec![],
258                    total: 0,
259                    page: query.page,
260                    per_page: query.per_page,
261                });
262            }
263
264            // Get all IDs and paginate in memory
265            let all_ids: Vec<String> = conn
266                .smembers(&relayer_list_key)
267                .await
268                .map_err(|e| self.map_redis_error(e, "list_paginated_members"))?;
269
270            let start = ((query.page - 1) * query.per_page) as usize;
271            let end = (start + query.per_page as usize).min(all_ids.len());
272
273            (total, all_ids[start..end].to_vec())
274            // Connection dropped here before nested call to avoid connection doubling
275        };
276
277        let items = self.get_relayers_by_ids(&page_ids).await?;
278
279        Ok(PaginatedResult {
280            items: items.results.clone(),
281            total,
282            page: query.page,
283            per_page: query.per_page,
284        })
285    }
286
287    async fn update(
288        &self,
289        id: String,
290        entity: RelayerRepoModel,
291    ) -> Result<RelayerRepoModel, RepositoryError> {
292        if id.is_empty() {
293            return Err(RepositoryError::InvalidData(
294                "Relayer ID cannot be empty".to_string(),
295            ));
296        }
297
298        if entity.name.is_empty() {
299            return Err(RepositoryError::InvalidData(
300                "Relayer name cannot be empty".to_string(),
301            ));
302        }
303
304        let mut conn = self
305            .get_connection(self.connections.primary(), "update")
306            .await?;
307        let relayer_key = self.relayer_key(&id);
308
309        // Check if relayer exists
310        let exists: bool = conn
311            .exists(&relayer_key)
312            .await
313            .map_err(|e| self.map_redis_error(e, "update_relayer_exists_check"))?;
314
315        if !exists {
316            return Err(RepositoryError::NotFound(format!(
317                "Relayer with ID {id} not found"
318            )));
319        }
320
321        // Ensure we preserve the original ID
322        let mut updated_entity = entity;
323        updated_entity.id = id.clone();
324
325        let serialized = self.serialize_entity(&updated_entity, |r| &r.id, "relayer")?;
326
327        // Use pipeline for atomic operations
328        let mut pipe = redis::pipe();
329        pipe.atomic();
330        pipe.set(&relayer_key, &serialized);
331        pipe.sadd(self.relayer_list_key(), &id);
332
333        pipe.exec_async(&mut conn)
334            .await
335            .map_err(|e| self.map_redis_error(e, "update_relayer_pipeline"))?;
336
337        debug!(relayer_id = %id, "updated relayer");
338        Ok(updated_entity)
339    }
340
341    async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
342        if id.is_empty() {
343            return Err(RepositoryError::InvalidData(
344                "Relayer ID cannot be empty".to_string(),
345            ));
346        }
347
348        let mut conn = self
349            .get_connection(self.connections.primary(), "delete_by_id")
350            .await?;
351        let relayer_key = self.relayer_key(&id);
352
353        // Check if relayer exists
354        let exists: bool = conn
355            .exists(&relayer_key)
356            .await
357            .map_err(|e| self.map_redis_error(e, "delete_relayer_exists_check"))?;
358
359        if !exists {
360            return Err(RepositoryError::NotFound(format!(
361                "Relayer with ID {id} not found"
362            )));
363        }
364
365        // Use pipeline for atomic operations
366        let mut pipe = redis::pipe();
367        pipe.atomic();
368        pipe.del(&relayer_key);
369        pipe.srem(self.relayer_list_key(), &id);
370
371        pipe.exec_async(&mut conn)
372            .await
373            .map_err(|e| self.map_redis_error(e, "delete_relayer_pipeline"))?;
374
375        debug!(relayer_id = %id, "deleted relayer");
376        Ok(())
377    }
378
379    async fn count(&self) -> Result<usize, RepositoryError> {
380        let mut conn = self
381            .get_connection(self.connections.reader(), "count")
382            .await?;
383        let relayer_list_key = self.relayer_list_key();
384
385        let count: u64 = conn
386            .scard(&relayer_list_key)
387            .await
388            .map_err(|e| self.map_redis_error(e, "count_relayers"))?;
389
390        Ok(count as usize)
391    }
392
393    async fn has_entries(&self) -> Result<bool, RepositoryError> {
394        let mut conn = self
395            .get_connection(self.connections.reader(), "has_entries")
396            .await?;
397        let relayer_list_key = self.relayer_list_key();
398
399        debug!("checking if relayer entries exist");
400
401        let exists: bool = conn
402            .exists(&relayer_list_key)
403            .await
404            .map_err(|e| self.map_redis_error(e, "has_entries_check"))?;
405
406        debug!(exists = %exists, "relayer entries exist");
407        Ok(exists)
408    }
409
410    async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
411        let mut conn = self
412            .get_connection(self.connections.primary(), "drop_all_entries")
413            .await?;
414        let relayer_list_key = self.relayer_list_key();
415
416        debug!("dropping all relayer entries");
417
418        // Get all relayer IDs first
419        let relayer_ids: Vec<String> = conn
420            .smembers(&relayer_list_key)
421            .await
422            .map_err(|e| self.map_redis_error(e, "drop_all_entries_get_ids"))?;
423
424        if relayer_ids.is_empty() {
425            debug!("no relayer entries to drop");
426            return Ok(());
427        }
428
429        // Use pipeline for atomic operations
430        let mut pipe = redis::pipe();
431        pipe.atomic();
432
433        // Delete all individual relayer entries
434        for relayer_id in &relayer_ids {
435            let relayer_key = self.relayer_key(relayer_id);
436            pipe.del(&relayer_key);
437        }
438
439        // Delete the relayer list key
440        pipe.del(&relayer_list_key);
441
442        pipe.exec_async(&mut conn)
443            .await
444            .map_err(|e| self.map_redis_error(e, "drop_all_entries_pipeline"))?;
445
446        debug!(count = %relayer_ids.len(), "dropped relayer entries");
447        Ok(())
448    }
449}
450
451#[async_trait]
452impl RelayerRepository for RedisRelayerRepository {
453    async fn list_active(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
454        let all_relayers = self.list_all().await?;
455        let active_relayers: Vec<RelayerRepoModel> = all_relayers
456            .into_iter()
457            .filter(|relayer| !relayer.paused)
458            .collect();
459
460        debug!(count = %active_relayers.len(), "found active relayers");
461        Ok(active_relayers)
462    }
463
464    async fn list_by_signer_id(
465        &self,
466        signer_id: &str,
467    ) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
468        let all_relayers = self.list_all().await?;
469        let relayers_with_signer: Vec<RelayerRepoModel> = all_relayers
470            .into_iter()
471            .filter(|relayer| relayer.signer_id == signer_id)
472            .collect();
473
474        debug!(count = %relayers_with_signer.len(), signer_id = %signer_id, "found relayers using signer");
475        Ok(relayers_with_signer)
476    }
477
478    async fn list_by_notification_id(
479        &self,
480        notification_id: &str,
481    ) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
482        let all_relayers = self.list_all().await?;
483        let relayers_with_notification: Vec<RelayerRepoModel> = all_relayers
484            .into_iter()
485            .filter(|relayer| {
486                relayer
487                    .notification_id
488                    .as_ref()
489                    .is_some_and(|id| id == notification_id)
490            })
491            .collect();
492
493        debug!(count = %relayers_with_notification.len(), notification_id = %notification_id, "found relayers using notification");
494        Ok(relayers_with_notification)
495    }
496
497    async fn partial_update(
498        &self,
499        id: String,
500        update: UpdateRelayerRequest,
501    ) -> Result<RelayerRepoModel, RepositoryError> {
502        // First get the current relayer
503        let mut relayer = self.get_by_id(id.clone()).await?;
504
505        // Apply the partial update
506        if let Some(paused) = update.paused {
507            relayer.paused = paused;
508        }
509
510        // Update the relayer
511        self.update(id, relayer).await
512    }
513
514    async fn enable_relayer(
515        &self,
516        relayer_id: String,
517    ) -> Result<RelayerRepoModel, RepositoryError> {
518        // First get the current relayer
519        let mut relayer = self.get_by_id(relayer_id.clone()).await?;
520
521        // Update the system_disabled flag and clear reason
522        relayer.system_disabled = false;
523        relayer.disabled_reason = None;
524
525        // Update the relayer
526        self.update(relayer_id, relayer).await
527    }
528
529    async fn disable_relayer(
530        &self,
531        relayer_id: String,
532        reason: DisabledReason,
533    ) -> Result<RelayerRepoModel, RepositoryError> {
534        // First get the current relayer
535        let mut relayer = self.get_by_id(relayer_id.clone()).await?;
536
537        // Update the system_disabled flag and set reason
538        relayer.system_disabled = true;
539        relayer.disabled_reason = Some(reason);
540
541        // Update the relayer
542        self.update(relayer_id, relayer).await
543    }
544
545    async fn update_policy(
546        &self,
547        id: String,
548        policy: RelayerNetworkPolicy,
549    ) -> Result<RelayerRepoModel, RepositoryError> {
550        // First get the current relayer
551        let mut relayer = self.get_by_id(id.clone()).await?;
552
553        // Update the policy
554        relayer.policies = policy;
555
556        // Update the relayer
557        self.update(id, relayer).await
558    }
559
560    fn is_persistent_storage(&self) -> bool {
561        true
562    }
563
564    fn connection_info(&self) -> Option<(Arc<deadpool_redis::Pool>, String)> {
565        Some((self.connections.primary().clone(), self.key_prefix.clone()))
566    }
567}
568
569#[cfg(test)]
570mod tests {
571    use super::*;
572    use crate::models::{NetworkType, RelayerEvmPolicy, RelayerNetworkPolicy};
573    use deadpool_redis::{Config, Runtime};
574    use std::sync::Arc;
575
576    fn create_test_relayer(id: &str) -> RelayerRepoModel {
577        RelayerRepoModel {
578            id: id.to_string(),
579            name: format!("Test Relayer {id}"),
580            network: "ethereum".to_string(),
581            paused: false,
582            network_type: NetworkType::Evm,
583            signer_id: "test-signer".to_string(),
584            policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy::default()),
585            address: "0x742d35Cc6634C0532925a3b844Bc454e4438f44e".to_string(),
586            notification_id: None,
587            system_disabled: false,
588            disabled_reason: None,
589            custom_rpc_urls: None,
590        }
591    }
592
593    fn create_test_relayer_with_pause(id: &str, paused: bool) -> RelayerRepoModel {
594        let mut relayer = create_test_relayer(id);
595        relayer.paused = paused;
596        relayer
597    }
598
599    async fn setup_test_repo() -> RedisRelayerRepository {
600        let redis_url =
601            std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379/".to_string());
602        let cfg = Config::from_url(&redis_url);
603        let pool = Arc::new(
604            cfg.builder()
605                .expect("Failed to create pool builder")
606                .max_size(16)
607                .runtime(Runtime::Tokio1)
608                .build()
609                .expect("Failed to build Redis pool"),
610        );
611        let connections = Arc::new(RedisConnections::new_single_pool(pool));
612
613        let random_id = uuid::Uuid::new_v4().to_string();
614        let key_prefix = format!("test_prefix:{random_id}");
615
616        RedisRelayerRepository::new(connections, key_prefix)
617            .expect("Failed to create Redis relayer repository")
618    }
619
620    #[ignore = "Requires active Redis instance"]
621    #[tokio::test]
622    async fn test_new_repository_creation() {
623        let repo = setup_test_repo().await;
624        assert!(repo.key_prefix.contains("test_prefix"));
625    }
626
627    #[ignore = "Requires active Redis instance"]
628    #[tokio::test]
629    async fn test_new_repository_empty_prefix_fails() {
630        let redis_url =
631            std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379/".to_string());
632        let cfg = Config::from_url(&redis_url);
633        let pool = Arc::new(
634            cfg.builder()
635                .expect("Failed to create pool builder")
636                .max_size(16)
637                .runtime(Runtime::Tokio1)
638                .build()
639                .expect("Failed to build Redis pool"),
640        );
641        let connections = Arc::new(RedisConnections::new_single_pool(pool));
642
643        let result = RedisRelayerRepository::new(connections, "".to_string());
644        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
645    }
646
647    #[ignore = "Requires active Redis instance"]
648    #[tokio::test]
649    async fn test_key_generation() {
650        let repo = setup_test_repo().await;
651
652        let relayer_key = repo.relayer_key("test-relayer");
653        assert!(relayer_key.contains(":relayer:test-relayer"));
654
655        let list_key = repo.relayer_list_key();
656        assert!(list_key.contains(":relayer_list"));
657    }
658
659    #[ignore = "Requires active Redis instance"]
660    #[tokio::test]
661    async fn test_serialize_deserialize_relayer() {
662        let repo = setup_test_repo().await;
663        let relayer = create_test_relayer("test-relayer");
664
665        let serialized = repo
666            .serialize_entity(&relayer, |r| &r.id, "relayer")
667            .unwrap();
668        let deserialized: RelayerRepoModel = repo
669            .deserialize_entity(&serialized, &relayer.id, "relayer")
670            .unwrap();
671
672        assert_eq!(relayer.id, deserialized.id);
673        assert_eq!(relayer.name, deserialized.name);
674        assert_eq!(relayer.network, deserialized.network);
675        assert_eq!(relayer.paused, deserialized.paused);
676        assert_eq!(relayer.network_type, deserialized.network_type);
677        assert_eq!(relayer.signer_id, deserialized.signer_id);
678        assert_eq!(relayer.address, deserialized.address);
679        assert_eq!(relayer.notification_id, deserialized.notification_id);
680        assert_eq!(relayer.system_disabled, deserialized.system_disabled);
681        assert_eq!(relayer.custom_rpc_urls, deserialized.custom_rpc_urls);
682    }
683
684    #[ignore = "Requires active Redis instance"]
685    #[tokio::test]
686    async fn test_create_relayer() {
687        let repo = setup_test_repo().await;
688        let relayer_id = uuid::Uuid::new_v4().to_string();
689        let relayer = create_test_relayer(&relayer_id);
690
691        let result = repo.create(relayer.clone()).await;
692        assert!(result.is_ok());
693
694        let created_relayer = result.unwrap();
695        assert_eq!(created_relayer.id, relayer_id);
696        assert_eq!(created_relayer.name, relayer.name);
697    }
698
699    #[ignore = "Requires active Redis instance"]
700    #[tokio::test]
701    async fn test_get_relayer() {
702        let repo = setup_test_repo().await;
703        let relayer_id = uuid::Uuid::new_v4().to_string();
704        let relayer = create_test_relayer(&relayer_id);
705
706        repo.create(relayer.clone()).await.unwrap();
707
708        let retrieved = repo.get_by_id(relayer_id).await.unwrap();
709        assert_eq!(retrieved.id, relayer.id);
710        assert_eq!(retrieved.name, relayer.name);
711    }
712
713    #[ignore = "Requires active Redis instance"]
714    #[tokio::test]
715    async fn test_list_all_relayers() {
716        let repo = setup_test_repo().await;
717        let relayer1_id = uuid::Uuid::new_v4().to_string();
718        let relayer2_id = uuid::Uuid::new_v4().to_string();
719        let relayer1 = create_test_relayer(&relayer1_id);
720        let relayer2 = create_test_relayer(&relayer2_id);
721
722        repo.create(relayer1).await.unwrap();
723        repo.create(relayer2).await.unwrap();
724
725        let all_relayers = repo.list_all().await.unwrap();
726        assert!(all_relayers.len() >= 2);
727    }
728
729    #[ignore = "Requires active Redis instance"]
730    #[tokio::test]
731    async fn test_list_active_relayers() {
732        let repo = setup_test_repo().await;
733        let relayer1_id = uuid::Uuid::new_v4().to_string();
734        let relayer2_id = uuid::Uuid::new_v4().to_string();
735        let relayer1 = create_test_relayer_with_pause(&relayer1_id, false);
736        let relayer2 = create_test_relayer_with_pause(&relayer2_id, true);
737
738        repo.create(relayer1).await.unwrap();
739        repo.create(relayer2).await.unwrap();
740
741        let active_relayers = repo.list_active().await.unwrap();
742        // Should have at least 1 active relayer
743        assert!(!active_relayers.is_empty());
744        // All returned relayers should be active
745        assert!(active_relayers.iter().all(|r| !r.paused));
746    }
747
748    #[ignore = "Requires active Redis instance"]
749    #[tokio::test]
750    async fn test_count_relayers() {
751        let repo = setup_test_repo().await;
752        let relayer_id = uuid::Uuid::new_v4().to_string();
753        let relayer = create_test_relayer(&relayer_id);
754
755        repo.create(relayer).await.unwrap();
756
757        let count = repo.count().await.unwrap();
758        assert!(count >= 1);
759    }
760
761    #[ignore = "Requires active Redis instance"]
762    #[tokio::test]
763    async fn test_get_nonexistent_relayer() {
764        let repo = setup_test_repo().await;
765
766        let result = repo.get_by_id("nonexistent-relayer".to_string()).await;
767        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
768    }
769
770    #[ignore = "Requires active Redis instance"]
771    #[tokio::test]
772    async fn test_duplicate_relayer_creation() {
773        let repo = setup_test_repo().await;
774        let relayer_id = uuid::Uuid::new_v4().to_string();
775        let relayer = create_test_relayer(&relayer_id);
776
777        repo.create(relayer.clone()).await.unwrap();
778
779        let duplicate_result = repo.create(relayer).await;
780        assert!(matches!(
781            duplicate_result,
782            Err(RepositoryError::ConstraintViolation(_))
783        ));
784    }
785
786    #[ignore = "Requires active Redis instance"]
787    #[tokio::test]
788    async fn test_update_relayer() {
789        let repo = setup_test_repo().await;
790        let relayer_id = uuid::Uuid::new_v4().to_string();
791        let relayer = create_test_relayer(&relayer_id);
792
793        repo.create(relayer.clone()).await.unwrap();
794
795        let mut updated_relayer = relayer.clone();
796        updated_relayer.name = "Updated Relayer Name".to_string();
797
798        let result = repo.update(relayer.id.clone(), updated_relayer).await;
799        assert!(result.is_ok());
800
801        let updated = result.unwrap();
802        assert_eq!(updated.name, "Updated Relayer Name");
803        assert_eq!(updated.id, relayer.id);
804    }
805
806    #[ignore = "Requires active Redis instance"]
807    #[tokio::test]
808    async fn test_delete_relayer() {
809        let repo = setup_test_repo().await;
810        let relayer_id = uuid::Uuid::new_v4().to_string();
811        let relayer = create_test_relayer(&relayer_id);
812
813        repo.create(relayer.clone()).await.unwrap();
814
815        let delete_result = repo.delete_by_id(relayer.id.clone()).await;
816        assert!(delete_result.is_ok());
817
818        let get_result = repo.get_by_id(relayer.id).await;
819        assert!(matches!(get_result, Err(RepositoryError::NotFound(_))));
820    }
821
822    #[ignore = "Requires active Redis instance"]
823    #[tokio::test]
824    async fn test_list_paginated() {
825        let repo = setup_test_repo().await;
826        let relayer1_id = uuid::Uuid::new_v4().to_string();
827        let relayer2_id = uuid::Uuid::new_v4().to_string();
828        let relayer1 = create_test_relayer(&relayer1_id);
829        let relayer2 = create_test_relayer(&relayer2_id);
830
831        repo.create(relayer1).await.unwrap();
832        repo.create(relayer2).await.unwrap();
833
834        let query = PaginationQuery {
835            page: 1,
836            per_page: 10,
837        };
838
839        let result = repo.list_paginated(query).await.unwrap();
840        assert!(result.total >= 2);
841        assert_eq!(result.page, 1);
842        assert_eq!(result.per_page, 10);
843    }
844
845    #[ignore = "Requires active Redis instance"]
846    #[tokio::test]
847    async fn test_partial_update_relayer() {
848        let repo = setup_test_repo().await;
849        let relayer_id = uuid::Uuid::new_v4().to_string();
850        let relayer = create_test_relayer(&relayer_id);
851
852        repo.create(relayer.clone()).await.unwrap();
853
854        let update = UpdateRelayerRequest {
855            paused: Some(true),
856            ..Default::default()
857        };
858        let result = repo.partial_update(relayer.id.clone(), update).await;
859        assert!(result.is_ok());
860
861        let updated = result.unwrap();
862        assert_eq!(updated.id, relayer.id);
863        assert!(updated.paused);
864    }
865
866    #[ignore = "Requires active Redis instance"]
867    #[tokio::test]
868    async fn test_enable_disable_relayer() {
869        let repo = setup_test_repo().await;
870        let relayer_id = uuid::Uuid::new_v4().to_string();
871        let relayer = create_test_relayer(&relayer_id);
872
873        repo.create(relayer.clone()).await.unwrap();
874
875        // Test disable
876        let disabled = repo
877            .disable_relayer(
878                relayer.id.clone(),
879                DisabledReason::BalanceCheckFailed("test reason".to_string()),
880            )
881            .await
882            .unwrap();
883        assert!(disabled.system_disabled);
884
885        // Test enable
886        let enabled = repo.enable_relayer(relayer.id.clone()).await.unwrap();
887        assert!(!enabled.system_disabled);
888    }
889
890    #[ignore = "Requires active Redis instance"]
891    #[tokio::test]
892    async fn test_update_policy() {
893        let repo = setup_test_repo().await;
894        let relayer_id = uuid::Uuid::new_v4().to_string();
895        let relayer = create_test_relayer(&relayer_id);
896
897        repo.create(relayer.clone()).await.unwrap();
898
899        let new_policy = RelayerNetworkPolicy::Evm(RelayerEvmPolicy {
900            gas_price_cap: Some(50_000_000_000),
901            whitelist_receivers: Some(vec!["0x123".to_string()]),
902            eip1559_pricing: Some(true),
903            private_transactions: Some(true),
904            min_balance: Some(1000000000000000000),
905            gas_limit_estimation: Some(true),
906        });
907
908        let result = repo.update_policy(relayer.id.clone(), new_policy).await;
909        assert!(result.is_ok());
910
911        let updated = result.unwrap();
912        if let RelayerNetworkPolicy::Evm(evm_policy) = updated.policies {
913            assert_eq!(evm_policy.gas_price_cap, Some(50_000_000_000));
914            assert_eq!(
915                evm_policy.whitelist_receivers,
916                Some(vec!["0x123".to_string()])
917            );
918            assert_eq!(evm_policy.eip1559_pricing, Some(true));
919            assert!(evm_policy.private_transactions.unwrap_or(false));
920            assert_eq!(evm_policy.min_balance, Some(1000000000000000000));
921        } else {
922            panic!("Expected EVM policy");
923        }
924    }
925
926    #[ignore = "Requires active Redis instance"]
927    #[tokio::test]
928    async fn test_debug_implementation() {
929        let repo = setup_test_repo().await;
930        let debug_str = format!("{repo:?}");
931        assert!(debug_str.contains("RedisRelayerRepository"));
932        assert!(debug_str.contains("key_prefix"));
933    }
934
935    #[ignore = "Requires active Redis instance"]
936    #[tokio::test]
937    async fn test_error_handling_empty_id() {
938        let repo = setup_test_repo().await;
939
940        let create_result = repo
941            .create(RelayerRepoModel {
942                id: "".to_string(),
943                ..create_test_relayer("test")
944            })
945            .await;
946        assert!(matches!(
947            create_result,
948            Err(RepositoryError::InvalidData(_))
949        ));
950
951        let get_result = repo.get_by_id("".to_string()).await;
952        assert!(matches!(get_result, Err(RepositoryError::InvalidData(_))));
953
954        let update_result = repo
955            .update("".to_string(), create_test_relayer("test"))
956            .await;
957        assert!(matches!(
958            update_result,
959            Err(RepositoryError::InvalidData(_))
960        ));
961
962        let delete_result = repo.delete_by_id("".to_string()).await;
963        assert!(matches!(
964            delete_result,
965            Err(RepositoryError::InvalidData(_))
966        ));
967    }
968
969    #[ignore = "Requires active Redis instance"]
970    #[tokio::test]
971    async fn test_error_handling_empty_name() {
972        let repo = setup_test_repo().await;
973
974        let create_result = repo
975            .create(RelayerRepoModel {
976                name: "".to_string(),
977                ..create_test_relayer("test")
978            })
979            .await;
980        assert!(matches!(
981            create_result,
982            Err(RepositoryError::InvalidData(_))
983        ));
984    }
985
986    #[ignore = "Requires active Redis instance"]
987    #[tokio::test]
988    async fn test_pagination_validation() {
989        let repo = setup_test_repo().await;
990
991        let invalid_page = PaginationQuery {
992            page: 0,
993            per_page: 10,
994        };
995        let result = repo.list_paginated(invalid_page).await;
996        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
997
998        let invalid_per_page = PaginationQuery {
999            page: 1,
1000            per_page: 0,
1001        };
1002        let result = repo.list_paginated(invalid_per_page).await;
1003        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1004    }
1005
1006    #[ignore = "Requires active Redis instance"]
1007    #[tokio::test]
1008    async fn test_update_nonexistent_relayer() {
1009        let repo = setup_test_repo().await;
1010        let relayer = create_test_relayer("nonexistent-relayer");
1011
1012        let result = repo
1013            .update("nonexistent-relayer".to_string(), relayer)
1014            .await;
1015        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1016    }
1017
1018    #[ignore = "Requires active Redis instance"]
1019    #[tokio::test]
1020    async fn test_delete_nonexistent_relayer() {
1021        let repo = setup_test_repo().await;
1022
1023        let result = repo.delete_by_id("nonexistent-relayer".to_string()).await;
1024        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1025    }
1026
1027    #[tokio::test]
1028    #[ignore = "Requires active Redis instance"]
1029    async fn test_has_entries() {
1030        let repo = setup_test_repo().await;
1031        assert!(!repo.has_entries().await.unwrap());
1032
1033        let relayer_id = uuid::Uuid::new_v4().to_string();
1034        let relayer = create_test_relayer(&relayer_id);
1035        repo.create(relayer.clone()).await.unwrap();
1036        assert!(repo.has_entries().await.unwrap());
1037    }
1038
1039    #[tokio::test]
1040    #[ignore = "Requires active Redis instance"]
1041    async fn test_drop_all_entries() {
1042        let repo = setup_test_repo().await;
1043        let relayer_id = uuid::Uuid::new_v4().to_string();
1044        let relayer = create_test_relayer(&relayer_id);
1045        repo.create(relayer.clone()).await.unwrap();
1046        assert!(repo.has_entries().await.unwrap());
1047
1048        repo.drop_all_entries().await.unwrap();
1049        assert!(!repo.has_entries().await.unwrap());
1050    }
1051
1052    #[ignore = "Requires active Redis instance"]
1053    #[tokio::test]
1054    async fn test_list_by_signer_id() {
1055        let repo = setup_test_repo().await;
1056
1057        let relayer1_id = uuid::Uuid::new_v4().to_string();
1058        let relayer2_id = uuid::Uuid::new_v4().to_string();
1059        let relayer3_id = uuid::Uuid::new_v4().to_string();
1060        let signer1_id = uuid::Uuid::new_v4().to_string();
1061        let signer2_id = uuid::Uuid::new_v4().to_string();
1062
1063        let mut relayer1 = create_test_relayer(&relayer1_id);
1064        relayer1.signer_id = signer1_id.clone();
1065        repo.create(relayer1).await.unwrap();
1066
1067        let mut relayer2 = create_test_relayer(&relayer2_id);
1068
1069        relayer2.signer_id = signer2_id.clone();
1070        repo.create(relayer2).await.unwrap();
1071
1072        let mut relayer3 = create_test_relayer(&relayer3_id);
1073        relayer3.signer_id = signer1_id.clone();
1074        repo.create(relayer3).await.unwrap();
1075
1076        let result = repo.list_by_signer_id(&signer1_id).await.unwrap();
1077        assert_eq!(result.len(), 2);
1078        let ids: Vec<_> = result.iter().map(|r| r.id.clone()).collect();
1079        assert!(ids.contains(&relayer1_id));
1080        assert!(ids.contains(&relayer3_id));
1081
1082        let result = repo.list_by_signer_id(&signer2_id).await.unwrap();
1083        assert_eq!(result.len(), 1);
1084
1085        let result = repo.list_by_signer_id("nonexistent").await.unwrap();
1086        assert_eq!(result.len(), 0);
1087    }
1088
1089    #[ignore = "Requires active Redis instance"]
1090    #[tokio::test]
1091    async fn test_list_by_notification_id() {
1092        let repo = setup_test_repo().await;
1093
1094        let relayer1_id = uuid::Uuid::new_v4().to_string();
1095        let mut relayer1 = create_test_relayer(&relayer1_id);
1096        relayer1.notification_id = Some("notif1".to_string());
1097        repo.create(relayer1).await.unwrap();
1098
1099        let relayer2_id = uuid::Uuid::new_v4().to_string();
1100        let mut relayer2 = create_test_relayer(&relayer2_id);
1101        relayer2.notification_id = Some("notif2".to_string());
1102        repo.create(relayer2).await.unwrap();
1103
1104        let relayer3_id = uuid::Uuid::new_v4().to_string();
1105        let mut relayer3 = create_test_relayer(&relayer3_id);
1106        relayer3.notification_id = Some("notif1".to_string());
1107        repo.create(relayer3).await.unwrap();
1108
1109        let relayer4_id = uuid::Uuid::new_v4().to_string();
1110        let mut relayer4 = create_test_relayer(&relayer4_id);
1111        relayer4.notification_id = None;
1112        repo.create(relayer4).await.unwrap();
1113
1114        let result = repo.list_by_notification_id("notif1").await.unwrap();
1115        assert_eq!(result.len(), 2);
1116        let ids: Vec<_> = result.iter().map(|r| r.id.clone()).collect();
1117        assert!(ids.contains(&relayer1_id));
1118        assert!(ids.contains(&relayer3_id));
1119
1120        let result = repo.list_by_notification_id("notif2").await.unwrap();
1121        assert_eq!(result.len(), 1);
1122
1123        let result = repo.list_by_notification_id("nonexistent").await.unwrap();
1124        assert_eq!(result.len(), 0);
1125    }
1126}