openzeppelin_relayer/services/plugins/
shared_socket.rs

1//! Shared Socket Service
2//!
3//! This module provides a unified bidirectional Unix socket service for plugin communication.
4//! Instead of creating separate sockets for registration and API calls, all communication
5//! happens over a single shared socket, dramatically reducing overhead and complexity.
6//!
7//! ## Architecture
8//!
9//! **Single Shared Socket**: All plugins connect to `/tmp/relayer-plugin-shared.sock`
10//!
11//! **Bidirectional Communication**:
12//! - Plugins → Host: Register, ApiRequest, Trace, Shutdown
13//! - Host → Plugins: ApiResponse
14//!
15//! **Connection Tagging (Security)**: Each connection is "tagged" with an execution_id
16//! after the first Register message. All subsequent messages are validated against this
17//! tagged ID to prevent spoofing attacks (Plugin A cannot impersonate Plugin B).
18//!
19//! ## Message Protocol
20//!
21//! All messages are JSON objects with a `type` field that discriminates the message type:
22//!
23//! ### Plugin → Host Messages
24//!
25//! **Register** (first message, required):
26//! ```json
27//! {
28//!   "type": "register",
29//!   "execution_id": "abc-123"
30//! }
31//! ```
32//!
33//! **ApiRequest** (call Relayer API):
34//! ```json
35//! {
36//!   "type": "api_request",
37//!   "request_id": "req-1",
38//!   "relayer_id": "relayer-1",
39//!   "method": "sendTransaction",
40//!   "payload": { "to": "0x...", "value": "100" }
41//! }
42//! ```
43//!
44//! **Trace** (observability event):
45//! ```json
46//! {
47//!   "type": "trace",
48//!   "trace": { "event": "processing", "timestamp": 1234567890 }
49//! }
50//! ```
51//!
52//! **Shutdown** (graceful close):
53//! ```json
54//! {
55//!   "type": "shutdown"
56//! }
57//! ```
58//!
59//! ### Host → Plugin Messages
60//!
61//! **ApiResponse** (Relayer API result):
62//! ```json
63//! {
64//!   "type": "api_response",
65//!   "request_id": "req-1",
66//!   "result": { "id": "tx-123", "status": "success" },
67//!   "error": null
68//! }
69//! ```
70//!
71//! ## Security Model
72//!
73//! The connection tagging mechanism prevents execution_id spoofing:
74//!
75//! 1. Plugin connects to shared socket
76//! 2. Plugin sends Register message with execution_id
77//! 3. Host "tags" the connection (file descriptor) with that execution_id
78//! 4. All subsequent messages are validated against the tagged ID
79//! 5. Attempts to change execution_id are rejected and connection is closed
80//!
81//! This ensures Plugin A cannot send requests pretending to be Plugin B, even though
82//! they share the same socket file.
83//!
84//! ## Backward Compatibility
85//!
86//! The handle_connection method maintains backward compatibility with the legacy
87//! Request/Response format from socket.rs. If a message doesn't parse as PluginMessage,
88//! it attempts to parse as the legacy Request format and handles it accordingly.
89//!
90//! ## Performance Benefits vs Per-Execution Sockets
91//!
92//! | Metric | Shared Socket | Per-Execution Socket |
93//! |--------|---------------|----------------------|
94//! | File descriptors | 1 per plugin | 2 per plugin |
95//! | Syscalls | ~50% fewer | Baseline |
96//! | Connection setup | Reuse existing | Create new each time |
97//! | Memory overhead | O(active executions) | O(active executions × 2) |
98//! | Debugging | Single stream | Two separate streams |
99//!
100//! ## Example Usage
101//!
102//! ```rust,no_run
103//! use openzeppelin_relayer::services::plugins::shared_socket::{
104//!     get_shared_socket_service, ensure_shared_socket_started
105//! };
106//!
107//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
108//! // Get the global shared socket instance
109//! let service = get_shared_socket_service()?;
110//!
111//! // Register an execution (returns RAII guard)
112//! let guard = service.register_execution("exec-123".to_string(), true).await;
113//!
114//! // Plugin connects and sends messages over the shared socket...
115//! // (handled automatically by the background listener)
116//!
117//! // Collect traces when done (returns Some when emit_traces=true)
118//! if let Some(mut traces_rx) = guard.into_receiver() {
119//!     let traces = traces_rx.recv().await;
120//! }
121//! # Ok(())
122//! # }
123//! ```
124
125use super::config::get_config;
126use crate::jobs::JobProducerTrait;
127use crate::models::{
128    NetworkRepoModel, NotificationRepoModel, RelayerRepoModel, SignerRepoModel, ThinDataAppState,
129    TransactionRepoModel,
130};
131use crate::repositories::{
132    ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository, Repository,
133    TransactionCounterTrait, TransactionRepository,
134};
135use crate::services::plugins::relayer_api::{RelayerApi, Request};
136use scc::HashMap as SccHashMap;
137use serde::{Deserialize, Serialize};
138use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
139use std::sync::Arc;
140use std::time::{Duration, Instant};
141use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
142use tokio::net::{UnixListener, UnixStream};
143use tokio::sync::{mpsc, watch, Semaphore};
144use tracing::{debug, info, warn};
145
146use super::PluginError;
147
148/// Log socket write errors at the appropriate level.
149/// Broken pipe and connection reset are expected when a plugin times out
150/// while an RPC call is still in-flight, so they're logged at DEBUG.
151fn log_socket_write_error(context: &str, error: &std::io::Error) {
152    match error.kind() {
153        std::io::ErrorKind::BrokenPipe | std::io::ErrorKind::ConnectionReset => {
154            debug!(
155                "Failed to write {}: {} (plugin likely timed out)",
156                context, error
157            );
158        }
159        _ => {
160            warn!("Failed to write {}: {}", context, error);
161        }
162    }
163}
164
165/// Unified message protocol for bidirectional communication
166#[derive(Debug, Serialize, Deserialize, Clone)]
167#[serde(tag = "type", rename_all = "snake_case")]
168pub enum PluginMessage {
169    /// Plugin registers its execution_id (first message from plugin)
170    Register { execution_id: String },
171    /// Plugin requests a Relayer API call
172    ApiRequest {
173        request_id: String,
174        relayer_id: String,
175        method: crate::services::plugins::relayer_api::PluginMethod,
176        payload: serde_json::Value,
177    },
178    /// Host responds to an API request
179    ApiResponse {
180        request_id: String,
181        result: Option<serde_json::Value>,
182        error: Option<String>,
183    },
184    /// Plugin sends a trace event (for observability)
185    Trace { trace: serde_json::Value },
186    /// Plugin signals completion
187    Shutdown,
188}
189
190/// Execution context for trace collection
191struct ExecutionContext {
192    /// Channel to send traces back to the execution (None when emit_traces=false)
193    /// When None, connection handler skips trace collection entirely for better performance
194    traces_tx: Option<mpsc::Sender<Vec<serde_json::Value>>>,
195    /// Creation timestamp for TTL cleanup
196    created_at: Instant,
197    /// The execution_id bound to this connection (for security)
198    /// Once set, all messages must match this ID to prevent spoofing
199    #[allow(dead_code)] // Used for security validation, not directly read
200    bound_execution_id: String,
201}
202
203/// RAII guard for execution registration that auto-unregisters on drop
204pub struct ExecutionGuard {
205    execution_id: String,
206    executions: Arc<SccHashMap<String, ExecutionContext>>,
207    rx: Option<mpsc::Receiver<Vec<serde_json::Value>>>,
208    /// Shared counter for tracking active executions (lock-free)
209    active_count: Arc<AtomicUsize>,
210    /// Whether this guard was successfully registered (insertion succeeded)
211    /// Only registered guards should decrement active_count on drop
212    registered: bool,
213}
214
215impl ExecutionGuard {
216    /// Get the trace receiver if tracing was enabled
217    /// Returns None if emit_traces=false was passed to register_execution
218    pub fn into_receiver(mut self) -> Option<mpsc::Receiver<Vec<serde_json::Value>>> {
219        self.rx.take()
220    }
221}
222
223impl Drop for ExecutionGuard {
224    fn drop(&mut self) {
225        // Auto-unregister on drop - synchronous with scc::HashMap (no spawn needed!)
226        // This eliminates the overhead of spawning a task for every request
227        //
228        // Only registered guards should remove entries and decrement counters.
229        // Non-registered guards (from duplicate execution_id) don't own the entry.
230        //
231        // For registered guards, only decrement if we actually removed the entry.
232        // This prevents double-decrement: if a long-running execution is GC'd by the
233        // stale entry cleanup task (which decrements the counter), and then the guard
234        // drops later, we must NOT decrement again.
235        if self.registered && self.executions.remove(&self.execution_id).is_some() {
236            self.active_count.fetch_sub(1, Ordering::AcqRel);
237        }
238    }
239}
240
241/// Shared socket service that handles multiple concurrent plugin executions
242pub struct SharedSocketService {
243    /// Socket path
244    socket_path: String,
245    /// Active execution contexts (execution_id -> ExecutionContext)
246    /// scc::HashMap provides lock-free reads and optimistic locking for writes
247    executions: Arc<SccHashMap<String, ExecutionContext>>,
248    /// Lock-free counter for active executions
249    active_count: Arc<AtomicUsize>,
250    /// Whether the listener has been started (instance-level flag)
251    started: AtomicBool,
252    /// Shutdown signal sender
253    shutdown_tx: watch::Sender<bool>,
254    /// Semaphore for connection limiting (prevents race conditions)
255    connection_semaphore: Arc<Semaphore>,
256}
257
258impl SharedSocketService {
259    /// Create a new shared socket service
260    pub fn new(socket_path: &str) -> Result<Self, PluginError> {
261        // Remove existing socket file if it exists (from previous runs or crashed processes)
262        let _ = std::fs::remove_file(socket_path);
263
264        let (shutdown_tx, _) = watch::channel(false);
265
266        // Use centralized config
267        let config = get_config();
268        let max_connections = config.socket_max_connections;
269
270        let executions: Arc<SccHashMap<String, ExecutionContext>> = Arc::new(SccHashMap::new());
271        let active_count = Arc::new(AtomicUsize::new(0));
272
273        // Spawn background cleanup task for stale executions (prevents memory leaks)
274        let executions_clone = executions.clone();
275        let active_count_clone = active_count.clone();
276        let mut cleanup_shutdown_rx = shutdown_tx.subscribe();
277        tokio::spawn(async move {
278            let mut interval = tokio::time::interval(Duration::from_secs(60));
279            loop {
280                tokio::select! {
281                    _ = interval.tick() => {}
282                    _ = cleanup_shutdown_rx.changed() => {
283                        if *cleanup_shutdown_rx.borrow() {
284                            break;
285                        }
286                    }
287                }
288                let now = Instant::now();
289                // scc::HashMap retain is lock-free per entry
290                let mut removed = 0usize;
291                executions_clone.retain(|_, ctx| {
292                    let keep = now.duration_since(ctx.created_at) < Duration::from_secs(300);
293                    if !keep {
294                        removed += 1;
295                    }
296                    keep
297                });
298                if removed > 0 {
299                    active_count_clone.fetch_sub(removed, Ordering::AcqRel);
300                }
301            }
302        });
303
304        Ok(Self {
305            socket_path: socket_path.to_string(),
306            executions,
307            active_count,
308            started: AtomicBool::new(false),
309            shutdown_tx,
310            connection_semaphore: Arc::new(Semaphore::new(max_connections)),
311        })
312    }
313
314    pub fn socket_path(&self) -> &str {
315        &self.socket_path
316    }
317
318    /// Register an execution and return a guard that auto-unregisters on drop
319    /// This prevents memory leaks from forgotten unregister calls
320    ///
321    /// # Arguments
322    /// * `execution_id` - Unique identifier for this execution
323    /// * `emit_traces` - If false, skips channel creation and trace collection for better performance
324    pub async fn register_execution(
325        &self,
326        execution_id: String,
327        emit_traces: bool,
328    ) -> ExecutionGuard {
329        // Only create channel when traces are needed - saves allocation and channel overhead
330        let (tx, rx) = if emit_traces {
331            let (tx, rx) = mpsc::channel(1);
332            (Some(tx), Some(rx))
333        } else {
334            (None, None)
335        };
336
337        let ctx = ExecutionContext {
338            traces_tx: tx,
339            created_at: Instant::now(),
340            bound_execution_id: execution_id.clone(),
341        };
342
343        // scc::HashMap insert - returns Ok if new, Err if key existed (duplicate)
344        let registered = match self.executions.insert(execution_id.clone(), ctx) {
345            Ok(_) => {
346                self.active_count.fetch_add(1, Ordering::AcqRel);
347                true
348            }
349            Err((existing_key, _)) => {
350                tracing::warn!(
351                    execution_id = %existing_key,
352                    "Duplicate execution_id detected during registration, guard will not decrement counter"
353                );
354                false
355            }
356        };
357
358        ExecutionGuard {
359            execution_id,
360            executions: self.executions.clone(),
361            rx,
362            registered,
363            active_count: self.active_count.clone(),
364        }
365    }
366
367    /// Get current number of available connection slots
368    pub fn available_connection_slots(&self) -> usize {
369        self.connection_semaphore.available_permits()
370    }
371
372    /// Get current active connection count
373    pub fn active_connection_count(&self) -> usize {
374        get_config().socket_max_connections - self.connection_semaphore.available_permits()
375    }
376
377    /// Get current number of registered executions (lock-free via atomic counter)
378    pub async fn registered_executions_count(&self) -> usize {
379        self.active_count.load(Ordering::Relaxed)
380    }
381
382    /// Signal shutdown to the listener and wait for active connections to drain
383    pub async fn shutdown(&self) {
384        let _ = self.shutdown_tx.send(true);
385        info!("Shared socket service: shutdown signal sent");
386
387        // Wait for active connections to drain (max 30 seconds)
388        let max_wait = Duration::from_secs(30);
389        let start = Instant::now();
390
391        while start.elapsed() < max_wait {
392            let available = self.connection_semaphore.available_permits();
393            if available == get_config().socket_max_connections {
394                // All permits returned - no active connections
395                break;
396            }
397            tokio::time::sleep(Duration::from_millis(100)).await;
398        }
399
400        // Remove socket file after connections drained
401        let _ = std::fs::remove_file(&self.socket_path);
402        info!("Shared socket service: shutdown complete");
403    }
404
405    /// Start the shared socket service
406    /// This spawns a background task that listens for connections
407    /// Safe to call multiple times - will only start once per instance
408    #[allow(clippy::type_complexity)]
409    pub async fn start<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
410        self: Arc<Self>,
411        state: Arc<ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>>,
412    ) -> Result<(), PluginError>
413    where
414        J: JobProducerTrait + Send + Sync + 'static,
415        RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
416        TR: TransactionRepository
417            + Repository<TransactionRepoModel, String>
418            + Send
419            + Sync
420            + 'static,
421        NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
422        NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
423        SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
424        TCR: TransactionCounterTrait + Send + Sync + 'static,
425        PR: PluginRepositoryTrait + Send + Sync + 'static,
426        AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
427    {
428        // Check if already started (instance-level flag)
429        if self.started.swap(true, Ordering::Acquire) {
430            return Ok(());
431        }
432
433        // Create the listener and move it into the task
434        let listener = UnixListener::bind(&self.socket_path)
435            .map_err(|e| PluginError::SocketError(format!("Failed to bind listener: {e}")))?;
436        let executions = self.executions.clone();
437        let relayer_api = Arc::new(RelayerApi);
438        let socket_path = self.socket_path.clone();
439        let mut shutdown_rx = self.shutdown_tx.subscribe();
440        let connection_semaphore = self.connection_semaphore.clone();
441
442        debug!(
443            "Shared socket service: starting listener on {}",
444            socket_path
445        );
446
447        // Spawn the listener task
448        tokio::spawn(async move {
449            debug!("Shared socket service: listener task started");
450            loop {
451                tokio::select! {
452                    // Check for shutdown signal
453                    _ = shutdown_rx.changed() => {
454                        if *shutdown_rx.borrow() {
455                            info!("Shared socket service: shutting down listener");
456                            break;
457                        }
458                    }
459                    // Accept new connections
460                    accept_result = listener.accept() => {
461                        match accept_result {
462                            Ok((stream, _)) => {
463                                // Try to acquire semaphore permit (no race condition!)
464                                match connection_semaphore.clone().try_acquire_owned() {
465                                    Ok(permit) => {
466                                        debug!("Shared socket service: accepted new connection");
467
468                                        let relayer_api_clone = relayer_api.clone();
469                                        let state_clone = Arc::clone(&state);
470                                        let executions_clone = executions.clone();
471
472                                        tokio::spawn(async move {
473                                            // Permit held until task completes (auto-released on drop)
474                                            let _permit = permit;
475
476                                            let result = Self::handle_connection(
477                                                stream,
478                                                relayer_api_clone,
479                                                state_clone,
480                                                executions_clone,
481                                            )
482                                            .await;
483
484                                            if let Err(e) = result {
485                                                debug!("Connection handler finished with error: {}", e);
486                                            }
487                                        });
488                                    }
489                                    Err(_) => {
490                                        warn!(
491                                            "Connection limit reached, rejecting new connection. \
492                                            Consider increasing PLUGIN_MAX_CONCURRENCY or PLUGIN_SOCKET_MAX_CONCURRENT_CONNECTIONS."
493                                        );
494                                        drop(stream);
495                                    }
496                                }
497                            }
498                            Err(e) => {
499                                warn!("Error accepting connection: {}", e);
500                            }
501                        }
502                    }
503                }
504            }
505
506            // Cleanup on shutdown
507            let _ = std::fs::remove_file(&socket_path);
508            info!("Shared socket service: listener stopped");
509        });
510
511        Ok(())
512    }
513
514    /// Handle a connection from a plugin.
515    ///
516    /// The inactivity timeout resets on every message — no hard wall-clock cap.
517    /// Connections stay alive as long as they're active, which is essential for
518    /// reused/pooled sockets and plugins making external calls.
519    ///
520    /// Security: The first message must be a Register message. Once registered,
521    /// the connection is "tagged" with that execution_id and cannot be changed.
522    /// This prevents Plugin A from spoofing Plugin B's execution_id.
523    #[allow(clippy::type_complexity)]
524    async fn handle_connection<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
525        stream: UnixStream,
526        relayer_api: Arc<RelayerApi>,
527        state: Arc<ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>>,
528        executions: Arc<SccHashMap<String, ExecutionContext>>,
529    ) -> Result<(), PluginError>
530    where
531        J: JobProducerTrait + Send + Sync + 'static,
532        RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
533        TR: TransactionRepository
534            + Repository<TransactionRepoModel, String>
535            + Send
536            + Sync
537            + 'static,
538        NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
539        NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
540        SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
541        TCR: TransactionCounterTrait + Send + Sync + 'static,
542        PR: PluginRepositoryTrait + Send + Sync + 'static,
543        AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
544    {
545        let (r, mut w) = stream.into_split();
546        let mut reader = BufReader::new(r).lines();
547
548        // Only allocate traces Vec when tracing is enabled (determined on Register)
549        let mut traces: Option<Vec<serde_json::Value>> = None;
550        // Track whether traces are enabled for this connection (set on Register)
551        let mut traces_enabled = false;
552
553        // Connection-bound execution_id (prevents spoofing)
554        // Once set, this cannot be changed for the lifetime of the connection
555        let mut bound_execution_id: Option<String> = None;
556
557        // Safety timeout: reap connections that are silent for longer than the maximum
558        // plugin execution timeout + margin. This prevents permit exhaustion from stuck
559        // or rogue clients. Not configurable — it's derived from DEFAULT_PLUGIN_TIMEOUT_SECONDS
560        // so it can never desync with execution timeouts.
561        let safety_timeout =
562            Duration::from_secs(crate::constants::DEFAULT_PLUGIN_TIMEOUT_SECONDS + 60);
563
564        loop {
565            // Read next line with safety timeout. In normal operation the client sends
566            // messages or closes the socket (EOF) well within this window. The timeout
567            // only fires for truly orphaned connections (stuck event loop, rogue client).
568            let line = match tokio::time::timeout(safety_timeout, reader.next_line()).await {
569                Ok(Ok(Some(line))) => line,
570                Ok(Ok(None)) => break, // EOF — client closed connection
571                Ok(Err(e)) => {
572                    warn!("Error reading from connection: {}", e);
573                    break;
574                }
575                Err(_) => {
576                    debug!(
577                        "Connection safety timeout reached ({}s with no message)",
578                        safety_timeout.as_secs()
579                    );
580                    break;
581                }
582            };
583
584            debug!("Shared socket service: received message");
585
586            // Parse once, discriminate on "type" field for efficiency
587            let json_value: serde_json::Value = match serde_json::from_str(&line) {
588                Ok(v) => v,
589                Err(e) => {
590                    warn!("Failed to parse JSON: {}", e);
591                    continue;
592                }
593            };
594
595            let has_type_field = json_value.get("type").is_some();
596
597            if has_type_field {
598                // New unified protocol
599                let message: PluginMessage = match serde_json::from_value(json_value) {
600                    Ok(msg) => msg,
601                    Err(e) => {
602                        warn!("Failed to parse PluginMessage: {}", e);
603                        continue;
604                    }
605                };
606
607                // Handle message based on type
608                match message {
609                    PluginMessage::Register { execution_id } => {
610                        // First message must be Register
611                        if bound_execution_id.is_some() {
612                            warn!("Attempted to re-register connection (security violation)");
613                            break;
614                        }
615
616                        // Validate execution_id exists in registry and check if tracing is enabled
617                        // scc::HashMap read() is lock-free
618                        if let Some(has_traces) =
619                            executions.read(&execution_id, |_, ctx| ctx.traces_tx.is_some())
620                        {
621                            traces_enabled = has_traces;
622                        } else {
623                            warn!("Unknown execution_id: {}", execution_id);
624                            break;
625                        }
626
627                        debug!(
628                            execution_id = %execution_id,
629                            traces_enabled = traces_enabled,
630                            "Connection registered"
631                        );
632                        bound_execution_id = Some(execution_id);
633                    }
634
635                    PluginMessage::ApiRequest {
636                        request_id,
637                        relayer_id,
638                        method,
639                        payload,
640                    } => {
641                        // Must be registered first
642                        let exec_id = match &bound_execution_id {
643                            Some(id) => id,
644                            None => {
645                                warn!("ApiRequest before Register (security violation)");
646                                break;
647                            }
648                        };
649
650                        // Create Request for RelayerApi (method is already PluginMethod)
651                        let request = Request {
652                            request_id: request_id.clone(),
653                            relayer_id,
654                            method,
655                            payload,
656                            http_request_id: Some(exec_id.clone()),
657                        };
658
659                        // Handle the request
660                        let response = relayer_api.handle_request(request, &state).await;
661
662                        // Send ApiResponse back
663                        let api_response = PluginMessage::ApiResponse {
664                            request_id: response.request_id,
665                            result: response.result,
666                            error: response.error,
667                        };
668
669                        let response_str = serde_json::to_string(&api_response)
670                            .map_err(|e| PluginError::PluginError(e.to_string()))?
671                            + "\n";
672
673                        if let Err(e) = w.write_all(response_str.as_bytes()).await {
674                            log_socket_write_error("API response", &e);
675                            break;
676                        }
677
678                        if let Err(e) = w.flush().await {
679                            log_socket_write_error("API response flush", &e);
680                            break;
681                        }
682                    }
683
684                    PluginMessage::Trace { trace } => {
685                        // Only collect traces if tracing is enabled for this execution
686                        if traces_enabled {
687                            if traces.is_none() {
688                                traces = Some(Vec::new());
689                            }
690                            if let Some(ref mut t) = traces {
691                                t.push(trace);
692                            }
693                        }
694                        // When traces_enabled=false, silently discard trace messages
695                    }
696
697                    PluginMessage::Shutdown => {
698                        debug!("Plugin requested shutdown");
699                        break;
700                    }
701
702                    PluginMessage::ApiResponse { .. } => {
703                        warn!("Received ApiResponse from plugin (invalid direction)");
704                        continue;
705                    }
706                }
707            } else {
708                // Legacy protocol (no "type" field)
709                if let Ok(request) = serde_json::from_value::<Request>(json_value.clone()) {
710                    // Legacy format - API requests are not trace events
711
712                    // Set execution_id from http_request_id or request_id if not bound
713                    if bound_execution_id.is_none() {
714                        let candidate_id = request
715                            .http_request_id
716                            .clone()
717                            .or_else(|| Some(request.request_id.clone()));
718
719                        // Validate execution_id exists (same as new protocol)
720                        // scc::HashMap read() is lock-free
721                        if let Some(ref id) = candidate_id {
722                            if let Some(has_traces) =
723                                executions.read(id, |_, ctx| ctx.traces_tx.is_some())
724                            {
725                                traces_enabled = has_traces;
726                                bound_execution_id = candidate_id;
727                            } else {
728                                debug!("Legacy request with unknown execution_id: {}", id);
729                            }
730                        }
731                    }
732
733                    // Handle legacy request
734                    let response = relayer_api.handle_request(request, &state).await;
735                    let response_str = serde_json::to_string(&response)
736                        .map_err(|e| PluginError::PluginError(e.to_string()))?
737                        + "\n";
738
739                    if let Err(e) = w.write_all(response_str.as_bytes()).await {
740                        log_socket_write_error("response", &e);
741                        break;
742                    }
743
744                    if let Err(e) = w.flush().await {
745                        log_socket_write_error("response flush", &e);
746                        break;
747                    }
748                } else {
749                    warn!("Failed to parse message as either PluginMessage or legacy Request");
750                }
751            }
752        }
753
754        // Send traces back to caller if tracing was enabled
755        if traces_enabled {
756            if let Some(exec_id) = bound_execution_id {
757                // Get the sender from execution context (lock-free read)
758                let traces_tx = executions
759                    .read(&exec_id, |_, ctx| ctx.traces_tx.clone())
760                    .flatten();
761
762                if let Some(tx) = traces_tx {
763                    let collected_traces = traces.unwrap_or_default();
764                    let trace_count = collected_traces.len();
765                    // Short timeout: in-process channel send should be nearly instant
766                    // If receiver isn't ready in 100ms, drop traces rather than blocking
767                    match tokio::time::timeout(
768                        Duration::from_millis(100),
769                        tx.send(collected_traces),
770                    )
771                    .await
772                    {
773                        Ok(Ok(())) => {}
774                        Ok(Err(_)) => {
775                            if trace_count > 0 {
776                                warn!(
777                                    "Trace channel closed for execution_id: {} ({} traces lost)",
778                                    exec_id, trace_count
779                                );
780                            }
781                        }
782                        Err(_) => warn!("Timeout sending traces for execution_id: {}", exec_id),
783                    }
784                }
785            }
786        }
787        // When traces_enabled=false, no channel exists and we skip all trace-related work
788
789        debug!("Shared socket service: connection closed");
790        Ok(())
791    }
792}
793
794impl Drop for SharedSocketService {
795    fn drop(&mut self) {
796        // Signal shutdown (cleanup happens in shutdown() method)
797        let _ = self.shutdown_tx.send(true);
798        // Note: Socket file cleanup happens in shutdown() after connections drain
799        // Drop can't be async, so proper cleanup should use shutdown() method
800    }
801}
802
803/// Global shared socket service instance with proper error handling
804static SHARED_SOCKET: std::sync::OnceLock<Result<Arc<SharedSocketService>, String>> =
805    std::sync::OnceLock::new();
806
807/// Get or create the global shared socket service
808/// Returns error if initialization fails instead of panicking
809pub fn get_shared_socket_service() -> Result<Arc<SharedSocketService>, PluginError> {
810    let socket_path = "/tmp/relayer-plugin-shared.sock";
811
812    let result = SHARED_SOCKET.get_or_init(|| {
813        // Remove existing socket file if it exists (from previous runs)
814        let _ = std::fs::remove_file(socket_path);
815
816        match SharedSocketService::new(socket_path) {
817            Ok(service) => Ok(Arc::new(service)),
818            Err(e) => Err(e.to_string()),
819        }
820    });
821
822    match result {
823        Ok(service) => Ok(service.clone()),
824        Err(e) => Err(PluginError::SocketError(format!(
825            "Failed to create shared socket service: {e}"
826        ))),
827    }
828}
829
830/// Ensure the shared socket service is started
831#[allow(clippy::type_complexity)]
832pub async fn ensure_shared_socket_started<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
833    state: Arc<ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>>,
834) -> Result<(), PluginError>
835where
836    J: JobProducerTrait + Send + Sync + 'static,
837    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
838    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
839    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
840    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
841    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
842    TCR: TransactionCounterTrait + Send + Sync + 'static,
843    PR: PluginRepositoryTrait + Send + Sync + 'static,
844    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
845{
846    let service = get_shared_socket_service()?;
847    service.start(state).await
848}
849
850#[cfg(test)]
851mod tests {
852    use super::*;
853    use crate::utils::mocks::mockutils::create_mock_app_state;
854    use actix_web::web;
855    use tempfile::tempdir;
856    use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
857    use tokio::net::UnixStream;
858
859    #[tokio::test]
860    async fn test_unified_protocol_register_and_api_request() {
861        let temp_dir = tempdir().unwrap();
862        let socket_path = temp_dir.path().join("shared.sock");
863
864        let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
865        let state = create_mock_app_state(None, None, None, None, None, None).await;
866
867        // Start the service
868        service
869            .clone()
870            .start(Arc::new(web::ThinData(state)))
871            .await
872            .unwrap();
873
874        // Register execution
875        let execution_id = "test-exec-123".to_string();
876        let _guard = service.register_execution(execution_id.clone(), true).await;
877
878        // Give the listener time to start
879        tokio::time::sleep(Duration::from_millis(50)).await;
880
881        // Connect as plugin
882        let mut client = UnixStream::connect(socket_path.to_str().unwrap())
883            .await
884            .unwrap();
885
886        // Send Register message
887        let register_msg = PluginMessage::Register {
888            execution_id: execution_id.clone(),
889        };
890        let msg_json = serde_json::to_string(&register_msg).unwrap() + "\n";
891        client.write_all(msg_json.as_bytes()).await.unwrap();
892
893        // Send ApiRequest
894        let api_request = PluginMessage::ApiRequest {
895            request_id: "req-1".to_string(),
896            relayer_id: "relayer-1".to_string(),
897            method: crate::services::plugins::relayer_api::PluginMethod::GetRelayerStatus,
898            payload: serde_json::json!({}),
899        };
900        let req_json = serde_json::to_string(&api_request).unwrap() + "\n";
901        client.write_all(req_json.as_bytes()).await.unwrap();
902        client.flush().await.unwrap();
903
904        // Read ApiResponse
905        let (r, _w) = client.into_split();
906        let mut reader = BufReader::new(r);
907        let mut response_line = String::new();
908        reader.read_line(&mut response_line).await.unwrap();
909
910        let response: PluginMessage = serde_json::from_str(&response_line).unwrap();
911        match response {
912            PluginMessage::ApiResponse { request_id, .. } => {
913                assert_eq!(request_id, "req-1");
914            }
915            _ => panic!("Expected ApiResponse, got {response:?}"),
916        }
917
918        drop(reader);
919        drop(_w);
920        service.shutdown().await;
921    }
922
923    #[tokio::test]
924    async fn test_connection_tagging_prevents_spoofing() {
925        let temp_dir = tempdir().unwrap();
926        let socket_path = temp_dir.path().join("shared2.sock");
927
928        let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
929        let state = create_mock_app_state(None, None, None, None, None, None).await;
930
931        service
932            .clone()
933            .start(Arc::new(web::ThinData(state)))
934            .await
935            .unwrap();
936
937        let execution_id = "test-exec-456".to_string();
938        let _guard = service.register_execution(execution_id.clone(), true).await;
939
940        tokio::time::sleep(Duration::from_millis(50)).await;
941
942        let mut client = UnixStream::connect(socket_path.to_str().unwrap())
943            .await
944            .unwrap();
945
946        // Register with execution_id
947        let register_msg = PluginMessage::Register {
948            execution_id: execution_id.clone(),
949        };
950        let msg_json = serde_json::to_string(&register_msg).unwrap() + "\n";
951        client.write_all(msg_json.as_bytes()).await.unwrap();
952
953        // Try to re-register with different execution_id (security violation)
954        let spoofed_register = PluginMessage::Register {
955            execution_id: "different-exec-id".to_string(),
956        };
957        let spoofed_json = serde_json::to_string(&spoofed_register).unwrap() + "\n";
958        client.write_all(spoofed_json.as_bytes()).await.unwrap();
959        client.flush().await.unwrap();
960
961        // Connection should be closed by server
962        tokio::time::sleep(Duration::from_millis(100)).await;
963
964        // Try to read - should get EOF since connection was closed
965        let (r, _w) = client.into_split();
966        let mut reader = BufReader::new(r);
967        let mut line = String::new();
968        let result = reader.read_line(&mut line).await;
969
970        // Should either get an error or EOF (0 bytes)
971        assert!(result.is_err() || result.unwrap() == 0);
972
973        drop(reader);
974        drop(_w);
975        service.shutdown().await;
976    }
977
978    #[tokio::test]
979    async fn test_backward_compatibility_with_legacy_format() {
980        let temp_dir = tempdir().unwrap();
981        let socket_path = temp_dir.path().join("shared3.sock");
982
983        let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
984        let state = create_mock_app_state(None, None, None, None, None, None).await;
985
986        service
987            .clone()
988            .start(Arc::new(web::ThinData(state)))
989            .await
990            .unwrap();
991
992        let execution_id = "test-exec-789".to_string();
993        let _guard = service.register_execution(execution_id.clone(), true).await;
994
995        tokio::time::sleep(Duration::from_millis(50)).await;
996
997        let mut client = UnixStream::connect(socket_path.to_str().unwrap())
998            .await
999            .unwrap();
1000
1001        // Send legacy Request format (without PluginMessage wrapper)
1002        let legacy_request = crate::services::plugins::relayer_api::Request {
1003            request_id: "legacy-1".to_string(),
1004            relayer_id: "relayer-1".to_string(),
1005            method: crate::services::plugins::relayer_api::PluginMethod::GetRelayerStatus,
1006            payload: serde_json::json!({}),
1007            http_request_id: Some(execution_id.clone()),
1008        };
1009        let legacy_json = serde_json::to_string(&legacy_request).unwrap() + "\n";
1010        client.write_all(legacy_json.as_bytes()).await.unwrap();
1011        client.flush().await.unwrap();
1012
1013        // Read legacy Response format
1014        let (r, _w) = client.into_split();
1015        let mut reader = BufReader::new(r);
1016        let mut response_line = String::new();
1017        reader.read_line(&mut response_line).await.unwrap();
1018
1019        let response: crate::services::plugins::relayer_api::Response =
1020            serde_json::from_str(&response_line).unwrap();
1021
1022        assert_eq!(response.request_id, "legacy-1");
1023        // Note: GetRelayerStatus might return an error if relayer doesn't exist
1024        // The important thing is we got a response in the correct format
1025        assert!(response.result.is_some() || response.error.is_some());
1026
1027        drop(reader);
1028        drop(_w);
1029        service.shutdown().await;
1030    }
1031
1032    #[tokio::test]
1033    async fn test_trace_collection() {
1034        let temp_dir = tempdir().unwrap();
1035        let socket_path = temp_dir.path().join("shared4.sock");
1036
1037        let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1038        let state = create_mock_app_state(None, None, None, None, None, None).await;
1039
1040        service
1041            .clone()
1042            .start(Arc::new(web::ThinData(state)))
1043            .await
1044            .unwrap();
1045
1046        let execution_id = "test-exec-trace".to_string();
1047        let guard = service.register_execution(execution_id.clone(), true).await;
1048
1049        tokio::time::sleep(Duration::from_millis(50)).await;
1050
1051        let mut client = UnixStream::connect(socket_path.to_str().unwrap())
1052            .await
1053            .unwrap();
1054
1055        // Register
1056        let register_msg = PluginMessage::Register {
1057            execution_id: execution_id.clone(),
1058        };
1059        client
1060            .write_all((serde_json::to_string(&register_msg).unwrap() + "\n").as_bytes())
1061            .await
1062            .unwrap();
1063
1064        // Send trace events
1065        let trace1 = PluginMessage::Trace {
1066            trace: serde_json::json!({"event": "start", "timestamp": 1000}),
1067        };
1068        client
1069            .write_all((serde_json::to_string(&trace1).unwrap() + "\n").as_bytes())
1070            .await
1071            .unwrap();
1072
1073        let trace2 = PluginMessage::Trace {
1074            trace: serde_json::json!({"event": "processing", "timestamp": 2000}),
1075        };
1076        client
1077            .write_all((serde_json::to_string(&trace2).unwrap() + "\n").as_bytes())
1078            .await
1079            .unwrap();
1080
1081        // Shutdown
1082        let shutdown_msg = PluginMessage::Shutdown;
1083        client
1084            .write_all((serde_json::to_string(&shutdown_msg).unwrap() + "\n").as_bytes())
1085            .await
1086            .unwrap();
1087        client.flush().await.unwrap();
1088
1089        drop(client);
1090
1091        // Wait for connection to close and traces to be sent
1092        tokio::time::sleep(Duration::from_millis(100)).await;
1093
1094        // Collect traces
1095        let mut traces_rx = guard.into_receiver().expect("Traces should be enabled");
1096        let traces = traces_rx.recv().await.unwrap();
1097
1098        // Should have collected 2 trace events
1099        assert_eq!(traces.len(), 2);
1100        assert_eq!(traces[0]["event"], "start");
1101        assert_eq!(traces[1]["event"], "processing");
1102
1103        service.shutdown().await;
1104    }
1105
1106    #[tokio::test]
1107    async fn test_execution_guard_auto_unregister() {
1108        let temp_dir = tempdir().unwrap();
1109        let socket_path = temp_dir.path().join("shared_guard.sock");
1110
1111        let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1112        let execution_id = "test-exec-guard".to_string();
1113
1114        {
1115            let _guard = service.register_execution(execution_id.clone(), true).await;
1116
1117            // Verify execution is registered (use atomic counter)
1118            assert_eq!(service.registered_executions_count().await, 1);
1119        }
1120        // Guard dropped here - synchronous removal with scc (no sleep needed!)
1121
1122        // Verify execution was auto-unregistered immediately
1123        assert_eq!(service.registered_executions_count().await, 0);
1124    }
1125
1126    #[tokio::test]
1127    async fn test_api_request_without_register_rejected() {
1128        let temp_dir = tempdir().unwrap();
1129        let socket_path = temp_dir.path().join("shared_no_register.sock");
1130
1131        let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1132        let state = create_mock_app_state(None, None, None, None, None, None).await;
1133
1134        service
1135            .clone()
1136            .start(Arc::new(web::ThinData(state)))
1137            .await
1138            .unwrap();
1139
1140        tokio::time::sleep(Duration::from_millis(50)).await;
1141
1142        let mut client = UnixStream::connect(socket_path.to_str().unwrap())
1143            .await
1144            .unwrap();
1145
1146        // Send ApiRequest WITHOUT registering first (security violation)
1147        let api_request = PluginMessage::ApiRequest {
1148            request_id: "req-1".to_string(),
1149            relayer_id: "relayer-1".to_string(),
1150            method: crate::services::plugins::relayer_api::PluginMethod::GetRelayerStatus,
1151            payload: serde_json::json!({}),
1152        };
1153        let req_json = serde_json::to_string(&api_request).unwrap() + "\n";
1154        client.write_all(req_json.as_bytes()).await.unwrap();
1155        client.flush().await.unwrap();
1156
1157        // Connection should be closed by server
1158        tokio::time::sleep(Duration::from_millis(100)).await;
1159
1160        let (r, _w) = client.into_split();
1161        let mut reader = BufReader::new(r);
1162        let mut line = String::new();
1163        let result = reader.read_line(&mut line).await;
1164
1165        // Should get EOF (connection closed)
1166        assert!(result.is_err() || result.unwrap() == 0);
1167
1168        drop(reader);
1169        drop(_w);
1170        service.shutdown().await;
1171    }
1172
1173    #[tokio::test]
1174    async fn test_register_with_unknown_execution_id_rejected() {
1175        let temp_dir = tempdir().unwrap();
1176        let socket_path = temp_dir.path().join("shared_unknown_exec.sock");
1177
1178        let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1179        let state = create_mock_app_state(None, None, None, None, None, None).await;
1180
1181        service
1182            .clone()
1183            .start(Arc::new(web::ThinData(state)))
1184            .await
1185            .unwrap();
1186
1187        tokio::time::sleep(Duration::from_millis(50)).await;
1188
1189        let mut client = UnixStream::connect(socket_path.to_str().unwrap())
1190            .await
1191            .unwrap();
1192
1193        // Try to register with an execution_id that doesn't exist in registry
1194        let register_msg = PluginMessage::Register {
1195            execution_id: "unknown-exec-id".to_string(),
1196        };
1197        let msg_json = serde_json::to_string(&register_msg).unwrap() + "\n";
1198        client.write_all(msg_json.as_bytes()).await.unwrap();
1199        client.flush().await.unwrap();
1200
1201        // Connection should be closed
1202        tokio::time::sleep(Duration::from_millis(100)).await;
1203
1204        let (r, _w) = client.into_split();
1205        let mut reader = BufReader::new(r);
1206        let mut line = String::new();
1207        let result = reader.read_line(&mut line).await;
1208
1209        assert!(result.is_err() || result.unwrap() == 0);
1210
1211        drop(reader);
1212        drop(_w);
1213        service.shutdown().await;
1214    }
1215
1216    #[tokio::test]
1217    async fn test_connection_limit_enforcement() {
1218        let temp_dir = tempdir().unwrap();
1219        let socket_path = temp_dir.path().join("shared_connection_limit.sock");
1220
1221        let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1222        let state = create_mock_app_state(None, None, None, None, None, None).await;
1223
1224        service
1225            .clone()
1226            .start(Arc::new(web::ThinData(state)))
1227            .await
1228            .unwrap();
1229
1230        tokio::time::sleep(Duration::from_millis(50)).await;
1231
1232        // Check initial connection count
1233        let initial_permits = service.connection_semaphore.available_permits();
1234        let max_connections = get_config().socket_max_connections;
1235        assert_eq!(initial_permits, max_connections);
1236
1237        // Create a connection (should reduce available permits)
1238        let _client = UnixStream::connect(socket_path.to_str().unwrap())
1239            .await
1240            .unwrap();
1241
1242        tokio::time::sleep(Duration::from_millis(50)).await;
1243
1244        // Available permits should be reduced
1245        let after_connect = service.connection_semaphore.available_permits();
1246        assert!(after_connect < initial_permits);
1247
1248        drop(_client);
1249        service.shutdown().await;
1250    }
1251
1252    #[tokio::test]
1253    async fn test_connection_stays_alive_within_safety_timeout() {
1254        let temp_dir = tempdir().unwrap();
1255        let socket_path = temp_dir.path().join("shared_safety.sock");
1256
1257        let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1258        let state = create_mock_app_state(None, None, None, None, None, None).await;
1259
1260        service
1261            .clone()
1262            .start(Arc::new(web::ThinData(state)))
1263            .await
1264            .unwrap();
1265
1266        let execution_id = "test-exec-safety".to_string();
1267        let _guard = service.register_execution(execution_id.clone(), true).await;
1268
1269        tokio::time::sleep(Duration::from_millis(50)).await;
1270
1271        let mut client = UnixStream::connect(socket_path.to_str().unwrap())
1272            .await
1273            .unwrap();
1274
1275        // Register
1276        let register_msg = PluginMessage::Register { execution_id };
1277        client
1278            .write_all((serde_json::to_string(&register_msg).unwrap() + "\n").as_bytes())
1279            .await
1280            .unwrap();
1281        client.flush().await.unwrap();
1282
1283        // Wait well below the safety timeout (DEFAULT_PLUGIN_TIMEOUT_SECONDS + 60s).
1284        // Connection must stay alive — the safety timeout only fires for truly stuck clients.
1285        tokio::time::sleep(Duration::from_millis(200)).await;
1286
1287        // Connection should still be alive — send a Shutdown message to verify
1288        let shutdown_msg = PluginMessage::Shutdown;
1289        let write_result = client
1290            .write_all((serde_json::to_string(&shutdown_msg).unwrap() + "\n").as_bytes())
1291            .await;
1292
1293        assert!(
1294            write_result.is_ok(),
1295            "Connection should still be alive within safety timeout"
1296        );
1297
1298        drop(client);
1299        service.shutdown().await;
1300    }
1301
1302    #[tokio::test]
1303    async fn test_idle_connection_cleaned_up_by_safety_timeout() {
1304        let temp_dir = tempdir().unwrap();
1305        let socket_path = temp_dir.path().join("shared_safety_timeout.sock");
1306
1307        let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1308        let state = create_mock_app_state(None, None, None, None, None, None).await;
1309
1310        service
1311            .clone()
1312            .start(Arc::new(web::ThinData(state)))
1313            .await
1314            .unwrap();
1315
1316        let execution_id = "test-exec-safety-timeout".to_string();
1317        let _guard = service.register_execution(execution_id.clone(), true).await;
1318
1319        tokio::time::sleep(Duration::from_millis(50)).await;
1320
1321        let mut client = UnixStream::connect(socket_path.to_str().unwrap())
1322            .await
1323            .unwrap();
1324
1325        // Register
1326        let register_msg = PluginMessage::Register { execution_id };
1327        client
1328            .write_all((serde_json::to_string(&register_msg).unwrap() + "\n").as_bytes())
1329            .await
1330            .unwrap();
1331        client.flush().await.unwrap();
1332
1333        // Don't send anything else - connection will eventually be reaped
1334        // by the safety timeout (DEFAULT_PLUGIN_TIMEOUT_SECONDS + 60s)
1335
1336        // Wait a bit - connection should still be alive well within safety timeout
1337        tokio::time::sleep(Duration::from_millis(200)).await;
1338
1339        // Connection should still be valid (safety timeout is ~360s)
1340        drop(client);
1341
1342        service.shutdown().await;
1343    }
1344
1345    #[tokio::test]
1346    async fn test_multiple_api_requests_same_connection() {
1347        let temp_dir = tempdir().unwrap();
1348        let socket_path = temp_dir.path().join("shared_multiple_requests.sock");
1349
1350        let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1351        let state = create_mock_app_state(None, None, None, None, None, None).await;
1352
1353        service
1354            .clone()
1355            .start(Arc::new(web::ThinData(state)))
1356            .await
1357            .unwrap();
1358
1359        let execution_id = "test-exec-multi".to_string();
1360        let _guard = service.register_execution(execution_id.clone(), true).await;
1361
1362        tokio::time::sleep(Duration::from_millis(50)).await;
1363
1364        let mut client = UnixStream::connect(socket_path.to_str().unwrap())
1365            .await
1366            .unwrap();
1367
1368        // Register
1369        let register_msg = PluginMessage::Register {
1370            execution_id: execution_id.clone(),
1371        };
1372        client
1373            .write_all((serde_json::to_string(&register_msg).unwrap() + "\n").as_bytes())
1374            .await
1375            .unwrap();
1376
1377        let (r, mut w) = client.into_split();
1378        let mut reader = BufReader::new(r);
1379
1380        // Send multiple API requests
1381        for i in 1..=3 {
1382            let api_request = PluginMessage::ApiRequest {
1383                request_id: format!("req-{i}"),
1384                relayer_id: "relayer-1".to_string(),
1385                method: crate::services::plugins::relayer_api::PluginMethod::GetRelayerStatus,
1386                payload: serde_json::json!({}),
1387            };
1388            w.write_all((serde_json::to_string(&api_request).unwrap() + "\n").as_bytes())
1389                .await
1390                .unwrap();
1391            w.flush().await.unwrap();
1392
1393            // Read response
1394            let mut response_line = String::new();
1395            reader.read_line(&mut response_line).await.unwrap();
1396
1397            let response: PluginMessage = serde_json::from_str(&response_line).unwrap();
1398            match response {
1399                PluginMessage::ApiResponse { request_id, .. } => {
1400                    assert_eq!(request_id, format!("req-{i}"));
1401                }
1402                _ => panic!("Expected ApiResponse"),
1403            }
1404        }
1405
1406        drop(reader);
1407        drop(w);
1408        service.shutdown().await;
1409    }
1410
1411    #[tokio::test]
1412    async fn test_shutdown_signal() {
1413        let temp_dir = tempdir().unwrap();
1414        let socket_path = temp_dir.path().join("shared_shutdown_signal.sock");
1415
1416        let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1417        let state = create_mock_app_state(None, None, None, None, None, None).await;
1418
1419        service
1420            .clone()
1421            .start(Arc::new(web::ThinData(state)))
1422            .await
1423            .unwrap();
1424
1425        tokio::time::sleep(Duration::from_millis(50)).await;
1426
1427        // Verify socket file exists
1428        assert!(std::path::Path::new(socket_path.to_str().unwrap()).exists());
1429
1430        // Shutdown the service
1431        service.shutdown().await;
1432
1433        // Give time for cleanup
1434        tokio::time::sleep(Duration::from_millis(100)).await;
1435
1436        // Socket file should be removed
1437        assert!(!std::path::Path::new(socket_path.to_str().unwrap()).exists());
1438    }
1439
1440    #[tokio::test]
1441    async fn test_malformed_json_handling() {
1442        let temp_dir = tempdir().unwrap();
1443        let socket_path = temp_dir.path().join("shared_malformed.sock");
1444
1445        let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1446        let state = create_mock_app_state(None, None, None, None, None, None).await;
1447
1448        service
1449            .clone()
1450            .start(Arc::new(web::ThinData(state)))
1451            .await
1452            .unwrap();
1453
1454        let execution_id = "test-exec-malformed".to_string();
1455        let _guard = service.register_execution(execution_id.clone(), true).await;
1456
1457        tokio::time::sleep(Duration::from_millis(50)).await;
1458
1459        let mut client = UnixStream::connect(socket_path.to_str().unwrap())
1460            .await
1461            .unwrap();
1462
1463        // Register first
1464        let register_msg = PluginMessage::Register {
1465            execution_id: execution_id.clone(),
1466        };
1467        client
1468            .write_all((serde_json::to_string(&register_msg).unwrap() + "\n").as_bytes())
1469            .await
1470            .unwrap();
1471
1472        // Send malformed JSON
1473        client
1474            .write_all(b"{ this is not valid json }\n")
1475            .await
1476            .unwrap();
1477        client.flush().await.unwrap();
1478
1479        // Connection should remain open (malformed messages are logged and skipped)
1480        tokio::time::sleep(Duration::from_millis(100)).await;
1481
1482        // Send valid shutdown message to verify connection is still up
1483        let shutdown_msg = PluginMessage::Shutdown;
1484        let write_result = client
1485            .write_all((serde_json::to_string(&shutdown_msg).unwrap() + "\n").as_bytes())
1486            .await;
1487
1488        assert!(
1489            write_result.is_ok(),
1490            "Connection should still be alive after malformed JSON"
1491        );
1492
1493        drop(client);
1494        service.shutdown().await;
1495    }
1496
1497    #[tokio::test]
1498    async fn test_invalid_message_direction() {
1499        let temp_dir = tempdir().unwrap();
1500        let socket_path = temp_dir.path().join("shared_invalid_direction.sock");
1501
1502        let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1503        let state = create_mock_app_state(None, None, None, None, None, None).await;
1504
1505        service
1506            .clone()
1507            .start(Arc::new(web::ThinData(state)))
1508            .await
1509            .unwrap();
1510
1511        let execution_id = "test-exec-invalid-dir".to_string();
1512        let _guard = service.register_execution(execution_id.clone(), true).await;
1513
1514        tokio::time::sleep(Duration::from_millis(50)).await;
1515
1516        let mut client = UnixStream::connect(socket_path.to_str().unwrap())
1517            .await
1518            .unwrap();
1519
1520        // Register
1521        let register_msg = PluginMessage::Register {
1522            execution_id: execution_id.clone(),
1523        };
1524        client
1525            .write_all((serde_json::to_string(&register_msg).unwrap() + "\n").as_bytes())
1526            .await
1527            .unwrap();
1528
1529        // Plugin tries to send ApiResponse (invalid direction - only Host sends ApiResponse)
1530        let invalid_msg = PluginMessage::ApiResponse {
1531            request_id: "invalid".to_string(),
1532            result: Some(serde_json::json!({})),
1533            error: None,
1534        };
1535        client
1536            .write_all((serde_json::to_string(&invalid_msg).unwrap() + "\n").as_bytes())
1537            .await
1538            .unwrap();
1539        client.flush().await.unwrap();
1540
1541        // Connection should remain open (invalid messages are logged and skipped)
1542        tokio::time::sleep(Duration::from_millis(100)).await;
1543
1544        // Verify connection is still alive
1545        let shutdown_msg = PluginMessage::Shutdown;
1546        let write_result = client
1547            .write_all((serde_json::to_string(&shutdown_msg).unwrap() + "\n").as_bytes())
1548            .await;
1549
1550        assert!(write_result.is_ok());
1551
1552        drop(client);
1553        service.shutdown().await;
1554    }
1555
1556    #[tokio::test]
1557    async fn test_stale_execution_cleanup() {
1558        let temp_dir = tempdir().unwrap();
1559        let socket_path = temp_dir.path().join("shared_stale_cleanup.sock");
1560
1561        let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1562
1563        // Register an execution manually with old timestamp
1564        let execution_id = "stale-exec".to_string();
1565        let (tx, _rx) = mpsc::channel(1);
1566        // scc::HashMap insert
1567        let _ = service.executions.insert(
1568            execution_id.clone(),
1569            ExecutionContext {
1570                traces_tx: Some(tx),
1571                created_at: Instant::now() - Duration::from_secs(400), // 6+ minutes old
1572                bound_execution_id: execution_id.clone(),
1573            },
1574        );
1575
1576        // Verify it's registered using scc's contains()
1577        assert!(service.executions.contains(&execution_id));
1578
1579        // Wait for cleanup task to run (it runs every 60 seconds, but we can't wait that long)
1580        // Instead, we verify the cleanup logic by checking the code in new()
1581        // The actual cleanup test would require mocking time or waiting 60+ seconds
1582
1583        // For this test, we just verify the logic exists and doesn't panic
1584        drop(service);
1585    }
1586
1587    #[tokio::test]
1588    async fn test_socket_path_getter() {
1589        let temp_dir = tempdir().unwrap();
1590        let socket_path = temp_dir.path().join("shared_path.sock");
1591
1592        let service = SharedSocketService::new(socket_path.to_str().unwrap()).unwrap();
1593
1594        assert_eq!(service.socket_path(), socket_path.to_str().unwrap());
1595    }
1596
1597    #[tokio::test]
1598    async fn test_trace_send_timeout() {
1599        let temp_dir = tempdir().unwrap();
1600        let socket_path = temp_dir.path().join("shared_trace_timeout.sock");
1601
1602        let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1603        let state = create_mock_app_state(None, None, None, None, None, None).await;
1604
1605        service
1606            .clone()
1607            .start(Arc::new(web::ThinData(state)))
1608            .await
1609            .unwrap();
1610
1611        let execution_id = "test-exec-trace-timeout".to_string();
1612        let guard = service.register_execution(execution_id.clone(), true).await;
1613
1614        // Don't consume the receiver - this will cause the channel to fill up
1615        drop(guard);
1616
1617        tokio::time::sleep(Duration::from_millis(50)).await;
1618
1619        let mut client = UnixStream::connect(socket_path.to_str().unwrap())
1620            .await
1621            .unwrap();
1622
1623        // Register
1624        let register_msg = PluginMessage::Register {
1625            execution_id: execution_id.clone(),
1626        };
1627        client
1628            .write_all((serde_json::to_string(&register_msg).unwrap() + "\n").as_bytes())
1629            .await
1630            .unwrap();
1631
1632        // Send trace
1633        let trace = PluginMessage::Trace {
1634            trace: serde_json::json!({"event": "test"}),
1635        };
1636        client
1637            .write_all((serde_json::to_string(&trace).unwrap() + "\n").as_bytes())
1638            .await
1639            .unwrap();
1640
1641        // Shutdown
1642        let shutdown_msg = PluginMessage::Shutdown;
1643        client
1644            .write_all((serde_json::to_string(&shutdown_msg).unwrap() + "\n").as_bytes())
1645            .await
1646            .unwrap();
1647        client.flush().await.unwrap();
1648
1649        drop(client);
1650
1651        // Wait for connection to close - should handle timeout gracefully
1652        tokio::time::sleep(Duration::from_millis(200)).await;
1653
1654        service.shutdown().await;
1655    }
1656
1657    #[tokio::test]
1658    async fn test_get_shared_socket_service() {
1659        // Test the global singleton
1660        let service1 = get_shared_socket_service();
1661        assert!(service1.is_ok());
1662
1663        let service2 = get_shared_socket_service();
1664        assert!(service2.is_ok());
1665
1666        // Should return the same instance
1667        let svc1 = service1.unwrap();
1668        let svc2 = service2.unwrap();
1669        let path1 = svc1.socket_path();
1670        let path2 = svc2.socket_path();
1671        assert_eq!(path1, path2);
1672    }
1673
1674    #[tokio::test]
1675    async fn test_available_connection_slots_and_active_count() {
1676        let temp_dir = tempdir().unwrap();
1677        let socket_path = temp_dir.path().join("shared_slots.sock");
1678
1679        let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1680        let state = create_mock_app_state(None, None, None, None, None, None).await;
1681        let max_connections = get_config().socket_max_connections;
1682
1683        // Before any connections
1684        assert_eq!(service.available_connection_slots(), max_connections);
1685        assert_eq!(service.active_connection_count(), 0);
1686
1687        service
1688            .clone()
1689            .start(Arc::new(web::ThinData(state)))
1690            .await
1691            .unwrap();
1692
1693        tokio::time::sleep(Duration::from_millis(50)).await;
1694
1695        // Connect a client
1696        let _client = UnixStream::connect(socket_path.to_str().unwrap())
1697            .await
1698            .unwrap();
1699
1700        tokio::time::sleep(Duration::from_millis(50)).await;
1701
1702        assert!(service.available_connection_slots() < max_connections);
1703        assert!(service.active_connection_count() > 0);
1704
1705        drop(_client);
1706        service.shutdown().await;
1707    }
1708
1709    #[tokio::test]
1710    async fn test_start_called_twice_is_idempotent() {
1711        let temp_dir = tempdir().unwrap();
1712        let socket_path = temp_dir.path().join("shared_double_start.sock");
1713
1714        let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1715        let state = create_mock_app_state(None, None, None, None, None, None).await;
1716        let thin = Arc::new(web::ThinData(state));
1717
1718        // First start should succeed
1719        service.clone().start(thin.clone()).await.unwrap();
1720
1721        // Second start should also succeed (early return)
1722        service.clone().start(thin).await.unwrap();
1723
1724        service.shutdown().await;
1725    }
1726
1727    #[tokio::test]
1728    async fn test_traces_discarded_when_emit_traces_false() {
1729        let temp_dir = tempdir().unwrap();
1730        let socket_path = temp_dir.path().join("shared_no_traces.sock");
1731
1732        let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1733        let state = create_mock_app_state(None, None, None, None, None, None).await;
1734
1735        service
1736            .clone()
1737            .start(Arc::new(web::ThinData(state)))
1738            .await
1739            .unwrap();
1740
1741        let execution_id = "test-exec-no-trace".to_string();
1742        // Register with emit_traces=false
1743        let guard = service
1744            .register_execution(execution_id.clone(), false)
1745            .await;
1746        assert_eq!(service.registered_executions_count().await, 1);
1747
1748        tokio::time::sleep(Duration::from_millis(50)).await;
1749
1750        let mut client = UnixStream::connect(socket_path.to_str().unwrap())
1751            .await
1752            .unwrap();
1753
1754        // Register
1755        let register_msg = PluginMessage::Register {
1756            execution_id: execution_id.clone(),
1757        };
1758        client
1759            .write_all((serde_json::to_string(&register_msg).unwrap() + "\n").as_bytes())
1760            .await
1761            .unwrap();
1762
1763        // Send trace (should be silently discarded)
1764        let trace = PluginMessage::Trace {
1765            trace: serde_json::json!({"event": "should_be_discarded"}),
1766        };
1767        client
1768            .write_all((serde_json::to_string(&trace).unwrap() + "\n").as_bytes())
1769            .await
1770            .unwrap();
1771
1772        // Shutdown
1773        let shutdown_msg = PluginMessage::Shutdown;
1774        client
1775            .write_all((serde_json::to_string(&shutdown_msg).unwrap() + "\n").as_bytes())
1776            .await
1777            .unwrap();
1778        client.flush().await.unwrap();
1779        drop(client);
1780
1781        // Connection should be fully drained (permit returned).
1782        tokio::time::timeout(Duration::from_secs(1), async {
1783            loop {
1784                if service.active_connection_count() == 0 {
1785                    break;
1786                }
1787                tokio::time::sleep(Duration::from_millis(20)).await;
1788            }
1789        })
1790        .await
1791        .expect("Connection should close promptly when traces are disabled");
1792
1793        // Guard with emit_traces=false should return None
1794        assert!(guard.into_receiver().is_none());
1795        assert_eq!(service.registered_executions_count().await, 0);
1796
1797        service.shutdown().await;
1798    }
1799
1800    #[tokio::test]
1801    async fn test_legacy_protocol_without_http_request_id() {
1802        let temp_dir = tempdir().unwrap();
1803        let socket_path = temp_dir.path().join("shared_legacy_no_http_id.sock");
1804
1805        let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1806        let state = create_mock_app_state(None, None, None, None, None, None).await;
1807
1808        service
1809            .clone()
1810            .start(Arc::new(web::ThinData(state)))
1811            .await
1812            .unwrap();
1813
1814        // Use request_id as the execution_id (fallback path)
1815        let request_id = "legacy-fallback-id".to_string();
1816        let _guard = service.register_execution(request_id.clone(), true).await;
1817
1818        tokio::time::sleep(Duration::from_millis(50)).await;
1819
1820        let mut client = UnixStream::connect(socket_path.to_str().unwrap())
1821            .await
1822            .unwrap();
1823
1824        // Send legacy Request with http_request_id = None
1825        // The handler falls back to request_id for execution binding
1826        let legacy_request = crate::services::plugins::relayer_api::Request {
1827            request_id: request_id.clone(),
1828            relayer_id: "relayer-1".to_string(),
1829            method: crate::services::plugins::relayer_api::PluginMethod::GetRelayerStatus,
1830            payload: serde_json::json!({}),
1831            http_request_id: None,
1832        };
1833        let legacy_json = serde_json::to_string(&legacy_request).unwrap() + "\n";
1834        client.write_all(legacy_json.as_bytes()).await.unwrap();
1835        client.flush().await.unwrap();
1836
1837        // Read legacy Response
1838        let (r, _w) = client.into_split();
1839        let mut reader = BufReader::new(r);
1840        let mut response_line = String::new();
1841        reader.read_line(&mut response_line).await.unwrap();
1842
1843        let response: crate::services::plugins::relayer_api::Response =
1844            serde_json::from_str(&response_line).unwrap();
1845        assert_eq!(response.request_id, request_id);
1846        assert!(response.result.is_some() || response.error.is_some());
1847
1848        drop(reader);
1849        drop(_w);
1850        service.shutdown().await;
1851    }
1852
1853    #[tokio::test]
1854    async fn test_legacy_protocol_with_unknown_execution_id() {
1855        let temp_dir = tempdir().unwrap();
1856        let socket_path = temp_dir.path().join("shared_legacy_unknown.sock");
1857
1858        let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1859        let state = create_mock_app_state(None, None, None, None, None, None).await;
1860
1861        service
1862            .clone()
1863            .start(Arc::new(web::ThinData(state)))
1864            .await
1865            .unwrap();
1866
1867        // Do NOT register any execution — the legacy handler should still process
1868        // the request but log a debug warning about unknown execution_id
1869
1870        tokio::time::sleep(Duration::from_millis(50)).await;
1871
1872        let mut client = UnixStream::connect(socket_path.to_str().unwrap())
1873            .await
1874            .unwrap();
1875
1876        // Send legacy Request with unknown execution_id
1877        let legacy_request = crate::services::plugins::relayer_api::Request {
1878            request_id: "unknown-req".to_string(),
1879            relayer_id: "relayer-1".to_string(),
1880            method: crate::services::plugins::relayer_api::PluginMethod::GetRelayerStatus,
1881            payload: serde_json::json!({}),
1882            http_request_id: Some("nonexistent-exec-id".to_string()),
1883        };
1884        let legacy_json = serde_json::to_string(&legacy_request).unwrap() + "\n";
1885        client.write_all(legacy_json.as_bytes()).await.unwrap();
1886        client.flush().await.unwrap();
1887
1888        // Should still get a response (legacy path processes even without binding)
1889        let (r, _w) = client.into_split();
1890        let mut reader = BufReader::new(r);
1891        let mut response_line = String::new();
1892        reader.read_line(&mut response_line).await.unwrap();
1893
1894        let response: crate::services::plugins::relayer_api::Response =
1895            serde_json::from_str(&response_line).unwrap();
1896        assert_eq!(response.request_id, "unknown-req");
1897
1898        drop(reader);
1899        drop(_w);
1900        service.shutdown().await;
1901    }
1902
1903    #[tokio::test]
1904    async fn test_legacy_unparsable_message() {
1905        let temp_dir = tempdir().unwrap();
1906        let socket_path = temp_dir.path().join("shared_legacy_unparse.sock");
1907
1908        let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1909        let state = create_mock_app_state(None, None, None, None, None, None).await;
1910
1911        service
1912            .clone()
1913            .start(Arc::new(web::ThinData(state)))
1914            .await
1915            .unwrap();
1916
1917        let execution_id = "test-exec-legacy-unparse".to_string();
1918        let _guard = service.register_execution(execution_id.clone(), true).await;
1919
1920        tokio::time::sleep(Duration::from_millis(50)).await;
1921
1922        let mut client = UnixStream::connect(socket_path.to_str().unwrap())
1923            .await
1924            .unwrap();
1925
1926        // Register first
1927        let register_msg = PluginMessage::Register {
1928            execution_id: execution_id.clone(),
1929        };
1930        client
1931            .write_all((serde_json::to_string(&register_msg).unwrap() + "\n").as_bytes())
1932            .await
1933            .unwrap();
1934
1935        // Send JSON without "type" field AND not a valid legacy Request
1936        // This hits the fallback warn! at line 732
1937        client.write_all(b"{\"foo\": \"bar\"}\n").await.unwrap();
1938        client.flush().await.unwrap();
1939
1940        tokio::time::sleep(Duration::from_millis(100)).await;
1941
1942        // Connection should still be alive
1943        let shutdown_msg = PluginMessage::Shutdown;
1944        let write_result = client
1945            .write_all((serde_json::to_string(&shutdown_msg).unwrap() + "\n").as_bytes())
1946            .await;
1947
1948        assert!(
1949            write_result.is_ok(),
1950            "Connection should still be alive after unparsable legacy message"
1951        );
1952
1953        drop(client);
1954        service.shutdown().await;
1955    }
1956
1957    #[tokio::test]
1958    async fn test_invalid_plugin_message_type() {
1959        let temp_dir = tempdir().unwrap();
1960        let socket_path = temp_dir.path().join("shared_invalid_type.sock");
1961
1962        let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1963        let state = create_mock_app_state(None, None, None, None, None, None).await;
1964
1965        service
1966            .clone()
1967            .start(Arc::new(web::ThinData(state)))
1968            .await
1969            .unwrap();
1970
1971        let execution_id = "test-exec-invalid-type".to_string();
1972        let _guard = service.register_execution(execution_id.clone(), true).await;
1973
1974        tokio::time::sleep(Duration::from_millis(50)).await;
1975
1976        let mut client = UnixStream::connect(socket_path.to_str().unwrap())
1977            .await
1978            .unwrap();
1979
1980        // Register first
1981        let register_msg = PluginMessage::Register {
1982            execution_id: execution_id.clone(),
1983        };
1984        client
1985            .write_all((serde_json::to_string(&register_msg).unwrap() + "\n").as_bytes())
1986            .await
1987            .unwrap();
1988
1989        // Send JSON with "type" field but not a valid PluginMessage variant
1990        // This hits the `Err(e)` branch at line 584-586
1991        client
1992            .write_all(b"{\"type\": \"nonexistent_type\", \"data\": \"foo\"}\n")
1993            .await
1994            .unwrap();
1995        client.flush().await.unwrap();
1996
1997        // Send a valid request after invalid message.
1998        // If we get ApiResponse, the connection stayed alive and parsing recovered.
1999        let api_request = PluginMessage::ApiRequest {
2000            request_id: "req-after-invalid".to_string(),
2001            relayer_id: "relayer-1".to_string(),
2002            method: crate::services::plugins::relayer_api::PluginMethod::GetRelayerStatus,
2003            payload: serde_json::json!({}),
2004        };
2005        client
2006            .write_all((serde_json::to_string(&api_request).unwrap() + "\n").as_bytes())
2007            .await
2008            .unwrap();
2009        client.flush().await.unwrap();
2010
2011        let (r, mut w) = client.into_split();
2012        let mut reader = BufReader::new(r);
2013        let mut response_line = String::new();
2014        tokio::time::timeout(Duration::from_secs(1), reader.read_line(&mut response_line))
2015            .await
2016            .expect("Timed out waiting for ApiResponse")
2017            .unwrap();
2018
2019        let response: PluginMessage = serde_json::from_str(&response_line).unwrap();
2020        match response {
2021            PluginMessage::ApiResponse { request_id, .. } => {
2022                assert_eq!(request_id, "req-after-invalid");
2023            }
2024            _ => panic!("Expected ApiResponse after invalid plugin message"),
2025        }
2026
2027        let shutdown_msg = PluginMessage::Shutdown;
2028        w.write_all((serde_json::to_string(&shutdown_msg).unwrap() + "\n").as_bytes())
2029            .await
2030            .unwrap();
2031        w.flush().await.unwrap();
2032
2033        drop(reader);
2034        drop(w);
2035        service.shutdown().await;
2036    }
2037
2038    #[tokio::test]
2039    async fn test_service_drop_sends_shutdown() {
2040        let temp_dir = tempdir().unwrap();
2041        let socket_path = temp_dir.path().join("shared_drop.sock");
2042
2043        let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
2044        let state = create_mock_app_state(None, None, None, None, None, None).await;
2045
2046        service
2047            .clone()
2048            .start(Arc::new(web::ThinData(state)))
2049            .await
2050            .unwrap();
2051
2052        // Subscribe before dropping so we can observe Drop-triggered shutdown signal.
2053        let mut shutdown_rx = service.shutdown_tx.subscribe();
2054
2055        // Drop should send shutdown signal.
2056        drop(service);
2057
2058        tokio::time::timeout(Duration::from_secs(1), shutdown_rx.changed())
2059            .await
2060            .expect("Timed out waiting for shutdown signal from Drop")
2061            .unwrap();
2062        assert!(
2063            *shutdown_rx.borrow(),
2064            "Drop should broadcast shutdown=true to listeners"
2065        );
2066    }
2067
2068    #[tokio::test]
2069    async fn test_trace_channel_closed_before_send() {
2070        let temp_dir = tempdir().unwrap();
2071        let socket_path = temp_dir.path().join("shared_trace_closed.sock");
2072
2073        let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
2074        let state = create_mock_app_state(None, None, None, None, None, None).await;
2075
2076        service
2077            .clone()
2078            .start(Arc::new(web::ThinData(state)))
2079            .await
2080            .unwrap();
2081
2082        let execution_id = "test-exec-trace-closed".to_string();
2083        let guard = service.register_execution(execution_id.clone(), true).await;
2084
2085        // Consume and drop the receiver immediately — channel is now closed
2086        let rx = guard.into_receiver();
2087        assert!(rx.is_some());
2088        drop(rx);
2089
2090        tokio::time::sleep(Duration::from_millis(50)).await;
2091
2092        let mut client = UnixStream::connect(socket_path.to_str().unwrap())
2093            .await
2094            .unwrap();
2095
2096        // Register
2097        let register_msg = PluginMessage::Register {
2098            execution_id: execution_id.clone(),
2099        };
2100        client
2101            .write_all((serde_json::to_string(&register_msg).unwrap() + "\n").as_bytes())
2102            .await
2103            .unwrap();
2104
2105        // Send trace
2106        let trace = PluginMessage::Trace {
2107            trace: serde_json::json!({"event": "will_be_lost"}),
2108        };
2109        client
2110            .write_all((serde_json::to_string(&trace).unwrap() + "\n").as_bytes())
2111            .await
2112            .unwrap();
2113
2114        // Shutdown
2115        let shutdown_msg = PluginMessage::Shutdown;
2116        client
2117            .write_all((serde_json::to_string(&shutdown_msg).unwrap() + "\n").as_bytes())
2118            .await
2119            .unwrap();
2120        client.flush().await.unwrap();
2121        drop(client);
2122
2123        // Handler should not get stuck trying to send traces to a closed channel.
2124        tokio::time::timeout(Duration::from_secs(1), async {
2125            loop {
2126                if service.active_connection_count() == 0 {
2127                    break;
2128                }
2129                tokio::time::sleep(Duration::from_millis(20)).await;
2130            }
2131        })
2132        .await
2133        .expect("Connection should close even when trace receiver is dropped");
2134
2135        service.shutdown().await;
2136    }
2137
2138    #[tokio::test]
2139    async fn test_duplicate_execution_id_does_not_corrupt_counter() {
2140        let temp_dir = tempdir().unwrap();
2141        let socket_path = temp_dir.path().join("shared_duplicate_exec.sock");
2142
2143        let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
2144        let execution_id = "duplicate-exec-id".to_string();
2145
2146        // Initial count should be 0
2147        assert_eq!(service.registered_executions_count().await, 0);
2148
2149        // Register first execution
2150        let guard1 = service.register_execution(execution_id.clone(), true).await;
2151        assert_eq!(service.registered_executions_count().await, 1);
2152
2153        // Try to register with same execution_id (duplicate)
2154        // This should NOT increment the counter (insertion will fail)
2155        let guard2 = service.register_execution(execution_id.clone(), true).await;
2156        // Counter should still be 1 (not 2)
2157        assert_eq!(service.registered_executions_count().await, 1);
2158
2159        // Drop the duplicate guard first - should NOT decrement counter
2160        // (because it was never successfully registered)
2161        drop(guard2);
2162        assert_eq!(service.registered_executions_count().await, 1);
2163
2164        // Drop the original guard - should decrement counter
2165        drop(guard1);
2166        assert_eq!(service.registered_executions_count().await, 0);
2167    }
2168
2169    #[tokio::test]
2170    async fn test_execution_guard_registered_field() {
2171        let temp_dir = tempdir().unwrap();
2172        let socket_path = temp_dir.path().join("shared_registered_field.sock");
2173
2174        let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
2175
2176        // Register a unique execution_id
2177        let execution_id_1 = "unique-exec-1".to_string();
2178        let guard1 = service
2179            .register_execution(execution_id_1.clone(), true)
2180            .await;
2181        assert_eq!(service.registered_executions_count().await, 1);
2182
2183        // Register another unique execution_id
2184        let execution_id_2 = "unique-exec-2".to_string();
2185        let guard2 = service
2186            .register_execution(execution_id_2.clone(), false)
2187            .await;
2188        assert_eq!(service.registered_executions_count().await, 2);
2189
2190        // into_receiver should work regardless of registered status
2191        let rx = guard1.into_receiver();
2192        assert!(rx.is_some()); // emit_traces=true
2193
2194        // guard2 had emit_traces=false
2195        let rx2 = guard2.into_receiver();
2196        assert!(rx2.is_none()); // emit_traces=false
2197
2198        // After guards are consumed via into_receiver, counter should be decremented
2199        assert_eq!(service.registered_executions_count().await, 0);
2200    }
2201
2202    // =========================================================================
2203    // log_socket_write_error tests
2204    // =========================================================================
2205
2206    #[test]
2207    fn test_log_socket_write_error_broken_pipe_does_not_panic() {
2208        // BrokenPipe is expected during timeout teardown → should log at DEBUG, not WARN
2209        let err = std::io::Error::new(std::io::ErrorKind::BrokenPipe, "Broken pipe");
2210        log_socket_write_error("API response", &err);
2211    }
2212
2213    #[test]
2214    fn test_log_socket_write_error_connection_reset_does_not_panic() {
2215        // ConnectionReset is expected during timeout teardown → should log at DEBUG, not WARN
2216        let err = std::io::Error::new(std::io::ErrorKind::ConnectionReset, "Connection reset");
2217        log_socket_write_error("API response flush", &err);
2218    }
2219
2220    #[test]
2221    fn test_log_socket_write_error_other_errors_do_not_panic() {
2222        // Other IO errors (e.g., PermissionDenied) → should log at WARN
2223        let err = std::io::Error::new(std::io::ErrorKind::PermissionDenied, "Permission denied");
2224        log_socket_write_error("response", &err);
2225    }
2226
2227    #[test]
2228    fn test_log_socket_write_error_unexpected_eof() {
2229        let err = std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "unexpected eof");
2230        log_socket_write_error("response flush", &err);
2231    }
2232
2233    #[test]
2234    fn test_log_socket_write_error_context_strings() {
2235        // Verify all 4 context strings used in production don't cause issues
2236        let err = std::io::Error::new(std::io::ErrorKind::BrokenPipe, "os error 32");
2237        log_socket_write_error("API response", &err);
2238        log_socket_write_error("API response flush", &err);
2239        log_socket_write_error("response", &err);
2240        log_socket_write_error("response flush", &err);
2241    }
2242}