openzeppelin_relayer/queues/
queue_type.rs

1use std::fmt;
2
3use serde::{Deserialize, Serialize};
4
5use crate::constants::{
6    DEFAULT_CONCURRENCY_STATUS_CHECKER, DEFAULT_CONCURRENCY_STATUS_CHECKER_EVM,
7    WORKER_NOTIFICATION_SENDER_RETRIES, WORKER_RELAYER_HEALTH_CHECK_RETRIES,
8    WORKER_TOKEN_SWAP_REQUEST_RETRIES, WORKER_TRANSACTION_REQUEST_RETRIES,
9    WORKER_TRANSACTION_STATUS_CHECKER_RETRIES, WORKER_TRANSACTION_SUBMIT_RETRIES,
10};
11
12/// Queue types for relayer operations.
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
14pub enum QueueType {
15    TransactionRequest,
16    TransactionSubmission,
17    StatusCheck,
18    StatusCheckEvm,
19    StatusCheckStellar,
20    Notification,
21    TokenSwapRequest,
22    RelayerHealthCheck,
23}
24
25impl QueueType {
26    /// Returns the queue name for logging and identification purposes.
27    pub fn queue_name(&self) -> &'static str {
28        match self {
29            Self::TransactionRequest => "transaction-request",
30            Self::TransactionSubmission => "transaction-submission",
31            Self::StatusCheck => "status-check",
32            Self::StatusCheckEvm => "status-check-evm",
33            Self::StatusCheckStellar => "status-check-stellar",
34            Self::Notification => "notification",
35            Self::TokenSwapRequest => "token-swap-request",
36            Self::RelayerHealthCheck => "relayer-health-check",
37        }
38    }
39
40    /// Returns the Redis namespace for this queue type (Apalis format).
41    pub fn redis_namespace(&self) -> &'static str {
42        match self {
43            Self::TransactionRequest => "relayer:transaction_request",
44            Self::TransactionSubmission => "relayer:transaction_submission",
45            Self::StatusCheck => "relayer:transaction_status",
46            Self::StatusCheckEvm => "relayer:transaction_status_evm",
47            Self::StatusCheckStellar => "relayer:transaction_status_stellar",
48            Self::Notification => "relayer:notification",
49            Self::TokenSwapRequest => "relayer:token_swap_request",
50            Self::RelayerHealthCheck => "relayer:relayer_health_check",
51        }
52    }
53
54    /// Returns the maximum number of retries for this queue type.
55    pub fn max_retries(&self) -> usize {
56        match self {
57            Self::TransactionRequest => WORKER_TRANSACTION_REQUEST_RETRIES,
58            Self::TransactionSubmission => WORKER_TRANSACTION_SUBMIT_RETRIES,
59            Self::StatusCheck | Self::StatusCheckEvm | Self::StatusCheckStellar => {
60                WORKER_TRANSACTION_STATUS_CHECKER_RETRIES
61            }
62            Self::Notification => WORKER_NOTIFICATION_SENDER_RETRIES,
63            Self::TokenSwapRequest => WORKER_TOKEN_SWAP_REQUEST_RETRIES,
64            Self::RelayerHealthCheck => WORKER_RELAYER_HEALTH_CHECK_RETRIES,
65        }
66    }
67
68    /// Returns the visibility timeout in seconds for SQS (how long a worker has to finish
69    /// processing before the message becomes visible again).
70    pub fn visibility_timeout_secs(&self) -> u32 {
71        match self {
72            Self::TransactionRequest => 30,
73            Self::TransactionSubmission => 30,
74            Self::StatusCheck | Self::StatusCheckEvm => 30,
75            Self::StatusCheckStellar => 20,
76            Self::Notification => 60,
77            Self::TokenSwapRequest => 60,
78            Self::RelayerHealthCheck => 60,
79        }
80    }
81
82    /// Returns the worker name used for the concurrency environment variable.
83    pub fn concurrency_env_key(&self) -> &'static str {
84        match self {
85            Self::TransactionRequest => "transaction_request",
86            Self::TransactionSubmission => "transaction_sender",
87            Self::StatusCheck => "transaction_status_checker",
88            Self::StatusCheckEvm => "transaction_status_checker_evm",
89            Self::StatusCheckStellar => "transaction_status_checker_stellar",
90            Self::Notification => "notification_sender",
91            Self::TokenSwapRequest => "token_swap_request",
92            Self::RelayerHealthCheck => "relayer_health_check",
93        }
94    }
95
96    /// Returns the default concurrency for this queue type.
97    pub fn default_concurrency(&self) -> usize {
98        match self {
99            Self::TransactionRequest => 50,
100            Self::TransactionSubmission => 75,
101            Self::StatusCheck => DEFAULT_CONCURRENCY_STATUS_CHECKER,
102            Self::StatusCheckEvm => DEFAULT_CONCURRENCY_STATUS_CHECKER_EVM,
103            Self::StatusCheckStellar => DEFAULT_CONCURRENCY_STATUS_CHECKER,
104            Self::Notification => 30,
105            Self::TokenSwapRequest => 10,
106            Self::RelayerHealthCheck => 10,
107        }
108    }
109
110    /// Returns the default SQS long-poll wait time in seconds.
111    /// Note: SQS `WaitTimeSeconds` maximum is 20 seconds.
112    /// These defaults can be overridden via environment variables
113    /// (see `sqs_env_key()`).
114    pub fn default_wait_time_secs(&self) -> u64 {
115        match self {
116            Self::TransactionRequest => 15,
117            Self::TransactionSubmission => 15,
118            Self::StatusCheck | Self::StatusCheckEvm => 5,
119            Self::StatusCheckStellar => 3,
120            Self::Notification => 20,
121            Self::TokenSwapRequest => 20,
122            Self::RelayerHealthCheck => 20,
123        }
124    }
125
126    /// Returns the SQS environment variable key infix for this queue type.
127    ///
128    /// Used to build env vars like `SQS_{key}_WAIT_TIME_SECONDS` and
129    /// `SQS_{key}_POLLER_COUNT`.
130    pub fn sqs_env_key(&self) -> &'static str {
131        match self {
132            Self::TransactionRequest => "TRANSACTION_REQUEST",
133            Self::TransactionSubmission => "TRANSACTION_SUBMISSION",
134            Self::StatusCheck => "STATUS_CHECK",
135            Self::StatusCheckEvm => "STATUS_CHECK_EVM",
136            Self::StatusCheckStellar => "STATUS_CHECK_STELLAR",
137            Self::Notification => "NOTIFICATION",
138            Self::TokenSwapRequest => "TOKEN_SWAP_REQUEST",
139            Self::RelayerHealthCheck => "RELAYER_HEALTH_CHECK",
140        }
141    }
142
143    /// Returns the default number of SQS poll loops per worker for this queue type.
144    /// Can be overridden via `SQS_{key}_POLLER_COUNT` env vars.
145    pub fn default_poller_count(&self) -> usize {
146        1
147    }
148
149    /// Returns true if this is any variant of status check queue.
150    pub fn is_status_check(&self) -> bool {
151        matches!(
152            self,
153            Self::StatusCheck | Self::StatusCheckEvm | Self::StatusCheckStellar
154        )
155    }
156}
157
158impl fmt::Display for QueueType {
159    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
160        write!(f, "{}", self.queue_name())
161    }
162}
163
164#[cfg(test)]
165mod tests {
166    use super::*;
167
168    #[test]
169    fn test_polling_interval_defaults() {
170        assert_eq!(QueueType::TransactionRequest.default_wait_time_secs(), 15);
171        assert_eq!(
172            QueueType::TransactionSubmission.default_wait_time_secs(),
173            15
174        );
175        assert_eq!(QueueType::StatusCheck.default_wait_time_secs(), 5);
176        assert_eq!(QueueType::StatusCheckStellar.default_wait_time_secs(), 3);
177    }
178
179    #[test]
180    fn test_is_status_check() {
181        assert!(QueueType::StatusCheck.is_status_check());
182        assert!(QueueType::StatusCheckEvm.is_status_check());
183        assert!(QueueType::StatusCheckStellar.is_status_check());
184        assert!(!QueueType::TransactionRequest.is_status_check());
185        assert!(!QueueType::Notification.is_status_check());
186    }
187
188    #[test]
189    fn test_status_check_evm_concurrency() {
190        assert_eq!(
191            QueueType::StatusCheckEvm.default_concurrency(),
192            DEFAULT_CONCURRENCY_STATUS_CHECKER_EVM
193        );
194    }
195
196    #[test]
197    fn test_all_variants_have_nonempty_queue_name() {
198        let all = [
199            QueueType::TransactionRequest,
200            QueueType::TransactionSubmission,
201            QueueType::StatusCheck,
202            QueueType::StatusCheckEvm,
203            QueueType::StatusCheckStellar,
204            QueueType::Notification,
205            QueueType::TokenSwapRequest,
206            QueueType::RelayerHealthCheck,
207        ];
208        for qt in &all {
209            assert!(!qt.queue_name().is_empty(), "{qt:?} has empty queue_name");
210            assert!(
211                !qt.redis_namespace().is_empty(),
212                "{qt:?} has empty redis_namespace"
213            );
214            assert!(
215                !qt.concurrency_env_key().is_empty(),
216                "{qt:?} has empty concurrency_env_key"
217            );
218        }
219    }
220
221    #[test]
222    fn test_display_matches_queue_name() {
223        let all = [
224            QueueType::TransactionRequest,
225            QueueType::TransactionSubmission,
226            QueueType::StatusCheck,
227            QueueType::StatusCheckEvm,
228            QueueType::StatusCheckStellar,
229            QueueType::Notification,
230            QueueType::TokenSwapRequest,
231            QueueType::RelayerHealthCheck,
232        ];
233        for qt in &all {
234            assert_eq!(qt.to_string(), qt.queue_name());
235        }
236    }
237
238    #[test]
239    fn test_max_retries_status_checkers_use_infinite() {
240        // Status checkers retry until tx reaches final state
241        assert_eq!(QueueType::StatusCheck.max_retries(), usize::MAX);
242        assert_eq!(QueueType::StatusCheckEvm.max_retries(), usize::MAX);
243        assert_eq!(QueueType::StatusCheckStellar.max_retries(), usize::MAX);
244    }
245
246    #[test]
247    fn test_max_retries_bounded_queues() {
248        // Non-status queues should have finite retries
249        assert!(QueueType::TransactionRequest.max_retries() < usize::MAX);
250        assert!(QueueType::TransactionSubmission.max_retries() < usize::MAX);
251        assert!(QueueType::Notification.max_retries() < usize::MAX);
252        assert!(QueueType::TokenSwapRequest.max_retries() < usize::MAX);
253        assert!(QueueType::RelayerHealthCheck.max_retries() < usize::MAX);
254    }
255
256    #[test]
257    fn test_polling_intervals_within_sqs_limit() {
258        let all = [
259            QueueType::TransactionRequest,
260            QueueType::TransactionSubmission,
261            QueueType::StatusCheck,
262            QueueType::StatusCheckEvm,
263            QueueType::StatusCheckStellar,
264            QueueType::Notification,
265            QueueType::TokenSwapRequest,
266            QueueType::RelayerHealthCheck,
267        ];
268        for qt in &all {
269            assert!(
270                qt.default_wait_time_secs() <= 20,
271                "{qt:?} polling interval {} exceeds SQS max of 20s",
272                qt.default_wait_time_secs()
273            );
274        }
275    }
276
277    #[test]
278    fn test_visibility_timeout_within_sqs_range() {
279        // SQS allows 0..=43200 (12 hours)
280        let all = [
281            QueueType::TransactionRequest,
282            QueueType::TransactionSubmission,
283            QueueType::StatusCheck,
284            QueueType::StatusCheckEvm,
285            QueueType::StatusCheckStellar,
286            QueueType::Notification,
287            QueueType::TokenSwapRequest,
288            QueueType::RelayerHealthCheck,
289        ];
290        for qt in &all {
291            let vt = qt.visibility_timeout_secs();
292            assert!(
293                vt <= 43200,
294                "{qt:?} visibility timeout {vt} exceeds SQS max"
295            );
296            assert!(vt > 0, "{qt:?} visibility timeout should be positive");
297        }
298    }
299
300    #[test]
301    fn test_default_concurrency_positive() {
302        let all = [
303            QueueType::TransactionRequest,
304            QueueType::TransactionSubmission,
305            QueueType::StatusCheck,
306            QueueType::StatusCheckEvm,
307            QueueType::StatusCheckStellar,
308            QueueType::Notification,
309            QueueType::TokenSwapRequest,
310            QueueType::RelayerHealthCheck,
311        ];
312        for qt in &all {
313            assert!(qt.default_concurrency() > 0, "{qt:?} has zero concurrency");
314        }
315    }
316
317    #[test]
318    fn test_sqs_env_key_nonempty_and_uppercase() {
319        let all = [
320            QueueType::TransactionRequest,
321            QueueType::TransactionSubmission,
322            QueueType::StatusCheck,
323            QueueType::StatusCheckEvm,
324            QueueType::StatusCheckStellar,
325            QueueType::Notification,
326            QueueType::TokenSwapRequest,
327            QueueType::RelayerHealthCheck,
328        ];
329        for qt in &all {
330            let key = qt.sqs_env_key();
331            assert!(!key.is_empty(), "{qt:?} has empty sqs_env_key");
332            assert_eq!(
333                key,
334                key.to_uppercase(),
335                "{qt:?} sqs_env_key should be uppercase"
336            );
337        }
338    }
339
340    #[test]
341    fn test_sqs_env_key_unique_per_variant() {
342        let all = [
343            QueueType::TransactionRequest,
344            QueueType::TransactionSubmission,
345            QueueType::StatusCheck,
346            QueueType::StatusCheckEvm,
347            QueueType::StatusCheckStellar,
348            QueueType::Notification,
349            QueueType::TokenSwapRequest,
350            QueueType::RelayerHealthCheck,
351        ];
352        let keys: Vec<&str> = all.iter().map(|qt| qt.sqs_env_key()).collect();
353        let unique: std::collections::HashSet<&str> = keys.iter().copied().collect();
354        assert_eq!(
355            keys.len(),
356            unique.len(),
357            "sqs_env_key must be unique per QueueType variant"
358        );
359    }
360
361    #[test]
362    fn test_default_poller_count_at_least_one() {
363        let all = [
364            QueueType::TransactionRequest,
365            QueueType::TransactionSubmission,
366            QueueType::StatusCheck,
367            QueueType::StatusCheckEvm,
368            QueueType::StatusCheckStellar,
369            QueueType::Notification,
370            QueueType::TokenSwapRequest,
371            QueueType::RelayerHealthCheck,
372        ];
373        for qt in &all {
374            assert!(
375                qt.default_poller_count() >= 1,
376                "{qt:?} default_poller_count should be >= 1"
377            );
378        }
379    }
380
381    #[test]
382    fn test_default_wait_time_all_variants_covered() {
383        // Ensure every variant returns a value within SQS bounds [0, 20]
384        let all = [
385            QueueType::TransactionRequest,
386            QueueType::TransactionSubmission,
387            QueueType::StatusCheck,
388            QueueType::StatusCheckEvm,
389            QueueType::StatusCheckStellar,
390            QueueType::Notification,
391            QueueType::TokenSwapRequest,
392            QueueType::RelayerHealthCheck,
393        ];
394        for qt in &all {
395            let wt = qt.default_wait_time_secs();
396            assert!(
397                wt <= 20,
398                "{qt:?} default_wait_time_secs {wt} exceeds SQS max 20"
399            );
400        }
401    }
402}