openzeppelin_relayer/jobs/
job_producer.rs

1//! Job producer module for enqueueing jobs to Redis queues.
2//!
3//! Provides functionality for producing various types of jobs:
4//! - Transaction processing jobs
5//! - Transaction submission jobs
6//! - Status monitoring jobs
7//! - Notification jobs
8
9use crate::{
10    jobs::{
11        Job, NotificationSend, RelayerHealthCheck, TransactionRequest, TransactionSend,
12        TransactionStatusCheck,
13    },
14    models::RelayerError,
15    observability::request_id::get_request_id,
16    queues::{QueueBackend, QueueBackendStorage, QueueBackendType},
17};
18use async_trait::async_trait;
19use serde::Serialize;
20use std::sync::Arc;
21use thiserror::Error;
22use tracing::{debug, error};
23
24use super::{JobType, TokenSwapRequest};
25
26#[cfg(test)]
27use mockall::automock;
28
29#[derive(Debug, Error, Serialize, Clone)]
30pub enum JobProducerError {
31    #[error("Queue error: {0}")]
32    QueueError(String),
33}
34
35impl From<JobProducerError> for RelayerError {
36    fn from(err: JobProducerError) -> Self {
37        RelayerError::QueueError(err.to_string())
38    }
39}
40
41/// Job producer that enqueues jobs via the configured queue backend.
42#[derive(Debug, Clone)]
43pub struct JobProducer {
44    queue_backend: Arc<QueueBackendStorage>,
45}
46
47#[async_trait]
48#[cfg_attr(test, automock)]
49pub trait JobProducerTrait: Send + Sync {
50    async fn produce_transaction_request_job(
51        &self,
52        transaction_process_job: TransactionRequest,
53        scheduled_on: Option<i64>,
54    ) -> Result<(), JobProducerError>;
55
56    async fn produce_submit_transaction_job(
57        &self,
58        transaction_submit_job: TransactionSend,
59        scheduled_on: Option<i64>,
60    ) -> Result<(), JobProducerError>;
61
62    async fn produce_check_transaction_status_job(
63        &self,
64        transaction_status_check_job: TransactionStatusCheck,
65        scheduled_on: Option<i64>,
66    ) -> Result<(), JobProducerError>;
67
68    async fn produce_send_notification_job(
69        &self,
70        notification_send_job: NotificationSend,
71        scheduled_on: Option<i64>,
72    ) -> Result<(), JobProducerError>;
73
74    async fn produce_token_swap_request_job(
75        &self,
76        swap_request_job: TokenSwapRequest,
77        scheduled_on: Option<i64>,
78    ) -> Result<(), JobProducerError>;
79
80    async fn produce_relayer_health_check_job(
81        &self,
82        relayer_health_check_job: RelayerHealthCheck,
83        scheduled_on: Option<i64>,
84    ) -> Result<(), JobProducerError>;
85
86    /// Returns active queue backend storage when available.
87    fn get_queue_backend(&self) -> Option<Arc<QueueBackendStorage>> {
88        None
89    }
90
91    /// Returns active queue backend type.
92    fn backend_type(&self) -> QueueBackendType {
93        QueueBackendType::Redis
94    }
95}
96
97impl JobProducer {
98    pub fn new(queue_backend: Arc<QueueBackendStorage>) -> Self {
99        Self { queue_backend }
100    }
101
102    pub fn queue_backend(&self) -> Arc<QueueBackendStorage> {
103        self.queue_backend.clone()
104    }
105}
106
107#[async_trait]
108impl JobProducerTrait for JobProducer {
109    fn get_queue_backend(&self) -> Option<Arc<QueueBackendStorage>> {
110        Some(self.queue_backend())
111    }
112
113    fn backend_type(&self) -> QueueBackendType {
114        self.queue_backend.backend_type()
115    }
116
117    async fn produce_transaction_request_job(
118        &self,
119        transaction_process_job: TransactionRequest,
120        scheduled_on: Option<i64>,
121    ) -> Result<(), JobProducerError> {
122        debug!(
123            "Producing transaction request job: {:?}",
124            transaction_process_job
125        );
126        let job = Job::new(JobType::TransactionRequest, transaction_process_job)
127            .with_request_id(get_request_id())
128            .with_scheduled_on(scheduled_on);
129        let request_id = job.request_id.clone();
130        let tx_id = job.data.transaction_id.clone();
131        let relayer_id = job.data.relayer_id.clone();
132
133        let backend = self.queue_backend();
134        let job_id = backend
135            .produce_transaction_request(job, scheduled_on)
136            .await
137            .map_err(|e| JobProducerError::QueueError(e.to_string()))?;
138
139        debug!(
140            job_type = %JobType::TransactionRequest,
141            backend = %backend.backend_type(),
142            job_id = %job_id,
143            request_id = ?request_id,
144            tx_id = %tx_id,
145            relayer_id = %relayer_id,
146            scheduled_on = ?scheduled_on,
147            "transaction request job produced"
148        );
149
150        Ok(())
151    }
152
153    async fn produce_submit_transaction_job(
154        &self,
155        transaction_submit_job: TransactionSend,
156        scheduled_on: Option<i64>,
157    ) -> Result<(), JobProducerError> {
158        let job = Job::new(JobType::TransactionSend, transaction_submit_job)
159            .with_request_id(get_request_id())
160            .with_scheduled_on(scheduled_on);
161        let request_id = job.request_id.clone();
162        let tx_id = job.data.transaction_id.clone();
163        let relayer_id = job.data.relayer_id.clone();
164        let command = job.data.command.clone();
165
166        let backend = self.queue_backend();
167        let job_id = backend
168            .produce_transaction_submission(job, scheduled_on)
169            .await
170            .map_err(|e| JobProducerError::QueueError(e.to_string()))?;
171
172        debug!(
173            job_type = %JobType::TransactionSend,
174            backend = %backend.backend_type(),
175            job_id = %job_id,
176            request_id = ?request_id,
177            tx_id = %tx_id,
178            relayer_id = %relayer_id,
179            command = ?command,
180            scheduled_on = ?scheduled_on,
181            "transaction submission job produced"
182        );
183
184        Ok(())
185    }
186
187    async fn produce_check_transaction_status_job(
188        &self,
189        transaction_status_check_job: TransactionStatusCheck,
190        scheduled_on: Option<i64>,
191    ) -> Result<(), JobProducerError> {
192        let job = Job::new(
193            JobType::TransactionStatusCheck,
194            transaction_status_check_job.clone(),
195        )
196        .with_request_id(get_request_id())
197        .with_scheduled_on(scheduled_on);
198        let request_id = job.request_id.clone();
199        let tx_id = job.data.transaction_id.clone();
200        let relayer_id = job.data.relayer_id.clone();
201
202        let backend = self.queue_backend();
203        let job_id = backend
204            .produce_transaction_status_check(job, scheduled_on)
205            .await
206            .map_err(|e| JobProducerError::QueueError(e.to_string()))?;
207
208        debug!(
209            job_type = %JobType::TransactionStatusCheck,
210            backend = %backend.backend_type(),
211            job_id = %job_id,
212            request_id = ?request_id,
213            tx_id = %tx_id,
214            relayer_id = %relayer_id,
215            network_type = ?transaction_status_check_job.network_type,
216            scheduled_on = ?scheduled_on,
217            "Transaction Status Check job produced successfully"
218        );
219        Ok(())
220    }
221
222    async fn produce_send_notification_job(
223        &self,
224        notification_send_job: NotificationSend,
225        scheduled_on: Option<i64>,
226    ) -> Result<(), JobProducerError> {
227        let job = Job::new(JobType::NotificationSend, notification_send_job)
228            .with_request_id(get_request_id())
229            .with_scheduled_on(scheduled_on);
230        let request_id = job.request_id.clone();
231        let notification_id = job.data.notification_id.clone();
232
233        let backend = self.queue_backend();
234        let job_id = backend
235            .produce_notification(job, scheduled_on)
236            .await
237            .map_err(|e| JobProducerError::QueueError(e.to_string()))?;
238
239        debug!(
240            job_type = %JobType::NotificationSend,
241            backend = %backend.backend_type(),
242            job_id = %job_id,
243            request_id = ?request_id,
244            notification_id = %notification_id,
245            scheduled_on = ?scheduled_on,
246            "notification send job produced"
247        );
248        Ok(())
249    }
250
251    async fn produce_token_swap_request_job(
252        &self,
253        swap_request_job: TokenSwapRequest,
254        scheduled_on: Option<i64>,
255    ) -> Result<(), JobProducerError> {
256        let job = Job::new(JobType::TokenSwapRequest, swap_request_job)
257            .with_request_id(get_request_id())
258            .with_scheduled_on(scheduled_on);
259        let request_id = job.request_id.clone();
260        let relayer_id = job.data.relayer_id.clone();
261        let backend = self.queue_backend();
262        let job_id = backend
263            .produce_token_swap_request(job, scheduled_on)
264            .await
265            .map_err(|e| JobProducerError::QueueError(e.to_string()))?;
266
267        debug!(
268            job_type = %JobType::TokenSwapRequest,
269            backend = %backend.backend_type(),
270            job_id = %job_id,
271            request_id = ?request_id,
272            relayer_id = %relayer_id,
273            scheduled_on = ?scheduled_on,
274            "token swap job produced"
275        );
276        Ok(())
277    }
278
279    async fn produce_relayer_health_check_job(
280        &self,
281        relayer_health_check_job: RelayerHealthCheck,
282        scheduled_on: Option<i64>,
283    ) -> Result<(), JobProducerError> {
284        let job = Job::new(
285            JobType::RelayerHealthCheck,
286            relayer_health_check_job.clone(),
287        )
288        .with_request_id(get_request_id())
289        .with_scheduled_on(scheduled_on);
290        let request_id = job.request_id.clone();
291        let relayer_id = job.data.relayer_id.clone();
292        let backend = self.queue_backend();
293        let job_id = backend
294            .produce_relayer_health_check(job, scheduled_on)
295            .await
296            .map_err(|e| JobProducerError::QueueError(e.to_string()))?;
297
298        debug!(
299            job_type = %JobType::RelayerHealthCheck,
300            backend = %backend.backend_type(),
301            job_id = %job_id,
302            request_id = ?request_id,
303            relayer_id = %relayer_id,
304            scheduled_on = ?scheduled_on,
305            "relayer health check job produced"
306        );
307        Ok(())
308    }
309}
310
311#[cfg(test)]
312mod tests {
313    use super::*;
314    use crate::models::{
315        EvmTransactionResponse, TransactionResponse, TransactionStatus, WebhookNotification,
316        WebhookPayload, U256,
317    };
318    use crate::utils::calculate_scheduled_timestamp;
319    use tokio::sync::Mutex;
320
321    #[derive(Clone, Debug)]
322    // Define a simplified queue for testing without using complex mocks
323    struct TestRedisStorage<T> {
324        pub push_called: bool,
325        pub schedule_called: bool,
326        pub last_job: Option<T>,
327        pub last_scheduled_timestamp: Option<i64>,
328        _phantom: std::marker::PhantomData<T>,
329    }
330
331    impl<T> TestRedisStorage<T> {
332        fn new() -> Self {
333            Self {
334                push_called: false,
335                schedule_called: false,
336                last_job: None,
337                last_scheduled_timestamp: None,
338                _phantom: std::marker::PhantomData,
339            }
340        }
341    }
342
343    impl<T: Clone> TestRedisStorage<T> {
344        async fn push(&mut self, job: T) -> Result<(), JobProducerError> {
345            self.push_called = true;
346            self.last_job = Some(job);
347            Ok(())
348        }
349
350        async fn schedule(&mut self, job: T, timestamp: i64) -> Result<(), JobProducerError> {
351            self.schedule_called = true;
352            self.last_job = Some(job);
353            self.last_scheduled_timestamp = Some(timestamp);
354            Ok(())
355        }
356    }
357
358    // A test version of the Queue
359    #[derive(Clone, Debug)]
360    struct TestQueue {
361        pub transaction_request_queue: TestRedisStorage<Job<TransactionRequest>>,
362        pub transaction_submission_queue: TestRedisStorage<Job<TransactionSend>>,
363        pub transaction_status_queue: TestRedisStorage<Job<TransactionStatusCheck>>,
364        pub transaction_status_queue_evm: TestRedisStorage<Job<TransactionStatusCheck>>,
365        pub transaction_status_queue_stellar: TestRedisStorage<Job<TransactionStatusCheck>>,
366        pub notification_queue: TestRedisStorage<Job<NotificationSend>>,
367        pub token_swap_request_queue: TestRedisStorage<Job<TokenSwapRequest>>,
368        pub relayer_health_check_queue: TestRedisStorage<Job<RelayerHealthCheck>>,
369    }
370
371    impl TestQueue {
372        fn new() -> Self {
373            Self {
374                transaction_request_queue: TestRedisStorage::new(),
375                transaction_submission_queue: TestRedisStorage::new(),
376                transaction_status_queue: TestRedisStorage::new(),
377                transaction_status_queue_evm: TestRedisStorage::new(),
378                transaction_status_queue_stellar: TestRedisStorage::new(),
379                notification_queue: TestRedisStorage::new(),
380                token_swap_request_queue: TestRedisStorage::new(),
381                relayer_health_check_queue: TestRedisStorage::new(),
382            }
383        }
384    }
385
386    // A test version of JobProducer
387    struct TestJobProducer {
388        queue: Mutex<TestQueue>,
389    }
390
391    impl Clone for TestJobProducer {
392        fn clone(&self) -> Self {
393            let queue = self
394                .queue
395                .try_lock()
396                .expect("Failed to lock queue for cloning")
397                .clone();
398            Self {
399                queue: Mutex::new(queue),
400            }
401        }
402    }
403
404    impl TestJobProducer {
405        fn new() -> Self {
406            Self {
407                queue: Mutex::new(TestQueue::new()),
408            }
409        }
410
411        async fn get_queue(&self) -> TestQueue {
412            self.queue.lock().await.clone()
413        }
414    }
415
416    #[async_trait]
417    impl JobProducerTrait for TestJobProducer {
418        fn get_queue_backend(&self) -> Option<Arc<QueueBackendStorage>> {
419            None
420        }
421
422        async fn produce_transaction_request_job(
423            &self,
424            transaction_process_job: TransactionRequest,
425            scheduled_on: Option<i64>,
426        ) -> Result<(), JobProducerError> {
427            let mut queue = self.queue.lock().await;
428            let job = Job::new(JobType::TransactionRequest, transaction_process_job)
429                .with_scheduled_on(scheduled_on);
430
431            match scheduled_on {
432                Some(scheduled_on) => {
433                    queue
434                        .transaction_request_queue
435                        .schedule(job, scheduled_on)
436                        .await?;
437                }
438                None => {
439                    queue.transaction_request_queue.push(job).await?;
440                }
441            }
442
443            Ok(())
444        }
445
446        async fn produce_submit_transaction_job(
447            &self,
448            transaction_submit_job: TransactionSend,
449            scheduled_on: Option<i64>,
450        ) -> Result<(), JobProducerError> {
451            let mut queue = self.queue.lock().await;
452            let job = Job::new(JobType::TransactionSend, transaction_submit_job)
453                .with_scheduled_on(scheduled_on);
454
455            match scheduled_on {
456                Some(on) => {
457                    queue.transaction_submission_queue.schedule(job, on).await?;
458                }
459                None => {
460                    queue.transaction_submission_queue.push(job).await?;
461                }
462            }
463
464            Ok(())
465        }
466
467        async fn produce_check_transaction_status_job(
468            &self,
469            transaction_status_check_job: TransactionStatusCheck,
470            scheduled_on: Option<i64>,
471        ) -> Result<(), JobProducerError> {
472            let mut queue = self.queue.lock().await;
473            let job = Job::new(
474                JobType::TransactionStatusCheck,
475                transaction_status_check_job.clone(),
476            )
477            .with_scheduled_on(scheduled_on);
478
479            // Route to the appropriate queue based on network type
480            use crate::models::NetworkType;
481            let status_queue = match transaction_status_check_job.network_type {
482                Some(NetworkType::Evm) => &mut queue.transaction_status_queue_evm,
483                Some(NetworkType::Stellar) => &mut queue.transaction_status_queue_stellar,
484                Some(NetworkType::Solana) => &mut queue.transaction_status_queue, // Use default queue
485                None => &mut queue.transaction_status_queue, // Legacy messages without network_type
486            };
487
488            match scheduled_on {
489                Some(on) => {
490                    status_queue.schedule(job, on).await?;
491                }
492                None => {
493                    status_queue.push(job).await?;
494                }
495            }
496
497            Ok(())
498        }
499
500        async fn produce_send_notification_job(
501            &self,
502            notification_send_job: NotificationSend,
503            scheduled_on: Option<i64>,
504        ) -> Result<(), JobProducerError> {
505            let mut queue = self.queue.lock().await;
506            let job = Job::new(JobType::NotificationSend, notification_send_job)
507                .with_scheduled_on(scheduled_on);
508
509            match scheduled_on {
510                Some(on) => {
511                    queue.notification_queue.schedule(job, on).await?;
512                }
513                None => {
514                    queue.notification_queue.push(job).await?;
515                }
516            }
517
518            Ok(())
519        }
520
521        async fn produce_token_swap_request_job(
522            &self,
523            swap_request_job: TokenSwapRequest,
524            scheduled_on: Option<i64>,
525        ) -> Result<(), JobProducerError> {
526            let mut queue = self.queue.lock().await;
527            let job = Job::new(JobType::TokenSwapRequest, swap_request_job)
528                .with_scheduled_on(scheduled_on);
529
530            match scheduled_on {
531                Some(on) => {
532                    queue.token_swap_request_queue.schedule(job, on).await?;
533                }
534                None => {
535                    queue.token_swap_request_queue.push(job).await?;
536                }
537            }
538
539            Ok(())
540        }
541
542        async fn produce_relayer_health_check_job(
543            &self,
544            relayer_health_check_job: RelayerHealthCheck,
545            scheduled_on: Option<i64>,
546        ) -> Result<(), JobProducerError> {
547            let mut queue = self.queue.lock().await;
548            let job = Job::new(JobType::RelayerHealthCheck, relayer_health_check_job)
549                .with_scheduled_on(scheduled_on);
550
551            match scheduled_on {
552                Some(scheduled_on) => {
553                    queue
554                        .relayer_health_check_queue
555                        .schedule(job, scheduled_on)
556                        .await?;
557                }
558                None => {
559                    queue.relayer_health_check_queue.push(job).await?;
560                }
561            }
562
563            Ok(())
564        }
565    }
566
567    #[tokio::test]
568    async fn test_job_producer_operations() {
569        let producer = TestJobProducer::new();
570
571        // Test transaction request job
572        let request = TransactionRequest::new("tx123", "relayer-1");
573        let result = producer
574            .produce_transaction_request_job(request, None)
575            .await;
576        assert!(result.is_ok());
577
578        let queue = producer.get_queue().await;
579        assert!(queue.transaction_request_queue.push_called);
580
581        // Test scheduled job
582        let producer = TestJobProducer::new();
583        let request = TransactionRequest::new("tx123", "relayer-1");
584        let scheduled_timestamp = calculate_scheduled_timestamp(10); // Schedule for 10 seconds from now
585        let result = producer
586            .produce_transaction_request_job(request, Some(scheduled_timestamp))
587            .await;
588        assert!(result.is_ok());
589
590        let queue = producer.get_queue().await;
591        assert!(queue.transaction_request_queue.schedule_called);
592    }
593
594    #[tokio::test]
595    async fn test_submit_transaction_job() {
596        let producer = TestJobProducer::new();
597
598        // Test submit transaction job
599        let submit_job = TransactionSend::submit("tx123", "relayer-1");
600        let result = producer
601            .produce_submit_transaction_job(submit_job, None)
602            .await;
603        assert!(result.is_ok());
604
605        let queue = producer.get_queue().await;
606        assert!(queue.transaction_submission_queue.push_called);
607    }
608
609    #[tokio::test]
610    async fn test_check_status_job() {
611        use crate::models::NetworkType;
612        let producer = TestJobProducer::new();
613
614        // Test status check job for EVM
615        let status_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm);
616        let result = producer
617            .produce_check_transaction_status_job(status_job, None)
618            .await;
619        assert!(result.is_ok());
620
621        let queue = producer.get_queue().await;
622        assert!(queue.transaction_status_queue_evm.push_called);
623    }
624
625    #[tokio::test]
626    async fn test_notification_job() {
627        let producer = TestJobProducer::new();
628
629        // Create a simple notification for testing
630        let notification = WebhookNotification::new(
631            "test_event".to_string(),
632            WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
633                EvmTransactionResponse {
634                    id: "tx123".to_string(),
635                    hash: Some("0x123".to_string()),
636                    status: TransactionStatus::Confirmed,
637                    status_reason: None,
638                    created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
639                    sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
640                    confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
641                    gas_price: Some(1000000000),
642                    gas_limit: Some(21000),
643                    nonce: Some(1),
644                    value: U256::from(1000000000000000000_u64),
645                    from: "0xabc".to_string(),
646                    to: Some("0xdef".to_string()),
647                    relayer_id: "relayer-1".to_string(),
648                    data: None,
649                    max_fee_per_gas: None,
650                    max_priority_fee_per_gas: None,
651                    signature: None,
652                    speed: None,
653                },
654            ))),
655        );
656        let job = NotificationSend::new("notification-1".to_string(), notification);
657
658        let result = producer.produce_send_notification_job(job, None).await;
659        assert!(result.is_ok());
660
661        let queue = producer.get_queue().await;
662        assert!(queue.notification_queue.push_called);
663    }
664
665    #[tokio::test]
666    async fn test_relayer_health_check_job() {
667        let producer = TestJobProducer::new();
668
669        // Test immediate health check job
670        let health_check = RelayerHealthCheck::new("relayer-1".to_string());
671        let result = producer
672            .produce_relayer_health_check_job(health_check, None)
673            .await;
674        assert!(result.is_ok());
675
676        let queue = producer.get_queue().await;
677        assert!(queue.relayer_health_check_queue.push_called);
678
679        // Test scheduled health check job
680        let producer = TestJobProducer::new();
681        let health_check = RelayerHealthCheck::new("relayer-1".to_string());
682        let scheduled_timestamp = calculate_scheduled_timestamp(60);
683        let result = producer
684            .produce_relayer_health_check_job(health_check, Some(scheduled_timestamp))
685            .await;
686        assert!(result.is_ok());
687
688        let queue = producer.get_queue().await;
689        assert!(queue.relayer_health_check_queue.schedule_called);
690    }
691
692    #[test]
693    fn test_job_producer_error_conversion() {
694        // Test error conversion preserves original error message
695        let job_error = JobProducerError::QueueError("Test error".to_string());
696        let relayer_error: RelayerError = job_error.into();
697
698        match relayer_error {
699            RelayerError::QueueError(msg) => {
700                assert_eq!(msg, "Queue error: Test error");
701            }
702            _ => panic!("Unexpected error type"),
703        }
704    }
705
706    #[tokio::test]
707    async fn test_get_queue() {
708        let producer = TestJobProducer::new();
709
710        // Get the queue
711        let queue = producer.get_queue().await;
712
713        // Verify the queue is valid and has the expected structure
714        assert!(!queue.transaction_request_queue.push_called);
715        assert!(!queue.transaction_request_queue.schedule_called);
716        assert!(!queue.transaction_submission_queue.push_called);
717        assert!(!queue.notification_queue.push_called);
718        assert!(!queue.token_swap_request_queue.push_called);
719        assert!(!queue.relayer_health_check_queue.push_called);
720    }
721
722    #[tokio::test]
723    async fn test_produce_relayer_health_check_job_immediate() {
724        let producer = TestJobProducer::new();
725
726        // Test immediate health check job (no scheduling)
727        let health_check = RelayerHealthCheck::new("relayer-1".to_string());
728        let result = producer
729            .produce_relayer_health_check_job(health_check, None)
730            .await;
731
732        // Should succeed
733        assert!(result.is_ok());
734
735        // Verify the job was pushed (not scheduled)
736        let queue = producer.get_queue().await;
737        assert!(queue.relayer_health_check_queue.push_called);
738        assert!(!queue.relayer_health_check_queue.schedule_called);
739
740        // Other queues should not be affected
741        assert!(!queue.transaction_request_queue.push_called);
742        assert!(!queue.transaction_submission_queue.push_called);
743        assert!(!queue.transaction_status_queue.push_called);
744        assert!(!queue.notification_queue.push_called);
745        assert!(!queue.token_swap_request_queue.push_called);
746    }
747
748    #[tokio::test]
749    async fn test_produce_relayer_health_check_job_scheduled() {
750        let producer = TestJobProducer::new();
751
752        // Test scheduled health check job
753        let health_check = RelayerHealthCheck::new("relayer-2".to_string());
754        let scheduled_timestamp = calculate_scheduled_timestamp(300); // 5 minutes from now
755        let result = producer
756            .produce_relayer_health_check_job(health_check, Some(scheduled_timestamp))
757            .await;
758
759        // Should succeed
760        assert!(result.is_ok());
761
762        // Verify the job was scheduled (not pushed)
763        let queue = producer.get_queue().await;
764        assert!(queue.relayer_health_check_queue.schedule_called);
765        assert!(!queue.relayer_health_check_queue.push_called);
766
767        // Other queues should not be affected
768        assert!(!queue.transaction_request_queue.push_called);
769        assert!(!queue.transaction_submission_queue.push_called);
770        assert!(!queue.transaction_status_queue.push_called);
771        assert!(!queue.notification_queue.push_called);
772        assert!(!queue.token_swap_request_queue.push_called);
773    }
774
775    #[tokio::test]
776    async fn test_produce_relayer_health_check_job_multiple_relayers() {
777        let producer = TestJobProducer::new();
778
779        // Produce health check jobs for multiple relayers
780        let relayer_ids = vec!["relayer-1", "relayer-2", "relayer-3"];
781
782        for relayer_id in &relayer_ids {
783            let health_check = RelayerHealthCheck::new(relayer_id.to_string());
784            let result = producer
785                .produce_relayer_health_check_job(health_check, None)
786                .await;
787            assert!(result.is_ok());
788        }
789
790        // Verify jobs were produced
791        let queue = producer.get_queue().await;
792        assert!(queue.relayer_health_check_queue.push_called);
793    }
794
795    #[tokio::test]
796    async fn test_status_check_routes_to_evm_queue() {
797        use crate::models::NetworkType;
798        let producer = TestJobProducer::new();
799
800        let status_job = TransactionStatusCheck::new("tx-evm", "relayer-1", NetworkType::Evm);
801        let result = producer
802            .produce_check_transaction_status_job(status_job, None)
803            .await;
804
805        assert!(result.is_ok());
806        let queue = producer.get_queue().await;
807        assert!(queue.transaction_status_queue_evm.push_called);
808        assert!(!queue.transaction_status_queue_stellar.push_called);
809        assert!(!queue.transaction_status_queue.push_called);
810    }
811
812    #[tokio::test]
813    async fn test_status_check_routes_to_stellar_queue() {
814        use crate::models::NetworkType;
815        let producer = TestJobProducer::new();
816
817        let status_job =
818            TransactionStatusCheck::new("tx-stellar", "relayer-2", NetworkType::Stellar);
819        let result = producer
820            .produce_check_transaction_status_job(status_job, None)
821            .await;
822
823        assert!(result.is_ok());
824        let queue = producer.get_queue().await;
825        assert!(queue.transaction_status_queue_stellar.push_called);
826        assert!(!queue.transaction_status_queue_evm.push_called);
827        assert!(!queue.transaction_status_queue.push_called);
828    }
829
830    #[tokio::test]
831    async fn test_status_check_routes_to_default_queue_for_solana() {
832        use crate::models::NetworkType;
833        let producer = TestJobProducer::new();
834
835        let status_job = TransactionStatusCheck::new("tx-solana", "relayer-3", NetworkType::Solana);
836        let result = producer
837            .produce_check_transaction_status_job(status_job, None)
838            .await;
839
840        assert!(result.is_ok());
841        let queue = producer.get_queue().await;
842        assert!(queue.transaction_status_queue.push_called);
843        assert!(!queue.transaction_status_queue_evm.push_called);
844        assert!(!queue.transaction_status_queue_stellar.push_called);
845    }
846
847    #[tokio::test]
848    async fn test_status_check_scheduled_evm() {
849        use crate::models::NetworkType;
850        let producer = TestJobProducer::new();
851
852        let status_job =
853            TransactionStatusCheck::new("tx-evm-scheduled", "relayer-1", NetworkType::Evm);
854        let scheduled_timestamp = calculate_scheduled_timestamp(30);
855        let result = producer
856            .produce_check_transaction_status_job(status_job, Some(scheduled_timestamp))
857            .await;
858
859        assert!(result.is_ok());
860        let queue = producer.get_queue().await;
861        assert!(queue.transaction_status_queue_evm.schedule_called);
862        assert!(!queue.transaction_status_queue_evm.push_called);
863    }
864
865    #[tokio::test]
866    async fn test_scheduled_status_check_persists_available_at() {
867        use crate::models::NetworkType;
868        let producer = TestJobProducer::new();
869
870        let status_job =
871            TransactionStatusCheck::new("tx-evm-scheduled", "relayer-1", NetworkType::Evm);
872        let scheduled_timestamp = calculate_scheduled_timestamp(30);
873        producer
874            .produce_check_transaction_status_job(status_job, Some(scheduled_timestamp))
875            .await
876            .unwrap();
877
878        let queue = producer.get_queue().await;
879        let stored_job = queue
880            .transaction_status_queue_evm
881            .last_job
882            .expect("scheduled status check should be stored");
883        let expected_available_at = scheduled_timestamp.to_string();
884
885        assert_eq!(
886            stored_job.available_at.as_deref(),
887            Some(expected_available_at.as_str())
888        );
889        assert_eq!(
890            queue.transaction_status_queue_evm.last_scheduled_timestamp,
891            Some(scheduled_timestamp)
892        );
893    }
894
895    #[tokio::test]
896    async fn test_submit_transaction_scheduled() {
897        let producer = TestJobProducer::new();
898
899        let submit_job = TransactionSend::submit("tx-scheduled", "relayer-1");
900        let scheduled_timestamp = calculate_scheduled_timestamp(15);
901        let result = producer
902            .produce_submit_transaction_job(submit_job, Some(scheduled_timestamp))
903            .await;
904
905        assert!(result.is_ok());
906        let queue = producer.get_queue().await;
907        assert!(queue.transaction_submission_queue.schedule_called);
908        assert!(!queue.transaction_submission_queue.push_called);
909    }
910
911    #[tokio::test]
912    async fn test_scheduled_submit_job_persists_available_at() {
913        let producer = TestJobProducer::new();
914
915        let submit_job = TransactionSend::submit("tx-scheduled", "relayer-1");
916        let scheduled_timestamp = calculate_scheduled_timestamp(15);
917        producer
918            .produce_submit_transaction_job(submit_job, Some(scheduled_timestamp))
919            .await
920            .unwrap();
921
922        let queue = producer.get_queue().await;
923        let stored_job = queue
924            .transaction_submission_queue
925            .last_job
926            .expect("scheduled submission should be stored");
927        let expected_available_at = scheduled_timestamp.to_string();
928
929        assert_eq!(
930            stored_job.available_at.as_deref(),
931            Some(expected_available_at.as_str())
932        );
933    }
934
935    #[tokio::test]
936    async fn test_notification_job_scheduled() {
937        let producer = TestJobProducer::new();
938
939        let notification = WebhookNotification::new(
940            "test_scheduled_event".to_string(),
941            WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
942                EvmTransactionResponse {
943                    id: "tx-notify-scheduled".to_string(),
944                    hash: Some("0xabc123".to_string()),
945                    status: TransactionStatus::Confirmed,
946                    status_reason: None,
947                    created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
948                    sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
949                    confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
950                    gas_price: Some(1000000000),
951                    gas_limit: Some(21000),
952                    nonce: Some(1),
953                    value: U256::from(1000000000000000000_u64),
954                    from: "0xabc".to_string(),
955                    to: Some("0xdef".to_string()),
956                    relayer_id: "relayer-1".to_string(),
957                    data: None,
958                    max_fee_per_gas: None,
959                    max_priority_fee_per_gas: None,
960                    signature: None,
961                    speed: None,
962                },
963            ))),
964        );
965        let job = NotificationSend::new("notification-scheduled".to_string(), notification);
966
967        let scheduled_timestamp = calculate_scheduled_timestamp(5);
968        let result = producer
969            .produce_send_notification_job(job, Some(scheduled_timestamp))
970            .await;
971
972        assert!(result.is_ok());
973        let queue = producer.get_queue().await;
974        assert!(queue.notification_queue.schedule_called);
975        assert!(!queue.notification_queue.push_called);
976    }
977
978    #[tokio::test]
979    async fn test_solana_swap_job_immediate() {
980        let producer = TestJobProducer::new();
981
982        let swap_job = TokenSwapRequest::new("relayer-solana".to_string());
983        let result = producer
984            .produce_token_swap_request_job(swap_job, None)
985            .await;
986
987        assert!(result.is_ok());
988        let queue = producer.get_queue().await;
989        assert!(queue.token_swap_request_queue.push_called);
990        assert!(!queue.token_swap_request_queue.schedule_called);
991    }
992
993    #[tokio::test]
994    async fn test_token_swap_job_scheduled() {
995        let producer = TestJobProducer::new();
996
997        let swap_job = TokenSwapRequest::new("relayer-solana".to_string());
998        let scheduled_timestamp = calculate_scheduled_timestamp(20);
999        let result = producer
1000            .produce_token_swap_request_job(swap_job, Some(scheduled_timestamp))
1001            .await;
1002
1003        assert!(result.is_ok());
1004        let queue = producer.get_queue().await;
1005        assert!(queue.token_swap_request_queue.schedule_called);
1006        assert!(!queue.token_swap_request_queue.push_called);
1007    }
1008
1009    #[tokio::test]
1010    async fn test_transaction_send_cancel_job() {
1011        let producer = TestJobProducer::new();
1012
1013        let cancel_job = TransactionSend::cancel("tx-cancel", "relayer-1", "user requested");
1014        let result = producer
1015            .produce_submit_transaction_job(cancel_job, None)
1016            .await;
1017
1018        assert!(result.is_ok());
1019        let queue = producer.get_queue().await;
1020        assert!(queue.transaction_submission_queue.push_called);
1021    }
1022
1023    #[tokio::test]
1024    async fn test_transaction_send_resubmit_job() {
1025        let producer = TestJobProducer::new();
1026
1027        let resubmit_job = TransactionSend::resubmit("tx-resubmit", "relayer-1");
1028        let result = producer
1029            .produce_submit_transaction_job(resubmit_job, None)
1030            .await;
1031
1032        assert!(result.is_ok());
1033        let queue = producer.get_queue().await;
1034        assert!(queue.transaction_submission_queue.push_called);
1035    }
1036
1037    #[tokio::test]
1038    async fn test_transaction_send_resend_job() {
1039        let producer = TestJobProducer::new();
1040
1041        let resend_job = TransactionSend::resend("tx-resend", "relayer-1");
1042        let result = producer
1043            .produce_submit_transaction_job(resend_job, None)
1044            .await;
1045
1046        assert!(result.is_ok());
1047        let queue = producer.get_queue().await;
1048        assert!(queue.transaction_submission_queue.push_called);
1049    }
1050
1051    #[tokio::test]
1052    async fn test_multiple_jobs_different_queues() {
1053        let producer = TestJobProducer::new();
1054
1055        // Produce different types of jobs
1056        let request = TransactionRequest::new("tx1", "relayer-1");
1057        producer
1058            .produce_transaction_request_job(request, None)
1059            .await
1060            .unwrap();
1061
1062        let submit = TransactionSend::submit("tx2", "relayer-1");
1063        producer
1064            .produce_submit_transaction_job(submit, None)
1065            .await
1066            .unwrap();
1067
1068        use crate::models::NetworkType;
1069        let status = TransactionStatusCheck::new("tx3", "relayer-1", NetworkType::Evm);
1070        producer
1071            .produce_check_transaction_status_job(status, None)
1072            .await
1073            .unwrap();
1074
1075        // Verify all queues were used
1076        let queue = producer.get_queue().await;
1077        assert!(queue.transaction_request_queue.push_called);
1078        assert!(queue.transaction_submission_queue.push_called);
1079        assert!(queue.transaction_status_queue_evm.push_called);
1080    }
1081
1082    #[test]
1083    fn test_job_producer_clone() {
1084        let producer = TestJobProducer::new();
1085        let cloned_producer = producer.clone();
1086
1087        // Both should be valid instances
1088        // The clone creates a new Mutex with a cloned Queue
1089        assert!(std::ptr::addr_of!(producer) != std::ptr::addr_of!(cloned_producer));
1090    }
1091
1092    #[tokio::test]
1093    async fn test_transaction_request_with_metadata() {
1094        let producer = TestJobProducer::new();
1095
1096        let mut metadata = std::collections::HashMap::new();
1097        metadata.insert("retry_count".to_string(), "3".to_string());
1098
1099        let request = TransactionRequest::new("tx-meta", "relayer-1").with_metadata(metadata);
1100
1101        let result = producer
1102            .produce_transaction_request_job(request, None)
1103            .await;
1104
1105        assert!(result.is_ok());
1106        let queue = producer.get_queue().await;
1107        assert!(queue.transaction_request_queue.push_called);
1108    }
1109
1110    #[tokio::test]
1111    async fn test_status_check_with_metadata() {
1112        use crate::models::NetworkType;
1113        let producer = TestJobProducer::new();
1114
1115        let mut metadata = std::collections::HashMap::new();
1116        metadata.insert("attempt".to_string(), "2".to_string());
1117
1118        let status =
1119            TransactionStatusCheck::new("tx-status-meta", "relayer-1", NetworkType::Stellar)
1120                .with_metadata(metadata);
1121
1122        let result = producer
1123            .produce_check_transaction_status_job(status, None)
1124            .await;
1125
1126        assert!(result.is_ok());
1127        let queue = producer.get_queue().await;
1128        assert!(queue.transaction_status_queue_stellar.push_called);
1129    }
1130
1131    #[tokio::test]
1132    async fn test_scheduled_jobs_with_different_delays() {
1133        let producer = TestJobProducer::new();
1134
1135        // Test with various scheduling delays
1136        let delays = [1, 10, 60, 300, 3600]; // 1s, 10s, 1m, 5m, 1h
1137
1138        for (idx, delay) in delays.iter().enumerate() {
1139            let request = TransactionRequest::new(format!("tx-delay-{idx}"), "relayer-1");
1140            let timestamp = calculate_scheduled_timestamp(*delay);
1141
1142            let result = producer
1143                .produce_transaction_request_job(request, Some(timestamp))
1144                .await;
1145
1146            assert!(result.is_ok(), "Failed to schedule job with delay {delay}");
1147        }
1148    }
1149
1150    #[test]
1151    fn test_job_producer_error_display() {
1152        let error = JobProducerError::QueueError("Test queue error".to_string());
1153        let error_string = error.to_string();
1154
1155        assert!(error_string.contains("Queue error"));
1156        assert!(error_string.contains("Test queue error"));
1157    }
1158
1159    #[test]
1160    fn test_job_producer_error_to_relayer_error() {
1161        // Test error conversion preserves original error message
1162        let job_error = JobProducerError::QueueError("Connection failed".to_string());
1163        let relayer_error: RelayerError = job_error.into();
1164
1165        match relayer_error {
1166            RelayerError::QueueError(msg) => {
1167                assert_eq!(msg, "Queue error: Connection failed");
1168            }
1169            _ => panic!("Expected QueueError variant"),
1170        }
1171    }
1172}