1use std::sync::Arc;
28
29use crate::{
30 constants::{
31 transactions::PENDING_TRANSACTION_STATUSES, EVM_SMALLEST_UNIT_NAME,
32 EVM_STATUS_CHECK_INITIAL_DELAY_SECONDS,
33 },
34 domain::{
35 relayer::{Relayer, RelayerError},
36 BalanceResponse, SignDataRequest, SignDataResponse, SignTransactionExternalResponse,
37 SignTransactionRequest, SignTypedDataRequest,
38 },
39 jobs::{
40 JobProducerTrait, RelayerHealthCheck, TransactionRequest, TransactionSend,
41 TransactionStatusCheck,
42 },
43 models::{
44 produce_relayer_disabled_payload, DeletePendingTransactionsResponse, DisabledReason,
45 EvmNetwork, HealthCheckFailure, JsonRpcRequest, JsonRpcResponse, NetworkRepoModel,
46 NetworkRpcRequest, NetworkRpcResult, NetworkTransactionRequest, NetworkType,
47 PaginationQuery, RelayerRepoModel, RelayerStatus, RepositoryError, RpcErrorCodes,
48 TransactionRepoModel, TransactionStatus, TransactionUpdateRequest,
49 },
50 repositories::{NetworkRepository, RelayerRepository, Repository, TransactionRepository},
51 services::{
52 provider::{EvmProvider, EvmProviderTrait},
53 signer::{DataSignerTrait, EvmSigner},
54 TransactionCounterService, TransactionCounterServiceTrait,
55 },
56 utils::calculate_scheduled_timestamp,
57};
58use async_trait::async_trait;
59use eyre::Result;
60use tracing::{debug, error, info, instrument, warn};
61
62use super::{create_error_response, create_success_response, EvmTransactionValidator};
63use crate::utils::{map_provider_error, sanitize_error_description};
64
65#[allow(dead_code)]
66pub struct EvmRelayer<P, RR, NR, TR, J, S, TCS>
67where
68 P: EvmProviderTrait + Send + Sync,
69 RR: Repository<RelayerRepoModel, String> + RelayerRepository + Send + Sync + 'static,
70 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
71 TR: Repository<TransactionRepoModel, String> + TransactionRepository + Send + Sync + 'static,
72 J: JobProducerTrait + Send + Sync + 'static,
73 S: DataSignerTrait + Send + Sync + 'static,
74{
75 relayer: RelayerRepoModel,
76 signer: S,
77 network: EvmNetwork,
78 provider: P,
79 relayer_repository: Arc<RR>,
80 network_repository: Arc<NR>,
81 transaction_repository: Arc<TR>,
82 job_producer: Arc<J>,
83 transaction_counter_service: Arc<TCS>,
84}
85
86#[allow(clippy::too_many_arguments)]
87impl<P, RR, NR, TR, J, S, TCS> EvmRelayer<P, RR, NR, TR, J, S, TCS>
88where
89 P: EvmProviderTrait + Send + Sync,
90 RR: Repository<RelayerRepoModel, String> + RelayerRepository + Send + Sync + 'static,
91 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
92 TR: Repository<TransactionRepoModel, String> + TransactionRepository + Send + Sync + 'static,
93 J: JobProducerTrait + Send + Sync + 'static,
94 S: DataSignerTrait + Send + Sync + 'static,
95 TCS: TransactionCounterServiceTrait + Send + Sync + 'static,
96{
97 pub fn new(
114 relayer: RelayerRepoModel,
115 signer: S,
116 provider: P,
117 network: EvmNetwork,
118 relayer_repository: Arc<RR>,
119 network_repository: Arc<NR>,
120 transaction_repository: Arc<TR>,
121 transaction_counter_service: Arc<TCS>,
122 job_producer: Arc<J>,
123 ) -> Result<Self, RelayerError> {
124 Ok(Self {
125 relayer,
126 signer,
127 network,
128 provider,
129 relayer_repository,
130 network_repository,
131 transaction_repository,
132 transaction_counter_service,
133 job_producer,
134 })
135 }
136
137 #[instrument(
143 level = "debug",
144 skip(self),
145 fields(
146 request_id = ?crate::observability::request_id::get_request_id(),
147 relayer_id = %self.relayer.id,
148 )
149 )]
150 async fn sync_nonce(&self) -> Result<(), RelayerError> {
151 let on_chain_nonce = self
152 .provider
153 .get_transaction_count(&self.relayer.address)
154 .await
155 .map_err(|e| RelayerError::ProviderError(e.to_string()))?;
156
157 let transaction_counter_nonce = self
158 .transaction_counter_service
159 .get()
160 .await
161 .ok()
162 .flatten()
163 .unwrap_or(0);
164
165 let nonce = std::cmp::max(on_chain_nonce, transaction_counter_nonce);
166
167 debug!(
168 relayer_id = %self.relayer.id,
169 on_chain_nonce = %on_chain_nonce,
170 transaction_counter_nonce = %transaction_counter_nonce,
171 "syncing nonce"
172 );
173
174 debug!(nonce = %nonce, "setting nonce for relayer");
175
176 self.transaction_counter_service.set(nonce).await?;
177
178 Ok(())
179 }
180
181 #[instrument(
187 level = "debug",
188 skip(self),
189 fields(
190 request_id = ?crate::observability::request_id::get_request_id(),
191 relayer_id = %self.relayer.id,
192 )
193 )]
194 async fn validate_rpc(&self) -> Result<(), RelayerError> {
195 self.provider
196 .health_check()
197 .await
198 .map_err(|e| RelayerError::ProviderError(e.to_string()))?;
199
200 Ok(())
201 }
202
203 #[instrument(
213 level = "debug",
214 skip(self, transaction),
215 fields(
216 request_id = ?crate::observability::request_id::get_request_id(),
217 relayer_id = %self.relayer.id,
218 tx_id = %transaction.id,
219 )
220 )]
221 async fn cancel_transaction_via_job(
222 &self,
223 transaction: TransactionRepoModel,
224 ) -> Result<(), RelayerError> {
225 let cancel_job = TransactionSend::cancel(
226 transaction.id.clone(),
227 transaction.relayer_id.clone(),
228 "Cancelled via delete_pending_transactions".to_string(),
229 );
230
231 self.job_producer
232 .produce_submit_transaction_job(cancel_job, None)
233 .await
234 .map_err(RelayerError::from)?;
235
236 Ok(())
237 }
238}
239
240pub type DefaultEvmRelayer<J, T, RR, NR, TCR> =
242 EvmRelayer<EvmProvider, RR, NR, T, J, EvmSigner, TransactionCounterService<TCR>>;
243
244#[async_trait]
245impl<P, RR, NR, TR, J, S, TCS> Relayer for EvmRelayer<P, RR, NR, TR, J, S, TCS>
246where
247 P: EvmProviderTrait + Send + Sync,
248 RR: Repository<RelayerRepoModel, String> + RelayerRepository + Send + Sync + 'static,
249 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
250 TR: Repository<TransactionRepoModel, String> + TransactionRepository + Send + Sync + 'static,
251 J: JobProducerTrait + Send + Sync + 'static,
252 S: DataSignerTrait + Send + Sync + 'static,
253 TCS: TransactionCounterServiceTrait + Send + Sync + 'static,
254{
255 #[instrument(
265 level = "debug",
266 skip(self, network_transaction),
267 fields(
268 request_id = ?crate::observability::request_id::get_request_id(),
269 relayer_id = %self.relayer.id,
270 network_type = ?self.relayer.network_type,
271 )
272 )]
273 async fn process_transaction_request(
274 &self,
275 network_transaction: NetworkTransactionRequest,
276 ) -> Result<TransactionRepoModel, RelayerError> {
277 let network_model = self
278 .network_repository
279 .get_by_name(NetworkType::Evm, &self.relayer.network)
280 .await?
281 .ok_or_else(|| {
282 RelayerError::NetworkConfiguration(format!(
283 "Network {} not found",
284 self.relayer.network
285 ))
286 })?;
287 let transaction =
288 TransactionRepoModel::try_from((&network_transaction, &self.relayer, &network_model))?;
289
290 self.transaction_repository
291 .create(transaction.clone())
292 .await
293 .map_err(|e| RepositoryError::TransactionFailure(e.to_string()))?;
294
295 if let Err(e) = self
299 .job_producer
300 .produce_check_transaction_status_job(
301 TransactionStatusCheck::new(
302 transaction.id.clone(),
303 transaction.relayer_id.clone(),
304 crate::models::NetworkType::Evm,
305 ),
306 Some(calculate_scheduled_timestamp(
307 EVM_STATUS_CHECK_INITIAL_DELAY_SECONDS,
308 )),
309 )
310 .await
311 {
312 error!(
314 relayer_id = %self.relayer.id,
315 transaction_id = %transaction.id,
316 error = %e,
317 "Status check queue push failed - marking transaction as failed"
318 );
319 if let Err(update_err) = self
320 .transaction_repository
321 .partial_update(
322 transaction.id.clone(),
323 TransactionUpdateRequest {
324 status: Some(TransactionStatus::Failed),
325 status_reason: Some("Queue unavailable".to_string()),
326 ..Default::default()
327 },
328 )
329 .await
330 {
331 warn!(
332 relayer_id = %self.relayer.id,
333 transaction_id = %transaction.id,
334 error = %update_err,
335 "Failed to mark transaction as failed after queue push failure"
336 );
337 }
338 return Err(e.into());
339 }
340
341 self.job_producer
344 .produce_transaction_request_job(
345 TransactionRequest::new(transaction.id.clone(), transaction.relayer_id.clone()),
346 None,
347 )
348 .await?;
349
350 Ok(transaction)
351 }
352
353 #[instrument(
359 level = "debug",
360 skip(self),
361 fields(
362 request_id = ?crate::observability::request_id::get_request_id(),
363 relayer_id = %self.relayer.id,
364 )
365 )]
366 async fn get_balance(&self) -> Result<BalanceResponse, RelayerError> {
367 let balance: u128 = self
368 .provider
369 .get_balance(&self.relayer.address)
370 .await
371 .map_err(|e| RelayerError::ProviderError(e.to_string()))?
372 .try_into()
373 .map_err(|_| {
374 RelayerError::ProviderError("Failed to convert balance to u128".to_string())
375 })?;
376
377 Ok(BalanceResponse {
378 balance,
379 unit: EVM_SMALLEST_UNIT_NAME.to_string(),
380 })
381 }
382
383 #[instrument(
389 level = "debug",
390 skip(self),
391 fields(
392 request_id = ?crate::observability::request_id::get_request_id(),
393 relayer_id = %self.relayer.id,
394 )
395 )]
396 async fn get_status(&self) -> Result<RelayerStatus, RelayerError> {
397 let relayer_model = &self.relayer;
398
399 let nonce = self
401 .transaction_counter_service
402 .get()
403 .await
404 .ok()
405 .flatten()
406 .unwrap_or(0);
407 let nonce_str = nonce.to_string();
408
409 let balance_response = self.get_balance().await?;
410
411 let pending_transactions_count = self
413 .transaction_repository
414 .count_by_status(&relayer_model.id, PENDING_TRANSACTION_STATUSES)
415 .await
416 .map_err(RelayerError::from)?;
417
418 let last_confirmed_transaction_timestamp = self
420 .transaction_repository
421 .find_by_status_paginated(
422 &relayer_model.id,
423 &[TransactionStatus::Confirmed],
424 PaginationQuery {
425 page: 1,
426 per_page: 1,
427 },
428 false, )
430 .await
431 .map_err(RelayerError::from)?
432 .items
433 .into_iter()
434 .next()
435 .and_then(|tx| tx.confirmed_at);
436
437 Ok(RelayerStatus::Evm {
438 balance: balance_response.balance.to_string(),
439 pending_transactions_count,
440 last_confirmed_transaction_timestamp,
441 system_disabled: relayer_model.system_disabled,
442 paused: relayer_model.paused,
443 nonce: nonce_str,
444 })
445 }
446
447 #[instrument(
454 level = "debug",
455 skip(self),
456 fields(
457 request_id = ?crate::observability::request_id::get_request_id(),
458 relayer_id = %self.relayer.id,
459 )
460 )]
461 async fn delete_pending_transactions(
462 &self,
463 ) -> Result<DeletePendingTransactionsResponse, RelayerError> {
464 let pending_statuses = [
465 TransactionStatus::Pending,
466 TransactionStatus::Sent,
467 TransactionStatus::Submitted,
468 ];
469
470 let pending_transactions = self
472 .transaction_repository
473 .find_by_status(&self.relayer.id, &pending_statuses[..])
474 .await
475 .map_err(RelayerError::from)?;
476
477 let transaction_count = pending_transactions.len();
478
479 if transaction_count == 0 {
480 info!(
481 relayer_id = %self.relayer.id,
482 "no pending transactions found for relayer"
483 );
484 return Ok(DeletePendingTransactionsResponse {
485 queued_for_cancellation_transaction_ids: vec![],
486 failed_to_queue_transaction_ids: vec![],
487 total_processed: 0,
488 });
489 }
490
491 info!(
492 relayer_id = %self.relayer.id,
493 transaction_count = %transaction_count,
494 "processing pending transactions for relayer"
495 );
496
497 let mut cancelled_transaction_ids = Vec::new();
498 let mut failed_transaction_ids = Vec::new();
499
500 for transaction in pending_transactions {
502 match self.cancel_transaction_via_job(transaction.clone()).await {
503 Ok(_) => {
504 cancelled_transaction_ids.push(transaction.id.clone());
505 info!(
506 tx_id = %transaction.id,
507 relayer_id = %self.relayer.id,
508 status = ?transaction.status,
509 "initiated cancellation for transaction"
510 );
511 }
512 Err(e) => {
513 failed_transaction_ids.push(transaction.id.clone());
514 warn!(
515 tx_id = %transaction.id,
516 relayer_id = %self.relayer.id,
517 error = %e,
518 "failed to cancel transaction"
519 );
520 }
521 }
522 }
523
524 let total_processed = cancelled_transaction_ids.len() + failed_transaction_ids.len();
525
526 debug!(
527 queued_for_cancellation = %cancelled_transaction_ids.len(),
528 failed_to_queue = %failed_transaction_ids.len(),
529 "completed processing pending transactions for relayer"
530 );
531
532 Ok(DeletePendingTransactionsResponse {
533 queued_for_cancellation_transaction_ids: cancelled_transaction_ids,
534 failed_to_queue_transaction_ids: failed_transaction_ids,
535 total_processed: total_processed as u32,
536 })
537 }
538
539 #[instrument(
549 level = "debug",
550 skip(self, request),
551 fields(
552 request_id = ?crate::observability::request_id::get_request_id(),
553 relayer_id = %self.relayer.id,
554 )
555 )]
556 async fn sign_data(&self, request: SignDataRequest) -> Result<SignDataResponse, RelayerError> {
557 let result = self.signer.sign_data(request).await?;
558
559 Ok(result)
560 }
561
562 #[instrument(
572 level = "debug",
573 skip(self, request),
574 fields(
575 request_id = ?crate::observability::request_id::get_request_id(),
576 relayer_id = %self.relayer.id,
577 )
578 )]
579 async fn sign_typed_data(
580 &self,
581 request: SignTypedDataRequest,
582 ) -> Result<SignDataResponse, RelayerError> {
583 let result = self.signer.sign_typed_data(request).await?;
584
585 Ok(result)
586 }
587
588 #[instrument(
598 level = "debug",
599 skip(self, request),
600 fields(
601 request_id = ?crate::observability::request_id::get_request_id(),
602 relayer_id = %self.relayer.id,
603 )
604 )]
605 async fn rpc(
606 &self,
607 request: JsonRpcRequest<NetworkRpcRequest>,
608 ) -> Result<JsonRpcResponse<NetworkRpcResult>, RelayerError> {
609 let evm_request = match request.params {
610 NetworkRpcRequest::Evm(evm_req) => evm_req,
611 _ => {
612 return Ok(create_error_response(
613 request.id,
614 RpcErrorCodes::INVALID_PARAMS,
615 "Invalid params",
616 "Expected EVM network request",
617 ))
618 }
619 };
620
621 let (method, params_json) = match evm_request {
623 crate::models::EvmRpcRequest::RawRpcRequest { method, params } => (method, params),
624 };
625
626 match self.provider.raw_request_dyn(&method, params_json).await {
628 Ok(result_value) => Ok(create_success_response(request.id, result_value)),
629 Err(provider_error) => {
630 tracing::error!(
632 error = %provider_error,
633 "RPC provider error occurred"
634 );
635 let (error_code, error_message) = map_provider_error(&provider_error);
636 let sanitized_description = sanitize_error_description(&provider_error);
637 Ok(create_error_response(
638 request.id,
639 error_code,
640 error_message,
641 &sanitized_description,
642 ))
643 }
644 }
645 }
646
647 #[instrument(
653 level = "debug",
654 skip(self),
655 fields(
656 request_id = ?crate::observability::request_id::get_request_id(),
657 relayer_id = %self.relayer.id,
658 )
659 )]
660 async fn validate_min_balance(&self) -> Result<(), RelayerError> {
661 let policy = self.relayer.policies.get_evm_policy();
662 EvmTransactionValidator::init_balance_validation(
663 &self.relayer.address,
664 &policy,
665 &self.provider,
666 )
667 .await
668 .map_err(|e| RelayerError::InsufficientBalanceError(e.to_string()))?;
669
670 Ok(())
671 }
672
673 #[instrument(
679 level = "debug",
680 skip(self),
681 fields(
682 request_id = ?crate::observability::request_id::get_request_id(),
683 relayer_id = %self.relayer.id,
684 )
685 )]
686 async fn check_health(&self) -> Result<(), Vec<HealthCheckFailure>> {
687 debug!("running health checks");
688
689 let nonce_sync_result = self.sync_nonce().await;
690 let validate_rpc_result = self.validate_rpc().await;
691 let validate_min_balance_result = self.validate_min_balance().await;
692
693 let failures: Vec<HealthCheckFailure> = vec![
695 nonce_sync_result
696 .err()
697 .map(|e| HealthCheckFailure::NonceSyncFailed(e.to_string())),
698 validate_rpc_result
699 .err()
700 .map(|e| HealthCheckFailure::RpcValidationFailed(e.to_string())),
701 validate_min_balance_result
702 .err()
703 .map(|e| HealthCheckFailure::BalanceCheckFailed(e.to_string())),
704 ]
705 .into_iter()
706 .flatten()
707 .collect();
708
709 if failures.is_empty() {
710 info!("all health checks passed");
711 Ok(())
712 } else {
713 warn!("health checks failed: {:?}", failures);
714 Err(failures)
715 }
716 }
717
718 #[instrument(
719 level = "debug",
720 skip(self),
721 fields(
722 request_id = ?crate::observability::request_id::get_request_id(),
723 relayer_id = %self.relayer.id,
724 )
725 )]
726 async fn initialize_relayer(&self) -> Result<(), RelayerError> {
727 debug!("initializing EVM relayer");
728
729 match self.check_health().await {
730 Ok(_) => {
731 if self.relayer.system_disabled {
733 self.relayer_repository
735 .enable_relayer(self.relayer.id.clone())
736 .await?;
737 }
738 Ok(())
739 }
740 Err(failures) => {
741 let reason = DisabledReason::from_health_failures(failures).unwrap_or_else(|| {
743 DisabledReason::RpcValidationFailed("Unknown error".to_string())
744 });
745
746 warn!(reason = %reason, "disabling relayer");
747 let updated_relayer = self
748 .relayer_repository
749 .disable_relayer(self.relayer.id.clone(), reason.clone())
750 .await?;
751
752 if let Some(notification_id) = &self.relayer.notification_id {
754 self.job_producer
755 .produce_send_notification_job(
756 produce_relayer_disabled_payload(
757 notification_id,
758 &updated_relayer,
759 &reason.safe_description(),
760 ),
761 None,
762 )
763 .await?;
764 }
765
766 self.job_producer
768 .produce_relayer_health_check_job(
769 RelayerHealthCheck::new(self.relayer.id.clone()),
770 Some(calculate_scheduled_timestamp(10)),
771 )
772 .await?;
773
774 Ok(())
775 }
776 }
777 }
778
779 #[instrument(
780 level = "debug",
781 skip(self, _request),
782 fields(
783 request_id = ?crate::observability::request_id::get_request_id(),
784 relayer_id = %self.relayer.id,
785 )
786 )]
787 async fn sign_transaction(
788 &self,
789 _request: &SignTransactionRequest,
790 ) -> Result<SignTransactionExternalResponse, RelayerError> {
791 Err(RelayerError::NotSupported(
792 "Transaction signing not supported for EVM".to_string(),
793 ))
794 }
795}
796
797#[cfg(test)]
798mod tests {
799 use super::*;
800 use crate::models::RpcConfig;
801 use crate::{
802 config::{EvmNetworkConfig, NetworkConfigCommon},
803 jobs::MockJobProducerTrait,
804 models::{
805 EvmRpcRequest, EvmRpcResult, JsonRpcId, NetworkRepoModel, NetworkType,
806 RelayerEvmPolicy, RelayerNetworkPolicy, RepositoryError, SignerError,
807 TransactionStatus, U256,
808 },
809 repositories::{MockNetworkRepository, MockRelayerRepository, MockTransactionRepository},
810 services::{
811 provider::{MockEvmProviderTrait, ProviderError},
812 MockTransactionCounterServiceTrait,
813 },
814 };
815 use mockall::predicate::*;
816 use std::future::ready;
817
818 mockall::mock! {
819 pub DataSigner {}
820
821 #[async_trait]
822 impl DataSignerTrait for DataSigner {
823 async fn sign_data(&self, request: SignDataRequest) -> Result<SignDataResponse, SignerError>;
824 async fn sign_typed_data(&self, request: SignTypedDataRequest) -> Result<SignDataResponse, SignerError>;
825 }
826 }
827
828 fn create_test_evm_network() -> EvmNetwork {
829 EvmNetwork {
830 network: "mainnet".to_string(),
831 rpc_urls: vec![RpcConfig::new(
832 "https://mainnet.infura.io/v3/YOUR_INFURA_API_KEY".to_string(),
833 )],
834 explorer_urls: None,
835 average_blocktime_ms: 12000,
836 is_testnet: false,
837 tags: vec!["mainnet".to_string()],
838 chain_id: 1,
839 required_confirmations: 1,
840 features: vec!["eip1559".to_string()],
841 symbol: "ETH".to_string(),
842 gas_price_cache: None,
843 }
844 }
845
846 fn create_test_network_repo_model() -> NetworkRepoModel {
847 let config = EvmNetworkConfig {
848 common: NetworkConfigCommon {
849 network: "mainnet".to_string(),
850 from: None,
851 rpc_urls: Some(vec![crate::models::RpcConfig::new(
852 "https://mainnet.infura.io/v3/YOUR_INFURA_API_KEY".to_string(),
853 )]),
854 explorer_urls: None,
855 average_blocktime_ms: Some(12000),
856 is_testnet: Some(false),
857 tags: Some(vec!["mainnet".to_string()]),
858 },
859 chain_id: Some(1),
860 required_confirmations: Some(1),
861 features: Some(vec!["eip1559".to_string()]),
862 symbol: Some("ETH".to_string()),
863 gas_price_cache: None,
864 };
865
866 NetworkRepoModel::new_evm(config)
867 }
868
869 fn create_test_relayer() -> RelayerRepoModel {
870 RelayerRepoModel {
871 id: "test-relayer-id".to_string(),
872 name: "Test Relayer".to_string(),
873 network: "mainnet".to_string(), address: "0xSender".to_string(),
875 paused: false,
876 system_disabled: false,
877 signer_id: "test-signer-id".to_string(),
878 notification_id: Some("test-notification-id".to_string()),
879 policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy {
880 min_balance: Some(100000000000000000u128), whitelist_receivers: Some(vec!["0xRecipient".to_string()]),
882 gas_price_cap: Some(100000000000), eip1559_pricing: Some(true),
884 private_transactions: Some(false),
885 gas_limit_estimation: Some(true),
886 }),
887 network_type: NetworkType::Evm,
888 custom_rpc_urls: None,
889 ..Default::default()
890 }
891 }
892
893 fn setup_mocks() -> (
894 MockEvmProviderTrait,
895 MockRelayerRepository,
896 MockNetworkRepository,
897 MockTransactionRepository,
898 MockJobProducerTrait,
899 MockDataSigner,
900 MockTransactionCounterServiceTrait,
901 ) {
902 (
903 MockEvmProviderTrait::new(),
904 MockRelayerRepository::new(),
905 MockNetworkRepository::new(),
906 MockTransactionRepository::new(),
907 MockJobProducerTrait::new(),
908 MockDataSigner::new(),
909 MockTransactionCounterServiceTrait::new(),
910 )
911 }
912
913 #[tokio::test]
914 async fn test_get_balance() {
915 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
916 setup_mocks();
917 let relayer_model = create_test_relayer();
918
919 provider
920 .expect_get_balance()
921 .with(eq("0xSender"))
922 .returning(|_| Box::pin(ready(Ok(U256::from(1000000000000000000u64))))); let relayer = EvmRelayer::new(
925 relayer_model,
926 signer,
927 provider,
928 create_test_evm_network(),
929 Arc::new(relayer_repo),
930 Arc::new(network_repo),
931 Arc::new(tx_repo),
932 Arc::new(counter),
933 Arc::new(job_producer),
934 )
935 .unwrap();
936
937 let balance = relayer.get_balance().await.unwrap();
938 assert_eq!(balance.balance, 1000000000000000000u128);
939 assert_eq!(balance.unit, EVM_SMALLEST_UNIT_NAME);
940 }
941
942 #[tokio::test]
943 async fn test_process_transaction_request() {
944 let (
945 provider,
946 relayer_repo,
947 mut network_repo,
948 mut tx_repo,
949 mut job_producer,
950 signer,
951 counter,
952 ) = setup_mocks();
953 let relayer_model = create_test_relayer();
954
955 let network_tx = NetworkTransactionRequest::Evm(crate::models::EvmTransactionRequest {
956 to: Some("0xRecipient".to_string()),
957 value: U256::from(1000000000000000000u64),
958 data: Some("0xData".to_string()),
959 gas_limit: Some(21000),
960 gas_price: Some(20000000000),
961 max_fee_per_gas: None,
962 max_priority_fee_per_gas: None,
963 speed: None,
964 valid_until: None,
965 });
966
967 network_repo
968 .expect_get_by_name()
969 .with(eq(NetworkType::Evm), eq("mainnet"))
970 .returning(|_, _| Ok(Some(create_test_network_repo_model())));
971
972 tx_repo.expect_create().returning(Ok);
973 job_producer
974 .expect_produce_transaction_request_job()
975 .returning(|_, _| Box::pin(ready(Ok(()))));
976 job_producer
977 .expect_produce_check_transaction_status_job()
978 .returning(|_, _| Box::pin(ready(Ok(()))));
979
980 let relayer = EvmRelayer::new(
981 relayer_model,
982 signer,
983 provider,
984 create_test_evm_network(),
985 Arc::new(relayer_repo),
986 Arc::new(network_repo),
987 Arc::new(tx_repo),
988 Arc::new(counter),
989 Arc::new(job_producer),
990 )
991 .unwrap();
992
993 let result = relayer.process_transaction_request(network_tx).await;
994 assert!(result.is_ok());
995 }
996
997 #[tokio::test]
998 async fn test_process_transaction_request_status_check_failure_returns_error() {
999 let (
1000 provider,
1001 relayer_repo,
1002 mut network_repo,
1003 mut tx_repo,
1004 mut job_producer,
1005 signer,
1006 counter,
1007 ) = setup_mocks();
1008 let relayer_model = create_test_relayer();
1009
1010 let network_tx = NetworkTransactionRequest::Evm(crate::models::EvmTransactionRequest {
1011 to: Some("0xRecipient".to_string()),
1012 value: U256::from(1000000000000000000u64),
1013 data: Some("0xData".to_string()),
1014 gas_limit: Some(21000),
1015 gas_price: Some(20000000000),
1016 max_fee_per_gas: None,
1017 max_priority_fee_per_gas: None,
1018 speed: None,
1019 valid_until: None,
1020 });
1021
1022 network_repo
1023 .expect_get_by_name()
1024 .with(eq(NetworkType::Evm), eq("mainnet"))
1025 .returning(|_, _| Ok(Some(create_test_network_repo_model())));
1026
1027 tx_repo.expect_create().returning(Ok);
1028 tx_repo
1030 .expect_partial_update()
1031 .returning(|_, _| Ok(TransactionRepoModel::default()));
1032
1033 job_producer
1035 .expect_produce_check_transaction_status_job()
1036 .returning(|_, _| {
1037 Box::pin(ready(Err(crate::jobs::JobProducerError::QueueError(
1038 "Failed to queue job".to_string(),
1039 ))))
1040 });
1041
1042 let relayer = EvmRelayer::new(
1046 relayer_model,
1047 signer,
1048 provider,
1049 create_test_evm_network(),
1050 Arc::new(relayer_repo),
1051 Arc::new(network_repo),
1052 Arc::new(tx_repo),
1053 Arc::new(counter),
1054 Arc::new(job_producer),
1055 )
1056 .unwrap();
1057
1058 let result = relayer.process_transaction_request(network_tx).await;
1059 assert!(result.is_err());
1060 }
1061
1062 #[tokio::test]
1063 async fn test_process_transaction_request_status_check_failure_marks_tx_failed() {
1064 let (
1065 provider,
1066 relayer_repo,
1067 mut network_repo,
1068 mut tx_repo,
1069 mut job_producer,
1070 signer,
1071 counter,
1072 ) = setup_mocks();
1073 let relayer_model = create_test_relayer();
1074
1075 let network_tx = NetworkTransactionRequest::Evm(crate::models::EvmTransactionRequest {
1076 to: Some("0xRecipient".to_string()),
1077 value: U256::from(1000000000000000000u64),
1078 data: Some("0xData".to_string()),
1079 gas_limit: Some(21000),
1080 gas_price: Some(20000000000),
1081 max_fee_per_gas: None,
1082 max_priority_fee_per_gas: None,
1083 speed: None,
1084 valid_until: None,
1085 });
1086
1087 network_repo
1088 .expect_get_by_name()
1089 .with(eq(NetworkType::Evm), eq("mainnet"))
1090 .returning(|_, _| Ok(Some(create_test_network_repo_model())));
1091
1092 tx_repo.expect_create().returning(Ok);
1093
1094 tx_repo
1096 .expect_partial_update()
1097 .withf(|_tx_id, update| {
1098 update.status == Some(TransactionStatus::Failed)
1099 && update.status_reason == Some("Queue unavailable".to_string())
1100 })
1101 .returning(|_, _| Ok(TransactionRepoModel::default()));
1102
1103 job_producer
1104 .expect_produce_check_transaction_status_job()
1105 .returning(|_, _| {
1106 Box::pin(ready(Err(crate::jobs::JobProducerError::QueueError(
1107 "Redis timeout".to_string(),
1108 ))))
1109 });
1110
1111 let relayer = EvmRelayer::new(
1112 relayer_model,
1113 signer,
1114 provider,
1115 create_test_evm_network(),
1116 Arc::new(relayer_repo),
1117 Arc::new(network_repo),
1118 Arc::new(tx_repo),
1119 Arc::new(counter),
1120 Arc::new(job_producer),
1121 )
1122 .unwrap();
1123
1124 let result = relayer.process_transaction_request(network_tx).await;
1125 assert!(result.is_err());
1126 }
1128
1129 #[tokio::test]
1130 async fn test_validate_min_balance_sufficient() {
1131 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
1132 setup_mocks();
1133 let relayer_model = create_test_relayer();
1134
1135 provider
1136 .expect_get_balance()
1137 .returning(|_| Box::pin(ready(Ok(U256::from(200000000000000000u64))))); let relayer = EvmRelayer::new(
1140 relayer_model,
1141 signer,
1142 provider,
1143 create_test_evm_network(),
1144 Arc::new(relayer_repo),
1145 Arc::new(network_repo),
1146 Arc::new(tx_repo),
1147 Arc::new(counter),
1148 Arc::new(job_producer),
1149 )
1150 .unwrap();
1151
1152 let result = relayer.validate_min_balance().await;
1153 assert!(result.is_ok());
1154 }
1155
1156 #[tokio::test]
1157 async fn test_validate_min_balance_insufficient() {
1158 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
1159 setup_mocks();
1160 let relayer_model = create_test_relayer();
1161
1162 provider
1163 .expect_get_balance()
1164 .returning(|_| Box::pin(ready(Ok(U256::from(50000000000000000u64))))); let relayer = EvmRelayer::new(
1167 relayer_model,
1168 signer,
1169 provider,
1170 create_test_evm_network(),
1171 Arc::new(relayer_repo),
1172 Arc::new(network_repo),
1173 Arc::new(tx_repo),
1174 Arc::new(counter),
1175 Arc::new(job_producer),
1176 )
1177 .unwrap();
1178
1179 let result = relayer.validate_min_balance().await;
1180 assert!(matches!(
1181 result,
1182 Err(RelayerError::InsufficientBalanceError(_))
1183 ));
1184 }
1185
1186 #[tokio::test]
1187 async fn test_sync_nonce() {
1188 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, mut counter) =
1189 setup_mocks();
1190 let relayer_model = create_test_relayer();
1191
1192 provider
1193 .expect_get_transaction_count()
1194 .returning(|_| Box::pin(ready(Ok(42u64))));
1195
1196 counter
1197 .expect_set()
1198 .returning(|_nonce| Box::pin(ready(Ok(()))));
1199
1200 counter
1201 .expect_get()
1202 .returning(|| Box::pin(ready(Ok(Some(42u64)))));
1203
1204 let relayer = EvmRelayer::new(
1205 relayer_model,
1206 signer,
1207 provider,
1208 create_test_evm_network(),
1209 Arc::new(relayer_repo),
1210 Arc::new(network_repo),
1211 Arc::new(tx_repo),
1212 Arc::new(counter),
1213 Arc::new(job_producer),
1214 )
1215 .unwrap();
1216
1217 let result = relayer.sync_nonce().await;
1218 assert!(result.is_ok());
1219 }
1220
1221 #[tokio::test]
1222 async fn test_sync_nonce_lower_on_chain_nonce() {
1223 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, mut counter) =
1224 setup_mocks();
1225 let relayer_model = create_test_relayer();
1226
1227 provider
1228 .expect_get_transaction_count()
1229 .returning(|_| Box::pin(ready(Ok(40u64))));
1230
1231 counter
1232 .expect_set()
1233 .with(eq(42u64))
1234 .returning(|_nonce| Box::pin(ready(Ok(()))));
1235
1236 counter
1237 .expect_get()
1238 .returning(|| Box::pin(ready(Ok(Some(42u64)))));
1239
1240 let relayer = EvmRelayer::new(
1241 relayer_model,
1242 signer,
1243 provider,
1244 create_test_evm_network(),
1245 Arc::new(relayer_repo),
1246 Arc::new(network_repo),
1247 Arc::new(tx_repo),
1248 Arc::new(counter),
1249 Arc::new(job_producer),
1250 )
1251 .unwrap();
1252
1253 let result = relayer.sync_nonce().await;
1254 assert!(result.is_ok());
1255 }
1256
1257 #[tokio::test]
1258 async fn test_sync_nonce_lower_transaction_counter_nonce() {
1259 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, mut counter) =
1260 setup_mocks();
1261 let relayer_model = create_test_relayer();
1262
1263 provider
1264 .expect_get_transaction_count()
1265 .returning(|_| Box::pin(ready(Ok(42u64))));
1266
1267 counter
1268 .expect_set()
1269 .with(eq(42u64))
1270 .returning(|_nonce| Box::pin(ready(Ok(()))));
1271
1272 counter
1273 .expect_get()
1274 .returning(|| Box::pin(ready(Ok(Some(40u64)))));
1275
1276 let relayer = EvmRelayer::new(
1277 relayer_model,
1278 signer,
1279 provider,
1280 create_test_evm_network(),
1281 Arc::new(relayer_repo),
1282 Arc::new(network_repo),
1283 Arc::new(tx_repo),
1284 Arc::new(counter),
1285 Arc::new(job_producer),
1286 )
1287 .unwrap();
1288
1289 let result = relayer.sync_nonce().await;
1290 assert!(result.is_ok());
1291 }
1292
1293 #[tokio::test]
1294 async fn test_validate_rpc() {
1295 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
1296 setup_mocks();
1297 let relayer_model = create_test_relayer();
1298
1299 provider
1300 .expect_health_check()
1301 .returning(|| Box::pin(ready(Ok(true))));
1302
1303 let relayer = EvmRelayer::new(
1304 relayer_model,
1305 signer,
1306 provider,
1307 create_test_evm_network(),
1308 Arc::new(relayer_repo),
1309 Arc::new(network_repo),
1310 Arc::new(tx_repo),
1311 Arc::new(counter),
1312 Arc::new(job_producer),
1313 )
1314 .unwrap();
1315
1316 let result = relayer.validate_rpc().await;
1317 assert!(result.is_ok());
1318 }
1319
1320 #[tokio::test]
1321 async fn test_get_status_success() {
1322 let (
1323 mut provider,
1324 relayer_repo,
1325 network_repo,
1326 mut tx_repo,
1327 job_producer,
1328 signer,
1329 mut counter,
1330 ) = setup_mocks();
1331 let relayer_model = create_test_relayer();
1332
1333 counter
1335 .expect_get()
1336 .returning(|| Box::pin(ready(Ok(Some(10u64)))))
1337 .once();
1338 provider
1339 .expect_get_balance()
1340 .returning(|_| Box::pin(ready(Ok(U256::from(1000000000000000000u64)))))
1341 .once();
1342
1343 tx_repo
1345 .expect_count_by_status()
1346 .withf(|relayer_id, statuses| {
1347 relayer_id == "test-relayer-id"
1348 && statuses
1349 == [
1350 TransactionStatus::Pending,
1351 TransactionStatus::Sent,
1352 TransactionStatus::Submitted,
1353 ]
1354 })
1355 .returning(|_, _| Ok(0u64))
1356 .once();
1357
1358 let latest_confirmed_tx = TransactionRepoModel {
1360 id: "tx1".to_string(),
1361 relayer_id: relayer_model.id.clone(),
1362 status: TransactionStatus::Confirmed,
1363 confirmed_at: Some("2023-01-01T12:00:00Z".to_string()),
1364 ..TransactionRepoModel::default()
1365 };
1366 let relayer_id_clone = relayer_model.id.clone();
1367 tx_repo
1368 .expect_find_by_status_paginated()
1369 .withf(move |relayer_id, statuses, query, oldest_first| {
1370 *relayer_id == relayer_id_clone
1371 && statuses == [TransactionStatus::Confirmed]
1372 && query.page == 1
1373 && query.per_page == 1
1374 && !(*oldest_first)
1375 })
1376 .returning(move |_, _, _, _| {
1377 Ok(crate::repositories::PaginatedResult {
1378 items: vec![latest_confirmed_tx.clone()],
1379 total: 1,
1380 page: 1,
1381 per_page: 1,
1382 })
1383 })
1384 .once();
1385
1386 let relayer = EvmRelayer::new(
1387 relayer_model.clone(),
1388 signer,
1389 provider,
1390 create_test_evm_network(),
1391 Arc::new(relayer_repo),
1392 Arc::new(network_repo),
1393 Arc::new(tx_repo),
1394 Arc::new(counter),
1395 Arc::new(job_producer),
1396 )
1397 .unwrap();
1398
1399 let status = relayer.get_status().await.unwrap();
1400
1401 match status {
1402 RelayerStatus::Evm {
1403 balance,
1404 pending_transactions_count,
1405 last_confirmed_transaction_timestamp,
1406 system_disabled,
1407 paused,
1408 nonce,
1409 } => {
1410 assert_eq!(balance, "1000000000000000000");
1411 assert_eq!(pending_transactions_count, 0);
1412 assert_eq!(
1413 last_confirmed_transaction_timestamp,
1414 Some("2023-01-01T12:00:00Z".to_string())
1415 );
1416 assert_eq!(system_disabled, relayer_model.system_disabled);
1417 assert_eq!(paused, relayer_model.paused);
1418 assert_eq!(nonce, "10");
1419 }
1420 _ => panic!("Expected EVM RelayerStatus"),
1421 }
1422 }
1423
1424 #[tokio::test]
1425 async fn test_get_status_provider_nonce_error() {
1426 let (
1427 mut provider,
1428 relayer_repo,
1429 network_repo,
1430 mut tx_repo,
1431 job_producer,
1432 signer,
1433 mut counter,
1434 ) = setup_mocks();
1435 let relayer_model = create_test_relayer();
1436
1437 counter
1439 .expect_get()
1440 .returning(|| Box::pin(ready(Ok(None))))
1441 .once();
1442 provider
1443 .expect_get_balance()
1444 .returning(|_| Box::pin(ready(Ok(U256::from(1000000000000000000u64)))))
1445 .once();
1446
1447 tx_repo
1449 .expect_count_by_status()
1450 .returning(|_, _| Ok(0u64))
1451 .once();
1452
1453 tx_repo
1455 .expect_find_by_status_paginated()
1456 .withf(|_relayer_id, statuses, query, oldest_first| {
1457 statuses == [TransactionStatus::Confirmed]
1458 && query.page == 1
1459 && query.per_page == 1
1460 && !(*oldest_first)
1461 })
1462 .returning(|_, _, _, _| {
1463 Ok(crate::repositories::PaginatedResult {
1464 items: vec![],
1465 total: 0,
1466 page: 1,
1467 per_page: 1,
1468 })
1469 })
1470 .once();
1471
1472 let relayer = EvmRelayer::new(
1473 relayer_model.clone(),
1474 signer,
1475 provider,
1476 create_test_evm_network(),
1477 Arc::new(relayer_repo),
1478 Arc::new(network_repo),
1479 Arc::new(tx_repo),
1480 Arc::new(counter),
1481 Arc::new(job_producer),
1482 )
1483 .unwrap();
1484
1485 let status = relayer.get_status().await.unwrap();
1487 match status {
1488 RelayerStatus::Evm { nonce, .. } => {
1489 assert_eq!(nonce, "0");
1490 }
1491 _ => panic!("Expected Evm status"),
1492 }
1493 }
1494
1495 #[tokio::test]
1496 async fn test_get_status_repository_pending_error() {
1497 let (
1498 mut provider,
1499 relayer_repo,
1500 network_repo,
1501 mut tx_repo,
1502 job_producer,
1503 signer,
1504 mut counter,
1505 ) = setup_mocks();
1506 let relayer_model = create_test_relayer();
1507
1508 counter
1510 .expect_get()
1511 .returning(|| Box::pin(ready(Ok(Some(10u64)))))
1512 .once();
1513 provider
1514 .expect_get_balance()
1515 .returning(|_| Box::pin(ready(Ok(U256::from(1000000000000000000u64)))));
1516
1517 tx_repo
1518 .expect_count_by_status()
1519 .withf(|relayer_id, statuses| {
1520 relayer_id == "test-relayer-id"
1521 && statuses
1522 == [
1523 TransactionStatus::Pending,
1524 TransactionStatus::Sent,
1525 TransactionStatus::Submitted,
1526 ]
1527 })
1528 .returning(|_, _| Err(RepositoryError::Unknown("DB down".to_string())))
1529 .once();
1530
1531 let relayer = EvmRelayer::new(
1532 relayer_model.clone(),
1533 signer,
1534 provider,
1535 create_test_evm_network(),
1536 Arc::new(relayer_repo),
1537 Arc::new(network_repo),
1538 Arc::new(tx_repo),
1539 Arc::new(counter),
1540 Arc::new(job_producer),
1541 )
1542 .unwrap();
1543
1544 let result = relayer.get_status().await;
1545 assert!(result.is_err());
1546 match result.err().unwrap() {
1547 RelayerError::NetworkConfiguration(msg) => assert!(msg.contains("DB down")),
1549 _ => panic!("Expected NetworkConfiguration error for repo failure"),
1550 }
1551 }
1552
1553 #[tokio::test]
1554 async fn test_get_status_no_confirmed_transactions() {
1555 let (
1556 mut provider,
1557 relayer_repo,
1558 network_repo,
1559 mut tx_repo,
1560 job_producer,
1561 signer,
1562 mut counter,
1563 ) = setup_mocks();
1564 let relayer_model = create_test_relayer();
1565
1566 counter
1568 .expect_get()
1569 .returning(|| Box::pin(ready(Ok(Some(10u64)))));
1570 provider
1571 .expect_get_balance()
1572 .returning(|_| Box::pin(ready(Ok(U256::from(1000000000000000000u64)))));
1573 provider
1574 .expect_health_check()
1575 .returning(|| Box::pin(ready(Ok(true))));
1576
1577 tx_repo
1579 .expect_count_by_status()
1580 .withf(|relayer_id, statuses| {
1581 relayer_id == "test-relayer-id"
1582 && statuses
1583 == [
1584 TransactionStatus::Pending,
1585 TransactionStatus::Sent,
1586 TransactionStatus::Submitted,
1587 ]
1588 })
1589 .returning(|_, _| Ok(0u64))
1590 .once();
1591
1592 let relayer_id_clone = relayer_model.id.clone();
1594 tx_repo
1595 .expect_find_by_status_paginated()
1596 .withf(move |relayer_id, statuses, query, oldest_first| {
1597 *relayer_id == relayer_id_clone
1598 && statuses == [TransactionStatus::Confirmed]
1599 && query.page == 1
1600 && query.per_page == 1
1601 && !(*oldest_first)
1602 })
1603 .returning(|_, _, _, _| {
1604 Ok(crate::repositories::PaginatedResult {
1605 items: vec![],
1606 total: 0,
1607 page: 1,
1608 per_page: 1,
1609 })
1610 })
1611 .once();
1612
1613 let relayer = EvmRelayer::new(
1614 relayer_model.clone(),
1615 signer,
1616 provider,
1617 create_test_evm_network(),
1618 Arc::new(relayer_repo),
1619 Arc::new(network_repo),
1620 Arc::new(tx_repo),
1621 Arc::new(counter),
1622 Arc::new(job_producer),
1623 )
1624 .unwrap();
1625
1626 let status = relayer.get_status().await.unwrap();
1627 match status {
1628 RelayerStatus::Evm {
1629 balance,
1630 pending_transactions_count,
1631 last_confirmed_transaction_timestamp,
1632 system_disabled,
1633 paused,
1634 nonce,
1635 } => {
1636 assert_eq!(balance, "1000000000000000000");
1637 assert_eq!(pending_transactions_count, 0);
1638 assert_eq!(last_confirmed_transaction_timestamp, None);
1639 assert_eq!(system_disabled, relayer_model.system_disabled);
1640 assert_eq!(paused, relayer_model.paused);
1641 assert_eq!(nonce, "10");
1642 }
1643 _ => panic!("Expected EVM RelayerStatus"),
1644 }
1645 }
1646
1647 #[tokio::test]
1648 async fn test_cancel_transaction_via_job_success() {
1649 let (provider, relayer_repo, network_repo, tx_repo, mut job_producer, signer, counter) =
1650 setup_mocks();
1651 let relayer_model = create_test_relayer();
1652
1653 let test_transaction = TransactionRepoModel {
1654 id: "test-tx-id".to_string(),
1655 relayer_id: relayer_model.id.clone(),
1656 status: TransactionStatus::Pending,
1657 ..TransactionRepoModel::default()
1658 };
1659
1660 job_producer
1661 .expect_produce_submit_transaction_job()
1662 .withf(|job, delay| {
1663 matches!(job.command, crate::jobs::TransactionCommand::Cancel { ref reason }
1664 if job.transaction_id == "test-tx-id"
1665 && job.relayer_id == "test-relayer-id"
1666 && reason == "Cancelled via delete_pending_transactions")
1667 && delay.is_none()
1668 })
1669 .returning(|_, _| Box::pin(ready(Ok(()))))
1670 .once();
1671
1672 let relayer = EvmRelayer::new(
1673 relayer_model,
1674 signer,
1675 provider,
1676 create_test_evm_network(),
1677 Arc::new(relayer_repo),
1678 Arc::new(network_repo),
1679 Arc::new(tx_repo),
1680 Arc::new(counter),
1681 Arc::new(job_producer),
1682 )
1683 .unwrap();
1684
1685 let result = relayer.cancel_transaction_via_job(test_transaction).await;
1686 assert!(result.is_ok());
1687 }
1688
1689 #[tokio::test]
1690 async fn test_cancel_transaction_via_job_failure() {
1691 let (provider, relayer_repo, network_repo, tx_repo, mut job_producer, signer, counter) =
1692 setup_mocks();
1693 let relayer_model = create_test_relayer();
1694
1695 let test_transaction = TransactionRepoModel {
1696 id: "test-tx-id".to_string(),
1697 relayer_id: relayer_model.id.clone(),
1698 status: TransactionStatus::Pending,
1699 ..TransactionRepoModel::default()
1700 };
1701
1702 job_producer
1703 .expect_produce_submit_transaction_job()
1704 .returning(|_, _| {
1705 Box::pin(ready(Err(crate::jobs::JobProducerError::QueueError(
1706 "Queue is full".to_string(),
1707 ))))
1708 })
1709 .once();
1710
1711 let relayer = EvmRelayer::new(
1712 relayer_model,
1713 signer,
1714 provider,
1715 create_test_evm_network(),
1716 Arc::new(relayer_repo),
1717 Arc::new(network_repo),
1718 Arc::new(tx_repo),
1719 Arc::new(counter),
1720 Arc::new(job_producer),
1721 )
1722 .unwrap();
1723
1724 let result = relayer.cancel_transaction_via_job(test_transaction).await;
1725 assert!(result.is_err());
1726 match result.err().unwrap() {
1727 RelayerError::QueueError(_) => (),
1728 _ => panic!("Expected QueueError"),
1729 }
1730 }
1731
1732 #[tokio::test]
1733 async fn test_delete_pending_transactions_no_pending() {
1734 let (provider, relayer_repo, network_repo, mut tx_repo, job_producer, signer, counter) =
1735 setup_mocks();
1736 let relayer_model = create_test_relayer();
1737
1738 tx_repo
1739 .expect_find_by_status()
1740 .withf(|relayer_id, statuses| {
1741 relayer_id == "test-relayer-id"
1742 && statuses
1743 == [
1744 TransactionStatus::Pending,
1745 TransactionStatus::Sent,
1746 TransactionStatus::Submitted,
1747 ]
1748 })
1749 .returning(|_, _| Ok(vec![]))
1750 .once();
1751
1752 let relayer = EvmRelayer::new(
1753 relayer_model,
1754 signer,
1755 provider,
1756 create_test_evm_network(),
1757 Arc::new(relayer_repo),
1758 Arc::new(network_repo),
1759 Arc::new(tx_repo),
1760 Arc::new(counter),
1761 Arc::new(job_producer),
1762 )
1763 .unwrap();
1764
1765 let result = relayer.delete_pending_transactions().await.unwrap();
1766 assert_eq!(result.queued_for_cancellation_transaction_ids.len(), 0);
1767 assert_eq!(result.failed_to_queue_transaction_ids.len(), 0);
1768 assert_eq!(result.total_processed, 0);
1769 }
1770
1771 #[tokio::test]
1772 async fn test_delete_pending_transactions_all_successful() {
1773 let (provider, relayer_repo, network_repo, mut tx_repo, mut job_producer, signer, counter) =
1774 setup_mocks();
1775 let relayer_model = create_test_relayer();
1776
1777 let pending_transactions = vec![
1778 TransactionRepoModel {
1779 id: "tx1".to_string(),
1780 relayer_id: relayer_model.id.clone(),
1781 status: TransactionStatus::Pending,
1782 ..TransactionRepoModel::default()
1783 },
1784 TransactionRepoModel {
1785 id: "tx2".to_string(),
1786 relayer_id: relayer_model.id.clone(),
1787 status: TransactionStatus::Sent,
1788 ..TransactionRepoModel::default()
1789 },
1790 TransactionRepoModel {
1791 id: "tx3".to_string(),
1792 relayer_id: relayer_model.id.clone(),
1793 status: TransactionStatus::Submitted,
1794 ..TransactionRepoModel::default()
1795 },
1796 ];
1797
1798 tx_repo
1799 .expect_find_by_status()
1800 .withf(|relayer_id, statuses| {
1801 relayer_id == "test-relayer-id"
1802 && statuses
1803 == [
1804 TransactionStatus::Pending,
1805 TransactionStatus::Sent,
1806 TransactionStatus::Submitted,
1807 ]
1808 })
1809 .returning(move |_, _| Ok(pending_transactions.clone()))
1810 .once();
1811
1812 job_producer
1813 .expect_produce_submit_transaction_job()
1814 .returning(|_, _| Box::pin(ready(Ok(()))))
1815 .times(3);
1816
1817 let relayer = EvmRelayer::new(
1818 relayer_model,
1819 signer,
1820 provider,
1821 create_test_evm_network(),
1822 Arc::new(relayer_repo),
1823 Arc::new(network_repo),
1824 Arc::new(tx_repo),
1825 Arc::new(counter),
1826 Arc::new(job_producer),
1827 )
1828 .unwrap();
1829
1830 let result = relayer.delete_pending_transactions().await.unwrap();
1831 assert_eq!(result.queued_for_cancellation_transaction_ids.len(), 3);
1832 assert_eq!(result.failed_to_queue_transaction_ids.len(), 0);
1833 assert_eq!(result.total_processed, 3);
1834
1835 let expected_ids = vec!["tx1", "tx2", "tx3"];
1836 for id in expected_ids {
1837 assert!(result
1838 .queued_for_cancellation_transaction_ids
1839 .contains(&id.to_string()));
1840 }
1841 }
1842
1843 #[tokio::test]
1844 async fn test_delete_pending_transactions_partial_failures() {
1845 let (provider, relayer_repo, network_repo, mut tx_repo, mut job_producer, signer, counter) =
1846 setup_mocks();
1847 let relayer_model = create_test_relayer();
1848
1849 let pending_transactions = vec![
1850 TransactionRepoModel {
1851 id: "tx1".to_string(),
1852 relayer_id: relayer_model.id.clone(),
1853 status: TransactionStatus::Pending,
1854 ..TransactionRepoModel::default()
1855 },
1856 TransactionRepoModel {
1857 id: "tx2".to_string(),
1858 relayer_id: relayer_model.id.clone(),
1859 status: TransactionStatus::Sent,
1860 ..TransactionRepoModel::default()
1861 },
1862 TransactionRepoModel {
1863 id: "tx3".to_string(),
1864 relayer_id: relayer_model.id.clone(),
1865 status: TransactionStatus::Submitted,
1866 ..TransactionRepoModel::default()
1867 },
1868 ];
1869
1870 tx_repo
1871 .expect_find_by_status()
1872 .withf(|relayer_id, statuses| {
1873 relayer_id == "test-relayer-id"
1874 && statuses
1875 == [
1876 TransactionStatus::Pending,
1877 TransactionStatus::Sent,
1878 TransactionStatus::Submitted,
1879 ]
1880 })
1881 .returning(move |_, _| Ok(pending_transactions.clone()))
1882 .once();
1883
1884 job_producer
1886 .expect_produce_submit_transaction_job()
1887 .returning(|_, _| Box::pin(ready(Ok(()))))
1888 .times(1);
1889 job_producer
1890 .expect_produce_submit_transaction_job()
1891 .returning(|_, _| {
1892 Box::pin(ready(Err(crate::jobs::JobProducerError::QueueError(
1893 "Queue is full".to_string(),
1894 ))))
1895 })
1896 .times(1);
1897 job_producer
1898 .expect_produce_submit_transaction_job()
1899 .returning(|_, _| Box::pin(ready(Ok(()))))
1900 .times(1);
1901
1902 let relayer = EvmRelayer::new(
1903 relayer_model,
1904 signer,
1905 provider,
1906 create_test_evm_network(),
1907 Arc::new(relayer_repo),
1908 Arc::new(network_repo),
1909 Arc::new(tx_repo),
1910 Arc::new(counter),
1911 Arc::new(job_producer),
1912 )
1913 .unwrap();
1914
1915 let result = relayer.delete_pending_transactions().await.unwrap();
1916 assert_eq!(result.queued_for_cancellation_transaction_ids.len(), 2);
1917 assert_eq!(result.failed_to_queue_transaction_ids.len(), 1);
1918 assert_eq!(result.total_processed, 3);
1919 }
1920
1921 #[tokio::test]
1922 async fn test_delete_pending_transactions_repository_error() {
1923 let (provider, relayer_repo, network_repo, mut tx_repo, job_producer, signer, counter) =
1924 setup_mocks();
1925 let relayer_model = create_test_relayer();
1926
1927 tx_repo
1928 .expect_find_by_status()
1929 .withf(|relayer_id, statuses| {
1930 relayer_id == "test-relayer-id"
1931 && statuses
1932 == [
1933 TransactionStatus::Pending,
1934 TransactionStatus::Sent,
1935 TransactionStatus::Submitted,
1936 ]
1937 })
1938 .returning(|_, _| {
1939 Err(RepositoryError::Unknown(
1940 "Database connection failed".to_string(),
1941 ))
1942 })
1943 .once();
1944
1945 let relayer = EvmRelayer::new(
1946 relayer_model,
1947 signer,
1948 provider,
1949 create_test_evm_network(),
1950 Arc::new(relayer_repo),
1951 Arc::new(network_repo),
1952 Arc::new(tx_repo),
1953 Arc::new(counter),
1954 Arc::new(job_producer),
1955 )
1956 .unwrap();
1957
1958 let result = relayer.delete_pending_transactions().await;
1959 assert!(result.is_err());
1960 match result.err().unwrap() {
1961 RelayerError::NetworkConfiguration(msg) => {
1962 assert!(msg.contains("Database connection failed"))
1963 }
1964 _ => panic!("Expected NetworkConfiguration error for repository failure"),
1965 }
1966 }
1967
1968 #[tokio::test]
1969 async fn test_delete_pending_transactions_all_failures() {
1970 let (provider, relayer_repo, network_repo, mut tx_repo, mut job_producer, signer, counter) =
1971 setup_mocks();
1972 let relayer_model = create_test_relayer();
1973
1974 let pending_transactions = vec![
1975 TransactionRepoModel {
1976 id: "tx1".to_string(),
1977 relayer_id: relayer_model.id.clone(),
1978 status: TransactionStatus::Pending,
1979 ..TransactionRepoModel::default()
1980 },
1981 TransactionRepoModel {
1982 id: "tx2".to_string(),
1983 relayer_id: relayer_model.id.clone(),
1984 status: TransactionStatus::Sent,
1985 ..TransactionRepoModel::default()
1986 },
1987 ];
1988
1989 tx_repo
1990 .expect_find_by_status()
1991 .withf(|relayer_id, statuses| {
1992 relayer_id == "test-relayer-id"
1993 && statuses
1994 == [
1995 TransactionStatus::Pending,
1996 TransactionStatus::Sent,
1997 TransactionStatus::Submitted,
1998 ]
1999 })
2000 .returning(move |_, _| Ok(pending_transactions.clone()))
2001 .once();
2002
2003 job_producer
2004 .expect_produce_submit_transaction_job()
2005 .returning(|_, _| {
2006 Box::pin(ready(Err(crate::jobs::JobProducerError::QueueError(
2007 "Queue is full".to_string(),
2008 ))))
2009 })
2010 .times(2);
2011
2012 let relayer = EvmRelayer::new(
2013 relayer_model,
2014 signer,
2015 provider,
2016 create_test_evm_network(),
2017 Arc::new(relayer_repo),
2018 Arc::new(network_repo),
2019 Arc::new(tx_repo),
2020 Arc::new(counter),
2021 Arc::new(job_producer),
2022 )
2023 .unwrap();
2024
2025 let result = relayer.delete_pending_transactions().await.unwrap();
2026 assert_eq!(result.queued_for_cancellation_transaction_ids.len(), 0);
2027 assert_eq!(result.failed_to_queue_transaction_ids.len(), 2);
2028 assert_eq!(result.total_processed, 2);
2029
2030 let expected_failed_ids = vec!["tx1", "tx2"];
2031 for id in expected_failed_ids {
2032 assert!(result
2033 .failed_to_queue_transaction_ids
2034 .contains(&id.to_string()));
2035 }
2036 }
2037
2038 #[tokio::test]
2039 async fn test_rpc_eth_get_balance() {
2040 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2041 setup_mocks();
2042 let relayer_model = create_test_relayer();
2043
2044 provider
2045 .expect_raw_request_dyn()
2046 .withf(|method, params| {
2047 method == "eth_getBalance"
2048 && params.as_str()
2049 == Some(r#"["0x742d35Cc6634C0532925a3b844Bc454e4438f44e", "latest"]"#)
2050 })
2051 .returning(|_, _| Box::pin(async { Ok(serde_json::json!("0xde0b6b3a7640000")) }));
2052
2053 let relayer = EvmRelayer::new(
2054 relayer_model,
2055 signer,
2056 provider,
2057 create_test_evm_network(),
2058 Arc::new(relayer_repo),
2059 Arc::new(network_repo),
2060 Arc::new(tx_repo),
2061 Arc::new(counter),
2062 Arc::new(job_producer),
2063 )
2064 .unwrap();
2065
2066 let request = JsonRpcRequest {
2067 jsonrpc: "2.0".to_string(),
2068 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2069 method: "eth_getBalance".to_string(),
2070 params: serde_json::Value::String(
2071 r#"["0x742d35Cc6634C0532925a3b844Bc454e4438f44e", "latest"]"#.to_string(),
2072 ),
2073 }),
2074 id: Some(JsonRpcId::Number(1)),
2075 };
2076
2077 let response = relayer.rpc(request).await.unwrap();
2078 assert!(response.error.is_none());
2079 assert!(response.result.is_some());
2080
2081 if let Some(NetworkRpcResult::Evm(EvmRpcResult::RawRpcResult(result))) = response.result {
2082 assert_eq!(result, serde_json::json!("0xde0b6b3a7640000")); }
2084 }
2085
2086 #[tokio::test]
2087 async fn test_rpc_eth_block_number() {
2088 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2089 setup_mocks();
2090 let relayer_model = create_test_relayer();
2091
2092 provider
2093 .expect_raw_request_dyn()
2094 .withf(|method, params| method == "eth_blockNumber" && params.as_str() == Some("[]"))
2095 .returning(|_, _| Box::pin(async { Ok(serde_json::json!("0x3039")) }));
2096
2097 let relayer = EvmRelayer::new(
2098 relayer_model,
2099 signer,
2100 provider,
2101 create_test_evm_network(),
2102 Arc::new(relayer_repo),
2103 Arc::new(network_repo),
2104 Arc::new(tx_repo),
2105 Arc::new(counter),
2106 Arc::new(job_producer),
2107 )
2108 .unwrap();
2109
2110 let request = JsonRpcRequest {
2111 jsonrpc: "2.0".to_string(),
2112 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2113 method: "eth_blockNumber".to_string(),
2114 params: serde_json::Value::String("[]".to_string()),
2115 }),
2116 id: Some(JsonRpcId::Number(1)),
2117 };
2118
2119 let response = relayer.rpc(request).await.unwrap();
2120 assert!(response.error.is_none());
2121 assert!(response.result.is_some());
2122
2123 if let Some(NetworkRpcResult::Evm(EvmRpcResult::RawRpcResult(result))) = response.result {
2124 assert_eq!(result, serde_json::json!("0x3039")); }
2126 }
2127
2128 #[tokio::test]
2129 async fn test_rpc_unsupported_method() {
2130 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2131 setup_mocks();
2132 let relayer_model = create_test_relayer();
2133
2134 provider
2135 .expect_raw_request_dyn()
2136 .withf(|method, _| method == "eth_unsupportedMethod")
2137 .returning(|_, _| {
2138 Box::pin(async {
2139 Err(ProviderError::Other(
2140 "Unsupported method: eth_unsupportedMethod".to_string(),
2141 ))
2142 })
2143 });
2144
2145 let relayer = EvmRelayer::new(
2146 relayer_model,
2147 signer,
2148 provider,
2149 create_test_evm_network(),
2150 Arc::new(relayer_repo),
2151 Arc::new(network_repo),
2152 Arc::new(tx_repo),
2153 Arc::new(counter),
2154 Arc::new(job_producer),
2155 )
2156 .unwrap();
2157
2158 let request = JsonRpcRequest {
2159 jsonrpc: "2.0".to_string(),
2160 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2161 method: "eth_unsupportedMethod".to_string(),
2162 params: serde_json::Value::String("[]".to_string()),
2163 }),
2164 id: Some(JsonRpcId::Number(1)),
2165 };
2166
2167 let response = relayer.rpc(request).await.unwrap();
2168 assert!(response.result.is_none());
2169 assert!(response.error.is_some());
2170
2171 let error = response.error.unwrap();
2172 assert_eq!(error.code, -32603); }
2174
2175 #[tokio::test]
2176 async fn test_rpc_invalid_params() {
2177 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2178 setup_mocks();
2179 let relayer_model = create_test_relayer();
2180
2181 provider
2182 .expect_raw_request_dyn()
2183 .withf(|method, params| method == "eth_getBalance" && params.as_str() == Some("[]"))
2184 .returning(|_, _| {
2185 Box::pin(async {
2186 Err(ProviderError::Other(
2187 "Missing address parameter".to_string(),
2188 ))
2189 })
2190 });
2191
2192 let relayer = EvmRelayer::new(
2193 relayer_model,
2194 signer,
2195 provider,
2196 create_test_evm_network(),
2197 Arc::new(relayer_repo),
2198 Arc::new(network_repo),
2199 Arc::new(tx_repo),
2200 Arc::new(counter),
2201 Arc::new(job_producer),
2202 )
2203 .unwrap();
2204
2205 let request = JsonRpcRequest {
2206 jsonrpc: "2.0".to_string(),
2207 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2208 method: "eth_getBalance".to_string(),
2209 params: serde_json::Value::String("[]".to_string()), }),
2211 id: Some(JsonRpcId::Number(1)),
2212 };
2213
2214 let response = relayer.rpc(request).await.unwrap();
2215 assert!(response.result.is_none());
2216 assert!(response.error.is_some());
2217
2218 let error = response.error.unwrap();
2219 assert_eq!(error.code, -32603); }
2221
2222 #[tokio::test]
2223 async fn test_rpc_non_evm_request() {
2224 let (provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2225 setup_mocks();
2226 let relayer_model = create_test_relayer();
2227
2228 let relayer = EvmRelayer::new(
2229 relayer_model,
2230 signer,
2231 provider,
2232 create_test_evm_network(),
2233 Arc::new(relayer_repo),
2234 Arc::new(network_repo),
2235 Arc::new(tx_repo),
2236 Arc::new(counter),
2237 Arc::new(job_producer),
2238 )
2239 .unwrap();
2240
2241 let request = JsonRpcRequest {
2242 jsonrpc: "2.0".to_string(),
2243 params: NetworkRpcRequest::Solana(crate::models::SolanaRpcRequest::GetSupportedTokens(
2244 crate::models::SolanaGetSupportedTokensRequestParams {},
2245 )),
2246 id: Some(JsonRpcId::Number(1)),
2247 };
2248
2249 let response = relayer.rpc(request).await.unwrap();
2250 assert!(response.result.is_none());
2251 assert!(response.error.is_some());
2252
2253 let error = response.error.unwrap();
2254 assert_eq!(error.code, -32602); }
2256
2257 #[tokio::test]
2258 async fn test_rpc_raw_request_with_array_params() {
2259 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2260 setup_mocks();
2261 let relayer_model = create_test_relayer();
2262
2263 provider
2264 .expect_raw_request_dyn()
2265 .withf(|method, params| {
2266 method == "eth_getTransactionByHash"
2267 && params.as_array().is_some_and(|arr| {
2268 arr.len() == 1 && arr[0].as_str() == Some("0x1234567890abcdef")
2269 })
2270 })
2271 .returning(|_, _| {
2272 Box::pin(async {
2273 Ok(serde_json::json!({
2274 "hash": "0x1234567890abcdef",
2275 "blockNumber": "0x1",
2276 "gasUsed": "0x5208"
2277 }))
2278 })
2279 });
2280
2281 let relayer = EvmRelayer::new(
2282 relayer_model,
2283 signer,
2284 provider,
2285 create_test_evm_network(),
2286 Arc::new(relayer_repo),
2287 Arc::new(network_repo),
2288 Arc::new(tx_repo),
2289 Arc::new(counter),
2290 Arc::new(job_producer),
2291 )
2292 .unwrap();
2293
2294 let request = JsonRpcRequest {
2295 jsonrpc: "2.0".to_string(),
2296 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2297 method: "eth_getTransactionByHash".to_string(),
2298 params: serde_json::json!(["0x1234567890abcdef"]),
2299 }),
2300 id: Some(JsonRpcId::Number(42)),
2301 };
2302
2303 let response = relayer.rpc(request).await.unwrap();
2304 assert!(response.error.is_none());
2305 assert!(response.result.is_some());
2306 assert_eq!(response.id, Some(JsonRpcId::Number(42)));
2307
2308 if let Some(NetworkRpcResult::Evm(EvmRpcResult::RawRpcResult(result))) = response.result {
2309 assert!(result.get("hash").is_some());
2310 assert!(result.get("blockNumber").is_some());
2311 }
2312 }
2313
2314 #[tokio::test]
2315 async fn test_rpc_raw_request_with_object_params() {
2316 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2317 setup_mocks();
2318 let relayer_model = create_test_relayer();
2319
2320 provider
2321 .expect_raw_request_dyn()
2322 .withf(|method, params| {
2323 method == "eth_call"
2324 && params
2325 .as_object()
2326 .is_some_and(|obj| obj.contains_key("to") && obj.contains_key("data"))
2327 })
2328 .returning(|_, _| {
2329 Box::pin(async {
2330 Ok(serde_json::json!(
2331 "0x0000000000000000000000000000000000000000000000000000000000000001"
2332 ))
2333 })
2334 });
2335
2336 let relayer = EvmRelayer::new(
2337 relayer_model,
2338 signer,
2339 provider,
2340 create_test_evm_network(),
2341 Arc::new(relayer_repo),
2342 Arc::new(network_repo),
2343 Arc::new(tx_repo),
2344 Arc::new(counter),
2345 Arc::new(job_producer),
2346 )
2347 .unwrap();
2348
2349 let request = JsonRpcRequest {
2350 jsonrpc: "2.0".to_string(),
2351 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2352 method: "eth_call".to_string(),
2353 params: serde_json::json!({
2354 "to": "0x742d35Cc6634C0532925a3b844Bc454e4438f44e",
2355 "data": "0x70a08231000000000000000000000000742d35cc6634c0532925a3b844bc454e4438f44e"
2356 }),
2357 }),
2358 id: Some(JsonRpcId::Number(123)),
2359 };
2360
2361 let response = relayer.rpc(request).await.unwrap();
2362 assert!(response.error.is_none());
2363 assert!(response.result.is_some());
2364 assert_eq!(response.id, Some(JsonRpcId::Number(123)));
2365 }
2366
2367 #[tokio::test]
2368 async fn test_rpc_generic_request_with_empty_params() {
2369 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2370 setup_mocks();
2371 let relayer_model = create_test_relayer();
2372
2373 provider
2374 .expect_raw_request_dyn()
2375 .withf(|method, params| method == "net_version" && params.as_str() == Some("[]"))
2376 .returning(|_, _| Box::pin(async { Ok(serde_json::json!("1")) }));
2377
2378 let relayer = EvmRelayer::new(
2379 relayer_model,
2380 signer,
2381 provider,
2382 create_test_evm_network(),
2383 Arc::new(relayer_repo),
2384 Arc::new(network_repo),
2385 Arc::new(tx_repo),
2386 Arc::new(counter),
2387 Arc::new(job_producer),
2388 )
2389 .unwrap();
2390
2391 let request = JsonRpcRequest {
2392 jsonrpc: "2.0".to_string(),
2393 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2394 method: "net_version".to_string(),
2395 params: serde_json::Value::String("[]".to_string()),
2396 }),
2397 id: Some(JsonRpcId::Number(999)),
2398 };
2399
2400 let response = relayer.rpc(request).await.unwrap();
2401 assert!(response.error.is_none());
2402 assert!(response.result.is_some());
2403 assert_eq!(response.id, Some(JsonRpcId::Number(999)));
2404 }
2405
2406 #[tokio::test]
2407 async fn test_rpc_provider_invalid_address_error() {
2408 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2409 setup_mocks();
2410 let relayer_model = create_test_relayer();
2411
2412 provider.expect_raw_request_dyn().returning(|_, _| {
2413 Box::pin(async {
2414 Err(ProviderError::InvalidAddress(
2415 "Invalid address format".to_string(),
2416 ))
2417 })
2418 });
2419
2420 let relayer = EvmRelayer::new(
2421 relayer_model,
2422 signer,
2423 provider,
2424 create_test_evm_network(),
2425 Arc::new(relayer_repo),
2426 Arc::new(network_repo),
2427 Arc::new(tx_repo),
2428 Arc::new(counter),
2429 Arc::new(job_producer),
2430 )
2431 .unwrap();
2432
2433 let request = JsonRpcRequest {
2434 jsonrpc: "2.0".to_string(),
2435 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2436 method: "eth_getBalance".to_string(),
2437 params: serde_json::Value::String(r#"["invalid_address", "latest"]"#.to_string()),
2438 }),
2439 id: Some(JsonRpcId::Number(1)),
2440 };
2441
2442 let response = relayer.rpc(request).await.unwrap();
2443 assert!(response.result.is_none());
2444 assert!(response.error.is_some());
2445
2446 let error = response.error.unwrap();
2447 assert_eq!(error.code, -32602); }
2449
2450 #[tokio::test]
2451 async fn test_rpc_provider_network_configuration_error() {
2452 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2453 setup_mocks();
2454 let relayer_model = create_test_relayer();
2455
2456 provider.expect_raw_request_dyn().returning(|_, _| {
2457 Box::pin(async {
2458 Err(ProviderError::NetworkConfiguration(
2459 "Network not reachable".to_string(),
2460 ))
2461 })
2462 });
2463
2464 let relayer = EvmRelayer::new(
2465 relayer_model,
2466 signer,
2467 provider,
2468 create_test_evm_network(),
2469 Arc::new(relayer_repo),
2470 Arc::new(network_repo),
2471 Arc::new(tx_repo),
2472 Arc::new(counter),
2473 Arc::new(job_producer),
2474 )
2475 .unwrap();
2476
2477 let request = JsonRpcRequest {
2478 jsonrpc: "2.0".to_string(),
2479 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2480 method: "eth_chainId".to_string(),
2481 params: serde_json::Value::String("[]".to_string()),
2482 }),
2483 id: Some(JsonRpcId::Number(2)),
2484 };
2485
2486 let response = relayer.rpc(request).await.unwrap();
2487 assert!(response.result.is_none());
2488 assert!(response.error.is_some());
2489
2490 let error = response.error.unwrap();
2491 assert_eq!(error.code, -33004); }
2493
2494 #[tokio::test]
2495 async fn test_rpc_provider_timeout_error() {
2496 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2497 setup_mocks();
2498 let relayer_model = create_test_relayer();
2499
2500 provider
2501 .expect_raw_request_dyn()
2502 .returning(|_, _| Box::pin(async { Err(ProviderError::Timeout) }));
2503
2504 let relayer = EvmRelayer::new(
2505 relayer_model,
2506 signer,
2507 provider,
2508 create_test_evm_network(),
2509 Arc::new(relayer_repo),
2510 Arc::new(network_repo),
2511 Arc::new(tx_repo),
2512 Arc::new(counter),
2513 Arc::new(job_producer),
2514 )
2515 .unwrap();
2516
2517 let request = JsonRpcRequest {
2518 jsonrpc: "2.0".to_string(),
2519 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2520 method: "eth_blockNumber".to_string(),
2521 params: serde_json::json!([]),
2522 }),
2523 id: Some(JsonRpcId::Number(3)),
2524 };
2525
2526 let response = relayer.rpc(request).await.unwrap();
2527 assert!(response.result.is_none());
2528 assert!(response.error.is_some());
2529
2530 let error = response.error.unwrap();
2531 assert_eq!(error.code, -33000); }
2533
2534 #[tokio::test]
2535 async fn test_rpc_provider_rate_limited_error() {
2536 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2537 setup_mocks();
2538 let relayer_model = create_test_relayer();
2539
2540 provider
2541 .expect_raw_request_dyn()
2542 .returning(|_, _| Box::pin(async { Err(ProviderError::RateLimited) }));
2543
2544 let relayer = EvmRelayer::new(
2545 relayer_model,
2546 signer,
2547 provider,
2548 create_test_evm_network(),
2549 Arc::new(relayer_repo),
2550 Arc::new(network_repo),
2551 Arc::new(tx_repo),
2552 Arc::new(counter),
2553 Arc::new(job_producer),
2554 )
2555 .unwrap();
2556
2557 let request = JsonRpcRequest {
2558 jsonrpc: "2.0".to_string(),
2559 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2560 method: "eth_getBalance".to_string(),
2561 params: serde_json::Value::String(
2562 r#"["0x742d35Cc6634C0532925a3b844Bc454e4438f44e", "latest"]"#.to_string(),
2563 ),
2564 }),
2565 id: Some(JsonRpcId::Number(4)),
2566 };
2567
2568 let response = relayer.rpc(request).await.unwrap();
2569 assert!(response.result.is_none());
2570 assert!(response.error.is_some());
2571
2572 let error = response.error.unwrap();
2573 assert_eq!(error.code, -33001); }
2575
2576 #[tokio::test]
2577 async fn test_rpc_provider_bad_gateway_error() {
2578 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2579 setup_mocks();
2580 let relayer_model = create_test_relayer();
2581
2582 provider
2583 .expect_raw_request_dyn()
2584 .returning(|_, _| Box::pin(async { Err(ProviderError::BadGateway) }));
2585
2586 let relayer = EvmRelayer::new(
2587 relayer_model,
2588 signer,
2589 provider,
2590 create_test_evm_network(),
2591 Arc::new(relayer_repo),
2592 Arc::new(network_repo),
2593 Arc::new(tx_repo),
2594 Arc::new(counter),
2595 Arc::new(job_producer),
2596 )
2597 .unwrap();
2598
2599 let request = JsonRpcRequest {
2600 jsonrpc: "2.0".to_string(),
2601 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2602 method: "eth_gasPrice".to_string(),
2603 params: serde_json::json!([]),
2604 }),
2605 id: Some(JsonRpcId::Number(5)),
2606 };
2607
2608 let response = relayer.rpc(request).await.unwrap();
2609 assert!(response.result.is_none());
2610 assert!(response.error.is_some());
2611
2612 let error = response.error.unwrap();
2613 assert_eq!(error.code, -33002); }
2615
2616 #[tokio::test]
2617 async fn test_rpc_provider_request_error() {
2618 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2619 setup_mocks();
2620 let relayer_model = create_test_relayer();
2621
2622 provider.expect_raw_request_dyn().returning(|_, _| {
2623 Box::pin(async {
2624 Err(ProviderError::RequestError {
2625 error: "Bad request".to_string(),
2626 status_code: 400,
2627 })
2628 })
2629 });
2630
2631 let relayer = EvmRelayer::new(
2632 relayer_model,
2633 signer,
2634 provider,
2635 create_test_evm_network(),
2636 Arc::new(relayer_repo),
2637 Arc::new(network_repo),
2638 Arc::new(tx_repo),
2639 Arc::new(counter),
2640 Arc::new(job_producer),
2641 )
2642 .unwrap();
2643
2644 let request = JsonRpcRequest {
2645 jsonrpc: "2.0".to_string(),
2646 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2647 method: "invalid_method".to_string(),
2648 params: serde_json::Value::String("{}".to_string()),
2649 }),
2650 id: Some(JsonRpcId::Number(6)),
2651 };
2652
2653 let response = relayer.rpc(request).await.unwrap();
2654 assert!(response.result.is_none());
2655 assert!(response.error.is_some());
2656
2657 let error = response.error.unwrap();
2658 assert_eq!(error.code, -33003); }
2660
2661 #[tokio::test]
2662 async fn test_rpc_provider_other_error() {
2663 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2664 setup_mocks();
2665 let relayer_model = create_test_relayer();
2666
2667 provider.expect_raw_request_dyn().returning(|_, _| {
2668 Box::pin(async {
2669 Err(ProviderError::Other(
2670 "Unexpected error occurred".to_string(),
2671 ))
2672 })
2673 });
2674
2675 let relayer = EvmRelayer::new(
2676 relayer_model,
2677 signer,
2678 provider,
2679 create_test_evm_network(),
2680 Arc::new(relayer_repo),
2681 Arc::new(network_repo),
2682 Arc::new(tx_repo),
2683 Arc::new(counter),
2684 Arc::new(job_producer),
2685 )
2686 .unwrap();
2687
2688 let request = JsonRpcRequest {
2689 jsonrpc: "2.0".to_string(),
2690 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2691 method: "eth_getBalance".to_string(),
2692 params: serde_json::json!(["0x742d35Cc6634C0532925a3b844Bc454e4438f44e", "latest"]),
2693 }),
2694 id: Some(JsonRpcId::Number(7)),
2695 };
2696
2697 let response = relayer.rpc(request).await.unwrap();
2698 assert!(response.result.is_none());
2699 assert!(response.error.is_some());
2700
2701 let error = response.error.unwrap();
2702 assert_eq!(error.code, -32603); }
2704
2705 #[tokio::test]
2706 async fn test_rpc_response_preserves_request_id() {
2707 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2708 setup_mocks();
2709 let relayer_model = create_test_relayer();
2710
2711 provider
2712 .expect_raw_request_dyn()
2713 .returning(|_, _| Box::pin(async { Ok(serde_json::json!("0x1")) }));
2714
2715 let relayer = EvmRelayer::new(
2716 relayer_model,
2717 signer,
2718 provider,
2719 create_test_evm_network(),
2720 Arc::new(relayer_repo),
2721 Arc::new(network_repo),
2722 Arc::new(tx_repo),
2723 Arc::new(counter),
2724 Arc::new(job_producer),
2725 )
2726 .unwrap();
2727
2728 let request_id = u64::MAX;
2729 let request = JsonRpcRequest {
2730 jsonrpc: "2.0".to_string(),
2731 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2732 method: "eth_chainId".to_string(),
2733 params: serde_json::Value::String("[]".to_string()),
2734 }),
2735 id: Some(JsonRpcId::Number(request_id as i64)),
2736 };
2737
2738 let response = relayer.rpc(request).await.unwrap();
2739 assert_eq!(response.id, Some(JsonRpcId::Number(request_id as i64)));
2740 assert_eq!(response.jsonrpc, "2.0");
2741 }
2742
2743 #[tokio::test]
2744 async fn test_rpc_handles_complex_json_response() {
2745 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2746 setup_mocks();
2747 let relayer_model = create_test_relayer();
2748
2749 let complex_response = serde_json::json!({
2750 "number": "0x1b4",
2751 "hash": "0xdc0818cf78f21a8e70579cb46a43643f78291264dda342ae31049421c82d21ae",
2752 "parentHash": "0xe99e022112df268ce40b8b654759b4f39c3cc1b8c86b2f4c7da48ba6d8a6ae8b",
2753 "transactions": [
2754 {
2755 "hash": "0x5c504ed432cb51138bcf09aa5e8a410dd4a1e204ef84bfed1be16dfba1b22060",
2756 "from": "0xa7d9ddbe1f17865597fbd27ec712455208b6b76d",
2757 "to": "0xf02c1c8e6114b1dbe8937a39260b5b0a374432bb",
2758 "value": "0xf3dbb76162000"
2759 }
2760 ],
2761 "gasUsed": "0x5208"
2762 });
2763
2764 provider.expect_raw_request_dyn().returning(move |_, _| {
2765 let response = complex_response.clone();
2766 Box::pin(async move { Ok(response) })
2767 });
2768
2769 let relayer = EvmRelayer::new(
2770 relayer_model,
2771 signer,
2772 provider,
2773 create_test_evm_network(),
2774 Arc::new(relayer_repo),
2775 Arc::new(network_repo),
2776 Arc::new(tx_repo),
2777 Arc::new(counter),
2778 Arc::new(job_producer),
2779 )
2780 .unwrap();
2781
2782 let request = JsonRpcRequest {
2783 jsonrpc: "2.0".to_string(),
2784 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2785 method: "eth_getBlockByNumber".to_string(),
2786 params: serde_json::json!(["0x1b4", true]),
2787 }),
2788 id: Some(JsonRpcId::Number(8)),
2789 };
2790
2791 let response = relayer.rpc(request).await.unwrap();
2792 assert!(response.error.is_none());
2793 assert!(response.result.is_some());
2794
2795 if let Some(NetworkRpcResult::Evm(EvmRpcResult::RawRpcResult(result))) = response.result {
2796 assert!(result.get("transactions").is_some());
2797 assert!(result.get("hash").is_some());
2798 assert!(result.get("gasUsed").is_some());
2799 }
2800 }
2801
2802 #[tokio::test]
2803 async fn test_initialize_relayer_disables_when_validation_fails() {
2804 let (
2805 mut provider,
2806 mut relayer_repo,
2807 network_repo,
2808 tx_repo,
2809 mut job_producer,
2810 signer,
2811 mut counter,
2812 ) = setup_mocks();
2813 let mut relayer_model = create_test_relayer();
2814 relayer_model.system_disabled = false; relayer_model.notification_id = Some("test-notification-id".to_string());
2816
2817 provider
2819 .expect_get_transaction_count()
2820 .returning(|_| Box::pin(ready(Err(ProviderError::Other("RPC error".to_string())))));
2821
2822 counter
2823 .expect_get()
2824 .returning(|| Box::pin(ready(Ok(Some(0u64)))));
2825
2826 provider
2828 .expect_get_balance()
2829 .returning(|_| Box::pin(ready(Ok(U256::from(200000000000000000u64)))));
2830
2831 provider
2832 .expect_health_check()
2833 .returning(|| Box::pin(ready(Ok(true))));
2834
2835 let mut disabled_relayer = relayer_model.clone();
2837 disabled_relayer.system_disabled = true;
2838 relayer_repo
2839 .expect_disable_relayer()
2840 .with(eq("test-relayer-id".to_string()), always())
2841 .returning(move |_, _| Ok(disabled_relayer.clone()));
2842
2843 job_producer
2845 .expect_produce_send_notification_job()
2846 .returning(|_, _| Box::pin(ready(Ok(()))));
2847
2848 job_producer
2850 .expect_produce_relayer_health_check_job()
2851 .returning(|_, _| Box::pin(ready(Ok(()))));
2852
2853 let relayer = EvmRelayer::new(
2854 relayer_model,
2855 signer,
2856 provider,
2857 create_test_evm_network(),
2858 Arc::new(relayer_repo),
2859 Arc::new(network_repo),
2860 Arc::new(tx_repo),
2861 Arc::new(counter),
2862 Arc::new(job_producer),
2863 )
2864 .unwrap();
2865
2866 let result = relayer.initialize_relayer().await;
2867 assert!(result.is_ok());
2868 }
2869
2870 #[tokio::test]
2871 async fn test_initialize_relayer_enables_when_validation_passes_and_was_disabled() {
2872 let (
2873 mut provider,
2874 mut relayer_repo,
2875 network_repo,
2876 tx_repo,
2877 job_producer,
2878 signer,
2879 mut counter,
2880 ) = setup_mocks();
2881 let mut relayer_model = create_test_relayer();
2882 relayer_model.system_disabled = true; provider
2886 .expect_get_transaction_count()
2887 .returning(|_| Box::pin(ready(Ok(42u64))));
2888
2889 counter.expect_set().returning(|_| Box::pin(ready(Ok(()))));
2890
2891 counter
2892 .expect_get()
2893 .returning(|| Box::pin(ready(Ok(Some(42u64)))));
2894
2895 provider
2896 .expect_get_balance()
2897 .returning(|_| Box::pin(ready(Ok(U256::from(200000000000000000u64))))); provider
2900 .expect_health_check()
2901 .returning(|| Box::pin(ready(Ok(true))));
2902
2903 let mut enabled_relayer = relayer_model.clone();
2905 enabled_relayer.system_disabled = false;
2906 relayer_repo
2907 .expect_enable_relayer()
2908 .with(eq("test-relayer-id".to_string()))
2909 .returning(move |_| Ok(enabled_relayer.clone()));
2910
2911 let relayer = EvmRelayer::new(
2912 relayer_model,
2913 signer,
2914 provider,
2915 create_test_evm_network(),
2916 Arc::new(relayer_repo),
2917 Arc::new(network_repo),
2918 Arc::new(tx_repo),
2919 Arc::new(counter),
2920 Arc::new(job_producer),
2921 )
2922 .unwrap();
2923
2924 let result = relayer.initialize_relayer().await;
2925 assert!(result.is_ok());
2926 }
2927
2928 #[tokio::test]
2929 async fn test_initialize_relayer_no_action_when_enabled_and_validation_passes() {
2930 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, mut counter) =
2931 setup_mocks();
2932 let mut relayer_model = create_test_relayer();
2933 relayer_model.system_disabled = false; provider
2937 .expect_get_transaction_count()
2938 .returning(|_| Box::pin(ready(Ok(42u64))));
2939
2940 counter.expect_set().returning(|_| Box::pin(ready(Ok(()))));
2941
2942 counter
2943 .expect_get()
2944 .returning(|| Box::pin(ready(Ok(Some(42u64)))));
2945
2946 provider
2947 .expect_get_balance()
2948 .returning(|_| Box::pin(ready(Ok(U256::from(200000000000000000u64))))); provider
2951 .expect_health_check()
2952 .returning(|| Box::pin(ready(Ok(true))));
2953
2954 let relayer = EvmRelayer::new(
2957 relayer_model,
2958 signer,
2959 provider,
2960 create_test_evm_network(),
2961 Arc::new(relayer_repo),
2962 Arc::new(network_repo),
2963 Arc::new(tx_repo),
2964 Arc::new(counter),
2965 Arc::new(job_producer),
2966 )
2967 .unwrap();
2968
2969 let result = relayer.initialize_relayer().await;
2970 assert!(result.is_ok());
2971 }
2972
2973 #[tokio::test]
2974 async fn test_initialize_relayer_sends_notification_when_disabled() {
2975 let (
2976 mut provider,
2977 mut relayer_repo,
2978 network_repo,
2979 tx_repo,
2980 mut job_producer,
2981 signer,
2982 mut counter,
2983 ) = setup_mocks();
2984 let mut relayer_model = create_test_relayer();
2985 relayer_model.system_disabled = false; relayer_model.notification_id = Some("test-notification-id".to_string());
2987
2988 provider
2990 .expect_get_transaction_count()
2991 .returning(|_| Box::pin(ready(Ok(42u64))));
2992
2993 counter.expect_set().returning(|_| Box::pin(ready(Ok(()))));
2994
2995 counter
2996 .expect_get()
2997 .returning(|| Box::pin(ready(Ok(Some(42u64)))));
2998
2999 provider
3000 .expect_get_balance()
3001 .returning(|_| Box::pin(ready(Ok(U256::from(200000000000000000u64))))); provider.expect_health_check().returning(|| {
3004 Box::pin(ready(Err(ProviderError::Other(
3005 "RPC validation failed".to_string(),
3006 ))))
3007 });
3008
3009 let mut disabled_relayer = relayer_model.clone();
3011 disabled_relayer.system_disabled = true;
3012 relayer_repo
3013 .expect_disable_relayer()
3014 .with(eq("test-relayer-id".to_string()), always())
3015 .returning(move |_, _| Ok(disabled_relayer.clone()));
3016
3017 job_producer
3019 .expect_produce_send_notification_job()
3020 .returning(|_, _| Box::pin(ready(Ok(()))));
3021
3022 job_producer
3024 .expect_produce_relayer_health_check_job()
3025 .returning(|_, _| Box::pin(ready(Ok(()))));
3026
3027 let relayer = EvmRelayer::new(
3028 relayer_model,
3029 signer,
3030 provider,
3031 create_test_evm_network(),
3032 Arc::new(relayer_repo),
3033 Arc::new(network_repo),
3034 Arc::new(tx_repo),
3035 Arc::new(counter),
3036 Arc::new(job_producer),
3037 )
3038 .unwrap();
3039
3040 let result = relayer.initialize_relayer().await;
3041 assert!(result.is_ok());
3042 }
3043
3044 #[tokio::test]
3045 async fn test_initialize_relayer_no_notification_when_no_notification_id() {
3046 let (
3047 mut provider,
3048 mut relayer_repo,
3049 network_repo,
3050 tx_repo,
3051 mut job_producer,
3052 signer,
3053 mut counter,
3054 ) = setup_mocks();
3055 let mut relayer_model = create_test_relayer();
3056 relayer_model.system_disabled = false; relayer_model.notification_id = None; provider
3061 .expect_get_transaction_count()
3062 .returning(|_| Box::pin(ready(Ok(42u64))));
3063
3064 counter.expect_set().returning(|_| Box::pin(ready(Ok(()))));
3065
3066 counter
3067 .expect_get()
3068 .returning(|| Box::pin(ready(Ok(Some(42u64)))));
3069
3070 provider
3071 .expect_get_balance()
3072 .returning(|_| Box::pin(ready(Ok(U256::from(50000000000000000u64))))); provider
3075 .expect_health_check()
3076 .returning(|| Box::pin(ready(Ok(true))));
3077
3078 let mut disabled_relayer = relayer_model.clone();
3080 disabled_relayer.system_disabled = true;
3081 relayer_repo
3082 .expect_disable_relayer()
3083 .with(eq("test-relayer-id".to_string()), always())
3084 .returning(move |_, _| Ok(disabled_relayer.clone()));
3085
3086 job_producer
3089 .expect_produce_relayer_health_check_job()
3090 .returning(|_, _| Box::pin(ready(Ok(()))));
3091
3092 let relayer = EvmRelayer::new(
3093 relayer_model,
3094 signer,
3095 provider,
3096 create_test_evm_network(),
3097 Arc::new(relayer_repo),
3098 Arc::new(network_repo),
3099 Arc::new(tx_repo),
3100 Arc::new(counter),
3101 Arc::new(job_producer),
3102 )
3103 .unwrap();
3104
3105 let result = relayer.initialize_relayer().await;
3106 assert!(result.is_ok());
3107 }
3108}