1use std::fmt;
2
3use serde::{Deserialize, Serialize};
4
5use crate::constants::{
6 DEFAULT_CONCURRENCY_STATUS_CHECKER, DEFAULT_CONCURRENCY_STATUS_CHECKER_EVM,
7 WORKER_NOTIFICATION_SENDER_RETRIES, WORKER_RELAYER_HEALTH_CHECK_RETRIES,
8 WORKER_TOKEN_SWAP_REQUEST_RETRIES, WORKER_TRANSACTION_REQUEST_RETRIES,
9 WORKER_TRANSACTION_STATUS_CHECKER_RETRIES, WORKER_TRANSACTION_SUBMIT_RETRIES,
10};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
14pub enum QueueType {
15 TransactionRequest,
16 TransactionSubmission,
17 StatusCheck,
18 StatusCheckEvm,
19 StatusCheckStellar,
20 Notification,
21 TokenSwapRequest,
22 RelayerHealthCheck,
23}
24
25impl QueueType {
26 pub fn queue_name(&self) -> &'static str {
28 match self {
29 Self::TransactionRequest => "transaction-request",
30 Self::TransactionSubmission => "transaction-submission",
31 Self::StatusCheck => "status-check",
32 Self::StatusCheckEvm => "status-check-evm",
33 Self::StatusCheckStellar => "status-check-stellar",
34 Self::Notification => "notification",
35 Self::TokenSwapRequest => "token-swap-request",
36 Self::RelayerHealthCheck => "relayer-health-check",
37 }
38 }
39
40 pub fn redis_namespace(&self) -> &'static str {
42 match self {
43 Self::TransactionRequest => "relayer:transaction_request",
44 Self::TransactionSubmission => "relayer:transaction_submission",
45 Self::StatusCheck => "relayer:transaction_status",
46 Self::StatusCheckEvm => "relayer:transaction_status_evm",
47 Self::StatusCheckStellar => "relayer:transaction_status_stellar",
48 Self::Notification => "relayer:notification",
49 Self::TokenSwapRequest => "relayer:token_swap_request",
50 Self::RelayerHealthCheck => "relayer:relayer_health_check",
51 }
52 }
53
54 pub fn max_retries(&self) -> usize {
56 match self {
57 Self::TransactionRequest => WORKER_TRANSACTION_REQUEST_RETRIES,
58 Self::TransactionSubmission => WORKER_TRANSACTION_SUBMIT_RETRIES,
59 Self::StatusCheck | Self::StatusCheckEvm | Self::StatusCheckStellar => {
60 WORKER_TRANSACTION_STATUS_CHECKER_RETRIES
61 }
62 Self::Notification => WORKER_NOTIFICATION_SENDER_RETRIES,
63 Self::TokenSwapRequest => WORKER_TOKEN_SWAP_REQUEST_RETRIES,
64 Self::RelayerHealthCheck => WORKER_RELAYER_HEALTH_CHECK_RETRIES,
65 }
66 }
67
68 pub fn visibility_timeout_secs(&self) -> u32 {
71 match self {
72 Self::TransactionRequest => 30,
73 Self::TransactionSubmission => 30,
74 Self::StatusCheck | Self::StatusCheckEvm => 30,
75 Self::StatusCheckStellar => 20,
76 Self::Notification => 60,
77 Self::TokenSwapRequest => 60,
78 Self::RelayerHealthCheck => 60,
79 }
80 }
81
82 pub fn concurrency_env_key(&self) -> &'static str {
84 match self {
85 Self::TransactionRequest => "transaction_request",
86 Self::TransactionSubmission => "transaction_sender",
87 Self::StatusCheck => "transaction_status_checker",
88 Self::StatusCheckEvm => "transaction_status_checker_evm",
89 Self::StatusCheckStellar => "transaction_status_checker_stellar",
90 Self::Notification => "notification_sender",
91 Self::TokenSwapRequest => "token_swap_request",
92 Self::RelayerHealthCheck => "relayer_health_check",
93 }
94 }
95
96 pub fn default_concurrency(&self) -> usize {
98 match self {
99 Self::TransactionRequest => 50,
100 Self::TransactionSubmission => 75,
101 Self::StatusCheck => DEFAULT_CONCURRENCY_STATUS_CHECKER,
102 Self::StatusCheckEvm => DEFAULT_CONCURRENCY_STATUS_CHECKER_EVM,
103 Self::StatusCheckStellar => DEFAULT_CONCURRENCY_STATUS_CHECKER,
104 Self::Notification => 30,
105 Self::TokenSwapRequest => 10,
106 Self::RelayerHealthCheck => 10,
107 }
108 }
109
110 pub fn default_wait_time_secs(&self) -> u64 {
115 match self {
116 Self::TransactionRequest => 15,
117 Self::TransactionSubmission => 15,
118 Self::StatusCheck | Self::StatusCheckEvm => 5,
119 Self::StatusCheckStellar => 3,
120 Self::Notification => 20,
121 Self::TokenSwapRequest => 20,
122 Self::RelayerHealthCheck => 20,
123 }
124 }
125
126 pub fn sqs_env_key(&self) -> &'static str {
131 match self {
132 Self::TransactionRequest => "TRANSACTION_REQUEST",
133 Self::TransactionSubmission => "TRANSACTION_SUBMISSION",
134 Self::StatusCheck => "STATUS_CHECK",
135 Self::StatusCheckEvm => "STATUS_CHECK_EVM",
136 Self::StatusCheckStellar => "STATUS_CHECK_STELLAR",
137 Self::Notification => "NOTIFICATION",
138 Self::TokenSwapRequest => "TOKEN_SWAP_REQUEST",
139 Self::RelayerHealthCheck => "RELAYER_HEALTH_CHECK",
140 }
141 }
142
143 pub fn default_poller_count(&self) -> usize {
146 1
147 }
148
149 pub fn is_status_check(&self) -> bool {
151 matches!(
152 self,
153 Self::StatusCheck | Self::StatusCheckEvm | Self::StatusCheckStellar
154 )
155 }
156}
157
158impl fmt::Display for QueueType {
159 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
160 write!(f, "{}", self.queue_name())
161 }
162}
163
164#[cfg(test)]
165mod tests {
166 use super::*;
167
168 #[test]
169 fn test_polling_interval_defaults() {
170 assert_eq!(QueueType::TransactionRequest.default_wait_time_secs(), 15);
171 assert_eq!(
172 QueueType::TransactionSubmission.default_wait_time_secs(),
173 15
174 );
175 assert_eq!(QueueType::StatusCheck.default_wait_time_secs(), 5);
176 assert_eq!(QueueType::StatusCheckStellar.default_wait_time_secs(), 3);
177 }
178
179 #[test]
180 fn test_is_status_check() {
181 assert!(QueueType::StatusCheck.is_status_check());
182 assert!(QueueType::StatusCheckEvm.is_status_check());
183 assert!(QueueType::StatusCheckStellar.is_status_check());
184 assert!(!QueueType::TransactionRequest.is_status_check());
185 assert!(!QueueType::Notification.is_status_check());
186 }
187
188 #[test]
189 fn test_status_check_evm_concurrency() {
190 assert_eq!(
191 QueueType::StatusCheckEvm.default_concurrency(),
192 DEFAULT_CONCURRENCY_STATUS_CHECKER_EVM
193 );
194 }
195
196 #[test]
197 fn test_all_variants_have_nonempty_queue_name() {
198 let all = [
199 QueueType::TransactionRequest,
200 QueueType::TransactionSubmission,
201 QueueType::StatusCheck,
202 QueueType::StatusCheckEvm,
203 QueueType::StatusCheckStellar,
204 QueueType::Notification,
205 QueueType::TokenSwapRequest,
206 QueueType::RelayerHealthCheck,
207 ];
208 for qt in &all {
209 assert!(!qt.queue_name().is_empty(), "{qt:?} has empty queue_name");
210 assert!(
211 !qt.redis_namespace().is_empty(),
212 "{qt:?} has empty redis_namespace"
213 );
214 assert!(
215 !qt.concurrency_env_key().is_empty(),
216 "{qt:?} has empty concurrency_env_key"
217 );
218 }
219 }
220
221 #[test]
222 fn test_display_matches_queue_name() {
223 let all = [
224 QueueType::TransactionRequest,
225 QueueType::TransactionSubmission,
226 QueueType::StatusCheck,
227 QueueType::StatusCheckEvm,
228 QueueType::StatusCheckStellar,
229 QueueType::Notification,
230 QueueType::TokenSwapRequest,
231 QueueType::RelayerHealthCheck,
232 ];
233 for qt in &all {
234 assert_eq!(qt.to_string(), qt.queue_name());
235 }
236 }
237
238 #[test]
239 fn test_max_retries_status_checkers_use_infinite() {
240 assert_eq!(QueueType::StatusCheck.max_retries(), usize::MAX);
242 assert_eq!(QueueType::StatusCheckEvm.max_retries(), usize::MAX);
243 assert_eq!(QueueType::StatusCheckStellar.max_retries(), usize::MAX);
244 }
245
246 #[test]
247 fn test_max_retries_bounded_queues() {
248 assert!(QueueType::TransactionRequest.max_retries() < usize::MAX);
250 assert!(QueueType::TransactionSubmission.max_retries() < usize::MAX);
251 assert!(QueueType::Notification.max_retries() < usize::MAX);
252 assert!(QueueType::TokenSwapRequest.max_retries() < usize::MAX);
253 assert!(QueueType::RelayerHealthCheck.max_retries() < usize::MAX);
254 }
255
256 #[test]
257 fn test_polling_intervals_within_sqs_limit() {
258 let all = [
259 QueueType::TransactionRequest,
260 QueueType::TransactionSubmission,
261 QueueType::StatusCheck,
262 QueueType::StatusCheckEvm,
263 QueueType::StatusCheckStellar,
264 QueueType::Notification,
265 QueueType::TokenSwapRequest,
266 QueueType::RelayerHealthCheck,
267 ];
268 for qt in &all {
269 assert!(
270 qt.default_wait_time_secs() <= 20,
271 "{qt:?} polling interval {} exceeds SQS max of 20s",
272 qt.default_wait_time_secs()
273 );
274 }
275 }
276
277 #[test]
278 fn test_visibility_timeout_within_sqs_range() {
279 let all = [
281 QueueType::TransactionRequest,
282 QueueType::TransactionSubmission,
283 QueueType::StatusCheck,
284 QueueType::StatusCheckEvm,
285 QueueType::StatusCheckStellar,
286 QueueType::Notification,
287 QueueType::TokenSwapRequest,
288 QueueType::RelayerHealthCheck,
289 ];
290 for qt in &all {
291 let vt = qt.visibility_timeout_secs();
292 assert!(
293 vt <= 43200,
294 "{qt:?} visibility timeout {vt} exceeds SQS max"
295 );
296 assert!(vt > 0, "{qt:?} visibility timeout should be positive");
297 }
298 }
299
300 #[test]
301 fn test_default_concurrency_positive() {
302 let all = [
303 QueueType::TransactionRequest,
304 QueueType::TransactionSubmission,
305 QueueType::StatusCheck,
306 QueueType::StatusCheckEvm,
307 QueueType::StatusCheckStellar,
308 QueueType::Notification,
309 QueueType::TokenSwapRequest,
310 QueueType::RelayerHealthCheck,
311 ];
312 for qt in &all {
313 assert!(qt.default_concurrency() > 0, "{qt:?} has zero concurrency");
314 }
315 }
316
317 #[test]
318 fn test_sqs_env_key_nonempty_and_uppercase() {
319 let all = [
320 QueueType::TransactionRequest,
321 QueueType::TransactionSubmission,
322 QueueType::StatusCheck,
323 QueueType::StatusCheckEvm,
324 QueueType::StatusCheckStellar,
325 QueueType::Notification,
326 QueueType::TokenSwapRequest,
327 QueueType::RelayerHealthCheck,
328 ];
329 for qt in &all {
330 let key = qt.sqs_env_key();
331 assert!(!key.is_empty(), "{qt:?} has empty sqs_env_key");
332 assert_eq!(
333 key,
334 key.to_uppercase(),
335 "{qt:?} sqs_env_key should be uppercase"
336 );
337 }
338 }
339
340 #[test]
341 fn test_sqs_env_key_unique_per_variant() {
342 let all = [
343 QueueType::TransactionRequest,
344 QueueType::TransactionSubmission,
345 QueueType::StatusCheck,
346 QueueType::StatusCheckEvm,
347 QueueType::StatusCheckStellar,
348 QueueType::Notification,
349 QueueType::TokenSwapRequest,
350 QueueType::RelayerHealthCheck,
351 ];
352 let keys: Vec<&str> = all.iter().map(|qt| qt.sqs_env_key()).collect();
353 let unique: std::collections::HashSet<&str> = keys.iter().copied().collect();
354 assert_eq!(
355 keys.len(),
356 unique.len(),
357 "sqs_env_key must be unique per QueueType variant"
358 );
359 }
360
361 #[test]
362 fn test_default_poller_count_at_least_one() {
363 let all = [
364 QueueType::TransactionRequest,
365 QueueType::TransactionSubmission,
366 QueueType::StatusCheck,
367 QueueType::StatusCheckEvm,
368 QueueType::StatusCheckStellar,
369 QueueType::Notification,
370 QueueType::TokenSwapRequest,
371 QueueType::RelayerHealthCheck,
372 ];
373 for qt in &all {
374 assert!(
375 qt.default_poller_count() >= 1,
376 "{qt:?} default_poller_count should be >= 1"
377 );
378 }
379 }
380
381 #[test]
382 fn test_default_wait_time_all_variants_covered() {
383 let all = [
385 QueueType::TransactionRequest,
386 QueueType::TransactionSubmission,
387 QueueType::StatusCheck,
388 QueueType::StatusCheckEvm,
389 QueueType::StatusCheckStellar,
390 QueueType::Notification,
391 QueueType::TokenSwapRequest,
392 QueueType::RelayerHealthCheck,
393 ];
394 for qt in &all {
395 let wt = qt.default_wait_time_secs();
396 assert!(
397 wt <= 20,
398 "{qt:?} default_wait_time_secs {wt} exceeds SQS max 20"
399 );
400 }
401 }
402}