openzeppelin_relayer/jobs/handlers/
transaction_submission_handler.rs

1//! Transaction submission handler for processing submission jobs.
2//!
3//! Handles the submission of prepared transactions to networks:
4//! - Submits transactions to appropriate networks
5//! - Handles different submission commands (Submit, Cancel, Resubmit)
6//! - Updates transaction status after submission
7//! - Enqueues status monitoring jobs
8use actix_web::web::ThinData;
9use chrono::Utc;
10use eyre::Result;
11use tracing::{debug, info, instrument};
12
13use crate::{
14    constants::{
15        WORKER_TRANSACTION_CANCEL_RETRIES, WORKER_TRANSACTION_RESEND_RETRIES,
16        WORKER_TRANSACTION_RESUBMIT_RETRIES, WORKER_TRANSACTION_SUBMIT_RETRIES,
17    },
18    domain::{get_relayer_transaction, get_transaction_by_id, Transaction},
19    jobs::{handle_result, Job, TransactionCommand, TransactionSend},
20    metrics::{observe_processing_time, STAGE_SUBMISSION_QUEUE_DWELL, STAGE_SUBMIT_DURATION},
21    models::DefaultAppState,
22    observability::request_id::set_request_id,
23    queues::{HandlerError, WorkerContext},
24};
25
26#[instrument(
27    level = "info",
28    skip(job, state, ctx),
29    fields(
30        request_id = ?job.request_id,
31        job_id = %job.message_id,
32        job_type = %job.job_type.to_string(),
33        attempt = %ctx.attempt,
34        tx_id = %job.data.transaction_id,
35        relayer_id = %job.data.relayer_id,
36        task_id = %ctx.task_id,
37        command = ?job.data.command,
38    )
39)]
40pub async fn transaction_submission_handler(
41    job: Job<TransactionSend>,
42    state: ThinData<DefaultAppState>,
43    ctx: WorkerContext,
44) -> Result<(), HandlerError> {
45    if let Some(request_id) = job.request_id.clone() {
46        set_request_id(request_id);
47    }
48
49    debug!(
50        tx_id = %job.data.transaction_id,
51        relayer_id = %job.data.relayer_id,
52        "handling transaction submission"
53    );
54
55    let command = job.data.command.clone();
56    let job_timestamp = job.timestamp.clone();
57    let result = handle_request(job.data, &state, &job_timestamp).await;
58
59    // Handle result with command-specific retry logic
60    handle_result(
61        result,
62        &ctx,
63        "Transaction Submission",
64        get_max_retries(&command),
65    )
66}
67
68/// Get max retry count based on command type
69fn get_max_retries(command: &TransactionCommand) -> usize {
70    match command {
71        TransactionCommand::Submit => WORKER_TRANSACTION_SUBMIT_RETRIES,
72        TransactionCommand::Resubmit => WORKER_TRANSACTION_RESUBMIT_RETRIES,
73        TransactionCommand::Cancel { .. } => WORKER_TRANSACTION_CANCEL_RETRIES,
74        TransactionCommand::Resend => WORKER_TRANSACTION_RESEND_RETRIES,
75    }
76}
77
78async fn handle_request(
79    status_request: TransactionSend,
80    state: &ThinData<DefaultAppState>,
81    job_timestamp: &str,
82) -> Result<()> {
83    let relayer_transaction =
84        get_relayer_transaction(status_request.relayer_id.clone(), state).await?;
85
86    let transaction = get_transaction_by_id(status_request.transaction_id, state).await?;
87
88    // Capture transaction info for completion log
89    let tx_id = transaction.id.clone();
90    let relayer_id = transaction.relayer_id.clone();
91    let network_type = transaction.network_type.to_string();
92    let command = status_request.command.clone();
93
94    // Measure submission queue dwell time using the Job's creation timestamp.
95    // This is the time between when the submit job was enqueued (after prepare)
96    // and when this handler starts processing it.
97    if let Ok(enqueued_epoch) = job_timestamp.parse::<i64>() {
98        let now_ms = Utc::now().timestamp_millis();
99        let enqueued_ms = enqueued_epoch * 1000; // Job.timestamp is whole seconds
100        let dwell_secs = (now_ms - enqueued_ms).max(0) as f64 / 1000.0;
101        observe_processing_time(
102            &relayer_id,
103            &network_type,
104            STAGE_SUBMISSION_QUEUE_DWELL,
105            dwell_secs,
106        );
107    }
108
109    debug!(
110        tx_id = %transaction.id,
111        relayer_id = %transaction.relayer_id,
112        status = ?transaction.status,
113        "loaded transaction for submission"
114    );
115
116    let submit_start = std::time::Instant::now();
117
118    match status_request.command {
119        TransactionCommand::Submit => {
120            relayer_transaction.submit_transaction(transaction).await?;
121        }
122        TransactionCommand::Cancel { reason } => {
123            info!(
124                tx_id = %tx_id,
125                relayer_id = %relayer_id,
126                status_reason = %reason,
127                "cancelling transaction"
128            );
129            relayer_transaction.submit_transaction(transaction).await?;
130        }
131        TransactionCommand::Resubmit => {
132            debug!(
133                tx_id = %tx_id,
134                relayer_id = %relayer_id,
135                "resubmitting transaction with updated parameters"
136            );
137            relayer_transaction
138                .resubmit_transaction(transaction)
139                .await?;
140        }
141        TransactionCommand::Resend => {
142            debug!(
143                tx_id = %tx_id,
144                relayer_id = %relayer_id,
145                "resending transaction"
146            );
147            relayer_transaction.submit_transaction(transaction).await?;
148        }
149    };
150
151    let submit_duration = submit_start.elapsed().as_secs_f64();
152    observe_processing_time(
153        &relayer_id,
154        &network_type,
155        STAGE_SUBMIT_DURATION,
156        submit_duration,
157    );
158
159    debug!(
160        tx_id = %tx_id,
161        relayer_id = %relayer_id,
162        command = ?command,
163        submit_duration_ms = submit_start.elapsed().as_millis(),
164        "transaction submission completed"
165    );
166
167    Ok(())
168}
169
170#[cfg(test)]
171mod tests {
172    use super::*;
173    use std::collections::HashMap;
174
175    #[tokio::test]
176    async fn test_submission_handler_job_validation() {
177        // Create a job with Submit command
178        let submit_job = TransactionSend::submit("tx123", "relayer-1");
179        let job = Job::new(crate::jobs::JobType::TransactionSend, submit_job);
180
181        // Validate the job data
182        match job.data.command {
183            TransactionCommand::Submit => {}
184            _ => panic!("Expected Submit command"),
185        }
186        assert_eq!(job.data.transaction_id, "tx123");
187        assert_eq!(job.data.relayer_id, "relayer-1");
188        assert!(job.data.metadata.is_none());
189
190        // Create a job with Cancel command
191        let cancel_job = TransactionSend::cancel("tx123", "relayer-1", "user requested");
192        let job = Job::new(crate::jobs::JobType::TransactionSend, cancel_job);
193
194        // Validate the job data
195        match job.data.command {
196            TransactionCommand::Cancel { reason } => {
197                assert_eq!(reason, "user requested");
198            }
199            _ => panic!("Expected Cancel command"),
200        }
201    }
202
203    #[tokio::test]
204    async fn test_submission_job_with_metadata() {
205        // Create a job with metadata
206        let mut metadata = HashMap::new();
207        metadata.insert("gas_price".to_string(), "20000000000".to_string());
208
209        let submit_job =
210            TransactionSend::submit("tx123", "relayer-1").with_metadata(metadata.clone());
211
212        // Validate the metadata
213        assert!(submit_job.metadata.is_some());
214        let job_metadata = submit_job.metadata.unwrap();
215        assert_eq!(job_metadata.get("gas_price").unwrap(), "20000000000");
216    }
217
218    mod get_max_retries_tests {
219        use super::*;
220
221        #[test]
222        fn test_submit_command_retries() {
223            let command = TransactionCommand::Submit;
224            let retries = get_max_retries(&command);
225
226            assert_eq!(
227                retries, WORKER_TRANSACTION_SUBMIT_RETRIES,
228                "Submit command should use WORKER_TRANSACTION_SUBMIT_RETRIES"
229            );
230        }
231
232        #[test]
233        fn test_resubmit_command_retries() {
234            let command = TransactionCommand::Resubmit;
235            let retries = get_max_retries(&command);
236
237            assert_eq!(
238                retries, WORKER_TRANSACTION_RESUBMIT_RETRIES,
239                "Resubmit command should use WORKER_TRANSACTION_RESUBMIT_RETRIES"
240            );
241        }
242
243        #[test]
244        fn test_cancel_command_retries() {
245            let command = TransactionCommand::Cancel {
246                reason: "test cancel".to_string(),
247            };
248            let retries = get_max_retries(&command);
249
250            assert_eq!(
251                retries, WORKER_TRANSACTION_CANCEL_RETRIES,
252                "Cancel command should use WORKER_TRANSACTION_CANCEL_RETRIES"
253            );
254        }
255
256        #[test]
257        fn test_resend_command_retries() {
258            let command = TransactionCommand::Resend;
259            let retries = get_max_retries(&command);
260
261            assert_eq!(
262                retries, WORKER_TRANSACTION_RESEND_RETRIES,
263                "Resend command should use WORKER_TRANSACTION_RESEND_RETRIES"
264            );
265        }
266    }
267}