openzeppelin_relayer/jobs/handlers/
transaction_request_handler.rs1use actix_web::web::ThinData;
6use chrono::Utc;
7use tracing::instrument;
8
9use crate::{
10 constants::WORKER_TRANSACTION_REQUEST_RETRIES,
11 domain::{get_relayer_transaction, get_transaction_by_id, Transaction},
12 jobs::{handle_result, Job, TransactionRequest},
13 metrics::{observe_processing_time, STAGE_PREPARE_DURATION, STAGE_REQUEST_QUEUE_DWELL},
14 models::DefaultAppState,
15 observability::request_id::set_request_id,
16 queues::{HandlerError, WorkerContext},
17};
18
19#[instrument(
20 level = "debug",
21 skip(job, state, ctx),
22 fields(
23 request_id = ?job.request_id,
24 job_id = %job.message_id,
25 job_type = %job.job_type.to_string(),
26 attempt = %ctx.attempt,
27 tx_id = %job.data.transaction_id,
28 relayer_id = %job.data.relayer_id,
29 task_id = %ctx.task_id,
30 )
31)]
32pub async fn transaction_request_handler(
33 job: Job<TransactionRequest>,
34 state: ThinData<DefaultAppState>,
35 ctx: WorkerContext,
36) -> Result<(), HandlerError> {
37 if let Some(request_id) = job.request_id.clone() {
38 set_request_id(request_id);
39 }
40
41 tracing::debug!(
42 tx_id = %job.data.transaction_id,
43 relayer_id = %job.data.relayer_id,
44 "handling transaction request"
45 );
46
47 let result = handle_request(job.data, &state).await;
48
49 handle_result(
50 result,
51 &ctx,
52 "Transaction Request",
53 WORKER_TRANSACTION_REQUEST_RETRIES,
54 )
55}
56
57async fn handle_request(
58 request: TransactionRequest,
59 state: &ThinData<DefaultAppState>,
60) -> eyre::Result<()> {
61 let relayer_transaction = get_relayer_transaction(request.relayer_id, state).await?;
62
63 let transaction = get_transaction_by_id(request.transaction_id.clone(), state).await?;
64
65 let relayer_id = transaction.relayer_id.clone();
69 let network_type = transaction.network_type.to_string();
70 if let Ok(created_time) = chrono::DateTime::parse_from_rfc3339(&transaction.created_at) {
71 let dwell_secs = (Utc::now() - created_time.with_timezone(&Utc))
72 .num_milliseconds()
73 .max(0) as f64
74 / 1000.0;
75 observe_processing_time(
76 &relayer_id,
77 &network_type,
78 STAGE_REQUEST_QUEUE_DWELL,
79 dwell_secs,
80 );
81 }
82
83 tracing::debug!(
84 tx_id = %transaction.id,
85 relayer_id = %transaction.relayer_id,
86 status = ?transaction.status,
87 "preparing transaction"
88 );
89
90 let prepare_start = std::time::Instant::now();
91 let prepared = relayer_transaction.prepare_transaction(transaction).await?;
92 let prepare_duration = prepare_start.elapsed().as_secs_f64();
93
94 observe_processing_time(
95 &relayer_id,
96 &network_type,
97 STAGE_PREPARE_DURATION,
98 prepare_duration,
99 );
100
101 tracing::debug!(
102 tx_id = %prepared.id,
103 relayer_id = %prepared.relayer_id,
104 status = ?prepared.status,
105 prepare_duration_ms = prepare_start.elapsed().as_millis(),
106 "transaction prepared"
107 );
108
109 Ok(())
110}
111
112#[cfg(test)]
113mod tests {
114 use super::*;
115 use crate::queues::WorkerContext;
116
117 #[tokio::test]
118 async fn test_handler_result_processing() {
119 let request = TransactionRequest::new("tx123", "relayer-1");
120 let job = Job::new(crate::jobs::JobType::TransactionRequest, request);
121 let ctx = WorkerContext::new(0, "test-task".into());
122
123 assert_eq!(job.data.transaction_id, "tx123");
124 assert_eq!(job.data.relayer_id, "relayer-1");
125 assert_eq!(ctx.attempt, 0);
126 }
127}