openzeppelin_relayer/jobs/
job.rs

1//! Job processing module for handling asynchronous tasks.
2//!
3//! Provides generic job structure for different types of operations:
4//! - Transaction processing
5//! - Status monitoring
6//! - Notifications
7use crate::models::{NetworkType, WebhookNotification};
8use chrono::Utc;
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use strum::Display;
12use uuid::Uuid;
13
14// Common message structure
15#[derive(Debug, Serialize, Deserialize, Clone)]
16pub struct Job<T> {
17    pub message_id: String,
18    pub version: String,
19    pub timestamp: String,
20    pub job_type: JobType,
21    pub data: T,
22    #[serde(skip_serializing_if = "Option::is_none")]
23    pub request_id: Option<String>,
24    /// Unix epoch (seconds) when this job is intended to become available for
25    /// processing. Set by the producer when `scheduled_on` is provided; absent
26    /// for immediate jobs. Consumers use this to compute queue pickup latency
27    /// that excludes intentional scheduling delay.
28    #[serde(default, skip_serializing_if = "Option::is_none")]
29    pub available_at: Option<String>,
30}
31
32impl<T> Job<T> {
33    pub fn new(job_type: JobType, data: T) -> Self {
34        Self {
35            message_id: Uuid::new_v4().to_string(),
36            version: "1.0".to_string(),
37            timestamp: Utc::now().timestamp().to_string(),
38            job_type,
39            data,
40            request_id: None,
41            available_at: None,
42        }
43    }
44    pub fn with_request_id(mut self, id: Option<String>) -> Self {
45        self.request_id = id;
46        self
47    }
48    pub fn with_scheduled_on(mut self, scheduled_on: Option<i64>) -> Self {
49        self.available_at = scheduled_on.map(|ts| ts.to_string());
50        self
51    }
52}
53
54// Enum to represent different message types
55#[derive(Debug, Serialize, Deserialize, Display, Clone)]
56#[serde(tag = "type", rename_all = "snake_case")]
57pub enum JobType {
58    TransactionRequest,
59    TransactionSend,
60    TransactionStatusCheck,
61    NotificationSend,
62    TokenSwapRequest,
63    RelayerHealthCheck,
64}
65
66// Example message data for transaction request
67#[derive(Debug, Serialize, Deserialize, Clone)]
68pub struct TransactionRequest {
69    pub transaction_id: String,
70    pub relayer_id: String,
71    /// Network type for this transaction request.
72    /// Used by SQS backend to choose the FIFO message group strategy:
73    /// EVM uses relayer_id (nonce ordering), others use transaction_id (parallelism).
74    /// Optional for backward compatibility with older queued messages.
75    #[serde(default)]
76    pub network_type: Option<NetworkType>,
77    pub metadata: Option<HashMap<String, String>>,
78}
79
80impl TransactionRequest {
81    pub fn new(transaction_id: impl Into<String>, relayer_id: impl Into<String>) -> Self {
82        Self {
83            transaction_id: transaction_id.into(),
84            relayer_id: relayer_id.into(),
85            network_type: None,
86            metadata: None,
87        }
88    }
89
90    pub fn with_network_type(mut self, network_type: NetworkType) -> Self {
91        self.network_type = Some(network_type);
92        self
93    }
94
95    pub fn with_metadata(mut self, metadata: HashMap<String, String>) -> Self {
96        self.metadata = Some(metadata);
97        self
98    }
99}
100
101#[derive(Debug, Serialize, Deserialize, Clone)]
102pub enum TransactionCommand {
103    Submit,
104    Cancel { reason: String },
105    Resubmit,
106    Resend,
107}
108
109// Example message data for order creation
110#[derive(Debug, Serialize, Deserialize, Clone)]
111pub struct TransactionSend {
112    pub transaction_id: String,
113    pub relayer_id: String,
114    pub command: TransactionCommand,
115    /// Network type for this transaction submission.
116    /// Used by SQS backend to choose the FIFO message group strategy:
117    /// EVM uses relayer_id (nonce ordering), others use transaction_id (parallelism).
118    /// Optional for backward compatibility with older queued messages.
119    #[serde(default)]
120    pub network_type: Option<NetworkType>,
121    pub metadata: Option<HashMap<String, String>>,
122}
123
124impl TransactionSend {
125    // Submit a transaction to the relayer
126    pub fn submit(transaction_id: impl Into<String>, relayer_id: impl Into<String>) -> Self {
127        Self {
128            transaction_id: transaction_id.into(),
129            relayer_id: relayer_id.into(),
130            command: TransactionCommand::Submit,
131            network_type: None,
132            metadata: None,
133        }
134    }
135
136    // Cancel a transaction
137    pub fn cancel(
138        transaction_id: impl Into<String>,
139        relayer_id: impl Into<String>,
140        reason: impl Into<String>,
141    ) -> Self {
142        Self {
143            transaction_id: transaction_id.into(),
144            relayer_id: relayer_id.into(),
145            command: TransactionCommand::Cancel {
146                reason: reason.into(),
147            },
148            network_type: None,
149            metadata: None,
150        }
151    }
152
153    // Resubmit a transaction
154    pub fn resubmit(transaction_id: impl Into<String>, relayer_id: impl Into<String>) -> Self {
155        Self {
156            transaction_id: transaction_id.into(),
157            relayer_id: relayer_id.into(),
158            command: TransactionCommand::Resubmit,
159            network_type: None,
160            metadata: None,
161        }
162    }
163
164    // Resend a transaction
165    pub fn resend(transaction_id: impl Into<String>, relayer_id: impl Into<String>) -> Self {
166        Self {
167            transaction_id: transaction_id.into(),
168            relayer_id: relayer_id.into(),
169            command: TransactionCommand::Resend,
170            network_type: None,
171            metadata: None,
172        }
173    }
174
175    // Set the network type for this transaction submission
176    pub fn with_network_type(mut self, network_type: NetworkType) -> Self {
177        self.network_type = Some(network_type);
178        self
179    }
180
181    // Set the metadata for this transaction submission
182    pub fn with_metadata(mut self, metadata: HashMap<String, String>) -> Self {
183        self.metadata = Some(metadata);
184        self
185    }
186}
187
188// Struct for individual order item
189#[derive(Debug, Serialize, Deserialize, Clone)]
190pub struct TransactionStatusCheck {
191    pub transaction_id: String,
192    pub relayer_id: String,
193    /// Network type for this transaction status check.
194    /// Optional for backward compatibility with older queued messages.
195    #[serde(default)]
196    pub network_type: Option<NetworkType>,
197    pub metadata: Option<HashMap<String, String>>,
198}
199
200impl TransactionStatusCheck {
201    // Create a new transaction status check
202    pub fn new(
203        transaction_id: impl Into<String>,
204        relayer_id: impl Into<String>,
205        network_type: NetworkType,
206    ) -> Self {
207        Self {
208            transaction_id: transaction_id.into(),
209            relayer_id: relayer_id.into(),
210            network_type: Some(network_type),
211            metadata: None,
212        }
213    }
214
215    // Set the metadata for this transaction status check
216    pub fn with_metadata(mut self, metadata: HashMap<String, String>) -> Self {
217        self.metadata = Some(metadata);
218        self
219    }
220}
221
222#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
223pub struct NotificationSend {
224    pub notification_id: String,
225    pub notification: WebhookNotification,
226}
227
228impl NotificationSend {
229    pub fn new(notification_id: String, notification: WebhookNotification) -> Self {
230        Self {
231            notification_id,
232            notification,
233        }
234    }
235}
236
237#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
238pub struct TokenSwapRequest {
239    pub relayer_id: String,
240}
241
242impl TokenSwapRequest {
243    pub fn new(relayer_id: String) -> Self {
244        Self { relayer_id }
245    }
246}
247
248#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
249pub struct RelayerHealthCheck {
250    pub relayer_id: String,
251    pub retry_count: u32,
252}
253
254impl RelayerHealthCheck {
255    pub fn new(relayer_id: String) -> Self {
256        Self {
257            relayer_id,
258            retry_count: 0,
259        }
260    }
261
262    pub fn with_retry_count(relayer_id: String, retry_count: u32) -> Self {
263        Self {
264            relayer_id,
265            retry_count,
266        }
267    }
268}
269
270#[cfg(test)]
271mod tests {
272    use std::collections::HashMap;
273    use std::str::FromStr;
274
275    use crate::models::{
276        evm::Speed, EvmTransactionDataSignature, EvmTransactionResponse, TransactionResponse,
277        TransactionStatus, WebhookNotification, WebhookPayload, U256,
278    };
279
280    use super::*;
281
282    #[test]
283    fn test_job_creation() {
284        let job_data = TransactionRequest::new("tx123", "relayer-1");
285        let job = Job::new(JobType::TransactionRequest, job_data.clone());
286
287        assert_eq!(job.job_type.to_string(), "TransactionRequest");
288        assert_eq!(job.version, "1.0");
289        assert_eq!(job.data.transaction_id, "tx123");
290        assert_eq!(job.data.relayer_id, "relayer-1");
291        assert!(job.data.metadata.is_none());
292    }
293
294    #[test]
295    fn test_transaction_request_with_metadata() {
296        let mut metadata = HashMap::new();
297        metadata.insert("chain_id".to_string(), "1".to_string());
298        metadata.insert("gas_price".to_string(), "20000000000".to_string());
299
300        let tx_request =
301            TransactionRequest::new("tx123", "relayer-1").with_metadata(metadata.clone());
302
303        assert_eq!(tx_request.transaction_id, "tx123");
304        assert_eq!(tx_request.relayer_id, "relayer-1");
305        assert!(tx_request.metadata.is_some());
306        assert_eq!(tx_request.metadata.unwrap(), metadata);
307    }
308
309    #[test]
310    fn test_transaction_send_methods() {
311        // Test submit
312        let tx_submit = TransactionSend::submit("tx123", "relayer-1");
313        assert_eq!(tx_submit.transaction_id, "tx123");
314        assert_eq!(tx_submit.relayer_id, "relayer-1");
315        matches!(tx_submit.command, TransactionCommand::Submit);
316
317        // Test cancel
318        let tx_cancel = TransactionSend::cancel("tx123", "relayer-1", "user requested");
319        matches!(tx_cancel.command, TransactionCommand::Cancel { reason } if reason == "user requested");
320
321        // Test resubmit
322        let tx_resubmit = TransactionSend::resubmit("tx123", "relayer-1");
323        matches!(tx_resubmit.command, TransactionCommand::Resubmit);
324
325        // Test resend
326        let tx_resend = TransactionSend::resend("tx123", "relayer-1");
327        matches!(tx_resend.command, TransactionCommand::Resend);
328
329        // Test with_metadata
330        let mut metadata = HashMap::new();
331        metadata.insert("nonce".to_string(), "5".to_string());
332
333        let tx_with_metadata =
334            TransactionSend::submit("tx123", "relayer-1").with_metadata(metadata.clone());
335
336        assert!(tx_with_metadata.metadata.is_some());
337        assert_eq!(tx_with_metadata.metadata.unwrap(), metadata);
338    }
339
340    #[test]
341    fn test_transaction_status_check() {
342        let tx_status = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm);
343        assert_eq!(tx_status.transaction_id, "tx123");
344        assert_eq!(tx_status.relayer_id, "relayer-1");
345        assert_eq!(tx_status.network_type, Some(NetworkType::Evm));
346        assert!(tx_status.metadata.is_none());
347
348        let mut metadata = HashMap::new();
349        metadata.insert("retries".to_string(), "3".to_string());
350
351        let tx_status_with_metadata =
352            TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Stellar)
353                .with_metadata(metadata.clone());
354
355        assert!(tx_status_with_metadata.metadata.is_some());
356        assert_eq!(tx_status_with_metadata.metadata.unwrap(), metadata);
357    }
358
359    #[test]
360    fn test_transaction_status_check_backward_compatibility() {
361        // Simulate an old message without network_type field
362        let old_json = r#"{
363            "transaction_id": "tx456",
364            "relayer_id": "relayer-2",
365            "metadata": null
366        }"#;
367
368        // Should deserialize successfully with network_type defaulting to None
369        let deserialized: TransactionStatusCheck = serde_json::from_str(old_json).unwrap();
370        assert_eq!(deserialized.transaction_id, "tx456");
371        assert_eq!(deserialized.relayer_id, "relayer-2");
372        assert_eq!(deserialized.network_type, None);
373        assert!(deserialized.metadata.is_none());
374
375        // New messages should include network_type
376        let new_status = TransactionStatusCheck::new("tx789", "relayer-3", NetworkType::Solana);
377        assert_eq!(new_status.network_type, Some(NetworkType::Solana));
378    }
379
380    #[test]
381    fn test_job_serialization() {
382        let tx_request = TransactionRequest::new("tx123", "relayer-1");
383        let job = Job::new(JobType::TransactionRequest, tx_request);
384
385        let serialized = serde_json::to_string(&job).unwrap();
386        let deserialized: Job<TransactionRequest> = serde_json::from_str(&serialized).unwrap();
387
388        assert_eq!(deserialized.job_type.to_string(), "TransactionRequest");
389        assert_eq!(deserialized.data.transaction_id, "tx123");
390        assert_eq!(deserialized.data.relayer_id, "relayer-1");
391    }
392
393    #[test]
394    fn test_job_with_scheduled_on_sets_available_at() {
395        let tx_request = TransactionRequest::new("tx123", "relayer-1");
396        let job = Job::new(JobType::TransactionRequest, tx_request).with_scheduled_on(Some(12345));
397
398        assert_eq!(job.available_at.as_deref(), Some("12345"));
399    }
400
401    #[test]
402    fn test_job_serialization_preserves_available_at() {
403        let tx_request = TransactionRequest::new("tx123", "relayer-1");
404        let job = Job::new(JobType::TransactionRequest, tx_request).with_scheduled_on(Some(12345));
405
406        let serialized = serde_json::to_string(&job).unwrap();
407        let deserialized: Job<TransactionRequest> = serde_json::from_str(&serialized).unwrap();
408
409        assert_eq!(deserialized.available_at.as_deref(), Some("12345"));
410    }
411
412    #[test]
413    fn test_job_serialization_omits_available_at_when_not_scheduled() {
414        let tx_request = TransactionRequest::new("tx123", "relayer-1");
415        let job = Job::new(JobType::TransactionRequest, tx_request);
416
417        let serialized = serde_json::to_string(&job).unwrap();
418
419        assert!(!serialized.contains("available_at"));
420    }
421
422    #[test]
423    fn test_notification_send_serialization() {
424        let payload = WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
425            EvmTransactionResponse {
426                id: "tx123".to_string(),
427                hash: Some("0x123".to_string()),
428                status: TransactionStatus::Confirmed,
429                status_reason: None,
430                created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
431                sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
432                confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
433                gas_price: Some(1000000000),
434                gas_limit: Some(21000),
435                nonce: Some(1),
436                value: U256::from_str("1000000000000000000").unwrap(),
437                from: "0xabc".to_string(),
438                to: Some("0xdef".to_string()),
439                relayer_id: "relayer-1".to_string(),
440                data: Some("0x123".to_string()),
441                max_fee_per_gas: Some(1000000000),
442                max_priority_fee_per_gas: Some(1000000000),
443                signature: Some(EvmTransactionDataSignature {
444                    r: "0x123".to_string(),
445                    s: "0x123".to_string(),
446                    v: 1,
447                    sig: "0x123".to_string(),
448                }),
449                speed: Some(Speed::Fast),
450            },
451        )));
452
453        let notification = WebhookNotification::new("transaction".to_string(), payload);
454        let notification_send =
455            NotificationSend::new("notification-test".to_string(), notification);
456
457        let serialized = serde_json::to_string(&notification_send).unwrap();
458
459        match serde_json::from_str::<NotificationSend>(&serialized) {
460            Ok(deserialized) => {
461                assert_eq!(notification_send, deserialized);
462            }
463            Err(e) => {
464                panic!("Deserialization error: {e}");
465            }
466        }
467    }
468
469    #[test]
470    fn test_notification_send_serialization_none_values() {
471        let payload = WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
472            EvmTransactionResponse {
473                id: "tx123".to_string(),
474                hash: None,
475                status: TransactionStatus::Confirmed,
476                status_reason: None,
477                created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
478                sent_at: None,
479                confirmed_at: None,
480                gas_price: None,
481                gas_limit: Some(21000),
482                nonce: None,
483                value: U256::from_str("1000000000000000000").unwrap(),
484                from: "0xabc".to_string(),
485                to: None,
486                relayer_id: "relayer-1".to_string(),
487                data: None,
488                max_fee_per_gas: None,
489                max_priority_fee_per_gas: None,
490                signature: None,
491                speed: None,
492            },
493        )));
494
495        let notification = WebhookNotification::new("transaction".to_string(), payload);
496        let notification_send =
497            NotificationSend::new("notification-test".to_string(), notification);
498
499        let serialized = serde_json::to_string(&notification_send).unwrap();
500
501        match serde_json::from_str::<NotificationSend>(&serialized) {
502            Ok(deserialized) => {
503                assert_eq!(notification_send, deserialized);
504            }
505            Err(e) => {
506                panic!("Deserialization error: {e}");
507            }
508        }
509    }
510
511    #[test]
512    fn test_relayer_health_check_new() {
513        let health_check = RelayerHealthCheck::new("relayer-1".to_string());
514
515        assert_eq!(health_check.relayer_id, "relayer-1");
516        assert_eq!(health_check.retry_count, 0);
517    }
518
519    #[test]
520    fn test_relayer_health_check_with_retry_count() {
521        let health_check = RelayerHealthCheck::with_retry_count("relayer-1".to_string(), 5);
522
523        assert_eq!(health_check.relayer_id, "relayer-1");
524        assert_eq!(health_check.retry_count, 5);
525    }
526
527    #[test]
528    fn test_relayer_health_check_correct_field_values() {
529        // Test with zero retry count
530        let health_check_zero = RelayerHealthCheck::new("relayer-test-123".to_string());
531        assert_eq!(health_check_zero.relayer_id, "relayer-test-123");
532        assert_eq!(health_check_zero.retry_count, 0);
533
534        // Test with specific retry count
535        let health_check_custom =
536            RelayerHealthCheck::with_retry_count("relayer-abc".to_string(), 10);
537        assert_eq!(health_check_custom.relayer_id, "relayer-abc");
538        assert_eq!(health_check_custom.retry_count, 10);
539
540        // Test with large retry count
541        let health_check_large =
542            RelayerHealthCheck::with_retry_count("relayer-xyz".to_string(), 999);
543        assert_eq!(health_check_large.relayer_id, "relayer-xyz");
544        assert_eq!(health_check_large.retry_count, 999);
545    }
546
547    #[test]
548    fn test_relayer_health_check_job_serialization() {
549        let health_check = RelayerHealthCheck::new("relayer-1".to_string());
550        let job = Job::new(JobType::RelayerHealthCheck, health_check);
551
552        let serialized = serde_json::to_string(&job).unwrap();
553        let deserialized: Job<RelayerHealthCheck> = serde_json::from_str(&serialized).unwrap();
554
555        assert_eq!(deserialized.job_type.to_string(), "RelayerHealthCheck");
556        assert_eq!(deserialized.data.relayer_id, "relayer-1");
557        assert_eq!(deserialized.data.retry_count, 0);
558    }
559
560    #[test]
561    fn test_relayer_health_check_job_serialization_with_retry_count() {
562        let health_check = RelayerHealthCheck::with_retry_count("relayer-2".to_string(), 3);
563        let job = Job::new(JobType::RelayerHealthCheck, health_check.clone());
564
565        let serialized = serde_json::to_string(&job).unwrap();
566        let deserialized: Job<RelayerHealthCheck> = serde_json::from_str(&serialized).unwrap();
567
568        assert_eq!(deserialized.job_type.to_string(), "RelayerHealthCheck");
569        assert_eq!(deserialized.data.relayer_id, health_check.relayer_id);
570        assert_eq!(deserialized.data.retry_count, health_check.retry_count);
571        assert_eq!(deserialized.data, health_check);
572    }
573
574    #[test]
575    fn test_relayer_health_check_equality_after_deserialization() {
576        let original_health_check =
577            RelayerHealthCheck::with_retry_count("relayer-test".to_string(), 7);
578        let job = Job::new(JobType::RelayerHealthCheck, original_health_check.clone());
579
580        let serialized = serde_json::to_string(&job).unwrap();
581        let deserialized: Job<RelayerHealthCheck> = serde_json::from_str(&serialized).unwrap();
582
583        // Assert job type string
584        assert_eq!(deserialized.job_type.to_string(), "RelayerHealthCheck");
585
586        // Assert data equality
587        assert_eq!(deserialized.data, original_health_check);
588        assert_eq!(
589            deserialized.data.relayer_id,
590            original_health_check.relayer_id
591        );
592        assert_eq!(
593            deserialized.data.retry_count,
594            original_health_check.retry_count
595        );
596    }
597}