openzeppelin_relayer/queues/
mod.rs1use async_trait::async_trait;
24use std::sync::Arc;
25
26use crate::{
27 config::ServerConfig,
28 jobs::{
29 Job, NotificationSend, RelayerHealthCheck, TokenSwapRequest, TransactionRequest,
30 TransactionSend, TransactionStatusCheck,
31 },
32 models::DefaultAppState,
33 utils::RedisConnections,
34};
35use actix_web::web::ThinData;
36
37pub mod errors;
38pub mod queue_type;
39pub mod redis;
40pub mod retry_config;
41pub mod sqs;
42pub mod swap_filter;
43pub mod worker_types;
44
45pub use errors::QueueBackendError;
46pub use queue_type::QueueType;
47pub use redis::queue::Queue;
48pub use retry_config::{backoff_config_for_queue, retry_delay_secs, status_check_retry_delay_secs};
49pub use swap_filter::filter_relayers_for_swap;
50pub use worker_types::{HandlerError, QueueHealth, WorkerContext, WorkerHandle};
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum QueueBackendType {
55 Redis,
56 Sqs,
57}
58
59impl QueueBackendType {
60 pub const fn as_str(self) -> &'static str {
61 match self {
62 Self::Redis => "redis",
63 Self::Sqs => "sqs",
64 }
65 }
66}
67
68impl std::fmt::Display for QueueBackendType {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 f.write_str(self.as_str())
71 }
72}
73
74#[async_trait]
82pub trait QueueBackend: Send + Sync {
83 async fn produce_transaction_request(
92 &self,
93 job: Job<TransactionRequest>,
94 scheduled_on: Option<i64>,
95 ) -> Result<String, QueueBackendError>;
96
97 async fn produce_transaction_submission(
99 &self,
100 job: Job<TransactionSend>,
101 scheduled_on: Option<i64>,
102 ) -> Result<String, QueueBackendError>;
103
104 async fn produce_transaction_status_check(
106 &self,
107 job: Job<TransactionStatusCheck>,
108 scheduled_on: Option<i64>,
109 ) -> Result<String, QueueBackendError>;
110
111 async fn produce_notification(
113 &self,
114 job: Job<NotificationSend>,
115 scheduled_on: Option<i64>,
116 ) -> Result<String, QueueBackendError>;
117
118 async fn produce_token_swap_request(
120 &self,
121 job: Job<TokenSwapRequest>,
122 scheduled_on: Option<i64>,
123 ) -> Result<String, QueueBackendError>;
124
125 async fn produce_relayer_health_check(
127 &self,
128 job: Job<RelayerHealthCheck>,
129 scheduled_on: Option<i64>,
130 ) -> Result<String, QueueBackendError>;
131
132 async fn initialize_workers(
143 &self,
144 app_state: Arc<ThinData<DefaultAppState>>,
145 ) -> Result<Vec<WorkerHandle>, QueueBackendError>;
146
147 async fn health_check(&self) -> Result<Vec<QueueHealth>, QueueBackendError>;
152
153 fn backend_type(&self) -> QueueBackendType;
155
156 fn shutdown(&self) {}
162}
163
164#[derive(Clone)]
170pub enum QueueBackendStorage {
171 Redis(Box<redis::backend::RedisBackend>),
172 Sqs(sqs::backend::SqsBackend),
173}
174
175impl std::fmt::Debug for QueueBackendStorage {
176 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
177 match self {
178 Self::Redis(b) => std::fmt::Debug::fmt(b, f),
179 Self::Sqs(b) => std::fmt::Debug::fmt(b, f),
180 }
181 }
182}
183
184impl QueueBackendStorage {
185 pub fn queue(&self) -> Option<&Queue> {
189 match self {
190 Self::Redis(b) => Some(b.queue()),
191 Self::Sqs(_) => None,
192 }
193 }
194
195 pub fn redis_connections(&self) -> Option<Arc<RedisConnections>> {
199 self.queue().map(|q| q.redis_connections())
200 }
201}
202
203#[async_trait]
204impl QueueBackend for QueueBackendStorage {
205 async fn produce_transaction_request(
206 &self,
207 job: Job<TransactionRequest>,
208 scheduled_on: Option<i64>,
209 ) -> Result<String, QueueBackendError> {
210 match self {
211 Self::Redis(b) => b.produce_transaction_request(job, scheduled_on).await,
212 Self::Sqs(b) => b.produce_transaction_request(job, scheduled_on).await,
213 }
214 }
215
216 async fn produce_transaction_submission(
217 &self,
218 job: Job<TransactionSend>,
219 scheduled_on: Option<i64>,
220 ) -> Result<String, QueueBackendError> {
221 match self {
222 Self::Redis(b) => b.produce_transaction_submission(job, scheduled_on).await,
223 Self::Sqs(b) => b.produce_transaction_submission(job, scheduled_on).await,
224 }
225 }
226
227 async fn produce_transaction_status_check(
228 &self,
229 job: Job<TransactionStatusCheck>,
230 scheduled_on: Option<i64>,
231 ) -> Result<String, QueueBackendError> {
232 match self {
233 Self::Redis(b) => b.produce_transaction_status_check(job, scheduled_on).await,
234 Self::Sqs(b) => b.produce_transaction_status_check(job, scheduled_on).await,
235 }
236 }
237
238 async fn produce_notification(
239 &self,
240 job: Job<NotificationSend>,
241 scheduled_on: Option<i64>,
242 ) -> Result<String, QueueBackendError> {
243 match self {
244 Self::Redis(b) => b.produce_notification(job, scheduled_on).await,
245 Self::Sqs(b) => b.produce_notification(job, scheduled_on).await,
246 }
247 }
248
249 async fn produce_token_swap_request(
250 &self,
251 job: Job<TokenSwapRequest>,
252 scheduled_on: Option<i64>,
253 ) -> Result<String, QueueBackendError> {
254 match self {
255 Self::Redis(b) => b.produce_token_swap_request(job, scheduled_on).await,
256 Self::Sqs(b) => b.produce_token_swap_request(job, scheduled_on).await,
257 }
258 }
259
260 async fn produce_relayer_health_check(
261 &self,
262 job: Job<RelayerHealthCheck>,
263 scheduled_on: Option<i64>,
264 ) -> Result<String, QueueBackendError> {
265 match self {
266 Self::Redis(b) => b.produce_relayer_health_check(job, scheduled_on).await,
267 Self::Sqs(b) => b.produce_relayer_health_check(job, scheduled_on).await,
268 }
269 }
270
271 async fn initialize_workers(
272 &self,
273 app_state: Arc<ThinData<DefaultAppState>>,
274 ) -> Result<Vec<WorkerHandle>, QueueBackendError> {
275 match self {
276 Self::Redis(b) => b.initialize_workers(app_state).await,
277 Self::Sqs(b) => b.initialize_workers(app_state).await,
278 }
279 }
280
281 async fn health_check(&self) -> Result<Vec<QueueHealth>, QueueBackendError> {
282 match self {
283 Self::Redis(b) => b.health_check().await,
284 Self::Sqs(b) => b.health_check().await,
285 }
286 }
287
288 fn backend_type(&self) -> QueueBackendType {
289 match self {
290 Self::Redis(b) => b.backend_type(),
291 Self::Sqs(b) => b.backend_type(),
292 }
293 }
294
295 fn shutdown(&self) {
296 match self {
297 Self::Redis(b) => b.shutdown(),
298 Self::Sqs(b) => b.shutdown(),
299 }
300 }
301}
302
303pub async fn create_queue_backend(
319 redis_connections: Arc<RedisConnections>,
320) -> Result<Arc<QueueBackendStorage>, QueueBackendError> {
321 let backend_type = ServerConfig::get_queue_backend();
322
323 let storage = match backend_type.to_lowercase().as_str() {
324 "redis" => {
325 let backend = redis::backend::RedisBackend::new(redis_connections).await?;
326 QueueBackendStorage::Redis(Box::new(backend))
327 }
328 "sqs" => {
329 let backend = sqs::backend::SqsBackend::new().await?;
330 QueueBackendStorage::Sqs(backend)
331 }
332 other => {
333 return Err(QueueBackendError::ConfigError(format!(
334 "Unsupported QUEUE_BACKEND value: {other}. Must be 'redis' or 'sqs'"
335 )));
336 }
337 };
338
339 Ok(Arc::new(storage))
340}
341
342#[cfg(test)]
343mod tests {
344 use super::*;
345
346 #[test]
347 fn test_queue_type_enum_values() {
348 let types = vec![
350 QueueType::TransactionRequest,
351 QueueType::TransactionSubmission,
352 QueueType::StatusCheck,
353 QueueType::StatusCheckEvm,
354 QueueType::StatusCheckStellar,
355 QueueType::Notification,
356 QueueType::TokenSwapRequest,
357 QueueType::RelayerHealthCheck,
358 ];
359
360 for queue_type in types {
361 assert!(!queue_type.queue_name().is_empty());
362 assert!(!queue_type.redis_namespace().is_empty());
363 }
364 }
365
366 #[test]
367 fn test_queue_type_visibility_timeouts_in_range() {
368 let all_types = [
370 QueueType::TransactionRequest,
371 QueueType::TransactionSubmission,
372 QueueType::StatusCheck,
373 QueueType::StatusCheckEvm,
374 QueueType::StatusCheckStellar,
375 QueueType::Notification,
376 QueueType::TokenSwapRequest,
377 QueueType::RelayerHealthCheck,
378 ];
379 for qt in all_types {
380 let vt = qt.visibility_timeout_secs();
381 assert!(vt > 0, "{qt}: visibility timeout must be > 0");
382 assert!(
383 vt <= 43200,
384 "{qt}: visibility timeout {vt}s exceeds SQS max (43200s)"
385 );
386 }
387 }
388
389 #[test]
390 fn test_queue_type_polling_intervals_appropriate() {
391 assert_eq!(QueueType::StatusCheck.default_wait_time_secs(), 5);
393
394 assert!(QueueType::TransactionRequest.default_wait_time_secs() >= 5);
396 assert!(QueueType::TransactionSubmission.default_wait_time_secs() >= 5);
397 assert!(QueueType::Notification.default_wait_time_secs() >= 10);
398 }
399
400 #[test]
401 fn test_queue_backend_error_variants() {
402 let errors = vec![
403 QueueBackendError::RedisError("test".to_string()),
404 QueueBackendError::SqsError("test".to_string()),
405 QueueBackendError::SerializationError("test".to_string()),
406 QueueBackendError::ConfigError("test".to_string()),
407 QueueBackendError::QueueNotFound("test".to_string()),
408 QueueBackendError::WorkerInitError("test".to_string()),
409 QueueBackendError::QueueError("test".to_string()),
410 ];
411
412 for error in errors {
413 let error_str = error.to_string();
414 assert!(!error_str.is_empty());
415 }
416 }
417
418 #[test]
419 fn test_queue_backend_type_string_representations() {
420 assert_eq!(QueueBackendType::Redis.as_str(), "redis");
421 assert_eq!(QueueBackendType::Sqs.as_str(), "sqs");
422 assert_eq!(QueueBackendType::Redis.to_string(), "redis");
423 assert_eq!(QueueBackendType::Sqs.to_string(), "sqs");
424 }
425}