1use chrono::Utc;
6use tracing::{debug, info, warn};
7
8use super::{
9 is_final_state,
10 utils::{decode_transaction_result_code, is_bad_sequence_error, is_insufficient_fee_error},
11 StellarRelayerTransaction,
12};
13use crate::{
14 constants::STELLAR_INSUFFICIENT_FEE_MAX_RETRIES,
15 jobs::JobProducerTrait,
16 metrics::{STELLAR_SUBMISSION_FAILURES, TRANSACTIONS_INSUFFICIENT_FEE},
17 models::{
18 NetworkTransactionData, RelayerRepoModel, TransactionError, TransactionRepoModel,
19 TransactionStatus, TransactionUpdateRequest,
20 },
21 repositories::{Repository, TransactionCounterTrait, TransactionRepository},
22 services::{
23 provider::StellarProviderTrait,
24 signer::{Signer, StellarSignTrait},
25 },
26};
27
28impl<R, T, J, S, P, C, D> StellarRelayerTransaction<R, T, J, S, P, C, D>
29where
30 R: Repository<RelayerRepoModel, String> + Send + Sync,
31 T: TransactionRepository + Send + Sync,
32 J: JobProducerTrait + Send + Sync,
33 S: Signer + StellarSignTrait + Send + Sync,
34 P: StellarProviderTrait + Send + Sync,
35 C: TransactionCounterTrait + Send + Sync,
36 D: crate::services::stellar_dex::StellarDexServiceTrait + Send + Sync + 'static,
37{
38 pub async fn submit_transaction_impl(
41 &self,
42 tx: TransactionRepoModel,
43 ) -> Result<TransactionRepoModel, TransactionError> {
44 info!(
45 tx_id = %tx.id,
46 relayer_id = %tx.relayer_id,
47 status = ?tx.status,
48 "submitting stellar transaction"
49 );
50
51 if is_final_state(&tx.status) {
53 warn!(
54 tx_id = %tx.id,
55 relayer_id = %tx.relayer_id,
56 status = ?tx.status,
57 "transaction already in final state, skipping submission"
58 );
59 return Ok(tx);
60 }
61
62 if self.is_transaction_expired(&tx)? {
64 info!(
65 tx_id = %tx.id,
66 relayer_id = %tx.relayer_id,
67 valid_until = ?tx.valid_until,
68 "transaction has expired, marking as Expired"
69 );
70 return self
71 .mark_as_expired(tx, "Transaction time_bounds expired".to_string())
72 .await;
73 }
74
75 match self.submit_core(tx.clone()).await {
77 Ok(submitted_tx) => Ok(submitted_tx),
78 Err(error) => {
79 self.handle_submit_failure(tx, error).await
81 }
82 }
83 }
84
85 async fn submit_core(
96 &self,
97 tx: TransactionRepoModel,
98 ) -> Result<TransactionRepoModel, TransactionError> {
99 let stellar_data = tx.network_data.get_stellar_transaction_data()?;
100 let tx_envelope = stellar_data
101 .get_envelope_for_submission()
102 .map_err(TransactionError::from)?;
103
104 let response = self
106 .provider()
107 .send_transaction_with_status(&tx_envelope)
108 .await
109 .map_err(|e| {
110 STELLAR_SUBMISSION_FAILURES
111 .with_label_values(&["provider_error", "n/a"])
112 .inc();
113 TransactionError::from(e)
114 })?;
115
116 match response.status.as_str() {
118 "PENDING" | "DUPLICATE" => {
119 if response.status == "DUPLICATE" {
121 info!(
122 tx_id = %tx.id,
123 relayer_id = %tx.relayer_id,
124 hash = %response.hash,
125 "transaction already submitted (DUPLICATE status)"
126 );
127 }
128 let tx_hash_hex = response.hash.clone();
129 let updated_stellar_data = stellar_data.with_hash(tx_hash_hex.clone());
130
131 let mut hashes = tx.hashes.clone();
132 if !hashes.contains(&tx_hash_hex) {
133 hashes.push(tx_hash_hex);
134 }
135
136 let update_req = TransactionUpdateRequest {
137 status: Some(TransactionStatus::Submitted),
138 sent_at: Some(Utc::now().to_rfc3339()),
139 network_data: Some(NetworkTransactionData::Stellar(updated_stellar_data)),
140 hashes: Some(hashes),
141 ..Default::default()
142 };
143
144 let updated_tx = self
145 .transaction_repository()
146 .partial_update(tx.id.clone(), update_req)
147 .await?;
148
149 if response.status == "PENDING" {
151 info!(
152 tx_id = %tx.id,
153 relayer_id = %tx.relayer_id,
154 "sending transaction update notification for pending transaction"
155 );
156 self.send_transaction_update_notification(&updated_tx).await;
157 }
158
159 Ok(updated_tx)
160 }
161 "TRY_AGAIN_LATER" => {
162 let updated_tx = self
170 .transaction_repository()
171 .record_stellar_try_again_later_retry(tx.id.clone(), Utc::now().to_rfc3339())
172 .await?;
173
174 let retries = updated_tx
175 .metadata
176 .as_ref()
177 .map_or(0, |m| m.try_again_later_retries);
178
179 if retries == 1 {
181 crate::metrics::STELLAR_TRY_AGAIN_LATER
182 .with_label_values(&[&tx.relayer_id, &tx.status.to_string()])
183 .inc();
184 }
185
186 debug!(
187 tx_id = %tx.id,
188 relayer_id = %tx.relayer_id,
189 status = ?tx.status,
190 try_again_later_retries = retries,
191 "TRY_AGAIN_LATER — status checker will retry"
192 );
193 Ok(updated_tx)
194 }
195 "ERROR" => {
196 let error_detail = response
198 .error_result_xdr
199 .unwrap_or_else(|| "No error details provided".to_string());
200 let decoded_result_code = decode_transaction_result_code(&error_detail);
201
202 if decoded_result_code
206 .as_deref()
207 .is_some_and(is_insufficient_fee_error)
208 {
209 let mut meta = tx.metadata.clone().unwrap_or_default();
210 meta.insufficient_fee_retries = meta.insufficient_fee_retries.saturating_add(1);
211
212 if meta.insufficient_fee_retries == 1 {
214 TRANSACTIONS_INSUFFICIENT_FEE
215 .with_label_values(&[tx.relayer_id.as_str(), "stellar"])
216 .inc();
217 }
218
219 if meta.insufficient_fee_retries > STELLAR_INSUFFICIENT_FEE_MAX_RETRIES {
220 STELLAR_SUBMISSION_FAILURES
221 .with_label_values(&["error", "tx_insufficient_fee"])
222 .inc();
223 return Err(TransactionError::UnexpectedError(format!(
224 "Transaction submission error: insufficient fee retry limit exceeded ({STELLAR_INSUFFICIENT_FEE_MAX_RETRIES})"
225 )));
226 }
227
228 debug!(
229 tx_id = %tx.id,
230 relayer_id = %tx.relayer_id,
231 status = ?tx.status,
232 insufficient_fee_retries = meta.insufficient_fee_retries,
233 result_code = decoded_result_code.as_deref().unwrap_or("Unknown"),
234 "ERROR with insufficient fee — status checker will retry"
235 );
236 let updated_tx = self
238 .transaction_repository()
239 .record_stellar_insufficient_fee_retry(
240 tx.id.clone(),
241 Utc::now().to_rfc3339(),
242 )
243 .await?;
244 return Ok(updated_tx);
245 }
246 STELLAR_SUBMISSION_FAILURES
247 .with_label_values(&[
248 "error",
249 decoded_result_code.as_deref().unwrap_or("unknown"),
250 ])
251 .inc();
252 Err(TransactionError::UnexpectedError(format!(
253 "Transaction submission error: {}",
254 decoded_result_code.unwrap_or(error_detail)
255 )))
256 }
257 unknown => {
258 STELLAR_SUBMISSION_FAILURES
260 .with_label_values(&["unknown_status", "n/a"])
261 .inc();
262 warn!(
263 tx_id = %tx.id,
264 relayer_id = %tx.relayer_id,
265 status = %unknown,
266 "received unknown transaction status from RPC"
267 );
268 Err(TransactionError::UnexpectedError(format!(
269 "Unknown transaction status: {unknown}"
270 )))
271 }
272 }
273 }
274
275 async fn handle_submit_failure(
278 &self,
279 tx: TransactionRepoModel,
280 error: TransactionError,
281 ) -> Result<TransactionRepoModel, TransactionError> {
282 let error_reason = format!("Submission failed: {error}");
283 let tx_id = tx.id.clone();
284 let relayer_id = tx.relayer_id.clone();
285 warn!(
286 tx_id = %tx_id,
287 relayer_id = %relayer_id,
288 reason = %error_reason,
289 "transaction submission failed"
290 );
291
292 if error.is_concurrent_update_conflict() {
297 info!(
298 tx_id = %tx_id,
299 relayer_id = %relayer_id,
300 "concurrent transaction update detected during submission, reloading latest state"
301 );
302 return self
303 .transaction_repository()
304 .get_by_id(tx_id)
305 .await
306 .map_err(TransactionError::from);
307 }
308
309 if is_bad_sequence_error(&error_reason) {
310 if let Ok(stellar_data) = tx.network_data.get_stellar_transaction_data() {
312 info!(
313 tx_id = %tx_id,
314 relayer_id = %relayer_id,
315 "syncing sequence from chain after bad sequence error"
316 );
317 match self
318 .sync_sequence_from_chain(&stellar_data.source_account)
319 .await
320 {
321 Ok(()) => {
322 info!(
323 tx_id = %tx_id,
324 relayer_id = %relayer_id,
325 "successfully synced sequence from chain"
326 );
327 }
328 Err(sync_error) => {
329 warn!(
330 tx_id = %tx_id,
331 relayer_id = %relayer_id,
332 error = %sync_error,
333 "failed to sync sequence from chain"
334 );
335 }
336 }
337 }
338
339 info!(
342 tx_id = %tx_id,
343 relayer_id = %relayer_id,
344 "bad sequence error detected, resetting transaction to pending state"
345 );
346 match self.reset_transaction_for_retry(tx.clone()).await {
347 Ok(reset_tx) => {
348 info!(
349 tx_id = %tx_id,
350 relayer_id = %relayer_id,
351 "transaction reset to pending, status check will handle resubmission"
352 );
353 return Ok(reset_tx);
357 }
358 Err(reset_error) => {
359 warn!(
360 tx_id = %tx_id,
361 relayer_id = %relayer_id,
362 error = %reset_error,
363 "failed to reset transaction for retry"
364 );
365 }
367 }
368 }
369
370 let update_request = TransactionUpdateRequest {
373 status: Some(TransactionStatus::Failed),
374 status_reason: Some(error_reason.clone()),
375 ..Default::default()
376 };
377 let failed_tx = match self
378 .finalize_transaction_state(tx_id.clone(), update_request)
379 .await
380 {
381 Ok(updated_tx) => updated_tx,
382 Err(finalize_error) => {
383 warn!(
384 tx_id = %tx_id,
385 relayer_id = %relayer_id,
386 error = %finalize_error,
387 "failed to mark transaction as failed, continuing with lane cleanup"
388 );
389 return Err(error);
392 }
393 };
394
395 if let Err(enqueue_error) = self.enqueue_next_pending_transaction(&tx_id).await {
397 warn!(
398 tx_id = %tx_id,
399 relayer_id = %relayer_id,
400 error = %enqueue_error,
401 "failed to enqueue next pending transaction after submission failure"
402 );
403 }
404
405 info!(
406 tx_id = %tx_id,
407 relayer_id = %relayer_id,
408 error = %error_reason,
409 "transaction submission failure handled, marked as failed"
410 );
411
412 Ok(failed_tx)
416 }
417
418 pub async fn resubmit_transaction_impl(
420 &self,
421 tx: TransactionRepoModel,
422 ) -> Result<TransactionRepoModel, TransactionError> {
423 self.submit_transaction_impl(tx).await
424 }
425}
426
427#[cfg(test)]
428mod tests {
429 use super::*;
430 use soroban_rs::stellar_rpc_client::SendTransactionResponse;
431 use soroban_rs::xdr::WriteXdr;
432
433 use crate::domain::transaction::stellar::test_helpers::*;
434 use crate::models::TransactionMetadata;
435
436 fn create_send_tx_response(status: &str, hash: &str) -> SendTransactionResponse {
438 SendTransactionResponse {
439 status: status.to_string(),
440 hash: hash.to_string(),
441 error_result_xdr: None,
442 latest_ledger: 100,
443 latest_ledger_close_time: 1700000000,
444 }
445 }
446
447 mod submit_transaction_tests {
448 use crate::{
449 models::RepositoryError, repositories::PaginatedResult,
450 services::provider::ProviderError,
451 };
452
453 use super::*;
454
455 #[tokio::test]
456 async fn submit_transaction_happy_path() {
457 let relayer = create_test_relayer();
458 let mut mocks = default_test_mocks();
459
460 let response = create_send_tx_response(
462 "PENDING",
463 "0101010101010101010101010101010101010101010101010101010101010101",
464 );
465 mocks
466 .provider
467 .expect_send_transaction_with_status()
468 .returning(move |_| {
469 let r = response.clone();
470 Box::pin(async move { Ok(r) })
471 });
472
473 mocks
475 .tx_repo
476 .expect_partial_update()
477 .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
478 .returning(|id, upd| {
479 let mut tx = create_test_transaction("relayer-1");
480 tx.id = id;
481 tx.status = upd.status.unwrap();
482 Ok::<_, RepositoryError>(tx)
483 });
484
485 mocks
487 .job_producer
488 .expect_produce_send_notification_job()
489 .times(1)
490 .returning(|_, _| Box::pin(async { Ok(()) }));
491
492 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
493
494 let mut tx = create_test_transaction(&relayer.id);
495 tx.status = TransactionStatus::Sent; if let NetworkTransactionData::Stellar(ref mut d) = tx.network_data {
497 d.signatures.push(dummy_signature());
498 d.signed_envelope_xdr = Some(create_signed_xdr(TEST_PK, TEST_PK_2));
499 }
501
502 let res = handler.submit_transaction_impl(tx).await.unwrap();
503 assert_eq!(res.status, TransactionStatus::Submitted);
504 }
505
506 #[tokio::test]
507 async fn submit_transaction_provider_error_marks_failed() {
508 let relayer = create_test_relayer();
509 let mut mocks = default_test_mocks();
510
511 mocks
513 .provider
514 .expect_send_transaction_with_status()
515 .returning(|_| {
516 Box::pin(async { Err(ProviderError::Other("Network error".to_string())) })
517 });
518
519 mocks
521 .tx_repo
522 .expect_partial_update()
523 .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
524 .returning(|id, upd| {
525 let mut tx = create_test_transaction("relayer-1");
526 tx.id = id;
527 tx.status = upd.status.unwrap();
528 Ok::<_, RepositoryError>(tx)
529 });
530
531 mocks
533 .job_producer
534 .expect_produce_send_notification_job()
535 .times(1)
536 .returning(|_, _| Box::pin(async { Ok(()) }));
537
538 mocks
540 .tx_repo
541 .expect_find_by_status_paginated()
542 .returning(move |_, _, _, _| {
543 Ok(PaginatedResult {
544 items: vec![],
545 total: 0,
546 page: 1,
547 per_page: 1,
548 })
549 }); let handler = make_stellar_tx_handler(relayer.clone(), mocks);
552 let mut tx = create_test_transaction(&relayer.id);
553 tx.status = TransactionStatus::Sent; if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
555 data.signatures.push(dummy_signature());
556 data.sequence_number = Some(42); data.signed_envelope_xdr = Some("test-xdr".to_string()); }
559
560 let res = handler.submit_transaction_impl(tx).await;
561
562 let failed_tx = res.unwrap();
564 assert_eq!(failed_tx.status, TransactionStatus::Failed);
565 }
566
567 #[tokio::test]
568 async fn submit_transaction_repository_error_marks_failed() {
569 let relayer = create_test_relayer();
570 let mut mocks = default_test_mocks();
571
572 let response = create_send_tx_response(
574 "PENDING",
575 "0101010101010101010101010101010101010101010101010101010101010101",
576 );
577 mocks
578 .provider
579 .expect_send_transaction_with_status()
580 .returning(move |_| {
581 let r = response.clone();
582 Box::pin(async move { Ok(r) })
583 });
584
585 mocks
587 .tx_repo
588 .expect_partial_update()
589 .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
590 .returning(|_, _| Err(RepositoryError::Unknown("Database error".to_string())));
591
592 mocks
594 .tx_repo
595 .expect_partial_update()
596 .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
597 .returning(|id, upd| {
598 let mut tx = create_test_transaction("relayer-1");
599 tx.id = id;
600 tx.status = upd.status.unwrap();
601 Ok::<_, RepositoryError>(tx)
602 });
603
604 mocks
606 .job_producer
607 .expect_produce_send_notification_job()
608 .times(1)
609 .returning(|_, _| Box::pin(async { Ok(()) }));
610
611 mocks
613 .tx_repo
614 .expect_find_by_status_paginated()
615 .returning(move |_, _, _, _| {
616 Ok(PaginatedResult {
617 items: vec![],
618 total: 0,
619 page: 1,
620 per_page: 1,
621 })
622 }); let handler = make_stellar_tx_handler(relayer.clone(), mocks);
625 let mut tx = create_test_transaction(&relayer.id);
626 tx.status = TransactionStatus::Sent; if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
628 data.signatures.push(dummy_signature());
629 data.sequence_number = Some(42); data.signed_envelope_xdr = Some("test-xdr".to_string()); }
632
633 let res = handler.submit_transaction_impl(tx).await;
634
635 let failed_tx = res.unwrap();
638 assert_eq!(failed_tx.status, TransactionStatus::Failed);
639 }
640
641 #[tokio::test]
642 async fn submit_transaction_uses_signed_envelope_xdr() {
643 let relayer = create_test_relayer();
644 let mut mocks = default_test_mocks();
645
646 let mut tx = create_test_transaction(&relayer.id);
648 tx.status = TransactionStatus::Sent; if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
650 data.signatures.push(dummy_signature());
651 let envelope = data.get_envelope_for_submission().unwrap();
653 let xdr = envelope
654 .to_xdr_base64(soroban_rs::xdr::Limits::none())
655 .unwrap();
656 data.signed_envelope_xdr = Some(xdr);
657 }
658
659 let response = create_send_tx_response(
661 "PENDING",
662 "0202020202020202020202020202020202020202020202020202020202020202",
663 );
664 mocks
665 .provider
666 .expect_send_transaction_with_status()
667 .returning(move |_| {
668 let r = response.clone();
669 Box::pin(async move { Ok(r) })
670 });
671
672 mocks
674 .tx_repo
675 .expect_partial_update()
676 .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
677 .returning(|id, upd| {
678 let mut tx = create_test_transaction("relayer-1");
679 tx.id = id;
680 tx.status = upd.status.unwrap();
681 Ok::<_, RepositoryError>(tx)
682 });
683
684 mocks
686 .job_producer
687 .expect_produce_send_notification_job()
688 .times(1)
689 .returning(|_, _| Box::pin(async { Ok(()) }));
690
691 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
692 let res = handler.submit_transaction_impl(tx).await.unwrap();
693
694 assert_eq!(res.status, TransactionStatus::Submitted);
695 }
696
697 #[tokio::test]
698 async fn resubmit_transaction_delegates_to_submit() {
699 let relayer = create_test_relayer();
700 let mut mocks = default_test_mocks();
701
702 let response = create_send_tx_response(
704 "PENDING",
705 "0101010101010101010101010101010101010101010101010101010101010101",
706 );
707 mocks
708 .provider
709 .expect_send_transaction_with_status()
710 .returning(move |_| {
711 let r = response.clone();
712 Box::pin(async move { Ok(r) })
713 });
714
715 mocks
717 .tx_repo
718 .expect_partial_update()
719 .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
720 .returning(|id, upd| {
721 let mut tx = create_test_transaction("relayer-1");
722 tx.id = id;
723 tx.status = upd.status.unwrap();
724 Ok::<_, RepositoryError>(tx)
725 });
726
727 mocks
729 .job_producer
730 .expect_produce_send_notification_job()
731 .times(1)
732 .returning(|_, _| Box::pin(async { Ok(()) }));
733
734 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
735
736 let mut tx = create_test_transaction(&relayer.id);
737 tx.status = TransactionStatus::Sent; if let NetworkTransactionData::Stellar(ref mut d) = tx.network_data {
739 d.signatures.push(dummy_signature());
740 d.signed_envelope_xdr = Some(create_signed_xdr(TEST_PK, TEST_PK_2));
741 }
743
744 let res = handler.resubmit_transaction_impl(tx).await.unwrap();
745 assert_eq!(res.status, TransactionStatus::Submitted);
746 }
747
748 #[tokio::test]
749 async fn submit_transaction_failure_enqueues_next_transaction() {
750 let relayer = create_test_relayer();
751 let mut mocks = default_test_mocks();
752
753 mocks
755 .provider
756 .expect_send_transaction_with_status()
757 .returning(|_| {
758 Box::pin(async { Err(ProviderError::Other("Network error".to_string())) })
759 });
760
761 mocks
765 .tx_repo
766 .expect_partial_update()
767 .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
768 .returning(|id, upd| {
769 let mut tx = create_test_transaction("relayer-1");
770 tx.id = id;
771 tx.status = upd.status.unwrap();
772 Ok::<_, RepositoryError>(tx)
773 });
774
775 mocks
777 .job_producer
778 .expect_produce_send_notification_job()
779 .times(1)
780 .returning(|_, _| Box::pin(async { Ok(()) }));
781
782 let mut pending_tx = create_test_transaction(&relayer.id);
784 pending_tx.id = "next-pending-tx".to_string();
785 pending_tx.status = TransactionStatus::Pending;
786 let captured_pending_tx = pending_tx.clone();
787 let relayer_id_clone = relayer.id.clone();
788 mocks
789 .tx_repo
790 .expect_find_by_status_paginated()
791 .withf(move |relayer_id, statuses, query, oldest_first| {
792 *relayer_id == relayer_id_clone
793 && statuses == [TransactionStatus::Pending]
794 && query.page == 1
795 && query.per_page == 1
796 && *oldest_first
797 })
798 .times(1)
799 .returning(move |_, _, _, _| {
800 Ok(PaginatedResult {
801 items: vec![captured_pending_tx.clone()],
802 total: 1,
803 page: 1,
804 per_page: 1,
805 })
806 });
807
808 mocks
810 .job_producer
811 .expect_produce_transaction_request_job()
812 .withf(move |job, _delay| job.transaction_id == "next-pending-tx")
813 .times(1)
814 .returning(|_, _| Box::pin(async { Ok(()) }));
815
816 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
817 let mut tx = create_test_transaction(&relayer.id);
818 tx.status = TransactionStatus::Sent; if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
820 data.signatures.push(dummy_signature());
821 data.sequence_number = Some(42); data.signed_envelope_xdr = Some("test-xdr".to_string()); }
824
825 let res = handler.submit_transaction_impl(tx).await;
826
827 let failed_tx = res.unwrap();
829 assert_eq!(failed_tx.status, TransactionStatus::Failed);
830 }
831
832 #[tokio::test]
833 async fn test_submit_bad_sequence_resets_and_retries() {
834 let relayer = create_test_relayer();
835 let mut mocks = default_test_mocks();
836
837 mocks
839 .provider
840 .expect_send_transaction_with_status()
841 .returning(|_| {
842 Box::pin(async {
843 Err(ProviderError::Other(
844 "transaction submission failed: TxBadSeq".to_string(),
845 ))
846 })
847 });
848
849 mocks.provider.expect_get_account().times(1).returning(|_| {
851 Box::pin(async {
852 use soroban_rs::xdr::{
853 AccountEntry, AccountEntryExt, AccountId, PublicKey, SequenceNumber,
854 String32, Thresholds, Uint256,
855 };
856 use stellar_strkey::ed25519;
857
858 let pk = ed25519::PublicKey::from_string(TEST_PK).unwrap();
859 let account_id = AccountId(PublicKey::PublicKeyTypeEd25519(Uint256(pk.0)));
860
861 Ok(AccountEntry {
862 account_id,
863 balance: 1000000,
864 seq_num: SequenceNumber(100),
865 num_sub_entries: 0,
866 inflation_dest: None,
867 flags: 0,
868 home_domain: String32::default(),
869 thresholds: Thresholds([1, 1, 1, 1]),
870 signers: Default::default(),
871 ext: AccountEntryExt::V0,
872 })
873 })
874 });
875
876 mocks
878 .counter
879 .expect_set()
880 .times(1)
881 .returning(|_, _, _| Box::pin(async { Ok(()) }));
882
883 mocks
885 .tx_repo
886 .expect_partial_update()
887 .withf(|_, upd| upd.status == Some(TransactionStatus::Pending))
888 .times(1)
889 .returning(|id, upd| {
890 let mut tx = create_test_transaction("relayer-1");
891 tx.id = id;
892 tx.status = upd.status.unwrap();
893 if let Some(network_data) = upd.network_data {
894 tx.network_data = network_data;
895 }
896 Ok::<_, RepositoryError>(tx)
897 });
898
899 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
903 let mut tx = create_test_transaction(&relayer.id);
904 tx.status = TransactionStatus::Sent; if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
906 data.signatures.push(dummy_signature());
907 data.sequence_number = Some(42);
908 data.signed_envelope_xdr = Some(create_signed_xdr(TEST_PK, TEST_PK_2));
909 }
911
912 let result = handler.submit_transaction_impl(tx).await;
913
914 assert!(result.is_ok());
916 let reset_tx = result.unwrap();
917 assert_eq!(reset_tx.status, TransactionStatus::Pending);
918
919 if let NetworkTransactionData::Stellar(data) = &reset_tx.network_data {
921 assert!(data.sequence_number.is_none());
922 assert!(data.signatures.is_empty());
923 assert!(data.hash.is_none());
924 assert!(data.signed_envelope_xdr.is_none());
925 } else {
926 panic!("Expected Stellar transaction data");
927 }
928 }
929
930 #[tokio::test]
931 async fn submit_transaction_duplicate_status_succeeds() {
932 let relayer = create_test_relayer();
933 let mut mocks = default_test_mocks();
934
935 let response = create_send_tx_response(
937 "DUPLICATE",
938 "0101010101010101010101010101010101010101010101010101010101010101",
939 );
940 mocks
941 .provider
942 .expect_send_transaction_with_status()
943 .returning(move |_| {
944 let r = response.clone();
945 Box::pin(async move { Ok(r) })
946 });
947
948 mocks
950 .tx_repo
951 .expect_partial_update()
952 .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
953 .returning(|id, upd| {
954 let mut tx = create_test_transaction("relayer-1");
955 tx.id = id;
956 tx.status = upd.status.unwrap();
957 Ok::<_, RepositoryError>(tx)
958 });
959
960 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
961
962 let mut tx = create_test_transaction(&relayer.id);
963 tx.status = TransactionStatus::Sent;
964 if let NetworkTransactionData::Stellar(ref mut d) = tx.network_data {
965 d.signatures.push(dummy_signature());
966 d.signed_envelope_xdr = Some(create_signed_xdr(TEST_PK, TEST_PK_2));
967 }
968
969 let res = handler.submit_transaction_impl(tx).await.unwrap();
970 assert_eq!(res.status, TransactionStatus::Submitted);
971 }
972
973 #[tokio::test]
974 async fn submit_transaction_try_again_later_keeps_tx_alive() {
975 let relayer = create_test_relayer();
976 let mut mocks = default_test_mocks();
977
978 let response = create_send_tx_response(
980 "TRY_AGAIN_LATER",
981 "0101010101010101010101010101010101010101010101010101010101010101",
982 );
983 mocks
984 .provider
985 .expect_send_transaction_with_status()
986 .returning(move |_| {
987 let r = response.clone();
988 Box::pin(async move { Ok(r) })
989 });
990
991 mocks
992 .tx_repo
993 .expect_record_stellar_try_again_later_retry()
994 .withf(|id, sent_at| id == "tx-1" && !sent_at.is_empty())
995 .returning(|id, _| {
996 let mut tx = create_test_transaction("relayer-1");
997 tx.id = id;
998 tx.status = TransactionStatus::Sent;
999 tx.metadata = Some(TransactionMetadata {
1000 consecutive_failures: 0,
1001 total_failures: 0,
1002 insufficient_fee_retries: 0,
1003 try_again_later_retries: 1,
1004 });
1005 Ok::<_, RepositoryError>(tx)
1006 });
1007
1008 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1009 let mut tx = create_test_transaction(&relayer.id);
1010 tx.status = TransactionStatus::Sent;
1011 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
1012 data.signatures.push(dummy_signature());
1013 data.signed_envelope_xdr = Some(create_signed_xdr(TEST_PK, TEST_PK_2));
1014 }
1015
1016 let res = handler.submit_transaction_impl(tx).await;
1017
1018 let returned_tx = res.unwrap();
1020 assert_eq!(returned_tx.status, TransactionStatus::Sent);
1021 }
1022
1023 #[tokio::test]
1024 async fn submit_try_again_later_then_status_checker_reenqueues_submit() {
1025 let relayer = create_test_relayer();
1026
1027 let mut submit_mocks = default_test_mocks();
1029 let response = create_send_tx_response(
1030 "TRY_AGAIN_LATER",
1031 "0101010101010101010101010101010101010101010101010101010101010101",
1032 );
1033 submit_mocks
1034 .provider
1035 .expect_send_transaction_with_status()
1036 .times(1)
1037 .returning(move |_| {
1038 let r = response.clone();
1039 Box::pin(async move { Ok(r) })
1040 });
1041 submit_mocks
1042 .tx_repo
1043 .expect_record_stellar_try_again_later_retry()
1044 .withf(|id, sent_at| id == "tx-1" && !sent_at.is_empty())
1045 .times(1)
1046 .returning(|id, sent_at| {
1047 let mut tx = create_test_transaction("relayer-1");
1048 tx.id = id;
1049 tx.status = TransactionStatus::Sent;
1050 tx.sent_at = Some(sent_at);
1051 tx.metadata = Some(TransactionMetadata {
1052 consecutive_failures: 0,
1053 total_failures: 0,
1054 insufficient_fee_retries: 0,
1055 try_again_later_retries: 1,
1056 });
1057 Ok::<_, RepositoryError>(tx)
1058 });
1059
1060 let submit_handler = make_stellar_tx_handler(relayer.clone(), submit_mocks);
1061 let mut sent_tx = create_test_transaction(&relayer.id);
1062 sent_tx.status = TransactionStatus::Sent;
1063 if let NetworkTransactionData::Stellar(ref mut data) = sent_tx.network_data {
1064 data.signatures.push(dummy_signature());
1065 data.signed_envelope_xdr = Some(create_signed_xdr(TEST_PK, TEST_PK_2));
1066 }
1067
1068 let mut returned_tx = submit_handler
1069 .submit_transaction_impl(sent_tx)
1070 .await
1071 .unwrap();
1072 assert_eq!(returned_tx.status, TransactionStatus::Sent);
1073 assert!(returned_tx.sent_at.is_some());
1074
1075 use crate::constants::STELLAR_RESUBMIT_BASE_INTERVAL_SECONDS;
1080 let buffer = 2;
1081 let created_at = (Utc::now()
1082 - chrono::Duration::seconds(STELLAR_RESUBMIT_BASE_INTERVAL_SECONDS + buffer))
1083 .to_rfc3339();
1084 let sent_at = (Utc::now()
1085 - chrono::Duration::seconds(STELLAR_RESUBMIT_BASE_INTERVAL_SECONDS + 1))
1086 .to_rfc3339();
1087 returned_tx.created_at = created_at;
1088 returned_tx.sent_at = Some(sent_at);
1089
1090 let mut status_mocks = default_test_mocks();
1091 status_mocks
1092 .job_producer
1093 .expect_produce_submit_transaction_job()
1094 .times(1)
1095 .returning(|_, _| Box::pin(async { Ok(()) }));
1096
1097 let status_handler = make_stellar_tx_handler(relayer.clone(), status_mocks);
1098 let status_result = status_handler
1099 .handle_transaction_status_impl(returned_tx, None)
1100 .await
1101 .unwrap();
1102 assert_eq!(status_result.status, TransactionStatus::Sent);
1103 }
1104
1105 #[tokio::test]
1106 async fn resubmit_try_again_later_returns_ok_for_submitted_tx() {
1107 let relayer = create_test_relayer();
1108 let mut mocks = default_test_mocks();
1109
1110 let response = create_send_tx_response(
1112 "TRY_AGAIN_LATER",
1113 "0101010101010101010101010101010101010101010101010101010101010101",
1114 );
1115 mocks
1116 .provider
1117 .expect_send_transaction_with_status()
1118 .returning(move |_| {
1119 let r = response.clone();
1120 Box::pin(async move { Ok(r) })
1121 });
1122
1123 mocks
1124 .tx_repo
1125 .expect_record_stellar_try_again_later_retry()
1126 .withf(|id, sent_at| id == "tx-1" && !sent_at.is_empty())
1127 .returning(|id, _| {
1128 let mut tx = create_test_transaction("relayer-1");
1129 tx.id = id;
1130 tx.status = TransactionStatus::Submitted;
1131 tx.metadata = Some(TransactionMetadata {
1132 consecutive_failures: 0,
1133 total_failures: 0,
1134 insufficient_fee_retries: 0,
1135 try_again_later_retries: 1,
1136 });
1137 Ok::<_, RepositoryError>(tx)
1138 });
1139
1140 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1141 let mut tx = create_test_transaction(&relayer.id);
1142 tx.status = TransactionStatus::Submitted; if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
1144 data.signatures.push(dummy_signature());
1145 data.signed_envelope_xdr = Some(create_signed_xdr(TEST_PK, TEST_PK_2));
1146 }
1147
1148 let res = handler.submit_transaction_impl(tx).await;
1149
1150 let returned_tx = res.unwrap();
1152 assert_eq!(returned_tx.status, TransactionStatus::Submitted);
1153 }
1154
1155 #[tokio::test]
1156 async fn submit_transaction_error_status_fails() {
1157 let relayer = create_test_relayer();
1158 let mut mocks = default_test_mocks();
1159
1160 let mut response = create_send_tx_response(
1162 "ERROR",
1163 "0101010101010101010101010101010101010101010101010101010101010101",
1164 );
1165 response.error_result_xdr = Some("not-base64".to_string());
1166 mocks
1167 .provider
1168 .expect_send_transaction_with_status()
1169 .returning(move |_| {
1170 let r = response.clone();
1171 Box::pin(async move { Ok(r) })
1172 });
1173
1174 mocks
1176 .tx_repo
1177 .expect_partial_update()
1178 .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
1179 .returning(|id, upd| {
1180 let mut tx = create_test_transaction("relayer-1");
1181 tx.id = id;
1182 tx.status = upd.status.unwrap();
1183 Ok::<_, RepositoryError>(tx)
1184 });
1185
1186 mocks
1188 .job_producer
1189 .expect_produce_send_notification_job()
1190 .times(1)
1191 .returning(|_, _| Box::pin(async { Ok(()) }));
1192
1193 mocks
1195 .tx_repo
1196 .expect_find_by_status_paginated()
1197 .returning(move |_, _, _, _| {
1198 Ok(PaginatedResult {
1199 items: vec![],
1200 total: 0,
1201 page: 1,
1202 per_page: 1,
1203 })
1204 });
1205
1206 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1207 let mut tx = create_test_transaction(&relayer.id);
1208 tx.status = TransactionStatus::Sent;
1209 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
1210 data.signatures.push(dummy_signature());
1211 data.signed_envelope_xdr = Some(create_signed_xdr(TEST_PK, TEST_PK_2));
1212 }
1213
1214 let res = handler.submit_transaction_impl(tx).await;
1215
1216 let failed_tx = res.unwrap();
1218 assert_eq!(failed_tx.status, TransactionStatus::Failed);
1219 }
1220
1221 #[tokio::test]
1222 async fn submit_transaction_insufficient_fee_keeps_tx_alive() {
1223 let relayer = create_test_relayer();
1224 let mut mocks = default_test_mocks();
1225
1226 let mut response = create_send_tx_response(
1228 "ERROR",
1229 "0101010101010101010101010101010101010101010101010101010101010101",
1230 );
1231 response.error_result_xdr = Some("AAAAAAAAY/n////3AAAAAA==".to_string());
1232 mocks
1233 .provider
1234 .expect_send_transaction_with_status()
1235 .returning(move |_| {
1236 let r = response.clone();
1237 Box::pin(async move { Ok(r) })
1238 });
1239
1240 mocks
1242 .tx_repo
1243 .expect_record_stellar_insufficient_fee_retry()
1244 .withf(|id, sent_at| id == "tx-1" && !sent_at.is_empty())
1245 .returning(|id, _| {
1246 let mut tx = create_test_transaction("relayer-1");
1247 tx.id = id;
1248 tx.status = TransactionStatus::Sent;
1249 tx.metadata = Some(TransactionMetadata {
1250 consecutive_failures: 0,
1251 total_failures: 0,
1252 insufficient_fee_retries: 1,
1253 try_again_later_retries: 0,
1254 });
1255 Ok::<_, RepositoryError>(tx)
1256 })
1257 .times(1);
1258
1259 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1260 let mut tx = create_test_transaction(&relayer.id);
1261 tx.status = TransactionStatus::Sent;
1262 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
1263 data.signatures.push(dummy_signature());
1264 data.signed_envelope_xdr = Some(create_signed_xdr(TEST_PK, TEST_PK_2));
1265 }
1266
1267 let res = handler.submit_transaction_impl(tx).await;
1268
1269 let returned_tx = res.unwrap();
1271 assert_eq!(returned_tx.status, TransactionStatus::Sent);
1272 assert_eq!(
1273 returned_tx
1274 .metadata
1275 .as_ref()
1276 .map(|metadata| metadata.insufficient_fee_retries),
1277 Some(1)
1278 );
1279 }
1280
1281 #[tokio::test]
1282 async fn submit_transaction_insufficient_fee_exceeding_retry_limit_fails() {
1283 let relayer = create_test_relayer();
1284 let mut mocks = default_test_mocks();
1285
1286 let mut response = create_send_tx_response(
1287 "ERROR",
1288 "0101010101010101010101010101010101010101010101010101010101010101",
1289 );
1290 response.error_result_xdr = Some("AAAAAAAAY/n////3AAAAAA==".to_string());
1291 mocks
1292 .provider
1293 .expect_send_transaction_with_status()
1294 .returning(move |_| {
1295 let r = response.clone();
1296 Box::pin(async move { Ok(r) })
1297 });
1298
1299 mocks
1300 .tx_repo
1301 .expect_partial_update()
1302 .withf(|_, upd| {
1303 upd.status == Some(TransactionStatus::Failed)
1304 && upd.status_reason.as_ref().is_some_and(|reason| {
1305 reason.contains("insufficient fee retry limit exceeded (2)")
1306 })
1307 })
1308 .returning(|id, upd| {
1309 let mut tx = create_test_transaction("relayer-1");
1310 tx.id = id;
1311 tx.status = upd.status.unwrap();
1312 tx.status_reason = upd.status_reason;
1313 Ok::<_, RepositoryError>(tx)
1314 });
1315
1316 mocks
1317 .job_producer
1318 .expect_produce_send_notification_job()
1319 .times(1)
1320 .returning(|_, _| Box::pin(async { Ok(()) }));
1321
1322 mocks
1323 .tx_repo
1324 .expect_find_by_status_paginated()
1325 .returning(move |_, _, _, _| {
1326 Ok(PaginatedResult {
1327 items: vec![],
1328 total: 0,
1329 page: 1,
1330 per_page: 1,
1331 })
1332 });
1333
1334 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1335 let mut tx = create_test_transaction(&relayer.id);
1336 tx.status = TransactionStatus::Sent;
1337 tx.metadata = Some(TransactionMetadata {
1338 insufficient_fee_retries: STELLAR_INSUFFICIENT_FEE_MAX_RETRIES,
1339 ..Default::default()
1340 });
1341 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
1342 data.signatures.push(dummy_signature());
1343 data.signed_envelope_xdr = Some(create_signed_xdr(TEST_PK, TEST_PK_2));
1344 }
1345
1346 let res = handler.submit_transaction_impl(tx).await;
1347
1348 let failed_tx = res.unwrap();
1349 assert_eq!(failed_tx.status, TransactionStatus::Failed);
1350 assert!(
1351 failed_tx.status_reason.as_ref().is_some_and(
1352 |reason| reason.contains("insufficient fee retry limit exceeded (2)")
1353 )
1354 );
1355 }
1356
1357 #[tokio::test]
1358 async fn submit_transaction_error_non_fee_still_fails() {
1359 let relayer = create_test_relayer();
1360 let mut mocks = default_test_mocks();
1361
1362 let mut response = create_send_tx_response(
1364 "ERROR",
1365 "0101010101010101010101010101010101010101010101010101010101010101",
1366 );
1367 response.error_result_xdr = Some("AAAAAAAAA/v////6AAAAAA==".to_string());
1368 mocks
1369 .provider
1370 .expect_send_transaction_with_status()
1371 .returning(move |_| {
1372 let r = response.clone();
1373 Box::pin(async move { Ok(r) })
1374 });
1375
1376 mocks
1378 .tx_repo
1379 .expect_partial_update()
1380 .withf(|_, upd| upd.status == Some(TransactionStatus::Failed))
1381 .returning(|id, upd| {
1382 let mut tx = create_test_transaction("relayer-1");
1383 tx.id = id;
1384 tx.status = upd.status.unwrap();
1385 Ok::<_, RepositoryError>(tx)
1386 });
1387
1388 mocks
1390 .job_producer
1391 .expect_produce_send_notification_job()
1392 .times(1)
1393 .returning(|_, _| Box::pin(async { Ok(()) }));
1394
1395 mocks
1397 .tx_repo
1398 .expect_find_by_status_paginated()
1399 .returning(move |_, _, _, _| {
1400 Ok(PaginatedResult {
1401 items: vec![],
1402 total: 0,
1403 page: 1,
1404 per_page: 1,
1405 })
1406 });
1407
1408 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1409 let mut tx = create_test_transaction(&relayer.id);
1410 tx.status = TransactionStatus::Sent;
1411 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
1412 data.signatures.push(dummy_signature());
1413 data.signed_envelope_xdr = Some(create_signed_xdr(TEST_PK, TEST_PK_2));
1414 }
1415
1416 let res = handler.submit_transaction_impl(tx).await;
1417
1418 let failed_tx = res.unwrap();
1420 assert_eq!(failed_tx.status, TransactionStatus::Failed);
1421 }
1422
1423 #[tokio::test]
1424 async fn submit_transaction_concurrent_update_conflict_reloads_latest_state() {
1425 let relayer = create_test_relayer();
1428 let mut mocks = default_test_mocks();
1429
1430 let response = create_send_tx_response(
1432 "PENDING",
1433 "0101010101010101010101010101010101010101010101010101010101010101",
1434 );
1435 mocks
1436 .provider
1437 .expect_send_transaction_with_status()
1438 .returning(move |_| {
1439 let r = response.clone();
1440 Box::pin(async move { Ok(r) })
1441 });
1442
1443 mocks
1445 .tx_repo
1446 .expect_partial_update()
1447 .withf(|_, upd| upd.status == Some(TransactionStatus::Submitted))
1448 .times(1)
1449 .returning(|_, _| {
1450 Err(RepositoryError::ConcurrentUpdateConflict(
1451 "CAS mismatch".to_string(),
1452 ))
1453 });
1454
1455 let reloaded_tx = {
1457 let mut t = create_test_transaction(&relayer.id);
1458 t.status = TransactionStatus::Submitted;
1459 t
1460 };
1461 let reloaded_clone = reloaded_tx.clone();
1462 mocks
1463 .tx_repo
1464 .expect_get_by_id()
1465 .times(1)
1466 .returning(move |_| Ok(reloaded_clone.clone()));
1467
1468 mocks
1470 .job_producer
1471 .expect_produce_send_notification_job()
1472 .never();
1473 mocks
1474 .job_producer
1475 .expect_produce_transaction_request_job()
1476 .never();
1477
1478 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1479 let mut tx = create_test_transaction(&relayer.id);
1480 tx.status = TransactionStatus::Sent;
1481 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
1482 data.signatures.push(dummy_signature());
1483 data.signed_envelope_xdr = Some(create_signed_xdr(TEST_PK, TEST_PK_2));
1484 }
1485
1486 let res = handler.submit_transaction_impl(tx).await;
1487
1488 assert!(res.is_ok(), "CAS conflict should return Ok after reload");
1489 let returned_tx = res.unwrap();
1490 assert_eq!(returned_tx.status, TransactionStatus::Submitted);
1492 }
1493 }
1494}