openzeppelin_relayer/jobs/handlers/
transaction_request_handler.rs

1//! Transaction request handler for processing incoming transaction jobs.
2//!
3//! Handles the validation and preparation of transactions before they are
4//! submitted to the network
5use actix_web::web::ThinData;
6use chrono::Utc;
7use tracing::instrument;
8
9use crate::{
10    constants::WORKER_TRANSACTION_REQUEST_RETRIES,
11    domain::{get_relayer_transaction, get_transaction_by_id, Transaction},
12    jobs::{handle_result, Job, TransactionRequest},
13    metrics::{observe_processing_time, STAGE_PREPARE_DURATION, STAGE_REQUEST_QUEUE_DWELL},
14    models::DefaultAppState,
15    observability::request_id::set_request_id,
16    queues::{HandlerError, WorkerContext},
17};
18
19#[instrument(
20    level = "debug",
21    skip(job, state, ctx),
22    fields(
23        request_id = ?job.request_id,
24        job_id = %job.message_id,
25        job_type = %job.job_type.to_string(),
26        attempt = %ctx.attempt,
27        tx_id = %job.data.transaction_id,
28        relayer_id = %job.data.relayer_id,
29        task_id = %ctx.task_id,
30    )
31)]
32pub async fn transaction_request_handler(
33    job: Job<TransactionRequest>,
34    state: ThinData<DefaultAppState>,
35    ctx: WorkerContext,
36) -> Result<(), HandlerError> {
37    if let Some(request_id) = job.request_id.clone() {
38        set_request_id(request_id);
39    }
40
41    tracing::debug!(
42        tx_id = %job.data.transaction_id,
43        relayer_id = %job.data.relayer_id,
44        "handling transaction request"
45    );
46
47    let result = handle_request(job.data, &state).await;
48
49    handle_result(
50        result,
51        &ctx,
52        "Transaction Request",
53        WORKER_TRANSACTION_REQUEST_RETRIES,
54    )
55}
56
57async fn handle_request(
58    request: TransactionRequest,
59    state: &ThinData<DefaultAppState>,
60) -> eyre::Result<()> {
61    let relayer_transaction = get_relayer_transaction(request.relayer_id, state).await?;
62
63    let transaction = get_transaction_by_id(request.transaction_id.clone(), state).await?;
64
65    // Measure time from transaction creation to request handler start.
66    // On first attempt this approximates queue dwell time. On retries it
67    // includes cumulative retry backoff since created_at is unchanged.
68    let relayer_id = transaction.relayer_id.clone();
69    let network_type = transaction.network_type.to_string();
70    if let Ok(created_time) = chrono::DateTime::parse_from_rfc3339(&transaction.created_at) {
71        let dwell_secs = (Utc::now() - created_time.with_timezone(&Utc))
72            .num_milliseconds()
73            .max(0) as f64
74            / 1000.0;
75        observe_processing_time(
76            &relayer_id,
77            &network_type,
78            STAGE_REQUEST_QUEUE_DWELL,
79            dwell_secs,
80        );
81    }
82
83    tracing::debug!(
84        tx_id = %transaction.id,
85        relayer_id = %transaction.relayer_id,
86        status = ?transaction.status,
87        "preparing transaction"
88    );
89
90    let prepare_start = std::time::Instant::now();
91    let prepared = relayer_transaction.prepare_transaction(transaction).await?;
92    let prepare_duration = prepare_start.elapsed().as_secs_f64();
93
94    observe_processing_time(
95        &relayer_id,
96        &network_type,
97        STAGE_PREPARE_DURATION,
98        prepare_duration,
99    );
100
101    tracing::debug!(
102        tx_id = %prepared.id,
103        relayer_id = %prepared.relayer_id,
104        status = ?prepared.status,
105        prepare_duration_ms = prepare_start.elapsed().as_millis(),
106        "transaction prepared"
107    );
108
109    Ok(())
110}
111
112#[cfg(test)]
113mod tests {
114    use super::*;
115    use crate::queues::WorkerContext;
116
117    #[tokio::test]
118    async fn test_handler_result_processing() {
119        let request = TransactionRequest::new("tx123", "relayer-1");
120        let job = Job::new(crate::jobs::JobType::TransactionRequest, request);
121        let ctx = WorkerContext::new(0, "test-task".into());
122
123        assert_eq!(job.data.transaction_id, "tx123");
124        assert_eq!(job.data.relayer_id, "relayer-1");
125        assert_eq!(ctx.attempt, 0);
126    }
127}