openzeppelin_relayer/jobs/handlers/
transaction_submission_handler.rs1use actix_web::web::ThinData;
9use chrono::Utc;
10use eyre::Result;
11use tracing::{debug, info, instrument};
12
13use crate::{
14 constants::{
15 WORKER_TRANSACTION_CANCEL_RETRIES, WORKER_TRANSACTION_RESEND_RETRIES,
16 WORKER_TRANSACTION_RESUBMIT_RETRIES, WORKER_TRANSACTION_SUBMIT_RETRIES,
17 },
18 domain::{get_relayer_transaction, get_transaction_by_id, Transaction},
19 jobs::{handle_result, Job, TransactionCommand, TransactionSend},
20 metrics::{observe_processing_time, STAGE_SUBMISSION_QUEUE_DWELL, STAGE_SUBMIT_DURATION},
21 models::DefaultAppState,
22 observability::request_id::set_request_id,
23 queues::{HandlerError, WorkerContext},
24};
25
26#[instrument(
27 level = "info",
28 skip(job, state, ctx),
29 fields(
30 request_id = ?job.request_id,
31 job_id = %job.message_id,
32 job_type = %job.job_type.to_string(),
33 attempt = %ctx.attempt,
34 tx_id = %job.data.transaction_id,
35 relayer_id = %job.data.relayer_id,
36 task_id = %ctx.task_id,
37 command = ?job.data.command,
38 )
39)]
40pub async fn transaction_submission_handler(
41 job: Job<TransactionSend>,
42 state: ThinData<DefaultAppState>,
43 ctx: WorkerContext,
44) -> Result<(), HandlerError> {
45 if let Some(request_id) = job.request_id.clone() {
46 set_request_id(request_id);
47 }
48
49 debug!(
50 tx_id = %job.data.transaction_id,
51 relayer_id = %job.data.relayer_id,
52 "handling transaction submission"
53 );
54
55 let command = job.data.command.clone();
56 let job_timestamp = job.timestamp.clone();
57 let result = handle_request(job.data, &state, &job_timestamp).await;
58
59 handle_result(
61 result,
62 &ctx,
63 "Transaction Submission",
64 get_max_retries(&command),
65 )
66}
67
68fn get_max_retries(command: &TransactionCommand) -> usize {
70 match command {
71 TransactionCommand::Submit => WORKER_TRANSACTION_SUBMIT_RETRIES,
72 TransactionCommand::Resubmit => WORKER_TRANSACTION_RESUBMIT_RETRIES,
73 TransactionCommand::Cancel { .. } => WORKER_TRANSACTION_CANCEL_RETRIES,
74 TransactionCommand::Resend => WORKER_TRANSACTION_RESEND_RETRIES,
75 }
76}
77
78async fn handle_request(
79 status_request: TransactionSend,
80 state: &ThinData<DefaultAppState>,
81 job_timestamp: &str,
82) -> Result<()> {
83 let relayer_transaction =
84 get_relayer_transaction(status_request.relayer_id.clone(), state).await?;
85
86 let transaction = get_transaction_by_id(status_request.transaction_id, state).await?;
87
88 let tx_id = transaction.id.clone();
90 let relayer_id = transaction.relayer_id.clone();
91 let network_type = transaction.network_type.to_string();
92 let command = status_request.command.clone();
93
94 if let Ok(enqueued_epoch) = job_timestamp.parse::<i64>() {
98 let now_ms = Utc::now().timestamp_millis();
99 let enqueued_ms = enqueued_epoch * 1000; let dwell_secs = (now_ms - enqueued_ms).max(0) as f64 / 1000.0;
101 observe_processing_time(
102 &relayer_id,
103 &network_type,
104 STAGE_SUBMISSION_QUEUE_DWELL,
105 dwell_secs,
106 );
107 }
108
109 debug!(
110 tx_id = %transaction.id,
111 relayer_id = %transaction.relayer_id,
112 status = ?transaction.status,
113 "loaded transaction for submission"
114 );
115
116 let submit_start = std::time::Instant::now();
117
118 match status_request.command {
119 TransactionCommand::Submit => {
120 relayer_transaction.submit_transaction(transaction).await?;
121 }
122 TransactionCommand::Cancel { reason } => {
123 info!(
124 tx_id = %tx_id,
125 relayer_id = %relayer_id,
126 status_reason = %reason,
127 "cancelling transaction"
128 );
129 relayer_transaction.submit_transaction(transaction).await?;
130 }
131 TransactionCommand::Resubmit => {
132 debug!(
133 tx_id = %tx_id,
134 relayer_id = %relayer_id,
135 "resubmitting transaction with updated parameters"
136 );
137 relayer_transaction
138 .resubmit_transaction(transaction)
139 .await?;
140 }
141 TransactionCommand::Resend => {
142 debug!(
143 tx_id = %tx_id,
144 relayer_id = %relayer_id,
145 "resending transaction"
146 );
147 relayer_transaction.submit_transaction(transaction).await?;
148 }
149 };
150
151 let submit_duration = submit_start.elapsed().as_secs_f64();
152 observe_processing_time(
153 &relayer_id,
154 &network_type,
155 STAGE_SUBMIT_DURATION,
156 submit_duration,
157 );
158
159 debug!(
160 tx_id = %tx_id,
161 relayer_id = %relayer_id,
162 command = ?command,
163 submit_duration_ms = submit_start.elapsed().as_millis(),
164 "transaction submission completed"
165 );
166
167 Ok(())
168}
169
170#[cfg(test)]
171mod tests {
172 use super::*;
173 use std::collections::HashMap;
174
175 #[tokio::test]
176 async fn test_submission_handler_job_validation() {
177 let submit_job = TransactionSend::submit("tx123", "relayer-1");
179 let job = Job::new(crate::jobs::JobType::TransactionSend, submit_job);
180
181 match job.data.command {
183 TransactionCommand::Submit => {}
184 _ => panic!("Expected Submit command"),
185 }
186 assert_eq!(job.data.transaction_id, "tx123");
187 assert_eq!(job.data.relayer_id, "relayer-1");
188 assert!(job.data.metadata.is_none());
189
190 let cancel_job = TransactionSend::cancel("tx123", "relayer-1", "user requested");
192 let job = Job::new(crate::jobs::JobType::TransactionSend, cancel_job);
193
194 match job.data.command {
196 TransactionCommand::Cancel { reason } => {
197 assert_eq!(reason, "user requested");
198 }
199 _ => panic!("Expected Cancel command"),
200 }
201 }
202
203 #[tokio::test]
204 async fn test_submission_job_with_metadata() {
205 let mut metadata = HashMap::new();
207 metadata.insert("gas_price".to_string(), "20000000000".to_string());
208
209 let submit_job =
210 TransactionSend::submit("tx123", "relayer-1").with_metadata(metadata.clone());
211
212 assert!(submit_job.metadata.is_some());
214 let job_metadata = submit_job.metadata.unwrap();
215 assert_eq!(job_metadata.get("gas_price").unwrap(), "20000000000");
216 }
217
218 mod get_max_retries_tests {
219 use super::*;
220
221 #[test]
222 fn test_submit_command_retries() {
223 let command = TransactionCommand::Submit;
224 let retries = get_max_retries(&command);
225
226 assert_eq!(
227 retries, WORKER_TRANSACTION_SUBMIT_RETRIES,
228 "Submit command should use WORKER_TRANSACTION_SUBMIT_RETRIES"
229 );
230 }
231
232 #[test]
233 fn test_resubmit_command_retries() {
234 let command = TransactionCommand::Resubmit;
235 let retries = get_max_retries(&command);
236
237 assert_eq!(
238 retries, WORKER_TRANSACTION_RESUBMIT_RETRIES,
239 "Resubmit command should use WORKER_TRANSACTION_RESUBMIT_RETRIES"
240 );
241 }
242
243 #[test]
244 fn test_cancel_command_retries() {
245 let command = TransactionCommand::Cancel {
246 reason: "test cancel".to_string(),
247 };
248 let retries = get_max_retries(&command);
249
250 assert_eq!(
251 retries, WORKER_TRANSACTION_CANCEL_RETRIES,
252 "Cancel command should use WORKER_TRANSACTION_CANCEL_RETRIES"
253 );
254 }
255
256 #[test]
257 fn test_resend_command_retries() {
258 let command = TransactionCommand::Resend;
259 let retries = get_max_retries(&command);
260
261 assert_eq!(
262 retries, WORKER_TRANSACTION_RESEND_RETRIES,
263 "Resend command should use WORKER_TRANSACTION_RESEND_RETRIES"
264 );
265 }
266 }
267}