1use crate::{
10 jobs::{
11 Job, NotificationSend, RelayerHealthCheck, TransactionRequest, TransactionSend,
12 TransactionStatusCheck,
13 },
14 models::RelayerError,
15 observability::request_id::get_request_id,
16 queues::{QueueBackend, QueueBackendStorage, QueueBackendType},
17};
18use async_trait::async_trait;
19use serde::Serialize;
20use std::sync::Arc;
21use thiserror::Error;
22use tracing::{debug, error};
23
24use super::{JobType, TokenSwapRequest};
25
26#[cfg(test)]
27use mockall::automock;
28
29#[derive(Debug, Error, Serialize, Clone)]
30pub enum JobProducerError {
31 #[error("Queue error: {0}")]
32 QueueError(String),
33}
34
35impl From<JobProducerError> for RelayerError {
36 fn from(err: JobProducerError) -> Self {
37 RelayerError::QueueError(err.to_string())
38 }
39}
40
41#[derive(Debug, Clone)]
43pub struct JobProducer {
44 queue_backend: Arc<QueueBackendStorage>,
45}
46
47#[async_trait]
48#[cfg_attr(test, automock)]
49pub trait JobProducerTrait: Send + Sync {
50 async fn produce_transaction_request_job(
51 &self,
52 transaction_process_job: TransactionRequest,
53 scheduled_on: Option<i64>,
54 ) -> Result<(), JobProducerError>;
55
56 async fn produce_submit_transaction_job(
57 &self,
58 transaction_submit_job: TransactionSend,
59 scheduled_on: Option<i64>,
60 ) -> Result<(), JobProducerError>;
61
62 async fn produce_check_transaction_status_job(
63 &self,
64 transaction_status_check_job: TransactionStatusCheck,
65 scheduled_on: Option<i64>,
66 ) -> Result<(), JobProducerError>;
67
68 async fn produce_send_notification_job(
69 &self,
70 notification_send_job: NotificationSend,
71 scheduled_on: Option<i64>,
72 ) -> Result<(), JobProducerError>;
73
74 async fn produce_token_swap_request_job(
75 &self,
76 swap_request_job: TokenSwapRequest,
77 scheduled_on: Option<i64>,
78 ) -> Result<(), JobProducerError>;
79
80 async fn produce_relayer_health_check_job(
81 &self,
82 relayer_health_check_job: RelayerHealthCheck,
83 scheduled_on: Option<i64>,
84 ) -> Result<(), JobProducerError>;
85
86 fn get_queue_backend(&self) -> Option<Arc<QueueBackendStorage>> {
88 None
89 }
90
91 fn backend_type(&self) -> QueueBackendType {
93 QueueBackendType::Redis
94 }
95}
96
97impl JobProducer {
98 pub fn new(queue_backend: Arc<QueueBackendStorage>) -> Self {
99 Self { queue_backend }
100 }
101
102 pub fn queue_backend(&self) -> Arc<QueueBackendStorage> {
103 self.queue_backend.clone()
104 }
105}
106
107#[async_trait]
108impl JobProducerTrait for JobProducer {
109 fn get_queue_backend(&self) -> Option<Arc<QueueBackendStorage>> {
110 Some(self.queue_backend())
111 }
112
113 fn backend_type(&self) -> QueueBackendType {
114 self.queue_backend.backend_type()
115 }
116
117 async fn produce_transaction_request_job(
118 &self,
119 transaction_process_job: TransactionRequest,
120 scheduled_on: Option<i64>,
121 ) -> Result<(), JobProducerError> {
122 debug!(
123 "Producing transaction request job: {:?}",
124 transaction_process_job
125 );
126 let job = Job::new(JobType::TransactionRequest, transaction_process_job)
127 .with_request_id(get_request_id())
128 .with_scheduled_on(scheduled_on);
129 let request_id = job.request_id.clone();
130 let tx_id = job.data.transaction_id.clone();
131 let relayer_id = job.data.relayer_id.clone();
132
133 let backend = self.queue_backend();
134 let job_id = backend
135 .produce_transaction_request(job, scheduled_on)
136 .await
137 .map_err(|e| JobProducerError::QueueError(e.to_string()))?;
138
139 debug!(
140 job_type = %JobType::TransactionRequest,
141 backend = %backend.backend_type(),
142 job_id = %job_id,
143 request_id = ?request_id,
144 tx_id = %tx_id,
145 relayer_id = %relayer_id,
146 scheduled_on = ?scheduled_on,
147 "transaction request job produced"
148 );
149
150 Ok(())
151 }
152
153 async fn produce_submit_transaction_job(
154 &self,
155 transaction_submit_job: TransactionSend,
156 scheduled_on: Option<i64>,
157 ) -> Result<(), JobProducerError> {
158 let job = Job::new(JobType::TransactionSend, transaction_submit_job)
159 .with_request_id(get_request_id())
160 .with_scheduled_on(scheduled_on);
161 let request_id = job.request_id.clone();
162 let tx_id = job.data.transaction_id.clone();
163 let relayer_id = job.data.relayer_id.clone();
164 let command = job.data.command.clone();
165
166 let backend = self.queue_backend();
167 let job_id = backend
168 .produce_transaction_submission(job, scheduled_on)
169 .await
170 .map_err(|e| JobProducerError::QueueError(e.to_string()))?;
171
172 debug!(
173 job_type = %JobType::TransactionSend,
174 backend = %backend.backend_type(),
175 job_id = %job_id,
176 request_id = ?request_id,
177 tx_id = %tx_id,
178 relayer_id = %relayer_id,
179 command = ?command,
180 scheduled_on = ?scheduled_on,
181 "transaction submission job produced"
182 );
183
184 Ok(())
185 }
186
187 async fn produce_check_transaction_status_job(
188 &self,
189 transaction_status_check_job: TransactionStatusCheck,
190 scheduled_on: Option<i64>,
191 ) -> Result<(), JobProducerError> {
192 let job = Job::new(
193 JobType::TransactionStatusCheck,
194 transaction_status_check_job.clone(),
195 )
196 .with_request_id(get_request_id())
197 .with_scheduled_on(scheduled_on);
198 let request_id = job.request_id.clone();
199 let tx_id = job.data.transaction_id.clone();
200 let relayer_id = job.data.relayer_id.clone();
201
202 let backend = self.queue_backend();
203 let job_id = backend
204 .produce_transaction_status_check(job, scheduled_on)
205 .await
206 .map_err(|e| JobProducerError::QueueError(e.to_string()))?;
207
208 debug!(
209 job_type = %JobType::TransactionStatusCheck,
210 backend = %backend.backend_type(),
211 job_id = %job_id,
212 request_id = ?request_id,
213 tx_id = %tx_id,
214 relayer_id = %relayer_id,
215 network_type = ?transaction_status_check_job.network_type,
216 scheduled_on = ?scheduled_on,
217 "Transaction Status Check job produced successfully"
218 );
219 Ok(())
220 }
221
222 async fn produce_send_notification_job(
223 &self,
224 notification_send_job: NotificationSend,
225 scheduled_on: Option<i64>,
226 ) -> Result<(), JobProducerError> {
227 let job = Job::new(JobType::NotificationSend, notification_send_job)
228 .with_request_id(get_request_id())
229 .with_scheduled_on(scheduled_on);
230 let request_id = job.request_id.clone();
231 let notification_id = job.data.notification_id.clone();
232
233 let backend = self.queue_backend();
234 let job_id = backend
235 .produce_notification(job, scheduled_on)
236 .await
237 .map_err(|e| JobProducerError::QueueError(e.to_string()))?;
238
239 debug!(
240 job_type = %JobType::NotificationSend,
241 backend = %backend.backend_type(),
242 job_id = %job_id,
243 request_id = ?request_id,
244 notification_id = %notification_id,
245 scheduled_on = ?scheduled_on,
246 "notification send job produced"
247 );
248 Ok(())
249 }
250
251 async fn produce_token_swap_request_job(
252 &self,
253 swap_request_job: TokenSwapRequest,
254 scheduled_on: Option<i64>,
255 ) -> Result<(), JobProducerError> {
256 let job = Job::new(JobType::TokenSwapRequest, swap_request_job)
257 .with_request_id(get_request_id())
258 .with_scheduled_on(scheduled_on);
259 let request_id = job.request_id.clone();
260 let relayer_id = job.data.relayer_id.clone();
261 let backend = self.queue_backend();
262 let job_id = backend
263 .produce_token_swap_request(job, scheduled_on)
264 .await
265 .map_err(|e| JobProducerError::QueueError(e.to_string()))?;
266
267 debug!(
268 job_type = %JobType::TokenSwapRequest,
269 backend = %backend.backend_type(),
270 job_id = %job_id,
271 request_id = ?request_id,
272 relayer_id = %relayer_id,
273 scheduled_on = ?scheduled_on,
274 "token swap job produced"
275 );
276 Ok(())
277 }
278
279 async fn produce_relayer_health_check_job(
280 &self,
281 relayer_health_check_job: RelayerHealthCheck,
282 scheduled_on: Option<i64>,
283 ) -> Result<(), JobProducerError> {
284 let job = Job::new(
285 JobType::RelayerHealthCheck,
286 relayer_health_check_job.clone(),
287 )
288 .with_request_id(get_request_id())
289 .with_scheduled_on(scheduled_on);
290 let request_id = job.request_id.clone();
291 let relayer_id = job.data.relayer_id.clone();
292 let backend = self.queue_backend();
293 let job_id = backend
294 .produce_relayer_health_check(job, scheduled_on)
295 .await
296 .map_err(|e| JobProducerError::QueueError(e.to_string()))?;
297
298 debug!(
299 job_type = %JobType::RelayerHealthCheck,
300 backend = %backend.backend_type(),
301 job_id = %job_id,
302 request_id = ?request_id,
303 relayer_id = %relayer_id,
304 scheduled_on = ?scheduled_on,
305 "relayer health check job produced"
306 );
307 Ok(())
308 }
309}
310
311#[cfg(test)]
312mod tests {
313 use super::*;
314 use crate::models::{
315 EvmTransactionResponse, TransactionResponse, TransactionStatus, WebhookNotification,
316 WebhookPayload, U256,
317 };
318 use crate::utils::calculate_scheduled_timestamp;
319 use tokio::sync::Mutex;
320
321 #[derive(Clone, Debug)]
322 struct TestRedisStorage<T> {
324 pub push_called: bool,
325 pub schedule_called: bool,
326 pub last_job: Option<T>,
327 pub last_scheduled_timestamp: Option<i64>,
328 _phantom: std::marker::PhantomData<T>,
329 }
330
331 impl<T> TestRedisStorage<T> {
332 fn new() -> Self {
333 Self {
334 push_called: false,
335 schedule_called: false,
336 last_job: None,
337 last_scheduled_timestamp: None,
338 _phantom: std::marker::PhantomData,
339 }
340 }
341 }
342
343 impl<T: Clone> TestRedisStorage<T> {
344 async fn push(&mut self, job: T) -> Result<(), JobProducerError> {
345 self.push_called = true;
346 self.last_job = Some(job);
347 Ok(())
348 }
349
350 async fn schedule(&mut self, job: T, timestamp: i64) -> Result<(), JobProducerError> {
351 self.schedule_called = true;
352 self.last_job = Some(job);
353 self.last_scheduled_timestamp = Some(timestamp);
354 Ok(())
355 }
356 }
357
358 #[derive(Clone, Debug)]
360 struct TestQueue {
361 pub transaction_request_queue: TestRedisStorage<Job<TransactionRequest>>,
362 pub transaction_submission_queue: TestRedisStorage<Job<TransactionSend>>,
363 pub transaction_status_queue: TestRedisStorage<Job<TransactionStatusCheck>>,
364 pub transaction_status_queue_evm: TestRedisStorage<Job<TransactionStatusCheck>>,
365 pub transaction_status_queue_stellar: TestRedisStorage<Job<TransactionStatusCheck>>,
366 pub notification_queue: TestRedisStorage<Job<NotificationSend>>,
367 pub token_swap_request_queue: TestRedisStorage<Job<TokenSwapRequest>>,
368 pub relayer_health_check_queue: TestRedisStorage<Job<RelayerHealthCheck>>,
369 }
370
371 impl TestQueue {
372 fn new() -> Self {
373 Self {
374 transaction_request_queue: TestRedisStorage::new(),
375 transaction_submission_queue: TestRedisStorage::new(),
376 transaction_status_queue: TestRedisStorage::new(),
377 transaction_status_queue_evm: TestRedisStorage::new(),
378 transaction_status_queue_stellar: TestRedisStorage::new(),
379 notification_queue: TestRedisStorage::new(),
380 token_swap_request_queue: TestRedisStorage::new(),
381 relayer_health_check_queue: TestRedisStorage::new(),
382 }
383 }
384 }
385
386 struct TestJobProducer {
388 queue: Mutex<TestQueue>,
389 }
390
391 impl Clone for TestJobProducer {
392 fn clone(&self) -> Self {
393 let queue = self
394 .queue
395 .try_lock()
396 .expect("Failed to lock queue for cloning")
397 .clone();
398 Self {
399 queue: Mutex::new(queue),
400 }
401 }
402 }
403
404 impl TestJobProducer {
405 fn new() -> Self {
406 Self {
407 queue: Mutex::new(TestQueue::new()),
408 }
409 }
410
411 async fn get_queue(&self) -> TestQueue {
412 self.queue.lock().await.clone()
413 }
414 }
415
416 #[async_trait]
417 impl JobProducerTrait for TestJobProducer {
418 fn get_queue_backend(&self) -> Option<Arc<QueueBackendStorage>> {
419 None
420 }
421
422 async fn produce_transaction_request_job(
423 &self,
424 transaction_process_job: TransactionRequest,
425 scheduled_on: Option<i64>,
426 ) -> Result<(), JobProducerError> {
427 let mut queue = self.queue.lock().await;
428 let job = Job::new(JobType::TransactionRequest, transaction_process_job)
429 .with_scheduled_on(scheduled_on);
430
431 match scheduled_on {
432 Some(scheduled_on) => {
433 queue
434 .transaction_request_queue
435 .schedule(job, scheduled_on)
436 .await?;
437 }
438 None => {
439 queue.transaction_request_queue.push(job).await?;
440 }
441 }
442
443 Ok(())
444 }
445
446 async fn produce_submit_transaction_job(
447 &self,
448 transaction_submit_job: TransactionSend,
449 scheduled_on: Option<i64>,
450 ) -> Result<(), JobProducerError> {
451 let mut queue = self.queue.lock().await;
452 let job = Job::new(JobType::TransactionSend, transaction_submit_job)
453 .with_scheduled_on(scheduled_on);
454
455 match scheduled_on {
456 Some(on) => {
457 queue.transaction_submission_queue.schedule(job, on).await?;
458 }
459 None => {
460 queue.transaction_submission_queue.push(job).await?;
461 }
462 }
463
464 Ok(())
465 }
466
467 async fn produce_check_transaction_status_job(
468 &self,
469 transaction_status_check_job: TransactionStatusCheck,
470 scheduled_on: Option<i64>,
471 ) -> Result<(), JobProducerError> {
472 let mut queue = self.queue.lock().await;
473 let job = Job::new(
474 JobType::TransactionStatusCheck,
475 transaction_status_check_job.clone(),
476 )
477 .with_scheduled_on(scheduled_on);
478
479 use crate::models::NetworkType;
481 let status_queue = match transaction_status_check_job.network_type {
482 Some(NetworkType::Evm) => &mut queue.transaction_status_queue_evm,
483 Some(NetworkType::Stellar) => &mut queue.transaction_status_queue_stellar,
484 Some(NetworkType::Solana) => &mut queue.transaction_status_queue, None => &mut queue.transaction_status_queue, };
487
488 match scheduled_on {
489 Some(on) => {
490 status_queue.schedule(job, on).await?;
491 }
492 None => {
493 status_queue.push(job).await?;
494 }
495 }
496
497 Ok(())
498 }
499
500 async fn produce_send_notification_job(
501 &self,
502 notification_send_job: NotificationSend,
503 scheduled_on: Option<i64>,
504 ) -> Result<(), JobProducerError> {
505 let mut queue = self.queue.lock().await;
506 let job = Job::new(JobType::NotificationSend, notification_send_job)
507 .with_scheduled_on(scheduled_on);
508
509 match scheduled_on {
510 Some(on) => {
511 queue.notification_queue.schedule(job, on).await?;
512 }
513 None => {
514 queue.notification_queue.push(job).await?;
515 }
516 }
517
518 Ok(())
519 }
520
521 async fn produce_token_swap_request_job(
522 &self,
523 swap_request_job: TokenSwapRequest,
524 scheduled_on: Option<i64>,
525 ) -> Result<(), JobProducerError> {
526 let mut queue = self.queue.lock().await;
527 let job = Job::new(JobType::TokenSwapRequest, swap_request_job)
528 .with_scheduled_on(scheduled_on);
529
530 match scheduled_on {
531 Some(on) => {
532 queue.token_swap_request_queue.schedule(job, on).await?;
533 }
534 None => {
535 queue.token_swap_request_queue.push(job).await?;
536 }
537 }
538
539 Ok(())
540 }
541
542 async fn produce_relayer_health_check_job(
543 &self,
544 relayer_health_check_job: RelayerHealthCheck,
545 scheduled_on: Option<i64>,
546 ) -> Result<(), JobProducerError> {
547 let mut queue = self.queue.lock().await;
548 let job = Job::new(JobType::RelayerHealthCheck, relayer_health_check_job)
549 .with_scheduled_on(scheduled_on);
550
551 match scheduled_on {
552 Some(scheduled_on) => {
553 queue
554 .relayer_health_check_queue
555 .schedule(job, scheduled_on)
556 .await?;
557 }
558 None => {
559 queue.relayer_health_check_queue.push(job).await?;
560 }
561 }
562
563 Ok(())
564 }
565 }
566
567 #[tokio::test]
568 async fn test_job_producer_operations() {
569 let producer = TestJobProducer::new();
570
571 let request = TransactionRequest::new("tx123", "relayer-1");
573 let result = producer
574 .produce_transaction_request_job(request, None)
575 .await;
576 assert!(result.is_ok());
577
578 let queue = producer.get_queue().await;
579 assert!(queue.transaction_request_queue.push_called);
580
581 let producer = TestJobProducer::new();
583 let request = TransactionRequest::new("tx123", "relayer-1");
584 let scheduled_timestamp = calculate_scheduled_timestamp(10); let result = producer
586 .produce_transaction_request_job(request, Some(scheduled_timestamp))
587 .await;
588 assert!(result.is_ok());
589
590 let queue = producer.get_queue().await;
591 assert!(queue.transaction_request_queue.schedule_called);
592 }
593
594 #[tokio::test]
595 async fn test_submit_transaction_job() {
596 let producer = TestJobProducer::new();
597
598 let submit_job = TransactionSend::submit("tx123", "relayer-1");
600 let result = producer
601 .produce_submit_transaction_job(submit_job, None)
602 .await;
603 assert!(result.is_ok());
604
605 let queue = producer.get_queue().await;
606 assert!(queue.transaction_submission_queue.push_called);
607 }
608
609 #[tokio::test]
610 async fn test_check_status_job() {
611 use crate::models::NetworkType;
612 let producer = TestJobProducer::new();
613
614 let status_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm);
616 let result = producer
617 .produce_check_transaction_status_job(status_job, None)
618 .await;
619 assert!(result.is_ok());
620
621 let queue = producer.get_queue().await;
622 assert!(queue.transaction_status_queue_evm.push_called);
623 }
624
625 #[tokio::test]
626 async fn test_notification_job() {
627 let producer = TestJobProducer::new();
628
629 let notification = WebhookNotification::new(
631 "test_event".to_string(),
632 WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
633 EvmTransactionResponse {
634 id: "tx123".to_string(),
635 hash: Some("0x123".to_string()),
636 status: TransactionStatus::Confirmed,
637 status_reason: None,
638 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
639 sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
640 confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
641 gas_price: Some(1000000000),
642 gas_limit: Some(21000),
643 nonce: Some(1),
644 value: U256::from(1000000000000000000_u64),
645 from: "0xabc".to_string(),
646 to: Some("0xdef".to_string()),
647 relayer_id: "relayer-1".to_string(),
648 data: None,
649 max_fee_per_gas: None,
650 max_priority_fee_per_gas: None,
651 signature: None,
652 speed: None,
653 },
654 ))),
655 );
656 let job = NotificationSend::new("notification-1".to_string(), notification);
657
658 let result = producer.produce_send_notification_job(job, None).await;
659 assert!(result.is_ok());
660
661 let queue = producer.get_queue().await;
662 assert!(queue.notification_queue.push_called);
663 }
664
665 #[tokio::test]
666 async fn test_relayer_health_check_job() {
667 let producer = TestJobProducer::new();
668
669 let health_check = RelayerHealthCheck::new("relayer-1".to_string());
671 let result = producer
672 .produce_relayer_health_check_job(health_check, None)
673 .await;
674 assert!(result.is_ok());
675
676 let queue = producer.get_queue().await;
677 assert!(queue.relayer_health_check_queue.push_called);
678
679 let producer = TestJobProducer::new();
681 let health_check = RelayerHealthCheck::new("relayer-1".to_string());
682 let scheduled_timestamp = calculate_scheduled_timestamp(60);
683 let result = producer
684 .produce_relayer_health_check_job(health_check, Some(scheduled_timestamp))
685 .await;
686 assert!(result.is_ok());
687
688 let queue = producer.get_queue().await;
689 assert!(queue.relayer_health_check_queue.schedule_called);
690 }
691
692 #[test]
693 fn test_job_producer_error_conversion() {
694 let job_error = JobProducerError::QueueError("Test error".to_string());
696 let relayer_error: RelayerError = job_error.into();
697
698 match relayer_error {
699 RelayerError::QueueError(msg) => {
700 assert_eq!(msg, "Queue error: Test error");
701 }
702 _ => panic!("Unexpected error type"),
703 }
704 }
705
706 #[tokio::test]
707 async fn test_get_queue() {
708 let producer = TestJobProducer::new();
709
710 let queue = producer.get_queue().await;
712
713 assert!(!queue.transaction_request_queue.push_called);
715 assert!(!queue.transaction_request_queue.schedule_called);
716 assert!(!queue.transaction_submission_queue.push_called);
717 assert!(!queue.notification_queue.push_called);
718 assert!(!queue.token_swap_request_queue.push_called);
719 assert!(!queue.relayer_health_check_queue.push_called);
720 }
721
722 #[tokio::test]
723 async fn test_produce_relayer_health_check_job_immediate() {
724 let producer = TestJobProducer::new();
725
726 let health_check = RelayerHealthCheck::new("relayer-1".to_string());
728 let result = producer
729 .produce_relayer_health_check_job(health_check, None)
730 .await;
731
732 assert!(result.is_ok());
734
735 let queue = producer.get_queue().await;
737 assert!(queue.relayer_health_check_queue.push_called);
738 assert!(!queue.relayer_health_check_queue.schedule_called);
739
740 assert!(!queue.transaction_request_queue.push_called);
742 assert!(!queue.transaction_submission_queue.push_called);
743 assert!(!queue.transaction_status_queue.push_called);
744 assert!(!queue.notification_queue.push_called);
745 assert!(!queue.token_swap_request_queue.push_called);
746 }
747
748 #[tokio::test]
749 async fn test_produce_relayer_health_check_job_scheduled() {
750 let producer = TestJobProducer::new();
751
752 let health_check = RelayerHealthCheck::new("relayer-2".to_string());
754 let scheduled_timestamp = calculate_scheduled_timestamp(300); let result = producer
756 .produce_relayer_health_check_job(health_check, Some(scheduled_timestamp))
757 .await;
758
759 assert!(result.is_ok());
761
762 let queue = producer.get_queue().await;
764 assert!(queue.relayer_health_check_queue.schedule_called);
765 assert!(!queue.relayer_health_check_queue.push_called);
766
767 assert!(!queue.transaction_request_queue.push_called);
769 assert!(!queue.transaction_submission_queue.push_called);
770 assert!(!queue.transaction_status_queue.push_called);
771 assert!(!queue.notification_queue.push_called);
772 assert!(!queue.token_swap_request_queue.push_called);
773 }
774
775 #[tokio::test]
776 async fn test_produce_relayer_health_check_job_multiple_relayers() {
777 let producer = TestJobProducer::new();
778
779 let relayer_ids = vec!["relayer-1", "relayer-2", "relayer-3"];
781
782 for relayer_id in &relayer_ids {
783 let health_check = RelayerHealthCheck::new(relayer_id.to_string());
784 let result = producer
785 .produce_relayer_health_check_job(health_check, None)
786 .await;
787 assert!(result.is_ok());
788 }
789
790 let queue = producer.get_queue().await;
792 assert!(queue.relayer_health_check_queue.push_called);
793 }
794
795 #[tokio::test]
796 async fn test_status_check_routes_to_evm_queue() {
797 use crate::models::NetworkType;
798 let producer = TestJobProducer::new();
799
800 let status_job = TransactionStatusCheck::new("tx-evm", "relayer-1", NetworkType::Evm);
801 let result = producer
802 .produce_check_transaction_status_job(status_job, None)
803 .await;
804
805 assert!(result.is_ok());
806 let queue = producer.get_queue().await;
807 assert!(queue.transaction_status_queue_evm.push_called);
808 assert!(!queue.transaction_status_queue_stellar.push_called);
809 assert!(!queue.transaction_status_queue.push_called);
810 }
811
812 #[tokio::test]
813 async fn test_status_check_routes_to_stellar_queue() {
814 use crate::models::NetworkType;
815 let producer = TestJobProducer::new();
816
817 let status_job =
818 TransactionStatusCheck::new("tx-stellar", "relayer-2", NetworkType::Stellar);
819 let result = producer
820 .produce_check_transaction_status_job(status_job, None)
821 .await;
822
823 assert!(result.is_ok());
824 let queue = producer.get_queue().await;
825 assert!(queue.transaction_status_queue_stellar.push_called);
826 assert!(!queue.transaction_status_queue_evm.push_called);
827 assert!(!queue.transaction_status_queue.push_called);
828 }
829
830 #[tokio::test]
831 async fn test_status_check_routes_to_default_queue_for_solana() {
832 use crate::models::NetworkType;
833 let producer = TestJobProducer::new();
834
835 let status_job = TransactionStatusCheck::new("tx-solana", "relayer-3", NetworkType::Solana);
836 let result = producer
837 .produce_check_transaction_status_job(status_job, None)
838 .await;
839
840 assert!(result.is_ok());
841 let queue = producer.get_queue().await;
842 assert!(queue.transaction_status_queue.push_called);
843 assert!(!queue.transaction_status_queue_evm.push_called);
844 assert!(!queue.transaction_status_queue_stellar.push_called);
845 }
846
847 #[tokio::test]
848 async fn test_status_check_scheduled_evm() {
849 use crate::models::NetworkType;
850 let producer = TestJobProducer::new();
851
852 let status_job =
853 TransactionStatusCheck::new("tx-evm-scheduled", "relayer-1", NetworkType::Evm);
854 let scheduled_timestamp = calculate_scheduled_timestamp(30);
855 let result = producer
856 .produce_check_transaction_status_job(status_job, Some(scheduled_timestamp))
857 .await;
858
859 assert!(result.is_ok());
860 let queue = producer.get_queue().await;
861 assert!(queue.transaction_status_queue_evm.schedule_called);
862 assert!(!queue.transaction_status_queue_evm.push_called);
863 }
864
865 #[tokio::test]
866 async fn test_scheduled_status_check_persists_available_at() {
867 use crate::models::NetworkType;
868 let producer = TestJobProducer::new();
869
870 let status_job =
871 TransactionStatusCheck::new("tx-evm-scheduled", "relayer-1", NetworkType::Evm);
872 let scheduled_timestamp = calculate_scheduled_timestamp(30);
873 producer
874 .produce_check_transaction_status_job(status_job, Some(scheduled_timestamp))
875 .await
876 .unwrap();
877
878 let queue = producer.get_queue().await;
879 let stored_job = queue
880 .transaction_status_queue_evm
881 .last_job
882 .expect("scheduled status check should be stored");
883 let expected_available_at = scheduled_timestamp.to_string();
884
885 assert_eq!(
886 stored_job.available_at.as_deref(),
887 Some(expected_available_at.as_str())
888 );
889 assert_eq!(
890 queue.transaction_status_queue_evm.last_scheduled_timestamp,
891 Some(scheduled_timestamp)
892 );
893 }
894
895 #[tokio::test]
896 async fn test_submit_transaction_scheduled() {
897 let producer = TestJobProducer::new();
898
899 let submit_job = TransactionSend::submit("tx-scheduled", "relayer-1");
900 let scheduled_timestamp = calculate_scheduled_timestamp(15);
901 let result = producer
902 .produce_submit_transaction_job(submit_job, Some(scheduled_timestamp))
903 .await;
904
905 assert!(result.is_ok());
906 let queue = producer.get_queue().await;
907 assert!(queue.transaction_submission_queue.schedule_called);
908 assert!(!queue.transaction_submission_queue.push_called);
909 }
910
911 #[tokio::test]
912 async fn test_scheduled_submit_job_persists_available_at() {
913 let producer = TestJobProducer::new();
914
915 let submit_job = TransactionSend::submit("tx-scheduled", "relayer-1");
916 let scheduled_timestamp = calculate_scheduled_timestamp(15);
917 producer
918 .produce_submit_transaction_job(submit_job, Some(scheduled_timestamp))
919 .await
920 .unwrap();
921
922 let queue = producer.get_queue().await;
923 let stored_job = queue
924 .transaction_submission_queue
925 .last_job
926 .expect("scheduled submission should be stored");
927 let expected_available_at = scheduled_timestamp.to_string();
928
929 assert_eq!(
930 stored_job.available_at.as_deref(),
931 Some(expected_available_at.as_str())
932 );
933 }
934
935 #[tokio::test]
936 async fn test_notification_job_scheduled() {
937 let producer = TestJobProducer::new();
938
939 let notification = WebhookNotification::new(
940 "test_scheduled_event".to_string(),
941 WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
942 EvmTransactionResponse {
943 id: "tx-notify-scheduled".to_string(),
944 hash: Some("0xabc123".to_string()),
945 status: TransactionStatus::Confirmed,
946 status_reason: None,
947 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
948 sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
949 confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
950 gas_price: Some(1000000000),
951 gas_limit: Some(21000),
952 nonce: Some(1),
953 value: U256::from(1000000000000000000_u64),
954 from: "0xabc".to_string(),
955 to: Some("0xdef".to_string()),
956 relayer_id: "relayer-1".to_string(),
957 data: None,
958 max_fee_per_gas: None,
959 max_priority_fee_per_gas: None,
960 signature: None,
961 speed: None,
962 },
963 ))),
964 );
965 let job = NotificationSend::new("notification-scheduled".to_string(), notification);
966
967 let scheduled_timestamp = calculate_scheduled_timestamp(5);
968 let result = producer
969 .produce_send_notification_job(job, Some(scheduled_timestamp))
970 .await;
971
972 assert!(result.is_ok());
973 let queue = producer.get_queue().await;
974 assert!(queue.notification_queue.schedule_called);
975 assert!(!queue.notification_queue.push_called);
976 }
977
978 #[tokio::test]
979 async fn test_solana_swap_job_immediate() {
980 let producer = TestJobProducer::new();
981
982 let swap_job = TokenSwapRequest::new("relayer-solana".to_string());
983 let result = producer
984 .produce_token_swap_request_job(swap_job, None)
985 .await;
986
987 assert!(result.is_ok());
988 let queue = producer.get_queue().await;
989 assert!(queue.token_swap_request_queue.push_called);
990 assert!(!queue.token_swap_request_queue.schedule_called);
991 }
992
993 #[tokio::test]
994 async fn test_token_swap_job_scheduled() {
995 let producer = TestJobProducer::new();
996
997 let swap_job = TokenSwapRequest::new("relayer-solana".to_string());
998 let scheduled_timestamp = calculate_scheduled_timestamp(20);
999 let result = producer
1000 .produce_token_swap_request_job(swap_job, Some(scheduled_timestamp))
1001 .await;
1002
1003 assert!(result.is_ok());
1004 let queue = producer.get_queue().await;
1005 assert!(queue.token_swap_request_queue.schedule_called);
1006 assert!(!queue.token_swap_request_queue.push_called);
1007 }
1008
1009 #[tokio::test]
1010 async fn test_transaction_send_cancel_job() {
1011 let producer = TestJobProducer::new();
1012
1013 let cancel_job = TransactionSend::cancel("tx-cancel", "relayer-1", "user requested");
1014 let result = producer
1015 .produce_submit_transaction_job(cancel_job, None)
1016 .await;
1017
1018 assert!(result.is_ok());
1019 let queue = producer.get_queue().await;
1020 assert!(queue.transaction_submission_queue.push_called);
1021 }
1022
1023 #[tokio::test]
1024 async fn test_transaction_send_resubmit_job() {
1025 let producer = TestJobProducer::new();
1026
1027 let resubmit_job = TransactionSend::resubmit("tx-resubmit", "relayer-1");
1028 let result = producer
1029 .produce_submit_transaction_job(resubmit_job, None)
1030 .await;
1031
1032 assert!(result.is_ok());
1033 let queue = producer.get_queue().await;
1034 assert!(queue.transaction_submission_queue.push_called);
1035 }
1036
1037 #[tokio::test]
1038 async fn test_transaction_send_resend_job() {
1039 let producer = TestJobProducer::new();
1040
1041 let resend_job = TransactionSend::resend("tx-resend", "relayer-1");
1042 let result = producer
1043 .produce_submit_transaction_job(resend_job, None)
1044 .await;
1045
1046 assert!(result.is_ok());
1047 let queue = producer.get_queue().await;
1048 assert!(queue.transaction_submission_queue.push_called);
1049 }
1050
1051 #[tokio::test]
1052 async fn test_multiple_jobs_different_queues() {
1053 let producer = TestJobProducer::new();
1054
1055 let request = TransactionRequest::new("tx1", "relayer-1");
1057 producer
1058 .produce_transaction_request_job(request, None)
1059 .await
1060 .unwrap();
1061
1062 let submit = TransactionSend::submit("tx2", "relayer-1");
1063 producer
1064 .produce_submit_transaction_job(submit, None)
1065 .await
1066 .unwrap();
1067
1068 use crate::models::NetworkType;
1069 let status = TransactionStatusCheck::new("tx3", "relayer-1", NetworkType::Evm);
1070 producer
1071 .produce_check_transaction_status_job(status, None)
1072 .await
1073 .unwrap();
1074
1075 let queue = producer.get_queue().await;
1077 assert!(queue.transaction_request_queue.push_called);
1078 assert!(queue.transaction_submission_queue.push_called);
1079 assert!(queue.transaction_status_queue_evm.push_called);
1080 }
1081
1082 #[test]
1083 fn test_job_producer_clone() {
1084 let producer = TestJobProducer::new();
1085 let cloned_producer = producer.clone();
1086
1087 assert!(std::ptr::addr_of!(producer) != std::ptr::addr_of!(cloned_producer));
1090 }
1091
1092 #[tokio::test]
1093 async fn test_transaction_request_with_metadata() {
1094 let producer = TestJobProducer::new();
1095
1096 let mut metadata = std::collections::HashMap::new();
1097 metadata.insert("retry_count".to_string(), "3".to_string());
1098
1099 let request = TransactionRequest::new("tx-meta", "relayer-1").with_metadata(metadata);
1100
1101 let result = producer
1102 .produce_transaction_request_job(request, None)
1103 .await;
1104
1105 assert!(result.is_ok());
1106 let queue = producer.get_queue().await;
1107 assert!(queue.transaction_request_queue.push_called);
1108 }
1109
1110 #[tokio::test]
1111 async fn test_status_check_with_metadata() {
1112 use crate::models::NetworkType;
1113 let producer = TestJobProducer::new();
1114
1115 let mut metadata = std::collections::HashMap::new();
1116 metadata.insert("attempt".to_string(), "2".to_string());
1117
1118 let status =
1119 TransactionStatusCheck::new("tx-status-meta", "relayer-1", NetworkType::Stellar)
1120 .with_metadata(metadata);
1121
1122 let result = producer
1123 .produce_check_transaction_status_job(status, None)
1124 .await;
1125
1126 assert!(result.is_ok());
1127 let queue = producer.get_queue().await;
1128 assert!(queue.transaction_status_queue_stellar.push_called);
1129 }
1130
1131 #[tokio::test]
1132 async fn test_scheduled_jobs_with_different_delays() {
1133 let producer = TestJobProducer::new();
1134
1135 let delays = [1, 10, 60, 300, 3600]; for (idx, delay) in delays.iter().enumerate() {
1139 let request = TransactionRequest::new(format!("tx-delay-{idx}"), "relayer-1");
1140 let timestamp = calculate_scheduled_timestamp(*delay);
1141
1142 let result = producer
1143 .produce_transaction_request_job(request, Some(timestamp))
1144 .await;
1145
1146 assert!(result.is_ok(), "Failed to schedule job with delay {delay}");
1147 }
1148 }
1149
1150 #[test]
1151 fn test_job_producer_error_display() {
1152 let error = JobProducerError::QueueError("Test queue error".to_string());
1153 let error_string = error.to_string();
1154
1155 assert!(error_string.contains("Queue error"));
1156 assert!(error_string.contains("Test queue error"));
1157 }
1158
1159 #[test]
1160 fn test_job_producer_error_to_relayer_error() {
1161 let job_error = JobProducerError::QueueError("Connection failed".to_string());
1163 let relayer_error: RelayerError = job_error.into();
1164
1165 match relayer_error {
1166 RelayerError::QueueError(msg) => {
1167 assert_eq!(msg, "Queue error: Connection failed");
1168 }
1169 _ => panic!("Expected QueueError variant"),
1170 }
1171 }
1172}