1mod relayer_in_memory;
23mod relayer_redis;
24
25pub use relayer_in_memory::*;
26pub use relayer_redis::*;
27
28use crate::{
29 models::UpdateRelayerRequest,
30 models::{
31 DisabledReason, PaginationQuery, RelayerNetworkPolicy, RelayerRepoModel, RepositoryError,
32 },
33 repositories::{PaginatedResult, Repository},
34 utils::RedisConnections,
35};
36use async_trait::async_trait;
37use deadpool_redis::Pool;
38use std::sync::Arc;
39
40#[async_trait]
41pub trait RelayerRepository: Repository<RelayerRepoModel, String> + Send + Sync {
42 async fn list_active(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
43 async fn list_by_signer_id(
44 &self,
45 signer_id: &str,
46 ) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
47 async fn list_by_notification_id(
48 &self,
49 notification_id: &str,
50 ) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
51 async fn partial_update(
52 &self,
53 id: String,
54 update: UpdateRelayerRequest,
55 ) -> Result<RelayerRepoModel, RepositoryError>;
56 async fn enable_relayer(&self, relayer_id: String)
57 -> Result<RelayerRepoModel, RepositoryError>;
58 async fn disable_relayer(
59 &self,
60 relayer_id: String,
61 reason: DisabledReason,
62 ) -> Result<RelayerRepoModel, RepositoryError>;
63 async fn update_policy(
64 &self,
65 id: String,
66 policy: RelayerNetworkPolicy,
67 ) -> Result<RelayerRepoModel, RepositoryError>;
68 fn is_persistent_storage(&self) -> bool;
71
72 fn connection_info(&self) -> Option<(Arc<Pool>, String)> {
82 None
83 }
84}
85
86#[cfg(test)]
87mockall::mock! {
88 pub RelayerRepository {}
89
90 #[async_trait]
91 impl Repository<RelayerRepoModel, String> for RelayerRepository {
92 async fn create(&self, entity: RelayerRepoModel) -> Result<RelayerRepoModel, RepositoryError>;
93 async fn get_by_id(&self, id: String) -> Result<RelayerRepoModel, RepositoryError>;
94 async fn list_all(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
95 async fn list_paginated(&self, query: PaginationQuery) -> Result<PaginatedResult<RelayerRepoModel>, RepositoryError>;
96 async fn update(&self, id: String, entity: RelayerRepoModel) -> Result<RelayerRepoModel, RepositoryError>;
97 async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError>;
98 async fn count(&self) -> Result<usize, RepositoryError>;
99 async fn has_entries(&self) -> Result<bool, RepositoryError>;
100 async fn drop_all_entries(&self) -> Result<(), RepositoryError>;
101 }
102
103 #[async_trait]
104 impl RelayerRepository for RelayerRepository {
105 async fn list_active(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
106 async fn list_by_signer_id(&self, signer_id: &str) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
107 async fn list_by_notification_id(&self, notification_id: &str) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
108 async fn partial_update(&self, id: String, update: UpdateRelayerRequest) -> Result<RelayerRepoModel, RepositoryError>;
109 async fn enable_relayer(&self, relayer_id: String) -> Result<RelayerRepoModel, RepositoryError>;
110 async fn disable_relayer(&self, relayer_id: String, reason: DisabledReason) -> Result<RelayerRepoModel, RepositoryError>;
111 async fn update_policy(&self, id: String, policy: RelayerNetworkPolicy) -> Result<RelayerRepoModel, RepositoryError>;
112 fn is_persistent_storage(&self) -> bool;
113 fn connection_info(&self) -> Option<(Arc<Pool>, String)>;
114 }
115}
116
117#[derive(Debug, Clone)]
119pub enum RelayerRepositoryStorage {
120 InMemory(InMemoryRelayerRepository),
121 Redis(RedisRelayerRepository),
122}
123
124impl RelayerRepositoryStorage {
125 pub fn new_in_memory() -> Self {
126 Self::InMemory(InMemoryRelayerRepository::new())
127 }
128
129 pub fn new_redis(
130 connections: Arc<RedisConnections>,
131 key_prefix: String,
132 ) -> Result<Self, RepositoryError> {
133 Ok(Self::Redis(RedisRelayerRepository::new(
134 connections,
135 key_prefix,
136 )?))
137 }
138
139 pub fn connection_info(&self) -> Option<(Arc<Pool>, String)> {
149 match self {
150 RelayerRepositoryStorage::InMemory(_) => None,
151 RelayerRepositoryStorage::Redis(repo) => {
152 Some((repo.connections.primary().clone(), repo.key_prefix.clone()))
153 }
154 }
155 }
156}
157
158impl Default for RelayerRepositoryStorage {
159 fn default() -> Self {
160 Self::new_in_memory()
161 }
162}
163
164#[async_trait]
165impl Repository<RelayerRepoModel, String> for RelayerRepositoryStorage {
166 async fn create(&self, entity: RelayerRepoModel) -> Result<RelayerRepoModel, RepositoryError> {
167 match self {
168 RelayerRepositoryStorage::InMemory(repo) => repo.create(entity).await,
169 RelayerRepositoryStorage::Redis(repo) => repo.create(entity).await,
170 }
171 }
172
173 async fn get_by_id(&self, id: String) -> Result<RelayerRepoModel, RepositoryError> {
174 match self {
175 RelayerRepositoryStorage::InMemory(repo) => repo.get_by_id(id).await,
176 RelayerRepositoryStorage::Redis(repo) => repo.get_by_id(id).await,
177 }
178 }
179
180 async fn list_all(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
181 match self {
182 RelayerRepositoryStorage::InMemory(repo) => repo.list_all().await,
183 RelayerRepositoryStorage::Redis(repo) => repo.list_all().await,
184 }
185 }
186
187 async fn list_paginated(
188 &self,
189 query: PaginationQuery,
190 ) -> Result<PaginatedResult<RelayerRepoModel>, RepositoryError> {
191 match self {
192 RelayerRepositoryStorage::InMemory(repo) => repo.list_paginated(query).await,
193 RelayerRepositoryStorage::Redis(repo) => repo.list_paginated(query).await,
194 }
195 }
196
197 async fn update(
198 &self,
199 id: String,
200 entity: RelayerRepoModel,
201 ) -> Result<RelayerRepoModel, RepositoryError> {
202 match self {
203 RelayerRepositoryStorage::InMemory(repo) => repo.update(id, entity).await,
204 RelayerRepositoryStorage::Redis(repo) => repo.update(id, entity).await,
205 }
206 }
207
208 async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
209 match self {
210 RelayerRepositoryStorage::InMemory(repo) => repo.delete_by_id(id).await,
211 RelayerRepositoryStorage::Redis(repo) => repo.delete_by_id(id).await,
212 }
213 }
214
215 async fn count(&self) -> Result<usize, RepositoryError> {
216 match self {
217 RelayerRepositoryStorage::InMemory(repo) => repo.count().await,
218 RelayerRepositoryStorage::Redis(repo) => repo.count().await,
219 }
220 }
221
222 async fn has_entries(&self) -> Result<bool, RepositoryError> {
223 match self {
224 RelayerRepositoryStorage::InMemory(repo) => repo.has_entries().await,
225 RelayerRepositoryStorage::Redis(repo) => repo.has_entries().await,
226 }
227 }
228
229 async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
230 match self {
231 RelayerRepositoryStorage::InMemory(repo) => repo.drop_all_entries().await,
232 RelayerRepositoryStorage::Redis(repo) => repo.drop_all_entries().await,
233 }
234 }
235}
236
237#[async_trait]
238impl RelayerRepository for RelayerRepositoryStorage {
239 async fn list_active(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
240 match self {
241 RelayerRepositoryStorage::InMemory(repo) => repo.list_active().await,
242 RelayerRepositoryStorage::Redis(repo) => repo.list_active().await,
243 }
244 }
245
246 async fn list_by_signer_id(
247 &self,
248 signer_id: &str,
249 ) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
250 match self {
251 RelayerRepositoryStorage::InMemory(repo) => repo.list_by_signer_id(signer_id).await,
252 RelayerRepositoryStorage::Redis(repo) => repo.list_by_signer_id(signer_id).await,
253 }
254 }
255
256 async fn list_by_notification_id(
257 &self,
258 notification_id: &str,
259 ) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
260 match self {
261 RelayerRepositoryStorage::InMemory(repo) => {
262 repo.list_by_notification_id(notification_id).await
263 }
264 RelayerRepositoryStorage::Redis(repo) => {
265 repo.list_by_notification_id(notification_id).await
266 }
267 }
268 }
269
270 async fn partial_update(
271 &self,
272 id: String,
273 update: UpdateRelayerRequest,
274 ) -> Result<RelayerRepoModel, RepositoryError> {
275 match self {
276 RelayerRepositoryStorage::InMemory(repo) => repo.partial_update(id, update).await,
277 RelayerRepositoryStorage::Redis(repo) => repo.partial_update(id, update).await,
278 }
279 }
280
281 async fn enable_relayer(
282 &self,
283 relayer_id: String,
284 ) -> Result<RelayerRepoModel, RepositoryError> {
285 match self {
286 RelayerRepositoryStorage::InMemory(repo) => repo.enable_relayer(relayer_id).await,
287 RelayerRepositoryStorage::Redis(repo) => repo.enable_relayer(relayer_id).await,
288 }
289 }
290
291 async fn disable_relayer(
292 &self,
293 relayer_id: String,
294 reason: DisabledReason,
295 ) -> Result<RelayerRepoModel, RepositoryError> {
296 match self {
297 RelayerRepositoryStorage::InMemory(repo) => {
298 repo.disable_relayer(relayer_id, reason).await
299 }
300 RelayerRepositoryStorage::Redis(repo) => repo.disable_relayer(relayer_id, reason).await,
301 }
302 }
303
304 async fn update_policy(
305 &self,
306 id: String,
307 policy: RelayerNetworkPolicy,
308 ) -> Result<RelayerRepoModel, RepositoryError> {
309 match self {
310 RelayerRepositoryStorage::InMemory(repo) => repo.update_policy(id, policy).await,
311 RelayerRepositoryStorage::Redis(repo) => repo.update_policy(id, policy).await,
312 }
313 }
314
315 fn is_persistent_storage(&self) -> bool {
316 match self {
317 RelayerRepositoryStorage::InMemory(_) => false,
318 RelayerRepositoryStorage::Redis(_) => true,
319 }
320 }
321
322 fn connection_info(&self) -> Option<(Arc<Pool>, String)> {
323 match self {
324 RelayerRepositoryStorage::InMemory(_) => None,
325 RelayerRepositoryStorage::Redis(repo) => {
326 Some((repo.connections.primary().clone(), repo.key_prefix.clone()))
327 }
328 }
329 }
330}
331
332#[cfg(test)]
333mod tests {
334 use super::*;
335 use crate::models::{NetworkType, RelayerEvmPolicy, RelayerNetworkPolicy};
336
337 fn create_test_relayer(id: String) -> RelayerRepoModel {
338 RelayerRepoModel {
339 id: id.clone(),
340 name: format!("Relayer {}", id.clone()),
341 network: "TestNet".to_string(),
342 paused: false,
343 network_type: NetworkType::Evm,
344 policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy {
345 min_balance: Some(0),
346 gas_limit_estimation: Some(true),
347 gas_price_cap: None,
348 whitelist_receivers: None,
349 eip1559_pricing: Some(false),
350 private_transactions: Some(false),
351 }),
352 signer_id: "test".to_string(),
353 address: "0x".to_string(),
354 notification_id: None,
355 system_disabled: false,
356 custom_rpc_urls: None,
357 ..Default::default()
358 }
359 }
360
361 #[actix_web::test]
362 async fn test_in_memory_repository_impl() {
363 let impl_repo = RelayerRepositoryStorage::new_in_memory();
364 let relayer = create_test_relayer("test-relayer".to_string());
365
366 let created = impl_repo.create(relayer.clone()).await.unwrap();
368 assert_eq!(created.id, relayer.id);
369
370 let retrieved = impl_repo
372 .get_by_id("test-relayer".to_string())
373 .await
374 .unwrap();
375 assert_eq!(retrieved.id, relayer.id);
376
377 let all_relayers = impl_repo.list_all().await.unwrap();
379 assert!(!all_relayers.is_empty());
380
381 let count = impl_repo.count().await.unwrap();
383 assert!(count >= 1);
384
385 let mut updated_relayer = relayer.clone();
387 updated_relayer.name = "Updated Name".to_string();
388 let updated = impl_repo
389 .update(relayer.id.clone(), updated_relayer)
390 .await
391 .unwrap();
392 assert_eq!(updated.name, "Updated Name");
393
394 impl_repo.delete_by_id(relayer.id.clone()).await.unwrap();
396 let get_result = impl_repo.get_by_id("test-relayer".to_string()).await;
397 assert!(get_result.is_err());
398 }
399
400 #[actix_web::test]
401 async fn test_relayer_repository_trait_methods() {
402 let impl_repo = RelayerRepositoryStorage::new_in_memory();
403 let relayer = create_test_relayer("test-relayer".to_string());
404
405 impl_repo.create(relayer.clone()).await.unwrap();
407
408 let active_relayers = impl_repo.list_active().await.unwrap();
410 assert!(!active_relayers.is_empty());
411
412 let update = UpdateRelayerRequest {
414 paused: Some(true),
415 ..Default::default()
416 };
417 let updated = impl_repo
418 .partial_update(relayer.id.clone(), update)
419 .await
420 .unwrap();
421 assert!(updated.paused);
422
423 let disabled = impl_repo
425 .disable_relayer(
426 relayer.id.clone(),
427 DisabledReason::BalanceCheckFailed("Test disable reason".to_string()),
428 )
429 .await
430 .unwrap();
431 assert!(disabled.system_disabled);
432 assert_eq!(
433 disabled.disabled_reason,
434 Some(DisabledReason::BalanceCheckFailed(
435 "Test disable reason".to_string()
436 ))
437 );
438
439 let enabled = impl_repo.enable_relayer(relayer.id.clone()).await.unwrap();
440 assert!(!enabled.system_disabled);
441 assert_eq!(enabled.disabled_reason, None);
442
443 let new_policy = RelayerNetworkPolicy::Evm(RelayerEvmPolicy {
445 min_balance: Some(1000000000000000000),
446 gas_limit_estimation: Some(true),
447 gas_price_cap: Some(50_000_000_000),
448 whitelist_receivers: None,
449 eip1559_pricing: Some(true),
450 private_transactions: Some(false),
451 });
452 let policy_updated = impl_repo
453 .update_policy(relayer.id.clone(), new_policy)
454 .await
455 .unwrap();
456
457 if let RelayerNetworkPolicy::Evm(evm_policy) = policy_updated.policies {
458 assert_eq!(evm_policy.gas_price_cap, Some(50_000_000_000));
459 assert_eq!(evm_policy.eip1559_pricing, Some(true));
460 } else {
461 panic!("Expected EVM policy");
462 }
463 }
464
465 #[actix_web::test]
466 async fn test_create_repository_in_memory() {
467 let result = RelayerRepositoryStorage::new_in_memory();
468
469 assert!(matches!(result, RelayerRepositoryStorage::InMemory(_)));
470 }
471
472 #[actix_web::test]
473 async fn test_pagination() {
474 let impl_repo = RelayerRepositoryStorage::new_in_memory();
475 let relayer1 = create_test_relayer("test-relayer-1".to_string());
476 let relayer2 = create_test_relayer("test-relayer-2".to_string());
477
478 impl_repo.create(relayer1).await.unwrap();
479 impl_repo.create(relayer2).await.unwrap();
480
481 let query = PaginationQuery {
482 page: 1,
483 per_page: 10,
484 };
485
486 let result = impl_repo.list_paginated(query).await.unwrap();
487 assert!(result.total >= 2);
488 assert_eq!(result.page, 1);
489 assert_eq!(result.per_page, 10);
490 }
491
492 #[actix_web::test]
493 async fn test_delete_relayer() {
494 let impl_repo = RelayerRepositoryStorage::new_in_memory();
495 let relayer = create_test_relayer("delete-test".to_string());
496
497 impl_repo.create(relayer.clone()).await.unwrap();
499
500 impl_repo
502 .delete_by_id("delete-test".to_string())
503 .await
504 .unwrap();
505
506 let get_result = impl_repo.get_by_id("delete-test".to_string()).await;
508 assert!(get_result.is_err());
509 assert!(matches!(
510 get_result.unwrap_err(),
511 RepositoryError::NotFound(_)
512 ));
513
514 let delete_result = impl_repo.delete_by_id("nonexistent".to_string()).await;
516 assert!(delete_result.is_err());
517 }
518
519 #[actix_web::test]
520 async fn test_has_entries() {
521 let repo = InMemoryRelayerRepository::new();
522 assert!(!repo.has_entries().await.unwrap());
523
524 let relayer = create_test_relayer("test".to_string());
525
526 repo.create(relayer.clone()).await.unwrap();
527 assert!(repo.has_entries().await.unwrap());
528
529 repo.delete_by_id(relayer.id.clone()).await.unwrap();
530 assert!(!repo.has_entries().await.unwrap());
531 }
532
533 #[actix_web::test]
534 async fn test_drop_all_entries() {
535 let repo = InMemoryRelayerRepository::new();
536 let relayer = create_test_relayer("test".to_string());
537
538 repo.create(relayer.clone()).await.unwrap();
539 assert!(repo.has_entries().await.unwrap());
540
541 repo.drop_all_entries().await.unwrap();
542 assert!(!repo.has_entries().await.unwrap());
543 }
544
545 #[tokio::test]
546 async fn test_connection_info_returns_none_for_in_memory() {
547 let storage = RelayerRepositoryStorage::new_in_memory();
548
549 assert!(storage.connection_info().is_none());
551 }
552
553 #[tokio::test]
554 async fn test_is_persistent_storage_returns_false_for_in_memory() {
555 let storage = RelayerRepositoryStorage::new_in_memory();
556
557 assert!(!storage.is_persistent_storage());
559 }
560
561 #[tokio::test]
562 async fn test_trait_connection_info_returns_none_for_in_memory() {
563 let storage = RelayerRepositoryStorage::new_in_memory();
564
565 let trait_ref: &dyn RelayerRepository = &storage;
567 assert!(trait_ref.connection_info().is_none());
568 }
569
570 #[tokio::test]
571 async fn test_struct_connection_info_returns_none_for_in_memory() {
572 let storage = RelayerRepositoryStorage::new_in_memory();
573
574 let result: Option<(Arc<Pool>, String)> = storage.connection_info();
576 assert!(result.is_none());
577 }
578}