openzeppelin_relayer/repositories/relayer/
mod.rs

1//! Relayer Repository Module
2//!
3//! This module provides the relayer repository layer for the OpenZeppelin Relayer service.
4//! It implements the Repository pattern to abstract relayer data persistence operations,
5//! supporting both in-memory and Redis-backed storage implementations.
6//!
7//! ## Features
8//!
9//! - **CRUD Operations**: Create, read, update, and delete relayer configurations
10//! - **Status Management**: Enable/disable relayers and track their state
11//! - **Policy Management**: Update relayer network policies
12//! - **Partial Updates**: Support for partial relayer configuration updates
13//! - **Active Filtering**: Query for active (non-paused) relayers
14//! - **Pagination Support**: Efficient paginated listing of relayers
15//!
16//! ## Repository Implementations
17//!
18//! - [`InMemoryRelayerRepository`]: Fast in-memory storage for testing/development
19//! - [`RedisRelayerRepository`]: Redis-backed storage for production environments
20//!
21
22mod relayer_in_memory;
23mod relayer_redis;
24
25pub use relayer_in_memory::*;
26pub use relayer_redis::*;
27
28use crate::{
29    models::UpdateRelayerRequest,
30    models::{
31        DisabledReason, PaginationQuery, RelayerNetworkPolicy, RelayerRepoModel, RepositoryError,
32    },
33    repositories::{PaginatedResult, Repository},
34    utils::RedisConnections,
35};
36use async_trait::async_trait;
37use deadpool_redis::Pool;
38use std::sync::Arc;
39
40#[async_trait]
41pub trait RelayerRepository: Repository<RelayerRepoModel, String> + Send + Sync {
42    async fn list_active(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
43    async fn list_by_signer_id(
44        &self,
45        signer_id: &str,
46    ) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
47    async fn list_by_notification_id(
48        &self,
49        notification_id: &str,
50    ) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
51    async fn partial_update(
52        &self,
53        id: String,
54        update: UpdateRelayerRequest,
55    ) -> Result<RelayerRepoModel, RepositoryError>;
56    async fn enable_relayer(&self, relayer_id: String)
57        -> Result<RelayerRepoModel, RepositoryError>;
58    async fn disable_relayer(
59        &self,
60        relayer_id: String,
61        reason: DisabledReason,
62    ) -> Result<RelayerRepoModel, RepositoryError>;
63    async fn update_policy(
64        &self,
65        id: String,
66        policy: RelayerNetworkPolicy,
67    ) -> Result<RelayerRepoModel, RepositoryError>;
68    /// Returns true if this repository uses persistent storage (e.g., Redis).
69    /// Returns false for in-memory storage.
70    fn is_persistent_storage(&self) -> bool;
71
72    /// Returns connection info for distributed operations.
73    ///
74    /// This method provides access to the underlying connection and key prefix
75    /// when using persistent storage. This is useful for distributed locking and
76    /// other coordination operations that need direct storage access.
77    ///
78    /// # Returns
79    /// * `Some((pool, prefix))` - If using persistent storage (e.g., Redis)
80    /// * `None` - If using in-memory storage (default)
81    fn connection_info(&self) -> Option<(Arc<Pool>, String)> {
82        None
83    }
84}
85
86#[cfg(test)]
87mockall::mock! {
88    pub RelayerRepository {}
89
90    #[async_trait]
91    impl Repository<RelayerRepoModel, String> for RelayerRepository {
92        async fn create(&self, entity: RelayerRepoModel) -> Result<RelayerRepoModel, RepositoryError>;
93        async fn get_by_id(&self, id: String) -> Result<RelayerRepoModel, RepositoryError>;
94        async fn list_all(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
95        async fn list_paginated(&self, query: PaginationQuery) -> Result<PaginatedResult<RelayerRepoModel>, RepositoryError>;
96        async fn update(&self, id: String, entity: RelayerRepoModel) -> Result<RelayerRepoModel, RepositoryError>;
97        async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError>;
98        async fn count(&self) -> Result<usize, RepositoryError>;
99        async fn has_entries(&self) -> Result<bool, RepositoryError>;
100        async fn drop_all_entries(&self) -> Result<(), RepositoryError>;
101    }
102
103    #[async_trait]
104    impl RelayerRepository for RelayerRepository {
105        async fn list_active(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
106        async fn list_by_signer_id(&self, signer_id: &str) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
107        async fn list_by_notification_id(&self, notification_id: &str) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
108        async fn partial_update(&self, id: String, update: UpdateRelayerRequest) -> Result<RelayerRepoModel, RepositoryError>;
109        async fn enable_relayer(&self, relayer_id: String) -> Result<RelayerRepoModel, RepositoryError>;
110        async fn disable_relayer(&self, relayer_id: String, reason: DisabledReason) -> Result<RelayerRepoModel, RepositoryError>;
111        async fn update_policy(&self, id: String, policy: RelayerNetworkPolicy) -> Result<RelayerRepoModel, RepositoryError>;
112        fn is_persistent_storage(&self) -> bool;
113        fn connection_info(&self) -> Option<(Arc<Pool>, String)>;
114    }
115}
116
117/// Enum wrapper for different relayer repository implementations
118#[derive(Debug, Clone)]
119pub enum RelayerRepositoryStorage {
120    InMemory(InMemoryRelayerRepository),
121    Redis(RedisRelayerRepository),
122}
123
124impl RelayerRepositoryStorage {
125    pub fn new_in_memory() -> Self {
126        Self::InMemory(InMemoryRelayerRepository::new())
127    }
128
129    pub fn new_redis(
130        connections: Arc<RedisConnections>,
131        key_prefix: String,
132    ) -> Result<Self, RepositoryError> {
133        Ok(Self::Redis(RedisRelayerRepository::new(
134            connections,
135            key_prefix,
136        )?))
137    }
138
139    /// Returns connection info for distributed operations.
140    ///
141    /// This method provides access to the underlying Redis connection and key prefix
142    /// when using Redis-backed storage. This is useful for distributed locking and
143    /// other coordination operations that need direct Redis access.
144    ///
145    /// # Returns
146    /// * `Some((pool, prefix))` - If using persistent storage (e.g., Redis)
147    /// * `None` - If using in-memory storage
148    pub fn connection_info(&self) -> Option<(Arc<Pool>, String)> {
149        match self {
150            RelayerRepositoryStorage::InMemory(_) => None,
151            RelayerRepositoryStorage::Redis(repo) => {
152                Some((repo.connections.primary().clone(), repo.key_prefix.clone()))
153            }
154        }
155    }
156}
157
158impl Default for RelayerRepositoryStorage {
159    fn default() -> Self {
160        Self::new_in_memory()
161    }
162}
163
164#[async_trait]
165impl Repository<RelayerRepoModel, String> for RelayerRepositoryStorage {
166    async fn create(&self, entity: RelayerRepoModel) -> Result<RelayerRepoModel, RepositoryError> {
167        match self {
168            RelayerRepositoryStorage::InMemory(repo) => repo.create(entity).await,
169            RelayerRepositoryStorage::Redis(repo) => repo.create(entity).await,
170        }
171    }
172
173    async fn get_by_id(&self, id: String) -> Result<RelayerRepoModel, RepositoryError> {
174        match self {
175            RelayerRepositoryStorage::InMemory(repo) => repo.get_by_id(id).await,
176            RelayerRepositoryStorage::Redis(repo) => repo.get_by_id(id).await,
177        }
178    }
179
180    async fn list_all(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
181        match self {
182            RelayerRepositoryStorage::InMemory(repo) => repo.list_all().await,
183            RelayerRepositoryStorage::Redis(repo) => repo.list_all().await,
184        }
185    }
186
187    async fn list_paginated(
188        &self,
189        query: PaginationQuery,
190    ) -> Result<PaginatedResult<RelayerRepoModel>, RepositoryError> {
191        match self {
192            RelayerRepositoryStorage::InMemory(repo) => repo.list_paginated(query).await,
193            RelayerRepositoryStorage::Redis(repo) => repo.list_paginated(query).await,
194        }
195    }
196
197    async fn update(
198        &self,
199        id: String,
200        entity: RelayerRepoModel,
201    ) -> Result<RelayerRepoModel, RepositoryError> {
202        match self {
203            RelayerRepositoryStorage::InMemory(repo) => repo.update(id, entity).await,
204            RelayerRepositoryStorage::Redis(repo) => repo.update(id, entity).await,
205        }
206    }
207
208    async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
209        match self {
210            RelayerRepositoryStorage::InMemory(repo) => repo.delete_by_id(id).await,
211            RelayerRepositoryStorage::Redis(repo) => repo.delete_by_id(id).await,
212        }
213    }
214
215    async fn count(&self) -> Result<usize, RepositoryError> {
216        match self {
217            RelayerRepositoryStorage::InMemory(repo) => repo.count().await,
218            RelayerRepositoryStorage::Redis(repo) => repo.count().await,
219        }
220    }
221
222    async fn has_entries(&self) -> Result<bool, RepositoryError> {
223        match self {
224            RelayerRepositoryStorage::InMemory(repo) => repo.has_entries().await,
225            RelayerRepositoryStorage::Redis(repo) => repo.has_entries().await,
226        }
227    }
228
229    async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
230        match self {
231            RelayerRepositoryStorage::InMemory(repo) => repo.drop_all_entries().await,
232            RelayerRepositoryStorage::Redis(repo) => repo.drop_all_entries().await,
233        }
234    }
235}
236
237#[async_trait]
238impl RelayerRepository for RelayerRepositoryStorage {
239    async fn list_active(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
240        match self {
241            RelayerRepositoryStorage::InMemory(repo) => repo.list_active().await,
242            RelayerRepositoryStorage::Redis(repo) => repo.list_active().await,
243        }
244    }
245
246    async fn list_by_signer_id(
247        &self,
248        signer_id: &str,
249    ) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
250        match self {
251            RelayerRepositoryStorage::InMemory(repo) => repo.list_by_signer_id(signer_id).await,
252            RelayerRepositoryStorage::Redis(repo) => repo.list_by_signer_id(signer_id).await,
253        }
254    }
255
256    async fn list_by_notification_id(
257        &self,
258        notification_id: &str,
259    ) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
260        match self {
261            RelayerRepositoryStorage::InMemory(repo) => {
262                repo.list_by_notification_id(notification_id).await
263            }
264            RelayerRepositoryStorage::Redis(repo) => {
265                repo.list_by_notification_id(notification_id).await
266            }
267        }
268    }
269
270    async fn partial_update(
271        &self,
272        id: String,
273        update: UpdateRelayerRequest,
274    ) -> Result<RelayerRepoModel, RepositoryError> {
275        match self {
276            RelayerRepositoryStorage::InMemory(repo) => repo.partial_update(id, update).await,
277            RelayerRepositoryStorage::Redis(repo) => repo.partial_update(id, update).await,
278        }
279    }
280
281    async fn enable_relayer(
282        &self,
283        relayer_id: String,
284    ) -> Result<RelayerRepoModel, RepositoryError> {
285        match self {
286            RelayerRepositoryStorage::InMemory(repo) => repo.enable_relayer(relayer_id).await,
287            RelayerRepositoryStorage::Redis(repo) => repo.enable_relayer(relayer_id).await,
288        }
289    }
290
291    async fn disable_relayer(
292        &self,
293        relayer_id: String,
294        reason: DisabledReason,
295    ) -> Result<RelayerRepoModel, RepositoryError> {
296        match self {
297            RelayerRepositoryStorage::InMemory(repo) => {
298                repo.disable_relayer(relayer_id, reason).await
299            }
300            RelayerRepositoryStorage::Redis(repo) => repo.disable_relayer(relayer_id, reason).await,
301        }
302    }
303
304    async fn update_policy(
305        &self,
306        id: String,
307        policy: RelayerNetworkPolicy,
308    ) -> Result<RelayerRepoModel, RepositoryError> {
309        match self {
310            RelayerRepositoryStorage::InMemory(repo) => repo.update_policy(id, policy).await,
311            RelayerRepositoryStorage::Redis(repo) => repo.update_policy(id, policy).await,
312        }
313    }
314
315    fn is_persistent_storage(&self) -> bool {
316        match self {
317            RelayerRepositoryStorage::InMemory(_) => false,
318            RelayerRepositoryStorage::Redis(_) => true,
319        }
320    }
321
322    fn connection_info(&self) -> Option<(Arc<Pool>, String)> {
323        match self {
324            RelayerRepositoryStorage::InMemory(_) => None,
325            RelayerRepositoryStorage::Redis(repo) => {
326                Some((repo.connections.primary().clone(), repo.key_prefix.clone()))
327            }
328        }
329    }
330}
331
332#[cfg(test)]
333mod tests {
334    use super::*;
335    use crate::models::{NetworkType, RelayerEvmPolicy, RelayerNetworkPolicy};
336
337    fn create_test_relayer(id: String) -> RelayerRepoModel {
338        RelayerRepoModel {
339            id: id.clone(),
340            name: format!("Relayer {}", id.clone()),
341            network: "TestNet".to_string(),
342            paused: false,
343            network_type: NetworkType::Evm,
344            policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy {
345                min_balance: Some(0),
346                gas_limit_estimation: Some(true),
347                gas_price_cap: None,
348                whitelist_receivers: None,
349                eip1559_pricing: Some(false),
350                private_transactions: Some(false),
351            }),
352            signer_id: "test".to_string(),
353            address: "0x".to_string(),
354            notification_id: None,
355            system_disabled: false,
356            custom_rpc_urls: None,
357            ..Default::default()
358        }
359    }
360
361    #[actix_web::test]
362    async fn test_in_memory_repository_impl() {
363        let impl_repo = RelayerRepositoryStorage::new_in_memory();
364        let relayer = create_test_relayer("test-relayer".to_string());
365
366        // Test create
367        let created = impl_repo.create(relayer.clone()).await.unwrap();
368        assert_eq!(created.id, relayer.id);
369
370        // Test get
371        let retrieved = impl_repo
372            .get_by_id("test-relayer".to_string())
373            .await
374            .unwrap();
375        assert_eq!(retrieved.id, relayer.id);
376
377        // Test list all
378        let all_relayers = impl_repo.list_all().await.unwrap();
379        assert!(!all_relayers.is_empty());
380
381        // Test count
382        let count = impl_repo.count().await.unwrap();
383        assert!(count >= 1);
384
385        // Test update
386        let mut updated_relayer = relayer.clone();
387        updated_relayer.name = "Updated Name".to_string();
388        let updated = impl_repo
389            .update(relayer.id.clone(), updated_relayer)
390            .await
391            .unwrap();
392        assert_eq!(updated.name, "Updated Name");
393
394        // Test delete
395        impl_repo.delete_by_id(relayer.id.clone()).await.unwrap();
396        let get_result = impl_repo.get_by_id("test-relayer".to_string()).await;
397        assert!(get_result.is_err());
398    }
399
400    #[actix_web::test]
401    async fn test_relayer_repository_trait_methods() {
402        let impl_repo = RelayerRepositoryStorage::new_in_memory();
403        let relayer = create_test_relayer("test-relayer".to_string());
404
405        // Create the relayer first
406        impl_repo.create(relayer.clone()).await.unwrap();
407
408        // Test list_active
409        let active_relayers = impl_repo.list_active().await.unwrap();
410        assert!(!active_relayers.is_empty());
411
412        // Test partial_update
413        let update = UpdateRelayerRequest {
414            paused: Some(true),
415            ..Default::default()
416        };
417        let updated = impl_repo
418            .partial_update(relayer.id.clone(), update)
419            .await
420            .unwrap();
421        assert!(updated.paused);
422
423        // Test enable/disable
424        let disabled = impl_repo
425            .disable_relayer(
426                relayer.id.clone(),
427                DisabledReason::BalanceCheckFailed("Test disable reason".to_string()),
428            )
429            .await
430            .unwrap();
431        assert!(disabled.system_disabled);
432        assert_eq!(
433            disabled.disabled_reason,
434            Some(DisabledReason::BalanceCheckFailed(
435                "Test disable reason".to_string()
436            ))
437        );
438
439        let enabled = impl_repo.enable_relayer(relayer.id.clone()).await.unwrap();
440        assert!(!enabled.system_disabled);
441        assert_eq!(enabled.disabled_reason, None);
442
443        // Test update_policy
444        let new_policy = RelayerNetworkPolicy::Evm(RelayerEvmPolicy {
445            min_balance: Some(1000000000000000000),
446            gas_limit_estimation: Some(true),
447            gas_price_cap: Some(50_000_000_000),
448            whitelist_receivers: None,
449            eip1559_pricing: Some(true),
450            private_transactions: Some(false),
451        });
452        let policy_updated = impl_repo
453            .update_policy(relayer.id.clone(), new_policy)
454            .await
455            .unwrap();
456
457        if let RelayerNetworkPolicy::Evm(evm_policy) = policy_updated.policies {
458            assert_eq!(evm_policy.gas_price_cap, Some(50_000_000_000));
459            assert_eq!(evm_policy.eip1559_pricing, Some(true));
460        } else {
461            panic!("Expected EVM policy");
462        }
463    }
464
465    #[actix_web::test]
466    async fn test_create_repository_in_memory() {
467        let result = RelayerRepositoryStorage::new_in_memory();
468
469        assert!(matches!(result, RelayerRepositoryStorage::InMemory(_)));
470    }
471
472    #[actix_web::test]
473    async fn test_pagination() {
474        let impl_repo = RelayerRepositoryStorage::new_in_memory();
475        let relayer1 = create_test_relayer("test-relayer-1".to_string());
476        let relayer2 = create_test_relayer("test-relayer-2".to_string());
477
478        impl_repo.create(relayer1).await.unwrap();
479        impl_repo.create(relayer2).await.unwrap();
480
481        let query = PaginationQuery {
482            page: 1,
483            per_page: 10,
484        };
485
486        let result = impl_repo.list_paginated(query).await.unwrap();
487        assert!(result.total >= 2);
488        assert_eq!(result.page, 1);
489        assert_eq!(result.per_page, 10);
490    }
491
492    #[actix_web::test]
493    async fn test_delete_relayer() {
494        let impl_repo = RelayerRepositoryStorage::new_in_memory();
495        let relayer = create_test_relayer("delete-test".to_string());
496
497        // Create relayer
498        impl_repo.create(relayer.clone()).await.unwrap();
499
500        // Delete relayer
501        impl_repo
502            .delete_by_id("delete-test".to_string())
503            .await
504            .unwrap();
505
506        // Verify deletion
507        let get_result = impl_repo.get_by_id("delete-test".to_string()).await;
508        assert!(get_result.is_err());
509        assert!(matches!(
510            get_result.unwrap_err(),
511            RepositoryError::NotFound(_)
512        ));
513
514        // Test deleting non-existent relayer
515        let delete_result = impl_repo.delete_by_id("nonexistent".to_string()).await;
516        assert!(delete_result.is_err());
517    }
518
519    #[actix_web::test]
520    async fn test_has_entries() {
521        let repo = InMemoryRelayerRepository::new();
522        assert!(!repo.has_entries().await.unwrap());
523
524        let relayer = create_test_relayer("test".to_string());
525
526        repo.create(relayer.clone()).await.unwrap();
527        assert!(repo.has_entries().await.unwrap());
528
529        repo.delete_by_id(relayer.id.clone()).await.unwrap();
530        assert!(!repo.has_entries().await.unwrap());
531    }
532
533    #[actix_web::test]
534    async fn test_drop_all_entries() {
535        let repo = InMemoryRelayerRepository::new();
536        let relayer = create_test_relayer("test".to_string());
537
538        repo.create(relayer.clone()).await.unwrap();
539        assert!(repo.has_entries().await.unwrap());
540
541        repo.drop_all_entries().await.unwrap();
542        assert!(!repo.has_entries().await.unwrap());
543    }
544
545    #[tokio::test]
546    async fn test_connection_info_returns_none_for_in_memory() {
547        let storage = RelayerRepositoryStorage::new_in_memory();
548
549        // In-memory storage should return None for connection_info
550        assert!(storage.connection_info().is_none());
551    }
552
553    #[tokio::test]
554    async fn test_is_persistent_storage_returns_false_for_in_memory() {
555        let storage = RelayerRepositoryStorage::new_in_memory();
556
557        // In-memory storage should return false for is_persistent_storage
558        assert!(!storage.is_persistent_storage());
559    }
560
561    #[tokio::test]
562    async fn test_trait_connection_info_returns_none_for_in_memory() {
563        let storage = RelayerRepositoryStorage::new_in_memory();
564
565        // Test the RelayerRepository trait's connection_info method
566        let trait_ref: &dyn RelayerRepository = &storage;
567        assert!(trait_ref.connection_info().is_none());
568    }
569
570    #[tokio::test]
571    async fn test_struct_connection_info_returns_none_for_in_memory() {
572        let storage = RelayerRepositoryStorage::new_in_memory();
573
574        // Test the struct's own connection_info method
575        let result: Option<(Arc<Pool>, String)> = storage.connection_info();
576        assert!(result.is_none());
577    }
578}