openzeppelin_relayer/services/plugins/
mod.rs

1//! Plugins service module for handling plugins execution and interaction with relayer
2
3use std::{fmt, sync::Arc};
4
5use crate::observability::request_id::get_request_id;
6use crate::{
7    jobs::JobProducerTrait,
8    models::{
9        AppState, NetworkRepoModel, NotificationRepoModel, PluginCallRequest, PluginMetadata,
10        PluginModel, RelayerRepoModel, SignerRepoModel, ThinDataAppState, TransactionRepoModel,
11    },
12    repositories::{
13        ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository,
14        Repository, TransactionCounterTrait, TransactionRepository,
15    },
16};
17use actix_web::web;
18use async_trait::async_trait;
19use serde::{Deserialize, Serialize};
20use thiserror::Error;
21use uuid::Uuid;
22
23pub mod config;
24pub use config::*;
25
26pub mod health;
27pub use health::*;
28
29pub mod protocol;
30pub use protocol::*;
31
32pub mod connection;
33pub use connection::*;
34
35pub mod runner;
36pub use runner::*;
37
38pub mod relayer_api;
39pub use relayer_api::*;
40
41pub mod script_executor;
42pub use script_executor::*;
43
44pub mod pool_executor;
45pub use pool_executor::*;
46
47pub mod shared_socket;
48pub use shared_socket::*;
49
50#[cfg(test)]
51use mockall::automock;
52
53#[derive(Error, Debug, Serialize)]
54pub enum PluginError {
55    #[error("Socket error: {0}")]
56    SocketError(String),
57    #[error("Plugin error: {0}")]
58    PluginError(String),
59    #[error("Relayer error: {0}")]
60    RelayerError(String),
61    #[error("Plugin execution error: {0}")]
62    PluginExecutionError(String),
63    #[error("Script execution timed out after {0} seconds")]
64    ScriptTimeout(u64),
65    #[error("Invalid method: {0}")]
66    InvalidMethod(String),
67    #[error("Invalid payload: {0}")]
68    InvalidPayload(String),
69    #[error("{0}")]
70    HandlerError(Box<PluginHandlerPayload>),
71}
72
73impl PluginError {
74    /// Enriches the error with traces if it's a HandlerError variant.
75    /// For other variants, returns the error unchanged.
76    pub fn with_traces(self, traces: Vec<serde_json::Value>) -> Self {
77        match self {
78            PluginError::HandlerError(mut payload) => {
79                payload.append_traces(traces);
80                PluginError::HandlerError(payload)
81            }
82            other => other,
83        }
84    }
85}
86
87impl From<PluginError> for String {
88    fn from(error: PluginError) -> Self {
89        error.to_string()
90    }
91}
92
93#[derive(Debug, Serialize, Deserialize, utoipa::ToSchema)]
94pub struct PluginCallResponse {
95    /// The plugin result, parsed as JSON when possible; otherwise a string
96    pub result: serde_json::Value,
97    /// Optional metadata captured during plugin execution (logs/traces)
98    #[serde(skip_serializing_if = "Option::is_none")]
99    pub metadata: Option<PluginMetadata>,
100}
101
102#[derive(Debug, Serialize, Deserialize, utoipa::ToSchema)]
103pub struct PluginHandlerError {
104    #[serde(skip_serializing_if = "Option::is_none")]
105    pub code: Option<String>,
106    #[serde(skip_serializing_if = "Option::is_none")]
107    pub details: Option<serde_json::Value>,
108}
109
110#[derive(Debug)]
111pub struct PluginHandlerResponse {
112    pub status: u16,
113    pub message: String,
114    pub error: PluginHandlerError,
115    pub metadata: Option<PluginMetadata>,
116}
117
118#[derive(Debug, Serialize)]
119pub struct PluginHandlerPayload {
120    pub status: u16,
121    pub message: String,
122    #[serde(skip_serializing_if = "Option::is_none")]
123    pub code: Option<String>,
124    #[serde(skip_serializing_if = "Option::is_none")]
125    pub details: Option<serde_json::Value>,
126    #[serde(skip_serializing_if = "Option::is_none")]
127    pub logs: Option<Vec<LogEntry>>,
128    #[serde(skip_serializing_if = "Option::is_none")]
129    pub traces: Option<Vec<serde_json::Value>>,
130}
131
132impl PluginHandlerPayload {
133    fn append_traces(&mut self, traces: Vec<serde_json::Value>) {
134        match &mut self.traces {
135            Some(existing) => existing.extend(traces),
136            None => self.traces = Some(traces),
137        }
138    }
139
140    fn into_response(self, emit_logs: bool, emit_traces: bool) -> PluginHandlerResponse {
141        let logs = if emit_logs { self.logs } else { None };
142        let traces = if emit_traces { self.traces } else { None };
143        let message = derive_handler_message(&self.message, logs.as_deref());
144        let metadata = build_metadata(logs, traces);
145
146        PluginHandlerResponse {
147            status: self.status,
148            message,
149            error: PluginHandlerError {
150                code: self.code,
151                details: self.details,
152            },
153            metadata,
154        }
155    }
156}
157
158impl fmt::Display for PluginHandlerPayload {
159    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
160        f.write_str(&self.message)
161    }
162}
163
164fn derive_handler_message(message: &str, logs: Option<&[LogEntry]>) -> String {
165    if !message.trim().is_empty() {
166        return message.to_string();
167    }
168
169    if let Some(logs) = logs {
170        if let Some(entry) = logs
171            .iter()
172            .rev()
173            .find(|entry| matches!(entry.level, LogLevel::Error | LogLevel::Warn))
174        {
175            return entry.message.clone();
176        }
177
178        if let Some(entry) = logs.last() {
179            return entry.message.clone();
180        }
181    }
182
183    "Plugin execution failed".to_string()
184}
185
186fn build_metadata(
187    logs: Option<Vec<LogEntry>>,
188    traces: Option<Vec<serde_json::Value>>,
189) -> Option<PluginMetadata> {
190    if logs.is_some() || traces.is_some() {
191        Some(PluginMetadata { logs, traces })
192    } else {
193        None
194    }
195}
196
197fn forward_logs_to_tracing(plugin_id: &str, logs: &[LogEntry], request_id: &str) {
198    for entry in logs {
199        match entry.level {
200            LogLevel::Error => {
201                tracing::error!(target: "plugin", plugin_id = %plugin_id, request_id = %request_id, "{}", entry.message)
202            }
203            LogLevel::Warn => {
204                tracing::warn!(target: "plugin", plugin_id = %plugin_id, request_id = %request_id, "{}", entry.message)
205            }
206            LogLevel::Info | LogLevel::Log => {
207                tracing::info!(target: "plugin", plugin_id = %plugin_id, request_id = %request_id, "{}", entry.message)
208            }
209            LogLevel::Debug => {
210                tracing::debug!(target: "plugin", plugin_id = %plugin_id, request_id = %request_id, "{}", entry.message)
211            }
212            LogLevel::Result => {}
213        }
214    }
215}
216
217#[derive(Debug)]
218pub enum PluginCallResult {
219    Success(PluginCallResponse),
220    Handler(PluginHandlerResponse),
221    Fatal(PluginError),
222}
223
224#[derive(Default)]
225pub struct PluginService<R: PluginRunnerTrait> {
226    runner: R,
227}
228
229impl<R: PluginRunnerTrait> PluginService<R> {
230    pub fn new(runner: R) -> Self {
231        Self { runner }
232    }
233
234    pub fn resolve_plugin_path(plugin_path: &str) -> String {
235        if plugin_path.starts_with("plugins/") {
236            plugin_path.to_string()
237        } else {
238            format!("plugins/{plugin_path}")
239        }
240    }
241
242    #[allow(clippy::type_complexity)]
243    async fn call_plugin<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
244        &self,
245        plugin: PluginModel,
246        plugin_call_request: PluginCallRequest,
247        state: Arc<ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>>,
248    ) -> PluginCallResult
249    where
250        J: JobProducerTrait + Send + Sync + 'static,
251        RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
252        TR: TransactionRepository
253            + Repository<TransactionRepoModel, String>
254            + Send
255            + Sync
256            + 'static,
257        NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
258        NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
259        SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
260        TCR: TransactionCounterTrait + Send + Sync + 'static,
261        PR: PluginRepositoryTrait + Send + Sync + 'static,
262        AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
263    {
264        let socket_path = format!("/tmp/{}.sock", Uuid::new_v4());
265        let script_path = Self::resolve_plugin_path(&plugin.path);
266        let script_params = plugin_call_request.params.to_string();
267        let headers_json = plugin_call_request
268            .headers
269            .map(|h| serde_json::to_string(&h).unwrap_or_default());
270        let route = plugin_call_request.route;
271        let config_json = plugin
272            .config
273            .map(|c| serde_json::to_string(&c).unwrap_or_default());
274        let method = plugin_call_request.method;
275        let query_json = plugin_call_request
276            .query
277            .map(|q| serde_json::to_string(&q).unwrap_or_default());
278
279        let request_id = get_request_id();
280        let request_id_for_logs: String = request_id
281            .as_deref()
282            .filter(|id| !id.trim().is_empty())
283            .map(str::to_owned)
284            .unwrap_or_else(|| Uuid::new_v4().to_string());
285        let result = self
286            .runner
287            .run(
288                plugin.id.clone(),
289                &socket_path,
290                script_path,
291                plugin.timeout,
292                script_params,
293                request_id,
294                headers_json,
295                route,
296                config_json,
297                method,
298                query_json,
299                plugin.emit_traces,
300                state,
301            )
302            .await;
303
304        match result {
305            Ok(script_result) => {
306                if plugin.forward_logs {
307                    forward_logs_to_tracing(&plugin.id, &script_result.logs, &request_id_for_logs);
308                }
309                // Include logs/traces only if enabled via plugin config
310                let logs = if plugin.emit_logs {
311                    Some(script_result.logs)
312                } else {
313                    None
314                };
315                let traces = if plugin.emit_traces {
316                    Some(script_result.trace)
317                } else {
318                    None
319                };
320                let metadata = build_metadata(logs, traces);
321
322                // Parse return_value string into JSON when possible; otherwise string
323                let result = if script_result.return_value.trim() == "undefined" {
324                    serde_json::Value::Null
325                } else {
326                    serde_json::from_str::<serde_json::Value>(&script_result.return_value)
327                        .unwrap_or(serde_json::Value::String(script_result.return_value))
328                };
329
330                PluginCallResult::Success(PluginCallResponse { result, metadata })
331            }
332            Err(e) => match e {
333                PluginError::HandlerError(payload) => {
334                    if plugin.forward_logs {
335                        if let Some(logs) = payload.logs.as_deref() {
336                            forward_logs_to_tracing(&plugin.id, logs, &request_id_for_logs);
337                        }
338                    }
339                    let failure = payload.into_response(plugin.emit_logs, plugin.emit_traces);
340                    let has_logs = failure
341                        .metadata
342                        .as_ref()
343                        .and_then(|meta| meta.logs.as_ref())
344                        .is_some();
345                    let has_traces = failure
346                        .metadata
347                        .as_ref()
348                        .and_then(|meta| meta.traces.as_ref())
349                        .is_some();
350
351                    tracing::debug!(
352                        status = failure.status,
353                        message = %failure.message,
354                        code = ?failure.error.code.as_ref(),
355                        details = ?failure.error.details.as_ref(),
356                        has_logs,
357                        has_traces,
358                        "Plugin handler returned error"
359                    );
360
361                    PluginCallResult::Handler(failure)
362                }
363                PluginError::ScriptTimeout(secs) => {
364                    let message = format!("Plugin execution timed out after {secs} seconds");
365                    tracing::warn!(
366                        timeout_secs = secs,
367                        plugin_id = %plugin.id,
368                        "Plugin execution timed out"
369                    );
370                    PluginCallResult::Handler(PluginHandlerResponse {
371                        status: 504,
372                        message,
373                        error: PluginHandlerError {
374                            code: Some("TIMEOUT".to_string()),
375                            details: None,
376                        },
377                        metadata: None,
378                    })
379                }
380                other => {
381                    // This is an actual execution/infrastructure failure
382                    tracing::error!("Plugin execution failed: {:?}", other);
383                    PluginCallResult::Fatal(other)
384                }
385            },
386        }
387    }
388}
389
390#[async_trait]
391#[cfg_attr(test, automock)]
392pub trait PluginServiceTrait<J, TR, RR, NR, NFR, SR, TCR, PR, AKR>: Send + Sync
393where
394    J: JobProducerTrait + 'static,
395    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
396    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
397    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
398    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
399    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
400    TCR: TransactionCounterTrait + Send + Sync + 'static,
401    PR: PluginRepositoryTrait + Send + Sync + 'static,
402    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
403{
404    fn new(runner: PluginRunner) -> Self;
405    async fn call_plugin(
406        &self,
407        plugin: PluginModel,
408        plugin_call_request: PluginCallRequest,
409        state: Arc<web::ThinData<AppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>>>,
410    ) -> PluginCallResult;
411}
412
413#[async_trait]
414impl<J, TR, RR, NR, NFR, SR, TCR, PR, AKR> PluginServiceTrait<J, TR, RR, NR, NFR, SR, TCR, PR, AKR>
415    for PluginService<PluginRunner>
416where
417    J: JobProducerTrait + 'static,
418    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
419    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
420    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
421    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
422    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
423    TCR: TransactionCounterTrait + Send + Sync + 'static,
424    PR: PluginRepositoryTrait + Send + Sync + 'static,
425    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
426{
427    fn new(runner: PluginRunner) -> Self {
428        Self::new(runner)
429    }
430
431    async fn call_plugin(
432        &self,
433        plugin: PluginModel,
434        plugin_call_request: PluginCallRequest,
435        state: Arc<web::ThinData<AppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>>>,
436    ) -> PluginCallResult {
437        self.call_plugin(plugin, plugin_call_request, state).await
438    }
439}
440
441#[cfg(test)]
442mod tests {
443    use std::{io::Write, time::Duration};
444
445    use crate::{
446        constants::DEFAULT_PLUGIN_TIMEOUT_SECONDS,
447        jobs::MockJobProducerTrait,
448        models::PluginModel,
449        repositories::{
450            ApiKeyRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage,
451            PluginRepositoryStorage, RelayerRepositoryStorage, SignerRepositoryStorage,
452            TransactionCounterRepositoryStorage, TransactionRepositoryStorage,
453        },
454        utils::mocks::mockutils::create_mock_app_state,
455    };
456
457    use super::*;
458
459    #[test]
460    fn test_resolve_plugin_path() {
461        assert_eq!(
462            PluginService::<MockPluginRunnerTrait>::resolve_plugin_path("plugins/examples/test.ts"),
463            "plugins/examples/test.ts"
464        );
465
466        assert_eq!(
467            PluginService::<MockPluginRunnerTrait>::resolve_plugin_path("examples/test.ts"),
468            "plugins/examples/test.ts"
469        );
470
471        assert_eq!(
472            PluginService::<MockPluginRunnerTrait>::resolve_plugin_path("test.ts"),
473            "plugins/test.ts"
474        );
475    }
476
477    #[tokio::test]
478    async fn test_call_plugin() {
479        let plugin = PluginModel {
480            id: "test-plugin".to_string(),
481            path: "test-path".to_string(),
482            timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
483            emit_logs: true,
484            emit_traces: false,
485            raw_response: false,
486            allow_get_invocation: false,
487            config: None,
488            forward_logs: false,
489        };
490        let app_state =
491            create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
492
493        let mut plugin_runner = MockPluginRunnerTrait::default();
494
495        plugin_runner
496            .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
497            .returning(|_, _, _, _, _, _, _, _, _, _, _, _, _| {
498                Ok(ScriptResult {
499                    logs: vec![LogEntry {
500                        level: LogLevel::Log,
501                        message: "test-log".to_string(),
502                    }],
503                    error: "test-error".to_string(),
504                    return_value: "test-result".to_string(),
505                    trace: Vec::new(),
506                })
507            });
508
509        let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
510        let outcome = plugin_service
511            .call_plugin(
512                plugin,
513                PluginCallRequest {
514                    params: serde_json::Value::Null,
515                    headers: None,
516                    route: None,
517                    method: Some("POST".to_string()),
518                    query: None,
519                },
520                Arc::new(web::ThinData(app_state)),
521            )
522            .await;
523        match outcome {
524            PluginCallResult::Success(result) => {
525                // result should be the string since it is not JSON
526                assert_eq!(
527                    result.result,
528                    serde_json::Value::String("test-result".to_string())
529                );
530                // emit_logs=true -> logs should be present in metadata
531                assert!(result.metadata.and_then(|meta| meta.logs).is_some());
532            }
533            PluginCallResult::Handler(_) | PluginCallResult::Fatal(_) => {
534                panic!("expected success outcome")
535            }
536        }
537    }
538
539    #[tokio::test]
540    async fn test_from_plugin_error_to_string() {
541        let error = PluginError::PluginExecutionError("test-error".to_string());
542        let result: String = error.into();
543        assert_eq!(result, "Plugin execution error: test-error");
544    }
545
546    #[test]
547    fn test_plugin_error_with_traces_handler_error() {
548        let payload = PluginHandlerPayload {
549            status: 400,
550            message: "test message".to_string(),
551            code: Some("TEST_CODE".to_string()),
552            details: None,
553            logs: None,
554            traces: Some(vec![serde_json::json!({"trace": "1"})]),
555        };
556        let error = PluginError::HandlerError(Box::new(payload));
557        let new_traces = vec![
558            serde_json::json!({"trace": "2"}),
559            serde_json::json!({"trace": "3"}),
560        ];
561
562        let enriched_error = error.with_traces(new_traces);
563
564        match enriched_error {
565            PluginError::HandlerError(payload) => {
566                let traces = payload.traces.unwrap();
567                assert_eq!(traces.len(), 3);
568                assert_eq!(traces[0], serde_json::json!({"trace": "1"}));
569                assert_eq!(traces[1], serde_json::json!({"trace": "2"}));
570                assert_eq!(traces[2], serde_json::json!({"trace": "3"}));
571            }
572            _ => panic!("Expected HandlerError variant"),
573        }
574    }
575
576    #[test]
577    fn test_plugin_error_with_traces_other_variants() {
578        let error = PluginError::PluginExecutionError("test".to_string());
579        let new_traces = vec![serde_json::json!({"trace": "1"})];
580
581        let result = error.with_traces(new_traces);
582
583        match result {
584            PluginError::PluginExecutionError(msg) => assert_eq!(msg, "test"),
585            _ => panic!("Expected PluginExecutionError variant"),
586        }
587    }
588
589    #[test]
590    fn test_derive_handler_message_with_message() {
591        let result = derive_handler_message("Custom error message", None);
592        assert_eq!(result, "Custom error message");
593    }
594
595    #[test]
596    fn test_derive_handler_message_with_error_log() {
597        let logs = vec![
598            LogEntry {
599                level: LogLevel::Log,
600                message: "info log".to_string(),
601            },
602            LogEntry {
603                level: LogLevel::Error,
604                message: "error log".to_string(),
605            },
606        ];
607        let result = derive_handler_message("", Some(&logs));
608        assert_eq!(result, "error log");
609    }
610
611    #[test]
612    fn test_derive_handler_message_with_warn_log() {
613        let logs = vec![
614            LogEntry {
615                level: LogLevel::Log,
616                message: "info log".to_string(),
617            },
618            LogEntry {
619                level: LogLevel::Warn,
620                message: "warn log".to_string(),
621            },
622        ];
623        let result = derive_handler_message("", Some(&logs));
624        assert_eq!(result, "warn log");
625    }
626
627    #[test]
628    fn test_derive_handler_message_with_only_info_logs() {
629        let logs = vec![
630            LogEntry {
631                level: LogLevel::Log,
632                message: "first log".to_string(),
633            },
634            LogEntry {
635                level: LogLevel::Info,
636                message: "last log".to_string(),
637            },
638        ];
639        let result = derive_handler_message("", Some(&logs));
640        assert_eq!(result, "last log");
641    }
642
643    #[test]
644    fn test_derive_handler_message_no_logs() {
645        let result = derive_handler_message("", None);
646        assert_eq!(result, "Plugin execution failed");
647    }
648
649    #[test]
650    fn test_build_metadata_with_logs_and_traces() {
651        let logs = vec![LogEntry {
652            level: LogLevel::Log,
653            message: "test".to_string(),
654        }];
655        let traces = vec![serde_json::json!({"trace": "1"})];
656
657        let result = build_metadata(Some(logs.clone()), Some(traces.clone()));
658
659        assert!(result.is_some());
660        let metadata = result.unwrap();
661        assert_eq!(metadata.logs.unwrap(), logs);
662        assert_eq!(metadata.traces.unwrap(), traces);
663    }
664
665    #[test]
666    fn test_build_metadata_with_only_logs() {
667        let logs = vec![LogEntry {
668            level: LogLevel::Log,
669            message: "test".to_string(),
670        }];
671
672        let result = build_metadata(Some(logs.clone()), None);
673
674        assert!(result.is_some());
675        let metadata = result.unwrap();
676        assert_eq!(metadata.logs.unwrap(), logs);
677        assert!(metadata.traces.is_none());
678    }
679
680    #[test]
681    fn test_build_metadata_with_only_traces() {
682        let traces = vec![serde_json::json!({"trace": "1"})];
683
684        let result = build_metadata(None, Some(traces.clone()));
685
686        assert!(result.is_some());
687        let metadata = result.unwrap();
688        assert!(metadata.logs.is_none());
689        assert_eq!(metadata.traces.unwrap(), traces);
690    }
691
692    #[test]
693    fn test_build_metadata_with_neither() {
694        let result = build_metadata(None, None);
695        assert!(result.is_none());
696    }
697
698    #[test]
699    fn test_plugin_handler_payload_append_traces_to_existing() {
700        let mut payload = PluginHandlerPayload {
701            status: 400,
702            message: "test".to_string(),
703            code: None,
704            details: None,
705            logs: None,
706            traces: Some(vec![serde_json::json!({"trace": "1"})]),
707        };
708
709        payload.append_traces(vec![serde_json::json!({"trace": "2"})]);
710
711        let traces = payload.traces.unwrap();
712        assert_eq!(traces.len(), 2);
713        assert_eq!(traces[0], serde_json::json!({"trace": "1"}));
714        assert_eq!(traces[1], serde_json::json!({"trace": "2"}));
715    }
716
717    #[test]
718    fn test_plugin_handler_payload_append_traces_to_none() {
719        let mut payload = PluginHandlerPayload {
720            status: 400,
721            message: "test".to_string(),
722            code: None,
723            details: None,
724            logs: None,
725            traces: None,
726        };
727
728        payload.append_traces(vec![serde_json::json!({"trace": "1"})]);
729
730        let traces = payload.traces.unwrap();
731        assert_eq!(traces.len(), 1);
732        assert_eq!(traces[0], serde_json::json!({"trace": "1"}));
733    }
734
735    #[test]
736    fn test_plugin_handler_payload_into_response_with_logs_and_traces() {
737        let logs = vec![LogEntry {
738            level: LogLevel::Error,
739            message: "error message".to_string(),
740        }];
741        let payload = PluginHandlerPayload {
742            status: 400,
743            message: "".to_string(),
744            code: Some("ERR_CODE".to_string()),
745            details: Some(serde_json::json!({"key": "value"})),
746            logs: Some(logs.clone()),
747            traces: Some(vec![serde_json::json!({"trace": "1"})]),
748        };
749
750        let response = payload.into_response(true, true);
751
752        assert_eq!(response.status, 400);
753        assert_eq!(response.message, "error message"); // Derived from error log
754        assert_eq!(response.error.code, Some("ERR_CODE".to_string()));
755        assert!(response.metadata.is_some());
756        let metadata = response.metadata.unwrap();
757        assert_eq!(metadata.logs.unwrap(), logs);
758        assert_eq!(metadata.traces.unwrap().len(), 1);
759    }
760
761    #[test]
762    fn test_plugin_handler_payload_into_response_without_logs() {
763        let logs = vec![LogEntry {
764            level: LogLevel::Log,
765            message: "test log".to_string(),
766        }];
767        let payload = PluginHandlerPayload {
768            status: 500,
769            message: "explicit message".to_string(),
770            code: None,
771            details: None,
772            logs: Some(logs),
773            traces: None,
774        };
775
776        let response = payload.into_response(false, false);
777
778        assert_eq!(response.status, 500);
779        assert_eq!(response.message, "explicit message");
780        assert!(response.metadata.is_none()); // emit_logs=false, emit_traces=false
781    }
782
783    #[tokio::test]
784    async fn test_call_plugin_handler_error() {
785        let plugin = PluginModel {
786            id: "test-plugin".to_string(),
787            path: "test-path".to_string(),
788            timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
789            emit_logs: true,
790            emit_traces: true,
791            raw_response: false,
792            allow_get_invocation: false,
793            config: None,
794            forward_logs: false,
795        };
796        let app_state =
797            create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
798
799        let mut plugin_runner = MockPluginRunnerTrait::default();
800
801        plugin_runner
802            .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
803            .returning(move |_, _, _, _, _, _, _, _, _, _, _, _, _| {
804                Err(PluginError::HandlerError(Box::new(PluginHandlerPayload {
805                    status: 400,
806                    message: "Plugin handler error".to_string(),
807                    code: Some("VALIDATION_ERROR".to_string()),
808                    details: Some(serde_json::json!({"field": "email"})),
809                    logs: Some(vec![LogEntry {
810                        level: LogLevel::Error,
811                        message: "Invalid email".to_string(),
812                    }]),
813                    traces: Some(vec![serde_json::json!({"step": "validation"})]),
814                })))
815            });
816
817        let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
818        let outcome = plugin_service
819            .call_plugin(
820                plugin,
821                PluginCallRequest {
822                    params: serde_json::Value::Null,
823                    headers: None,
824                    route: None,
825                    method: Some("POST".to_string()),
826                    query: None,
827                },
828                Arc::new(web::ThinData(app_state)),
829            )
830            .await;
831
832        match outcome {
833            PluginCallResult::Handler(response) => {
834                assert_eq!(response.status, 400);
835                assert_eq!(response.error.code, Some("VALIDATION_ERROR".to_string()));
836                assert!(response.metadata.is_some());
837                let metadata = response.metadata.unwrap();
838                assert!(metadata.logs.is_some());
839                assert!(metadata.traces.is_some());
840            }
841            _ => panic!("Expected Handler result"),
842        }
843    }
844
845    #[tokio::test]
846    async fn test_call_plugin_fatal_error() {
847        let plugin = PluginModel {
848            id: "test-plugin".to_string(),
849            path: "test-path".to_string(),
850            timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
851            emit_logs: false,
852            emit_traces: false,
853            raw_response: false,
854            allow_get_invocation: false,
855            config: None,
856            forward_logs: false,
857        };
858        let app_state =
859            create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
860
861        let mut plugin_runner = MockPluginRunnerTrait::default();
862
863        plugin_runner
864            .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
865            .returning(|_, _, _, _, _, _, _, _, _, _, _, _, _| {
866                Err(PluginError::PluginExecutionError("Fatal error".to_string()))
867            });
868
869        let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
870        let outcome = plugin_service
871            .call_plugin(
872                plugin,
873                PluginCallRequest {
874                    params: serde_json::Value::Null,
875                    headers: None,
876                    route: None,
877                    method: Some("POST".to_string()),
878                    query: None,
879                },
880                Arc::new(web::ThinData(app_state)),
881            )
882            .await;
883
884        match outcome {
885            PluginCallResult::Fatal(error) => {
886                assert!(matches!(error, PluginError::PluginExecutionError(_)));
887            }
888            _ => panic!("Expected Fatal result"),
889        }
890    }
891
892    #[tokio::test]
893    async fn test_call_plugin_success_with_json_result() {
894        let plugin = PluginModel {
895            id: "test-plugin".to_string(),
896            path: "test-path".to_string(),
897            timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
898            emit_logs: true,
899            emit_traces: true,
900            raw_response: false,
901            allow_get_invocation: false,
902            config: None,
903            forward_logs: false,
904        };
905        let app_state =
906            create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
907
908        let mut plugin_runner = MockPluginRunnerTrait::default();
909
910        plugin_runner
911            .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
912            .returning(|_, _, _, _, _, _, _, _, _, _, _, _, _| {
913                Ok(ScriptResult {
914                    logs: vec![LogEntry {
915                        level: LogLevel::Log,
916                        message: "test-log".to_string(),
917                    }],
918                    error: "".to_string(),
919                    return_value: r#"{"result": "success"}"#.to_string(),
920                    trace: vec![serde_json::json!({"step": "1"})],
921                })
922            });
923
924        let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
925        let outcome = plugin_service
926            .call_plugin(
927                plugin,
928                PluginCallRequest {
929                    params: serde_json::Value::Null,
930                    headers: None,
931                    route: None,
932                    method: Some("POST".to_string()),
933                    query: None,
934                },
935                Arc::new(web::ThinData(app_state)),
936            )
937            .await;
938
939        match outcome {
940            PluginCallResult::Success(result) => {
941                // Should be parsed as JSON object
942                assert_eq!(result.result, serde_json::json!({"result": "success"}));
943                assert!(result.metadata.is_some());
944                let metadata = result.metadata.unwrap();
945                assert!(metadata.logs.is_some());
946                assert!(metadata.traces.is_some());
947            }
948            _ => panic!("Expected Success result"),
949        }
950    }
951
952    #[derive(Clone)]
953    struct VecWriter {
954        buffer: std::sync::Arc<std::sync::Mutex<Vec<String>>>,
955    }
956
957    impl Write for VecWriter {
958        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
959            let mut buffer = self.buffer.lock().unwrap();
960            buffer.push(String::from_utf8_lossy(buf).to_string());
961            Ok(buf.len())
962        }
963
964        fn flush(&mut self) -> std::io::Result<()> {
965            Ok(())
966        }
967    }
968
969    fn init_capturing_subscriber(
970        buffer: std::sync::Arc<std::sync::Mutex<Vec<String>>>,
971    ) -> tracing::subscriber::DefaultGuard {
972        use tracing_subscriber::filter::LevelFilter;
973        let writer = VecWriter { buffer };
974        let subscriber = tracing_subscriber::fmt()
975            .with_writer(move || writer.clone())
976            .with_ansi(false)
977            .with_target(true)
978            .without_time()
979            .with_max_level(LevelFilter::DEBUG)
980            .finish();
981        tracing::subscriber::set_default(subscriber)
982    }
983
984    #[tokio::test]
985    async fn test_forward_logs_to_tracing_when_enabled_all_levels() {
986        use std::sync::{Arc as StdArc, Mutex};
987
988        let logs_buffer: StdArc<Mutex<Vec<String>>> = StdArc::new(Mutex::new(Vec::new()));
989        let _guard = init_capturing_subscriber(logs_buffer.clone());
990
991        let plugin = PluginModel {
992            id: "test-plugin-levels".to_string(),
993            path: "test-path".to_string(),
994            timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
995            emit_logs: false,
996            emit_traces: false,
997            forward_logs: true,
998            raw_response: false,
999            allow_get_invocation: false,
1000            config: None,
1001        };
1002        let app_state =
1003            create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
1004
1005        let mut plugin_runner = MockPluginRunnerTrait::default();
1006
1007        plugin_runner
1008            .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
1009            .returning(|_, _, _, _, _, _, _, _, _, _, _, _, _| {
1010                Ok(ScriptResult {
1011                    logs: vec![
1012                        LogEntry {
1013                            level: LogLevel::Error,
1014                            message: "err-log".to_string(),
1015                        },
1016                        LogEntry {
1017                            level: LogLevel::Warn,
1018                            message: "warn-log".to_string(),
1019                        },
1020                        LogEntry {
1021                            level: LogLevel::Info,
1022                            message: "info-log".to_string(),
1023                        },
1024                        LogEntry {
1025                            level: LogLevel::Log,
1026                            message: "log-log".to_string(),
1027                        },
1028                        LogEntry {
1029                            level: LogLevel::Debug,
1030                            message: "debug-log".to_string(),
1031                        },
1032                        LogEntry {
1033                            level: LogLevel::Result,
1034                            message: "result-log".to_string(),
1035                        },
1036                    ],
1037                    error: "".to_string(),
1038                    return_value: "{}".to_string(),
1039                    trace: vec![],
1040                })
1041            });
1042
1043        let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
1044        let _ = plugin_service
1045            .call_plugin(
1046                plugin,
1047                PluginCallRequest {
1048                    params: serde_json::json!({}),
1049                    headers: None,
1050                    route: None,
1051                    method: Some("POST".to_string()),
1052                    query: None,
1053                },
1054                Arc::new(web::ThinData(app_state)),
1055            )
1056            .await;
1057
1058        let captured = logs_buffer.lock().unwrap().join("\n");
1059
1060        assert!(captured.contains("err-log"));
1061        assert!(captured.contains("warn-log"));
1062        assert!(captured.contains("info-log"));
1063        assert!(captured.contains("log-log"));
1064        assert!(captured.contains("debug-log"));
1065        assert!(!captured.contains("result-log"));
1066        assert!(captured.contains("plugin_id=test-plugin-levels"));
1067        assert!(captured.contains("ERROR"));
1068        assert!(captured.contains("WARN"));
1069        let request_id_values: Vec<&str> = captured
1070            .match_indices("request_id=")
1071            .filter_map(|(idx, _)| {
1072                let tail = &captured[idx + "request_id=".len()..];
1073                tail.split_whitespace()
1074                    .next()
1075                    .map(|value| value.trim_matches(|c: char| c == ',' || c == '"' || c == '}'))
1076            })
1077            .collect();
1078        assert!(
1079            !request_id_values.is_empty(),
1080            "expected forwarded plugin logs to include request_id field, captured: {}",
1081            captured
1082        );
1083        assert!(
1084            request_id_values.iter().all(|value| !value.is_empty()),
1085            "expected non-empty request_id values, captured: {}",
1086            captured
1087        );
1088        assert!(
1089            request_id_values
1090                .iter()
1091                .all(|value| uuid::Uuid::parse_str(value).is_ok()),
1092            "expected request_id fallback to be UUID when no span request id is set, captured: {}",
1093            captured
1094        );
1095    }
1096
1097    #[tokio::test]
1098    async fn test_forward_logs_not_emitted_when_disabled() {
1099        use std::sync::{Arc as StdArc, Mutex};
1100
1101        let logs_buffer: StdArc<Mutex<Vec<String>>> = StdArc::new(Mutex::new(Vec::new()));
1102        let _guard = init_capturing_subscriber(logs_buffer.clone());
1103
1104        let plugin = PluginModel {
1105            id: "test-plugin-disabled".to_string(),
1106            path: "test-path".to_string(),
1107            timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
1108            emit_logs: false,
1109            emit_traces: false,
1110            forward_logs: false,
1111            raw_response: false,
1112            allow_get_invocation: false,
1113            config: None,
1114        };
1115        let app_state =
1116            create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
1117
1118        let mut plugin_runner = MockPluginRunnerTrait::default();
1119        plugin_runner
1120            .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
1121            .returning(|_, _, _, _, _, _, _, _, _, _, _, _, _| {
1122                Ok(ScriptResult {
1123                    logs: vec![LogEntry {
1124                        level: LogLevel::Warn,
1125                        message: "should-not-emit".to_string(),
1126                    }],
1127                    error: "".to_string(),
1128                    return_value: "{}".to_string(),
1129                    trace: vec![],
1130                })
1131            });
1132
1133        let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
1134        let _ = plugin_service
1135            .call_plugin(
1136                plugin,
1137                PluginCallRequest {
1138                    params: serde_json::json!({}),
1139                    headers: None,
1140                    route: None,
1141                    method: Some("POST".to_string()),
1142                    query: None,
1143                },
1144                Arc::new(web::ThinData(app_state)),
1145            )
1146            .await;
1147
1148        let captured = logs_buffer.lock().unwrap().join("\n");
1149        // When forward_logs is disabled, plugin log messages should not appear in tracing output
1150        // (internal framework logs like "Calling plugin" may still appear)
1151        assert!(
1152            !captured.contains("should-not-emit"),
1153            "plugin logs should not be forwarded when disabled, but found: {captured}"
1154        );
1155    }
1156
1157    #[tokio::test]
1158    async fn test_forward_logs_on_handler_error() {
1159        use std::sync::{Arc as StdArc, Mutex};
1160
1161        let logs_buffer: StdArc<Mutex<Vec<String>>> = StdArc::new(Mutex::new(Vec::new()));
1162        let _guard = init_capturing_subscriber(logs_buffer.clone());
1163
1164        let plugin = PluginModel {
1165            id: "test-plugin-error".to_string(),
1166            path: "test-path".to_string(),
1167            timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
1168            emit_logs: true,
1169            emit_traces: false,
1170            forward_logs: true,
1171            raw_response: false,
1172            allow_get_invocation: false,
1173            config: None,
1174        };
1175        let app_state =
1176            create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
1177
1178        let mut plugin_runner = MockPluginRunnerTrait::default();
1179        plugin_runner
1180            .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
1181            .returning(|_, _, _, _, _, _, _, _, _, _, _, _, _| {
1182                Err(PluginError::HandlerError(Box::new(PluginHandlerPayload {
1183                    status: 400,
1184                    message: "handler failed".to_string(),
1185                    code: None,
1186                    details: None,
1187                    logs: Some(vec![LogEntry {
1188                        level: LogLevel::Error,
1189                        message: "handler-log".to_string(),
1190                    }]),
1191                    traces: None,
1192                })))
1193            });
1194
1195        let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
1196        let _ = plugin_service
1197            .call_plugin(
1198                plugin,
1199                PluginCallRequest {
1200                    params: serde_json::json!({}),
1201                    headers: None,
1202                    route: None,
1203                    method: Some("POST".to_string()),
1204                    query: None,
1205                },
1206                Arc::new(web::ThinData(app_state)),
1207            )
1208            .await;
1209
1210        let captured = logs_buffer.lock().unwrap().join("\n");
1211        assert!(captured.contains("handler-log"));
1212        assert!(captured.contains("plugin_id=test-plugin-error"));
1213        assert!(captured.contains("ERROR"));
1214    }
1215
1216    #[tokio::test]
1217    async fn test_call_plugin_success_with_undefined_result() {
1218        let plugin = PluginModel {
1219            id: "test-plugin".to_string(),
1220            path: "test-path".to_string(),
1221            timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
1222            emit_logs: false,
1223            emit_traces: false,
1224            raw_response: false,
1225            allow_get_invocation: false,
1226            config: None,
1227            forward_logs: false,
1228        };
1229        let app_state =
1230            create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
1231
1232        let mut plugin_runner = MockPluginRunnerTrait::default();
1233
1234        plugin_runner
1235            .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
1236            .returning(|_, _, _, _, _, _, _, _, _, _, _, _, _| {
1237                Ok(ScriptResult {
1238                    logs: vec![],
1239                    error: "".to_string(),
1240                    return_value: "undefined".to_string(),
1241                    trace: vec![],
1242                })
1243            });
1244
1245        let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
1246        let outcome = plugin_service
1247            .call_plugin(
1248                plugin,
1249                PluginCallRequest {
1250                    params: serde_json::Value::Null,
1251                    headers: None,
1252                    route: None,
1253                    method: Some("POST".to_string()),
1254                    query: None,
1255                },
1256                Arc::new(web::ThinData(app_state)),
1257            )
1258            .await;
1259
1260        match outcome {
1261            PluginCallResult::Success(result) => {
1262                // "undefined" should be converted to null
1263                assert_eq!(result.result, serde_json::Value::Null);
1264                // emit_logs=false, emit_traces=false -> no metadata
1265                assert!(result.metadata.is_none());
1266            }
1267            _ => panic!("Expected Success result"),
1268        }
1269    }
1270
1271    #[tokio::test]
1272    async fn test_call_plugin_with_headers() {
1273        use std::sync::{Arc as StdArc, Mutex};
1274
1275        let plugin = PluginModel {
1276            id: "test-plugin".to_string(),
1277            path: "test-path".to_string(),
1278            timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
1279            emit_logs: false,
1280            emit_traces: false,
1281            raw_response: false,
1282            allow_get_invocation: false,
1283            config: None,
1284            forward_logs: false,
1285        };
1286        let app_state =
1287            create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
1288
1289        // Capture the headers_json parameter passed to the runner
1290        let captured_headers: StdArc<Mutex<Option<String>>> = StdArc::new(Mutex::new(None));
1291        let captured_headers_clone = captured_headers.clone();
1292
1293        let mut plugin_runner = MockPluginRunnerTrait::default();
1294
1295        plugin_runner
1296            .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
1297            .returning(move |_, _, _, _, _, _, headers_json, _, _, _, _, _, _| {
1298                // Capture the headers_json parameter
1299                *captured_headers_clone.lock().unwrap() = headers_json;
1300                Ok(ScriptResult {
1301                    logs: vec![],
1302                    error: "".to_string(),
1303                    return_value: "{}".to_string(),
1304                    trace: vec![],
1305                })
1306            });
1307
1308        // Create request with headers
1309        let mut headers_map = std::collections::HashMap::new();
1310        headers_map.insert(
1311            "x-custom-header".to_string(),
1312            vec!["custom-value".to_string()],
1313        );
1314        headers_map.insert(
1315            "authorization".to_string(),
1316            vec!["Bearer token123".to_string()],
1317        );
1318
1319        let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
1320        let _outcome = plugin_service
1321            .call_plugin(
1322                plugin,
1323                PluginCallRequest {
1324                    params: serde_json::json!({"test": "data"}),
1325                    headers: Some(headers_map.clone()),
1326                    route: None,
1327                    method: Some("POST".to_string()),
1328                    query: None,
1329                },
1330                Arc::new(web::ThinData(app_state)),
1331            )
1332            .await;
1333
1334        // Verify headers were serialized and passed to the runner
1335        let captured = captured_headers.lock().unwrap();
1336        assert!(
1337            captured.is_some(),
1338            "headers_json should be passed to runner"
1339        );
1340
1341        let headers_json = captured.as_ref().unwrap();
1342        let parsed: std::collections::HashMap<String, Vec<String>> =
1343            serde_json::from_str(headers_json).expect("headers_json should be valid JSON");
1344
1345        assert_eq!(
1346            parsed.get("x-custom-header"),
1347            Some(&vec!["custom-value".to_string()])
1348        );
1349        assert_eq!(
1350            parsed.get("authorization"),
1351            Some(&vec!["Bearer token123".to_string()])
1352        );
1353    }
1354
1355    #[tokio::test]
1356    async fn test_call_plugin_without_headers() {
1357        use std::sync::{Arc as StdArc, Mutex};
1358
1359        let plugin = PluginModel {
1360            id: "test-plugin".to_string(),
1361            path: "test-path".to_string(),
1362            timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
1363            emit_logs: false,
1364            emit_traces: false,
1365            raw_response: false,
1366            allow_get_invocation: false,
1367            config: None,
1368            forward_logs: false,
1369        };
1370        let app_state =
1371            create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
1372
1373        let captured_headers: StdArc<Mutex<Option<String>>> = StdArc::new(Mutex::new(None));
1374        let captured_headers_clone = captured_headers.clone();
1375
1376        let mut plugin_runner = MockPluginRunnerTrait::default();
1377
1378        plugin_runner
1379            .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
1380            .returning(move |_, _, _, _, _, _, headers_json, _, _, _, _, _, _| {
1381                *captured_headers_clone.lock().unwrap() = headers_json;
1382                Ok(ScriptResult {
1383                    logs: vec![],
1384                    error: "".to_string(),
1385                    return_value: "{}".to_string(),
1386                    trace: vec![],
1387                })
1388            });
1389
1390        let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
1391        let _outcome = plugin_service
1392            .call_plugin(
1393                plugin,
1394                PluginCallRequest {
1395                    params: serde_json::json!({}),
1396                    headers: None, // No headers
1397                    route: None,
1398                    method: Some("POST".to_string()),
1399                    query: None,
1400                },
1401                Arc::new(web::ThinData(app_state)),
1402            )
1403            .await;
1404
1405        // Verify headers_json is None when no headers provided
1406        let captured = captured_headers.lock().unwrap();
1407        assert!(
1408            captured.is_none(),
1409            "headers_json should be None when no headers provided"
1410        );
1411    }
1412
1413    #[tokio::test]
1414    async fn test_call_plugin_script_timeout_returns_504() {
1415        let plugin = PluginModel {
1416            id: "test-plugin".to_string(),
1417            path: "test-path".to_string(),
1418            timeout: Duration::from_secs(3),
1419            emit_logs: false,
1420            emit_traces: false,
1421            raw_response: false,
1422            allow_get_invocation: false,
1423            config: None,
1424            forward_logs: false,
1425        };
1426        let app_state =
1427            create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
1428
1429        let mut plugin_runner = MockPluginRunnerTrait::default();
1430
1431        plugin_runner
1432            .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
1433            .returning(|_, _, _, _, _, _, _, _, _, _, _, _, _| {
1434                Err(PluginError::ScriptTimeout(3))
1435            });
1436
1437        let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
1438        let outcome = plugin_service
1439            .call_plugin(
1440                plugin,
1441                PluginCallRequest {
1442                    params: serde_json::Value::Null,
1443                    headers: None,
1444                    route: None,
1445                    method: Some("POST".to_string()),
1446                    query: None,
1447                },
1448                Arc::new(web::ThinData(app_state)),
1449            )
1450            .await;
1451
1452        match outcome {
1453            PluginCallResult::Handler(response) => {
1454                assert_eq!(response.status, 504);
1455                assert_eq!(
1456                    response.message,
1457                    "Plugin execution timed out after 3 seconds"
1458                );
1459                assert_eq!(response.error.code, Some("TIMEOUT".to_string()));
1460                assert!(response.error.details.is_none());
1461                assert!(response.metadata.is_none());
1462            }
1463            _ => panic!("Expected Handler result with 504 status for ScriptTimeout"),
1464        }
1465    }
1466}