1use std::{fmt, sync::Arc};
4
5use crate::observability::request_id::get_request_id;
6use crate::{
7 jobs::JobProducerTrait,
8 models::{
9 AppState, NetworkRepoModel, NotificationRepoModel, PluginCallRequest, PluginMetadata,
10 PluginModel, RelayerRepoModel, SignerRepoModel, ThinDataAppState, TransactionRepoModel,
11 },
12 repositories::{
13 ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository,
14 Repository, TransactionCounterTrait, TransactionRepository,
15 },
16};
17use actix_web::web;
18use async_trait::async_trait;
19use serde::{Deserialize, Serialize};
20use thiserror::Error;
21use uuid::Uuid;
22
23pub mod config;
24pub use config::*;
25
26pub mod health;
27pub use health::*;
28
29pub mod protocol;
30pub use protocol::*;
31
32pub mod connection;
33pub use connection::*;
34
35pub mod runner;
36pub use runner::*;
37
38pub mod relayer_api;
39pub use relayer_api::*;
40
41pub mod script_executor;
42pub use script_executor::*;
43
44pub mod pool_executor;
45pub use pool_executor::*;
46
47pub mod shared_socket;
48pub use shared_socket::*;
49
50#[cfg(test)]
51use mockall::automock;
52
53#[derive(Error, Debug, Serialize)]
54pub enum PluginError {
55 #[error("Socket error: {0}")]
56 SocketError(String),
57 #[error("Plugin error: {0}")]
58 PluginError(String),
59 #[error("Relayer error: {0}")]
60 RelayerError(String),
61 #[error("Plugin execution error: {0}")]
62 PluginExecutionError(String),
63 #[error("Script execution timed out after {0} seconds")]
64 ScriptTimeout(u64),
65 #[error("Invalid method: {0}")]
66 InvalidMethod(String),
67 #[error("Invalid payload: {0}")]
68 InvalidPayload(String),
69 #[error("{0}")]
70 HandlerError(Box<PluginHandlerPayload>),
71}
72
73impl PluginError {
74 pub fn with_traces(self, traces: Vec<serde_json::Value>) -> Self {
77 match self {
78 PluginError::HandlerError(mut payload) => {
79 payload.append_traces(traces);
80 PluginError::HandlerError(payload)
81 }
82 other => other,
83 }
84 }
85}
86
87impl From<PluginError> for String {
88 fn from(error: PluginError) -> Self {
89 error.to_string()
90 }
91}
92
93#[derive(Debug, Serialize, Deserialize, utoipa::ToSchema)]
94pub struct PluginCallResponse {
95 pub result: serde_json::Value,
97 #[serde(skip_serializing_if = "Option::is_none")]
99 pub metadata: Option<PluginMetadata>,
100}
101
102#[derive(Debug, Serialize, Deserialize, utoipa::ToSchema)]
103pub struct PluginHandlerError {
104 #[serde(skip_serializing_if = "Option::is_none")]
105 pub code: Option<String>,
106 #[serde(skip_serializing_if = "Option::is_none")]
107 pub details: Option<serde_json::Value>,
108}
109
110#[derive(Debug)]
111pub struct PluginHandlerResponse {
112 pub status: u16,
113 pub message: String,
114 pub error: PluginHandlerError,
115 pub metadata: Option<PluginMetadata>,
116}
117
118#[derive(Debug, Serialize)]
119pub struct PluginHandlerPayload {
120 pub status: u16,
121 pub message: String,
122 #[serde(skip_serializing_if = "Option::is_none")]
123 pub code: Option<String>,
124 #[serde(skip_serializing_if = "Option::is_none")]
125 pub details: Option<serde_json::Value>,
126 #[serde(skip_serializing_if = "Option::is_none")]
127 pub logs: Option<Vec<LogEntry>>,
128 #[serde(skip_serializing_if = "Option::is_none")]
129 pub traces: Option<Vec<serde_json::Value>>,
130}
131
132impl PluginHandlerPayload {
133 fn append_traces(&mut self, traces: Vec<serde_json::Value>) {
134 match &mut self.traces {
135 Some(existing) => existing.extend(traces),
136 None => self.traces = Some(traces),
137 }
138 }
139
140 fn into_response(self, emit_logs: bool, emit_traces: bool) -> PluginHandlerResponse {
141 let logs = if emit_logs { self.logs } else { None };
142 let traces = if emit_traces { self.traces } else { None };
143 let message = derive_handler_message(&self.message, logs.as_deref());
144 let metadata = build_metadata(logs, traces);
145
146 PluginHandlerResponse {
147 status: self.status,
148 message,
149 error: PluginHandlerError {
150 code: self.code,
151 details: self.details,
152 },
153 metadata,
154 }
155 }
156}
157
158impl fmt::Display for PluginHandlerPayload {
159 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
160 f.write_str(&self.message)
161 }
162}
163
164fn derive_handler_message(message: &str, logs: Option<&[LogEntry]>) -> String {
165 if !message.trim().is_empty() {
166 return message.to_string();
167 }
168
169 if let Some(logs) = logs {
170 if let Some(entry) = logs
171 .iter()
172 .rev()
173 .find(|entry| matches!(entry.level, LogLevel::Error | LogLevel::Warn))
174 {
175 return entry.message.clone();
176 }
177
178 if let Some(entry) = logs.last() {
179 return entry.message.clone();
180 }
181 }
182
183 "Plugin execution failed".to_string()
184}
185
186fn build_metadata(
187 logs: Option<Vec<LogEntry>>,
188 traces: Option<Vec<serde_json::Value>>,
189) -> Option<PluginMetadata> {
190 if logs.is_some() || traces.is_some() {
191 Some(PluginMetadata { logs, traces })
192 } else {
193 None
194 }
195}
196
197fn forward_logs_to_tracing(plugin_id: &str, logs: &[LogEntry], request_id: &str) {
198 for entry in logs {
199 match entry.level {
200 LogLevel::Error => {
201 tracing::error!(target: "plugin", plugin_id = %plugin_id, request_id = %request_id, "{}", entry.message)
202 }
203 LogLevel::Warn => {
204 tracing::warn!(target: "plugin", plugin_id = %plugin_id, request_id = %request_id, "{}", entry.message)
205 }
206 LogLevel::Info | LogLevel::Log => {
207 tracing::info!(target: "plugin", plugin_id = %plugin_id, request_id = %request_id, "{}", entry.message)
208 }
209 LogLevel::Debug => {
210 tracing::debug!(target: "plugin", plugin_id = %plugin_id, request_id = %request_id, "{}", entry.message)
211 }
212 LogLevel::Result => {}
213 }
214 }
215}
216
217#[derive(Debug)]
218pub enum PluginCallResult {
219 Success(PluginCallResponse),
220 Handler(PluginHandlerResponse),
221 Fatal(PluginError),
222}
223
224#[derive(Default)]
225pub struct PluginService<R: PluginRunnerTrait> {
226 runner: R,
227}
228
229impl<R: PluginRunnerTrait> PluginService<R> {
230 pub fn new(runner: R) -> Self {
231 Self { runner }
232 }
233
234 pub fn resolve_plugin_path(plugin_path: &str) -> String {
235 if plugin_path.starts_with("plugins/") {
236 plugin_path.to_string()
237 } else {
238 format!("plugins/{plugin_path}")
239 }
240 }
241
242 #[allow(clippy::type_complexity)]
243 async fn call_plugin<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
244 &self,
245 plugin: PluginModel,
246 plugin_call_request: PluginCallRequest,
247 state: Arc<ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>>,
248 ) -> PluginCallResult
249 where
250 J: JobProducerTrait + Send + Sync + 'static,
251 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
252 TR: TransactionRepository
253 + Repository<TransactionRepoModel, String>
254 + Send
255 + Sync
256 + 'static,
257 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
258 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
259 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
260 TCR: TransactionCounterTrait + Send + Sync + 'static,
261 PR: PluginRepositoryTrait + Send + Sync + 'static,
262 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
263 {
264 let socket_path = format!("/tmp/{}.sock", Uuid::new_v4());
265 let script_path = Self::resolve_plugin_path(&plugin.path);
266 let script_params = plugin_call_request.params.to_string();
267 let headers_json = plugin_call_request
268 .headers
269 .map(|h| serde_json::to_string(&h).unwrap_or_default());
270 let route = plugin_call_request.route;
271 let config_json = plugin
272 .config
273 .map(|c| serde_json::to_string(&c).unwrap_or_default());
274 let method = plugin_call_request.method;
275 let query_json = plugin_call_request
276 .query
277 .map(|q| serde_json::to_string(&q).unwrap_or_default());
278
279 let request_id = get_request_id();
280 let request_id_for_logs: String = request_id
281 .as_deref()
282 .filter(|id| !id.trim().is_empty())
283 .map(str::to_owned)
284 .unwrap_or_else(|| Uuid::new_v4().to_string());
285 let result = self
286 .runner
287 .run(
288 plugin.id.clone(),
289 &socket_path,
290 script_path,
291 plugin.timeout,
292 script_params,
293 request_id,
294 headers_json,
295 route,
296 config_json,
297 method,
298 query_json,
299 plugin.emit_traces,
300 state,
301 )
302 .await;
303
304 match result {
305 Ok(script_result) => {
306 if plugin.forward_logs {
307 forward_logs_to_tracing(&plugin.id, &script_result.logs, &request_id_for_logs);
308 }
309 let logs = if plugin.emit_logs {
311 Some(script_result.logs)
312 } else {
313 None
314 };
315 let traces = if plugin.emit_traces {
316 Some(script_result.trace)
317 } else {
318 None
319 };
320 let metadata = build_metadata(logs, traces);
321
322 let result = if script_result.return_value.trim() == "undefined" {
324 serde_json::Value::Null
325 } else {
326 serde_json::from_str::<serde_json::Value>(&script_result.return_value)
327 .unwrap_or(serde_json::Value::String(script_result.return_value))
328 };
329
330 PluginCallResult::Success(PluginCallResponse { result, metadata })
331 }
332 Err(e) => match e {
333 PluginError::HandlerError(payload) => {
334 if plugin.forward_logs {
335 if let Some(logs) = payload.logs.as_deref() {
336 forward_logs_to_tracing(&plugin.id, logs, &request_id_for_logs);
337 }
338 }
339 let failure = payload.into_response(plugin.emit_logs, plugin.emit_traces);
340 let has_logs = failure
341 .metadata
342 .as_ref()
343 .and_then(|meta| meta.logs.as_ref())
344 .is_some();
345 let has_traces = failure
346 .metadata
347 .as_ref()
348 .and_then(|meta| meta.traces.as_ref())
349 .is_some();
350
351 tracing::debug!(
352 status = failure.status,
353 message = %failure.message,
354 code = ?failure.error.code.as_ref(),
355 details = ?failure.error.details.as_ref(),
356 has_logs,
357 has_traces,
358 "Plugin handler returned error"
359 );
360
361 PluginCallResult::Handler(failure)
362 }
363 PluginError::ScriptTimeout(secs) => {
364 let message = format!("Plugin execution timed out after {secs} seconds");
365 tracing::warn!(
366 timeout_secs = secs,
367 plugin_id = %plugin.id,
368 "Plugin execution timed out"
369 );
370 PluginCallResult::Handler(PluginHandlerResponse {
371 status: 504,
372 message,
373 error: PluginHandlerError {
374 code: Some("TIMEOUT".to_string()),
375 details: None,
376 },
377 metadata: None,
378 })
379 }
380 other => {
381 tracing::error!("Plugin execution failed: {:?}", other);
383 PluginCallResult::Fatal(other)
384 }
385 },
386 }
387 }
388}
389
390#[async_trait]
391#[cfg_attr(test, automock)]
392pub trait PluginServiceTrait<J, TR, RR, NR, NFR, SR, TCR, PR, AKR>: Send + Sync
393where
394 J: JobProducerTrait + 'static,
395 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
396 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
397 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
398 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
399 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
400 TCR: TransactionCounterTrait + Send + Sync + 'static,
401 PR: PluginRepositoryTrait + Send + Sync + 'static,
402 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
403{
404 fn new(runner: PluginRunner) -> Self;
405 async fn call_plugin(
406 &self,
407 plugin: PluginModel,
408 plugin_call_request: PluginCallRequest,
409 state: Arc<web::ThinData<AppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>>>,
410 ) -> PluginCallResult;
411}
412
413#[async_trait]
414impl<J, TR, RR, NR, NFR, SR, TCR, PR, AKR> PluginServiceTrait<J, TR, RR, NR, NFR, SR, TCR, PR, AKR>
415 for PluginService<PluginRunner>
416where
417 J: JobProducerTrait + 'static,
418 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
419 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
420 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
421 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
422 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
423 TCR: TransactionCounterTrait + Send + Sync + 'static,
424 PR: PluginRepositoryTrait + Send + Sync + 'static,
425 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
426{
427 fn new(runner: PluginRunner) -> Self {
428 Self::new(runner)
429 }
430
431 async fn call_plugin(
432 &self,
433 plugin: PluginModel,
434 plugin_call_request: PluginCallRequest,
435 state: Arc<web::ThinData<AppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>>>,
436 ) -> PluginCallResult {
437 self.call_plugin(plugin, plugin_call_request, state).await
438 }
439}
440
441#[cfg(test)]
442mod tests {
443 use std::{io::Write, time::Duration};
444
445 use crate::{
446 constants::DEFAULT_PLUGIN_TIMEOUT_SECONDS,
447 jobs::MockJobProducerTrait,
448 models::PluginModel,
449 repositories::{
450 ApiKeyRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage,
451 PluginRepositoryStorage, RelayerRepositoryStorage, SignerRepositoryStorage,
452 TransactionCounterRepositoryStorage, TransactionRepositoryStorage,
453 },
454 utils::mocks::mockutils::create_mock_app_state,
455 };
456
457 use super::*;
458
459 #[test]
460 fn test_resolve_plugin_path() {
461 assert_eq!(
462 PluginService::<MockPluginRunnerTrait>::resolve_plugin_path("plugins/examples/test.ts"),
463 "plugins/examples/test.ts"
464 );
465
466 assert_eq!(
467 PluginService::<MockPluginRunnerTrait>::resolve_plugin_path("examples/test.ts"),
468 "plugins/examples/test.ts"
469 );
470
471 assert_eq!(
472 PluginService::<MockPluginRunnerTrait>::resolve_plugin_path("test.ts"),
473 "plugins/test.ts"
474 );
475 }
476
477 #[tokio::test]
478 async fn test_call_plugin() {
479 let plugin = PluginModel {
480 id: "test-plugin".to_string(),
481 path: "test-path".to_string(),
482 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
483 emit_logs: true,
484 emit_traces: false,
485 raw_response: false,
486 allow_get_invocation: false,
487 config: None,
488 forward_logs: false,
489 };
490 let app_state =
491 create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
492
493 let mut plugin_runner = MockPluginRunnerTrait::default();
494
495 plugin_runner
496 .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
497 .returning(|_, _, _, _, _, _, _, _, _, _, _, _, _| {
498 Ok(ScriptResult {
499 logs: vec![LogEntry {
500 level: LogLevel::Log,
501 message: "test-log".to_string(),
502 }],
503 error: "test-error".to_string(),
504 return_value: "test-result".to_string(),
505 trace: Vec::new(),
506 })
507 });
508
509 let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
510 let outcome = plugin_service
511 .call_plugin(
512 plugin,
513 PluginCallRequest {
514 params: serde_json::Value::Null,
515 headers: None,
516 route: None,
517 method: Some("POST".to_string()),
518 query: None,
519 },
520 Arc::new(web::ThinData(app_state)),
521 )
522 .await;
523 match outcome {
524 PluginCallResult::Success(result) => {
525 assert_eq!(
527 result.result,
528 serde_json::Value::String("test-result".to_string())
529 );
530 assert!(result.metadata.and_then(|meta| meta.logs).is_some());
532 }
533 PluginCallResult::Handler(_) | PluginCallResult::Fatal(_) => {
534 panic!("expected success outcome")
535 }
536 }
537 }
538
539 #[tokio::test]
540 async fn test_from_plugin_error_to_string() {
541 let error = PluginError::PluginExecutionError("test-error".to_string());
542 let result: String = error.into();
543 assert_eq!(result, "Plugin execution error: test-error");
544 }
545
546 #[test]
547 fn test_plugin_error_with_traces_handler_error() {
548 let payload = PluginHandlerPayload {
549 status: 400,
550 message: "test message".to_string(),
551 code: Some("TEST_CODE".to_string()),
552 details: None,
553 logs: None,
554 traces: Some(vec![serde_json::json!({"trace": "1"})]),
555 };
556 let error = PluginError::HandlerError(Box::new(payload));
557 let new_traces = vec![
558 serde_json::json!({"trace": "2"}),
559 serde_json::json!({"trace": "3"}),
560 ];
561
562 let enriched_error = error.with_traces(new_traces);
563
564 match enriched_error {
565 PluginError::HandlerError(payload) => {
566 let traces = payload.traces.unwrap();
567 assert_eq!(traces.len(), 3);
568 assert_eq!(traces[0], serde_json::json!({"trace": "1"}));
569 assert_eq!(traces[1], serde_json::json!({"trace": "2"}));
570 assert_eq!(traces[2], serde_json::json!({"trace": "3"}));
571 }
572 _ => panic!("Expected HandlerError variant"),
573 }
574 }
575
576 #[test]
577 fn test_plugin_error_with_traces_other_variants() {
578 let error = PluginError::PluginExecutionError("test".to_string());
579 let new_traces = vec![serde_json::json!({"trace": "1"})];
580
581 let result = error.with_traces(new_traces);
582
583 match result {
584 PluginError::PluginExecutionError(msg) => assert_eq!(msg, "test"),
585 _ => panic!("Expected PluginExecutionError variant"),
586 }
587 }
588
589 #[test]
590 fn test_derive_handler_message_with_message() {
591 let result = derive_handler_message("Custom error message", None);
592 assert_eq!(result, "Custom error message");
593 }
594
595 #[test]
596 fn test_derive_handler_message_with_error_log() {
597 let logs = vec![
598 LogEntry {
599 level: LogLevel::Log,
600 message: "info log".to_string(),
601 },
602 LogEntry {
603 level: LogLevel::Error,
604 message: "error log".to_string(),
605 },
606 ];
607 let result = derive_handler_message("", Some(&logs));
608 assert_eq!(result, "error log");
609 }
610
611 #[test]
612 fn test_derive_handler_message_with_warn_log() {
613 let logs = vec![
614 LogEntry {
615 level: LogLevel::Log,
616 message: "info log".to_string(),
617 },
618 LogEntry {
619 level: LogLevel::Warn,
620 message: "warn log".to_string(),
621 },
622 ];
623 let result = derive_handler_message("", Some(&logs));
624 assert_eq!(result, "warn log");
625 }
626
627 #[test]
628 fn test_derive_handler_message_with_only_info_logs() {
629 let logs = vec![
630 LogEntry {
631 level: LogLevel::Log,
632 message: "first log".to_string(),
633 },
634 LogEntry {
635 level: LogLevel::Info,
636 message: "last log".to_string(),
637 },
638 ];
639 let result = derive_handler_message("", Some(&logs));
640 assert_eq!(result, "last log");
641 }
642
643 #[test]
644 fn test_derive_handler_message_no_logs() {
645 let result = derive_handler_message("", None);
646 assert_eq!(result, "Plugin execution failed");
647 }
648
649 #[test]
650 fn test_build_metadata_with_logs_and_traces() {
651 let logs = vec![LogEntry {
652 level: LogLevel::Log,
653 message: "test".to_string(),
654 }];
655 let traces = vec![serde_json::json!({"trace": "1"})];
656
657 let result = build_metadata(Some(logs.clone()), Some(traces.clone()));
658
659 assert!(result.is_some());
660 let metadata = result.unwrap();
661 assert_eq!(metadata.logs.unwrap(), logs);
662 assert_eq!(metadata.traces.unwrap(), traces);
663 }
664
665 #[test]
666 fn test_build_metadata_with_only_logs() {
667 let logs = vec![LogEntry {
668 level: LogLevel::Log,
669 message: "test".to_string(),
670 }];
671
672 let result = build_metadata(Some(logs.clone()), None);
673
674 assert!(result.is_some());
675 let metadata = result.unwrap();
676 assert_eq!(metadata.logs.unwrap(), logs);
677 assert!(metadata.traces.is_none());
678 }
679
680 #[test]
681 fn test_build_metadata_with_only_traces() {
682 let traces = vec![serde_json::json!({"trace": "1"})];
683
684 let result = build_metadata(None, Some(traces.clone()));
685
686 assert!(result.is_some());
687 let metadata = result.unwrap();
688 assert!(metadata.logs.is_none());
689 assert_eq!(metadata.traces.unwrap(), traces);
690 }
691
692 #[test]
693 fn test_build_metadata_with_neither() {
694 let result = build_metadata(None, None);
695 assert!(result.is_none());
696 }
697
698 #[test]
699 fn test_plugin_handler_payload_append_traces_to_existing() {
700 let mut payload = PluginHandlerPayload {
701 status: 400,
702 message: "test".to_string(),
703 code: None,
704 details: None,
705 logs: None,
706 traces: Some(vec![serde_json::json!({"trace": "1"})]),
707 };
708
709 payload.append_traces(vec![serde_json::json!({"trace": "2"})]);
710
711 let traces = payload.traces.unwrap();
712 assert_eq!(traces.len(), 2);
713 assert_eq!(traces[0], serde_json::json!({"trace": "1"}));
714 assert_eq!(traces[1], serde_json::json!({"trace": "2"}));
715 }
716
717 #[test]
718 fn test_plugin_handler_payload_append_traces_to_none() {
719 let mut payload = PluginHandlerPayload {
720 status: 400,
721 message: "test".to_string(),
722 code: None,
723 details: None,
724 logs: None,
725 traces: None,
726 };
727
728 payload.append_traces(vec![serde_json::json!({"trace": "1"})]);
729
730 let traces = payload.traces.unwrap();
731 assert_eq!(traces.len(), 1);
732 assert_eq!(traces[0], serde_json::json!({"trace": "1"}));
733 }
734
735 #[test]
736 fn test_plugin_handler_payload_into_response_with_logs_and_traces() {
737 let logs = vec![LogEntry {
738 level: LogLevel::Error,
739 message: "error message".to_string(),
740 }];
741 let payload = PluginHandlerPayload {
742 status: 400,
743 message: "".to_string(),
744 code: Some("ERR_CODE".to_string()),
745 details: Some(serde_json::json!({"key": "value"})),
746 logs: Some(logs.clone()),
747 traces: Some(vec![serde_json::json!({"trace": "1"})]),
748 };
749
750 let response = payload.into_response(true, true);
751
752 assert_eq!(response.status, 400);
753 assert_eq!(response.message, "error message"); assert_eq!(response.error.code, Some("ERR_CODE".to_string()));
755 assert!(response.metadata.is_some());
756 let metadata = response.metadata.unwrap();
757 assert_eq!(metadata.logs.unwrap(), logs);
758 assert_eq!(metadata.traces.unwrap().len(), 1);
759 }
760
761 #[test]
762 fn test_plugin_handler_payload_into_response_without_logs() {
763 let logs = vec![LogEntry {
764 level: LogLevel::Log,
765 message: "test log".to_string(),
766 }];
767 let payload = PluginHandlerPayload {
768 status: 500,
769 message: "explicit message".to_string(),
770 code: None,
771 details: None,
772 logs: Some(logs),
773 traces: None,
774 };
775
776 let response = payload.into_response(false, false);
777
778 assert_eq!(response.status, 500);
779 assert_eq!(response.message, "explicit message");
780 assert!(response.metadata.is_none()); }
782
783 #[tokio::test]
784 async fn test_call_plugin_handler_error() {
785 let plugin = PluginModel {
786 id: "test-plugin".to_string(),
787 path: "test-path".to_string(),
788 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
789 emit_logs: true,
790 emit_traces: true,
791 raw_response: false,
792 allow_get_invocation: false,
793 config: None,
794 forward_logs: false,
795 };
796 let app_state =
797 create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
798
799 let mut plugin_runner = MockPluginRunnerTrait::default();
800
801 plugin_runner
802 .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
803 .returning(move |_, _, _, _, _, _, _, _, _, _, _, _, _| {
804 Err(PluginError::HandlerError(Box::new(PluginHandlerPayload {
805 status: 400,
806 message: "Plugin handler error".to_string(),
807 code: Some("VALIDATION_ERROR".to_string()),
808 details: Some(serde_json::json!({"field": "email"})),
809 logs: Some(vec![LogEntry {
810 level: LogLevel::Error,
811 message: "Invalid email".to_string(),
812 }]),
813 traces: Some(vec![serde_json::json!({"step": "validation"})]),
814 })))
815 });
816
817 let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
818 let outcome = plugin_service
819 .call_plugin(
820 plugin,
821 PluginCallRequest {
822 params: serde_json::Value::Null,
823 headers: None,
824 route: None,
825 method: Some("POST".to_string()),
826 query: None,
827 },
828 Arc::new(web::ThinData(app_state)),
829 )
830 .await;
831
832 match outcome {
833 PluginCallResult::Handler(response) => {
834 assert_eq!(response.status, 400);
835 assert_eq!(response.error.code, Some("VALIDATION_ERROR".to_string()));
836 assert!(response.metadata.is_some());
837 let metadata = response.metadata.unwrap();
838 assert!(metadata.logs.is_some());
839 assert!(metadata.traces.is_some());
840 }
841 _ => panic!("Expected Handler result"),
842 }
843 }
844
845 #[tokio::test]
846 async fn test_call_plugin_fatal_error() {
847 let plugin = PluginModel {
848 id: "test-plugin".to_string(),
849 path: "test-path".to_string(),
850 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
851 emit_logs: false,
852 emit_traces: false,
853 raw_response: false,
854 allow_get_invocation: false,
855 config: None,
856 forward_logs: false,
857 };
858 let app_state =
859 create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
860
861 let mut plugin_runner = MockPluginRunnerTrait::default();
862
863 plugin_runner
864 .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
865 .returning(|_, _, _, _, _, _, _, _, _, _, _, _, _| {
866 Err(PluginError::PluginExecutionError("Fatal error".to_string()))
867 });
868
869 let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
870 let outcome = plugin_service
871 .call_plugin(
872 plugin,
873 PluginCallRequest {
874 params: serde_json::Value::Null,
875 headers: None,
876 route: None,
877 method: Some("POST".to_string()),
878 query: None,
879 },
880 Arc::new(web::ThinData(app_state)),
881 )
882 .await;
883
884 match outcome {
885 PluginCallResult::Fatal(error) => {
886 assert!(matches!(error, PluginError::PluginExecutionError(_)));
887 }
888 _ => panic!("Expected Fatal result"),
889 }
890 }
891
892 #[tokio::test]
893 async fn test_call_plugin_success_with_json_result() {
894 let plugin = PluginModel {
895 id: "test-plugin".to_string(),
896 path: "test-path".to_string(),
897 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
898 emit_logs: true,
899 emit_traces: true,
900 raw_response: false,
901 allow_get_invocation: false,
902 config: None,
903 forward_logs: false,
904 };
905 let app_state =
906 create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
907
908 let mut plugin_runner = MockPluginRunnerTrait::default();
909
910 plugin_runner
911 .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
912 .returning(|_, _, _, _, _, _, _, _, _, _, _, _, _| {
913 Ok(ScriptResult {
914 logs: vec![LogEntry {
915 level: LogLevel::Log,
916 message: "test-log".to_string(),
917 }],
918 error: "".to_string(),
919 return_value: r#"{"result": "success"}"#.to_string(),
920 trace: vec![serde_json::json!({"step": "1"})],
921 })
922 });
923
924 let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
925 let outcome = plugin_service
926 .call_plugin(
927 plugin,
928 PluginCallRequest {
929 params: serde_json::Value::Null,
930 headers: None,
931 route: None,
932 method: Some("POST".to_string()),
933 query: None,
934 },
935 Arc::new(web::ThinData(app_state)),
936 )
937 .await;
938
939 match outcome {
940 PluginCallResult::Success(result) => {
941 assert_eq!(result.result, serde_json::json!({"result": "success"}));
943 assert!(result.metadata.is_some());
944 let metadata = result.metadata.unwrap();
945 assert!(metadata.logs.is_some());
946 assert!(metadata.traces.is_some());
947 }
948 _ => panic!("Expected Success result"),
949 }
950 }
951
952 #[derive(Clone)]
953 struct VecWriter {
954 buffer: std::sync::Arc<std::sync::Mutex<Vec<String>>>,
955 }
956
957 impl Write for VecWriter {
958 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
959 let mut buffer = self.buffer.lock().unwrap();
960 buffer.push(String::from_utf8_lossy(buf).to_string());
961 Ok(buf.len())
962 }
963
964 fn flush(&mut self) -> std::io::Result<()> {
965 Ok(())
966 }
967 }
968
969 fn init_capturing_subscriber(
970 buffer: std::sync::Arc<std::sync::Mutex<Vec<String>>>,
971 ) -> tracing::subscriber::DefaultGuard {
972 use tracing_subscriber::filter::LevelFilter;
973 let writer = VecWriter { buffer };
974 let subscriber = tracing_subscriber::fmt()
975 .with_writer(move || writer.clone())
976 .with_ansi(false)
977 .with_target(true)
978 .without_time()
979 .with_max_level(LevelFilter::DEBUG)
980 .finish();
981 tracing::subscriber::set_default(subscriber)
982 }
983
984 #[tokio::test]
985 async fn test_forward_logs_to_tracing_when_enabled_all_levels() {
986 use std::sync::{Arc as StdArc, Mutex};
987
988 let logs_buffer: StdArc<Mutex<Vec<String>>> = StdArc::new(Mutex::new(Vec::new()));
989 let _guard = init_capturing_subscriber(logs_buffer.clone());
990
991 let plugin = PluginModel {
992 id: "test-plugin-levels".to_string(),
993 path: "test-path".to_string(),
994 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
995 emit_logs: false,
996 emit_traces: false,
997 forward_logs: true,
998 raw_response: false,
999 allow_get_invocation: false,
1000 config: None,
1001 };
1002 let app_state =
1003 create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
1004
1005 let mut plugin_runner = MockPluginRunnerTrait::default();
1006
1007 plugin_runner
1008 .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
1009 .returning(|_, _, _, _, _, _, _, _, _, _, _, _, _| {
1010 Ok(ScriptResult {
1011 logs: vec![
1012 LogEntry {
1013 level: LogLevel::Error,
1014 message: "err-log".to_string(),
1015 },
1016 LogEntry {
1017 level: LogLevel::Warn,
1018 message: "warn-log".to_string(),
1019 },
1020 LogEntry {
1021 level: LogLevel::Info,
1022 message: "info-log".to_string(),
1023 },
1024 LogEntry {
1025 level: LogLevel::Log,
1026 message: "log-log".to_string(),
1027 },
1028 LogEntry {
1029 level: LogLevel::Debug,
1030 message: "debug-log".to_string(),
1031 },
1032 LogEntry {
1033 level: LogLevel::Result,
1034 message: "result-log".to_string(),
1035 },
1036 ],
1037 error: "".to_string(),
1038 return_value: "{}".to_string(),
1039 trace: vec![],
1040 })
1041 });
1042
1043 let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
1044 let _ = plugin_service
1045 .call_plugin(
1046 plugin,
1047 PluginCallRequest {
1048 params: serde_json::json!({}),
1049 headers: None,
1050 route: None,
1051 method: Some("POST".to_string()),
1052 query: None,
1053 },
1054 Arc::new(web::ThinData(app_state)),
1055 )
1056 .await;
1057
1058 let captured = logs_buffer.lock().unwrap().join("\n");
1059
1060 assert!(captured.contains("err-log"));
1061 assert!(captured.contains("warn-log"));
1062 assert!(captured.contains("info-log"));
1063 assert!(captured.contains("log-log"));
1064 assert!(captured.contains("debug-log"));
1065 assert!(!captured.contains("result-log"));
1066 assert!(captured.contains("plugin_id=test-plugin-levels"));
1067 assert!(captured.contains("ERROR"));
1068 assert!(captured.contains("WARN"));
1069 let request_id_values: Vec<&str> = captured
1070 .match_indices("request_id=")
1071 .filter_map(|(idx, _)| {
1072 let tail = &captured[idx + "request_id=".len()..];
1073 tail.split_whitespace()
1074 .next()
1075 .map(|value| value.trim_matches(|c: char| c == ',' || c == '"' || c == '}'))
1076 })
1077 .collect();
1078 assert!(
1079 !request_id_values.is_empty(),
1080 "expected forwarded plugin logs to include request_id field, captured: {}",
1081 captured
1082 );
1083 assert!(
1084 request_id_values.iter().all(|value| !value.is_empty()),
1085 "expected non-empty request_id values, captured: {}",
1086 captured
1087 );
1088 assert!(
1089 request_id_values
1090 .iter()
1091 .all(|value| uuid::Uuid::parse_str(value).is_ok()),
1092 "expected request_id fallback to be UUID when no span request id is set, captured: {}",
1093 captured
1094 );
1095 }
1096
1097 #[tokio::test]
1098 async fn test_forward_logs_not_emitted_when_disabled() {
1099 use std::sync::{Arc as StdArc, Mutex};
1100
1101 let logs_buffer: StdArc<Mutex<Vec<String>>> = StdArc::new(Mutex::new(Vec::new()));
1102 let _guard = init_capturing_subscriber(logs_buffer.clone());
1103
1104 let plugin = PluginModel {
1105 id: "test-plugin-disabled".to_string(),
1106 path: "test-path".to_string(),
1107 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
1108 emit_logs: false,
1109 emit_traces: false,
1110 forward_logs: false,
1111 raw_response: false,
1112 allow_get_invocation: false,
1113 config: None,
1114 };
1115 let app_state =
1116 create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
1117
1118 let mut plugin_runner = MockPluginRunnerTrait::default();
1119 plugin_runner
1120 .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
1121 .returning(|_, _, _, _, _, _, _, _, _, _, _, _, _| {
1122 Ok(ScriptResult {
1123 logs: vec![LogEntry {
1124 level: LogLevel::Warn,
1125 message: "should-not-emit".to_string(),
1126 }],
1127 error: "".to_string(),
1128 return_value: "{}".to_string(),
1129 trace: vec![],
1130 })
1131 });
1132
1133 let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
1134 let _ = plugin_service
1135 .call_plugin(
1136 plugin,
1137 PluginCallRequest {
1138 params: serde_json::json!({}),
1139 headers: None,
1140 route: None,
1141 method: Some("POST".to_string()),
1142 query: None,
1143 },
1144 Arc::new(web::ThinData(app_state)),
1145 )
1146 .await;
1147
1148 let captured = logs_buffer.lock().unwrap().join("\n");
1149 assert!(
1152 !captured.contains("should-not-emit"),
1153 "plugin logs should not be forwarded when disabled, but found: {captured}"
1154 );
1155 }
1156
1157 #[tokio::test]
1158 async fn test_forward_logs_on_handler_error() {
1159 use std::sync::{Arc as StdArc, Mutex};
1160
1161 let logs_buffer: StdArc<Mutex<Vec<String>>> = StdArc::new(Mutex::new(Vec::new()));
1162 let _guard = init_capturing_subscriber(logs_buffer.clone());
1163
1164 let plugin = PluginModel {
1165 id: "test-plugin-error".to_string(),
1166 path: "test-path".to_string(),
1167 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
1168 emit_logs: true,
1169 emit_traces: false,
1170 forward_logs: true,
1171 raw_response: false,
1172 allow_get_invocation: false,
1173 config: None,
1174 };
1175 let app_state =
1176 create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
1177
1178 let mut plugin_runner = MockPluginRunnerTrait::default();
1179 plugin_runner
1180 .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
1181 .returning(|_, _, _, _, _, _, _, _, _, _, _, _, _| {
1182 Err(PluginError::HandlerError(Box::new(PluginHandlerPayload {
1183 status: 400,
1184 message: "handler failed".to_string(),
1185 code: None,
1186 details: None,
1187 logs: Some(vec![LogEntry {
1188 level: LogLevel::Error,
1189 message: "handler-log".to_string(),
1190 }]),
1191 traces: None,
1192 })))
1193 });
1194
1195 let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
1196 let _ = plugin_service
1197 .call_plugin(
1198 plugin,
1199 PluginCallRequest {
1200 params: serde_json::json!({}),
1201 headers: None,
1202 route: None,
1203 method: Some("POST".to_string()),
1204 query: None,
1205 },
1206 Arc::new(web::ThinData(app_state)),
1207 )
1208 .await;
1209
1210 let captured = logs_buffer.lock().unwrap().join("\n");
1211 assert!(captured.contains("handler-log"));
1212 assert!(captured.contains("plugin_id=test-plugin-error"));
1213 assert!(captured.contains("ERROR"));
1214 }
1215
1216 #[tokio::test]
1217 async fn test_call_plugin_success_with_undefined_result() {
1218 let plugin = PluginModel {
1219 id: "test-plugin".to_string(),
1220 path: "test-path".to_string(),
1221 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
1222 emit_logs: false,
1223 emit_traces: false,
1224 raw_response: false,
1225 allow_get_invocation: false,
1226 config: None,
1227 forward_logs: false,
1228 };
1229 let app_state =
1230 create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
1231
1232 let mut plugin_runner = MockPluginRunnerTrait::default();
1233
1234 plugin_runner
1235 .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
1236 .returning(|_, _, _, _, _, _, _, _, _, _, _, _, _| {
1237 Ok(ScriptResult {
1238 logs: vec![],
1239 error: "".to_string(),
1240 return_value: "undefined".to_string(),
1241 trace: vec![],
1242 })
1243 });
1244
1245 let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
1246 let outcome = plugin_service
1247 .call_plugin(
1248 plugin,
1249 PluginCallRequest {
1250 params: serde_json::Value::Null,
1251 headers: None,
1252 route: None,
1253 method: Some("POST".to_string()),
1254 query: None,
1255 },
1256 Arc::new(web::ThinData(app_state)),
1257 )
1258 .await;
1259
1260 match outcome {
1261 PluginCallResult::Success(result) => {
1262 assert_eq!(result.result, serde_json::Value::Null);
1264 assert!(result.metadata.is_none());
1266 }
1267 _ => panic!("Expected Success result"),
1268 }
1269 }
1270
1271 #[tokio::test]
1272 async fn test_call_plugin_with_headers() {
1273 use std::sync::{Arc as StdArc, Mutex};
1274
1275 let plugin = PluginModel {
1276 id: "test-plugin".to_string(),
1277 path: "test-path".to_string(),
1278 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
1279 emit_logs: false,
1280 emit_traces: false,
1281 raw_response: false,
1282 allow_get_invocation: false,
1283 config: None,
1284 forward_logs: false,
1285 };
1286 let app_state =
1287 create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
1288
1289 let captured_headers: StdArc<Mutex<Option<String>>> = StdArc::new(Mutex::new(None));
1291 let captured_headers_clone = captured_headers.clone();
1292
1293 let mut plugin_runner = MockPluginRunnerTrait::default();
1294
1295 plugin_runner
1296 .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
1297 .returning(move |_, _, _, _, _, _, headers_json, _, _, _, _, _, _| {
1298 *captured_headers_clone.lock().unwrap() = headers_json;
1300 Ok(ScriptResult {
1301 logs: vec![],
1302 error: "".to_string(),
1303 return_value: "{}".to_string(),
1304 trace: vec![],
1305 })
1306 });
1307
1308 let mut headers_map = std::collections::HashMap::new();
1310 headers_map.insert(
1311 "x-custom-header".to_string(),
1312 vec!["custom-value".to_string()],
1313 );
1314 headers_map.insert(
1315 "authorization".to_string(),
1316 vec!["Bearer token123".to_string()],
1317 );
1318
1319 let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
1320 let _outcome = plugin_service
1321 .call_plugin(
1322 plugin,
1323 PluginCallRequest {
1324 params: serde_json::json!({"test": "data"}),
1325 headers: Some(headers_map.clone()),
1326 route: None,
1327 method: Some("POST".to_string()),
1328 query: None,
1329 },
1330 Arc::new(web::ThinData(app_state)),
1331 )
1332 .await;
1333
1334 let captured = captured_headers.lock().unwrap();
1336 assert!(
1337 captured.is_some(),
1338 "headers_json should be passed to runner"
1339 );
1340
1341 let headers_json = captured.as_ref().unwrap();
1342 let parsed: std::collections::HashMap<String, Vec<String>> =
1343 serde_json::from_str(headers_json).expect("headers_json should be valid JSON");
1344
1345 assert_eq!(
1346 parsed.get("x-custom-header"),
1347 Some(&vec!["custom-value".to_string()])
1348 );
1349 assert_eq!(
1350 parsed.get("authorization"),
1351 Some(&vec!["Bearer token123".to_string()])
1352 );
1353 }
1354
1355 #[tokio::test]
1356 async fn test_call_plugin_without_headers() {
1357 use std::sync::{Arc as StdArc, Mutex};
1358
1359 let plugin = PluginModel {
1360 id: "test-plugin".to_string(),
1361 path: "test-path".to_string(),
1362 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
1363 emit_logs: false,
1364 emit_traces: false,
1365 raw_response: false,
1366 allow_get_invocation: false,
1367 config: None,
1368 forward_logs: false,
1369 };
1370 let app_state =
1371 create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
1372
1373 let captured_headers: StdArc<Mutex<Option<String>>> = StdArc::new(Mutex::new(None));
1374 let captured_headers_clone = captured_headers.clone();
1375
1376 let mut plugin_runner = MockPluginRunnerTrait::default();
1377
1378 plugin_runner
1379 .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
1380 .returning(move |_, _, _, _, _, _, headers_json, _, _, _, _, _, _| {
1381 *captured_headers_clone.lock().unwrap() = headers_json;
1382 Ok(ScriptResult {
1383 logs: vec![],
1384 error: "".to_string(),
1385 return_value: "{}".to_string(),
1386 trace: vec![],
1387 })
1388 });
1389
1390 let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
1391 let _outcome = plugin_service
1392 .call_plugin(
1393 plugin,
1394 PluginCallRequest {
1395 params: serde_json::json!({}),
1396 headers: None, route: None,
1398 method: Some("POST".to_string()),
1399 query: None,
1400 },
1401 Arc::new(web::ThinData(app_state)),
1402 )
1403 .await;
1404
1405 let captured = captured_headers.lock().unwrap();
1407 assert!(
1408 captured.is_none(),
1409 "headers_json should be None when no headers provided"
1410 );
1411 }
1412
1413 #[tokio::test]
1414 async fn test_call_plugin_script_timeout_returns_504() {
1415 let plugin = PluginModel {
1416 id: "test-plugin".to_string(),
1417 path: "test-path".to_string(),
1418 timeout: Duration::from_secs(3),
1419 emit_logs: false,
1420 emit_traces: false,
1421 raw_response: false,
1422 allow_get_invocation: false,
1423 config: None,
1424 forward_logs: false,
1425 };
1426 let app_state =
1427 create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
1428
1429 let mut plugin_runner = MockPluginRunnerTrait::default();
1430
1431 plugin_runner
1432 .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
1433 .returning(|_, _, _, _, _, _, _, _, _, _, _, _, _| {
1434 Err(PluginError::ScriptTimeout(3))
1435 });
1436
1437 let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
1438 let outcome = plugin_service
1439 .call_plugin(
1440 plugin,
1441 PluginCallRequest {
1442 params: serde_json::Value::Null,
1443 headers: None,
1444 route: None,
1445 method: Some("POST".to_string()),
1446 query: None,
1447 },
1448 Arc::new(web::ThinData(app_state)),
1449 )
1450 .await;
1451
1452 match outcome {
1453 PluginCallResult::Handler(response) => {
1454 assert_eq!(response.status, 504);
1455 assert_eq!(
1456 response.message,
1457 "Plugin execution timed out after 3 seconds"
1458 );
1459 assert_eq!(response.error.code, Some("TIMEOUT".to_string()));
1460 assert!(response.error.details.is_none());
1461 assert!(response.metadata.is_none());
1462 }
1463 _ => panic!("Expected Handler result with 504 status for ScriptTimeout"),
1464 }
1465 }
1466}