openzeppelin_relayer/services/plugins/
pool_executor.rs

1//! Pool-based Plugin Executor
2//!
3//! This module provides execution of pre-compiled JavaScript plugins via
4//! a persistent Piscina worker pool, replacing the per-request ts-node approach.
5//!
6//! Communication with the Node.js pool server happens via Unix socket using
7//! a JSON-line protocol.
8
9use std::collections::HashMap;
10use std::process::Stdio;
11use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::io::{AsyncBufReadExt, BufReader};
15use tokio::process::{Child, Command};
16use tokio::sync::oneshot;
17use uuid::Uuid;
18
19use crate::constants::{
20    ADMIN_REQUEST_TIMEOUT_SECS, DEFAULT_PLUGIN_TIMEOUT_SECONDS, PLUGIN_TIMEOUT_BUFFER_SECONDS,
21};
22
23use super::config::get_config;
24use super::connection::{ConnectionPool, PoolConnection};
25use super::health::{
26    CircuitBreaker, CircuitState, DeadServerIndicator, HealthStatus, ProcessStatus,
27};
28use super::protocol::{PoolError, PoolRequest, PoolResponse};
29use super::shared_socket::get_shared_socket_service;
30use super::{LogEntry, PluginError, PluginHandlerPayload, ScriptResult};
31
32/// Request queue entry for throttling
33struct QueuedRequest {
34    plugin_id: String,
35    compiled_code: Option<String>,
36    plugin_path: Option<String>,
37    params: serde_json::Value,
38    headers: Option<HashMap<String, Vec<String>>>,
39    socket_path: String,
40    http_request_id: Option<String>,
41    timeout_secs: Option<u64>,
42    route: Option<String>,
43    config: Option<serde_json::Value>,
44    method: Option<String>,
45    query: Option<serde_json::Value>,
46    response_tx: oneshot::Sender<Result<ScriptResult, PluginError>>,
47}
48
49/// Parsed health check result fields extracted from pool server JSON response.
50///
51/// This struct replaces a complex tuple return type to satisfy Clippy's
52/// `type_complexity` lint and improve readability.
53#[derive(Debug, Default, PartialEq)]
54pub struct ParsedHealthResult {
55    pub status: String,
56    pub uptime_ms: Option<u64>,
57    pub memory: Option<u64>,
58    pub pool_completed: Option<u64>,
59    pub pool_queued: Option<u64>,
60    pub success_rate: Option<f64>,
61}
62
63/// Manages the pool server process and connections
64pub struct PoolManager {
65    socket_path: String,
66    process: tokio::sync::Mutex<Option<Child>>,
67    initialized: Arc<AtomicBool>,
68    /// Lock to prevent concurrent restarts (thundering herd)
69    restart_lock: tokio::sync::Mutex<()>,
70    /// Connection pool for reusing connections
71    connection_pool: Arc<ConnectionPool>,
72    /// Request queue for throttling/backpressure (multi-consumer channel)
73    request_tx: async_channel::Sender<QueuedRequest>,
74    /// Actual configured queue size (for error messages)
75    max_queue_size: usize,
76    /// Flag indicating if health check is needed (set by background task)
77    health_check_needed: Arc<AtomicBool>,
78    /// Consecutive failure count for health checks
79    consecutive_failures: Arc<AtomicU32>,
80    /// Circuit breaker for automatic degradation under GC pressure
81    circuit_breaker: Arc<CircuitBreaker>,
82    /// Last successful restart time (for backoff calculation)
83    last_restart_time_ms: Arc<AtomicU64>,
84    /// Is currently in recovery mode (gradual ramp-up)
85    recovery_mode: Arc<AtomicBool>,
86    /// Requests allowed during recovery (gradual increase)
87    recovery_allowance: Arc<AtomicU32>,
88    /// Shutdown signal for background tasks (queue workers, health check, etc.)
89    shutdown_signal: Arc<tokio::sync::Notify>,
90}
91
92impl Default for PoolManager {
93    fn default() -> Self {
94        Self::new()
95    }
96}
97
98impl PoolManager {
99    /// Base heap size in MB for the pool server process.
100    /// This provides the minimum memory needed for the Node.js runtime and core pool infrastructure.
101    const BASE_HEAP_MB: usize = 512;
102
103    /// Concurrency divisor for heap calculation.
104    /// Heap is incremented for every N concurrent requests to scale with load.
105    const CONCURRENCY_DIVISOR: usize = 10;
106
107    /// Heap increment in MB per CONCURRENCY_DIVISOR concurrent requests.
108    /// Formula: BASE_HEAP_MB + ((max_concurrency / CONCURRENCY_DIVISOR) * HEAP_INCREMENT_PER_DIVISOR_MB)
109    /// This accounts for additional memory needed per concurrent plugin execution context.
110    const HEAP_INCREMENT_PER_DIVISOR_MB: usize = 32;
111
112    /// Maximum heap size in MB (hard cap) for the pool server process.
113    /// Prevents excessive memory allocation that could cause system instability.
114    /// Set to 8GB (8192 MB) as a reasonable upper bound for Node.js processes.
115    const MAX_HEAP_MB: usize = 8192;
116
117    /// Calculate heap size based on concurrency level.
118    ///
119    /// Formula: BASE_HEAP_MB + ((max_concurrency / CONCURRENCY_DIVISOR) * HEAP_INCREMENT_PER_DIVISOR_MB)
120    /// Result is capped at MAX_HEAP_MB.
121    ///
122    /// This scales memory allocation with expected load while maintaining a reasonable minimum.
123    pub fn calculate_heap_size(max_concurrency: usize) -> usize {
124        let calculated = Self::BASE_HEAP_MB
125            + ((max_concurrency / Self::CONCURRENCY_DIVISOR) * Self::HEAP_INCREMENT_PER_DIVISOR_MB);
126        calculated.min(Self::MAX_HEAP_MB)
127    }
128
129    /// Format a result value from the pool response into a string.
130    ///
131    /// If the value is already a string, returns it directly.
132    /// Otherwise, serializes it to JSON.
133    pub fn format_return_value(value: Option<serde_json::Value>) -> String {
134        value
135            .map(|v| {
136                if v.is_string() {
137                    v.as_str().unwrap_or("").to_string()
138                } else {
139                    serde_json::to_string(&v).unwrap_or_default()
140                }
141            })
142            .unwrap_or_default()
143    }
144
145    /// Parse a successful pool response into a ScriptResult.
146    ///
147    /// Converts logs from PoolLogEntry to LogEntry and extracts the return value.
148    pub fn parse_success_response(response: PoolResponse) -> ScriptResult {
149        let logs: Vec<LogEntry> = response
150            .logs
151            .map(|logs| logs.into_iter().map(|l| l.into()).collect())
152            .unwrap_or_default();
153
154        ScriptResult {
155            logs,
156            error: String::new(),
157            return_value: Self::format_return_value(response.result),
158            trace: Vec::new(),
159        }
160    }
161
162    /// Parse a failed pool response into a PluginError.
163    ///
164    /// Extracts error details and converts logs for inclusion in the error payload.
165    pub fn parse_error_response(response: PoolResponse) -> PluginError {
166        let logs: Vec<LogEntry> = response
167            .logs
168            .map(|logs| logs.into_iter().map(|l| l.into()).collect())
169            .unwrap_or_default();
170
171        let error = response.error.unwrap_or(PoolError {
172            message: "Unknown error".to_string(),
173            code: None,
174            status: None,
175            details: None,
176        });
177
178        PluginError::HandlerError(Box::new(PluginHandlerPayload {
179            message: error.message,
180            status: error.status.unwrap_or(500),
181            code: error.code,
182            details: error.details,
183            logs: Some(logs),
184            traces: None,
185        }))
186    }
187
188    /// Parse a pool response into either a success result or an error.
189    ///
190    /// This is the main entry point for response parsing, dispatching to
191    /// either parse_success_response or parse_error_response based on the success flag.
192    pub fn parse_pool_response(response: PoolResponse) -> Result<ScriptResult, PluginError> {
193        if response.success {
194            Ok(Self::parse_success_response(response))
195        } else {
196            Err(Self::parse_error_response(response))
197        }
198    }
199
200    /// Parse health check result JSON into individual fields.
201    ///
202    /// Extracts status, uptime, memory usage, pool stats, and success rate
203    /// from the nested JSON structure returned by the pool server.
204    pub fn parse_health_result(result: &serde_json::Value) -> ParsedHealthResult {
205        ParsedHealthResult {
206            status: result
207                .get("status")
208                .and_then(|v| v.as_str())
209                .unwrap_or("unknown")
210                .to_string(),
211            uptime_ms: result.get("uptime").and_then(|v| v.as_u64()),
212            memory: result
213                .get("memory")
214                .and_then(|v| v.get("heapUsed"))
215                .and_then(|v| v.as_u64()),
216            pool_completed: result
217                .get("pool")
218                .and_then(|v| v.get("completed"))
219                .and_then(|v| v.as_u64()),
220            pool_queued: result
221                .get("pool")
222                .and_then(|v| v.get("queued"))
223                .and_then(|v| v.as_u64()),
224            success_rate: result
225                .get("execution")
226                .and_then(|v| v.get("successRate"))
227                .and_then(|v| v.as_f64()),
228        }
229    }
230
231    /// Create a new PoolManager with default socket path
232    pub fn new() -> Self {
233        Self::init(format!("/tmp/relayer-plugin-pool-{}.sock", Uuid::new_v4()))
234    }
235
236    /// Create a new PoolManager with custom socket path
237    pub fn with_socket_path(socket_path: String) -> Self {
238        Self::init(socket_path)
239    }
240
241    /// Common initialization logic
242    fn init(socket_path: String) -> Self {
243        let config = get_config();
244        let max_connections = config.pool_max_connections;
245        let max_queue_size = config.pool_max_queue_size;
246
247        let (tx, rx) = async_channel::bounded(max_queue_size);
248
249        let connection_pool = Arc::new(ConnectionPool::new(socket_path.clone(), max_connections));
250        let connection_pool_clone = connection_pool.clone();
251
252        let shutdown_signal = Arc::new(tokio::sync::Notify::new());
253
254        Self::spawn_queue_workers(
255            rx,
256            connection_pool_clone,
257            config.pool_workers,
258            shutdown_signal.clone(),
259        );
260
261        let health_check_needed = Arc::new(AtomicBool::new(false));
262        let consecutive_failures = Arc::new(AtomicU32::new(0));
263        let circuit_breaker = Arc::new(CircuitBreaker::new());
264        let last_restart_time_ms = Arc::new(AtomicU64::new(0));
265        let recovery_mode = Arc::new(AtomicBool::new(false));
266        let recovery_allowance = Arc::new(AtomicU32::new(0));
267
268        Self::spawn_health_check_task(
269            health_check_needed.clone(),
270            config.health_check_interval_secs,
271            shutdown_signal.clone(),
272        );
273
274        Self::spawn_recovery_task(
275            recovery_mode.clone(),
276            recovery_allowance.clone(),
277            shutdown_signal.clone(),
278        );
279
280        Self {
281            connection_pool,
282            socket_path,
283            process: tokio::sync::Mutex::new(None),
284            initialized: Arc::new(AtomicBool::new(false)),
285            restart_lock: tokio::sync::Mutex::new(()),
286            request_tx: tx,
287            max_queue_size,
288            health_check_needed,
289            consecutive_failures,
290            circuit_breaker,
291            last_restart_time_ms,
292            recovery_mode,
293            recovery_allowance,
294            shutdown_signal,
295        }
296    }
297
298    /// Spawn background task to gradually increase recovery allowance
299    fn spawn_recovery_task(
300        recovery_mode: Arc<AtomicBool>,
301        recovery_allowance: Arc<AtomicU32>,
302        shutdown_signal: Arc<tokio::sync::Notify>,
303    ) {
304        tokio::spawn(async move {
305            let mut interval = tokio::time::interval(Duration::from_millis(500));
306            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
307
308            loop {
309                tokio::select! {
310                    biased;
311
312                    _ = shutdown_signal.notified() => {
313                        tracing::debug!("Recovery task received shutdown signal");
314                        break;
315                    }
316
317                    _ = interval.tick() => {
318                        if recovery_mode.load(Ordering::Relaxed) {
319                            let current = recovery_allowance.load(Ordering::Relaxed);
320                            if current < 100 {
321                                let new_allowance = (current + 10).min(100);
322                                recovery_allowance.store(new_allowance, Ordering::Relaxed);
323                                tracing::debug!(
324                                    allowance = new_allowance,
325                                    "Recovery mode: increasing request allowance"
326                                );
327                            } else {
328                                recovery_mode.store(false, Ordering::Relaxed);
329                                tracing::info!("Recovery mode complete - full capacity restored");
330                            }
331                        }
332                    }
333                }
334            }
335        });
336    }
337
338    /// Spawn background task to set health check flag periodically
339    fn spawn_health_check_task(
340        health_check_needed: Arc<AtomicBool>,
341        interval_secs: u64,
342        shutdown_signal: Arc<tokio::sync::Notify>,
343    ) {
344        tokio::spawn(async move {
345            let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
346            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
347
348            loop {
349                tokio::select! {
350                    biased;
351
352                    _ = shutdown_signal.notified() => {
353                        tracing::debug!("Health check task received shutdown signal");
354                        break;
355                    }
356
357                    _ = interval.tick() => {
358                        health_check_needed.store(true, Ordering::Relaxed);
359                    }
360                }
361            }
362        });
363    }
364
365    /// Spawn multiple worker tasks to process queued requests concurrently
366    fn spawn_queue_workers(
367        rx: async_channel::Receiver<QueuedRequest>,
368        connection_pool: Arc<ConnectionPool>,
369        configured_workers: usize,
370        shutdown_signal: Arc<tokio::sync::Notify>,
371    ) {
372        let num_workers = if configured_workers > 0 {
373            configured_workers
374        } else {
375            std::thread::available_parallelism()
376                .map(|n| n.get().clamp(4, 32))
377                .unwrap_or(8)
378        };
379
380        tracing::info!(num_workers = num_workers, "Starting request queue workers");
381
382        for worker_id in 0..num_workers {
383            let rx_clone = rx.clone();
384            let pool_clone = connection_pool.clone();
385            let shutdown = shutdown_signal.clone();
386
387            tokio::spawn(async move {
388                loop {
389                    tokio::select! {
390                        biased;
391
392                        _ = shutdown.notified() => {
393                            tracing::debug!(worker_id = worker_id, "Request queue worker received shutdown signal");
394                            break;
395                        }
396
397                        request_result = rx_clone.recv() => {
398                            let request = match request_result {
399                                Ok(r) => r,
400                                Err(_) => break,
401                            };
402
403                            let start = std::time::Instant::now();
404                            let plugin_id = request.plugin_id.clone();
405
406                            let result = Self::execute_plugin_internal(
407                                &pool_clone,
408                                request.plugin_id,
409                                request.compiled_code,
410                                request.plugin_path,
411                                request.params,
412                                request.headers,
413                                request.socket_path,
414                                request.http_request_id,
415                                request.timeout_secs,
416                                request.route,
417                                request.config,
418                                request.method,
419                                request.query,
420                            )
421                            .await;
422
423                            let elapsed = start.elapsed();
424                            if let Err(ref e) = result {
425                                let error_str = format!("{e:?}");
426                                if error_str.contains("shutdown") || error_str.contains("Shutdown") {
427                                    tracing::debug!(
428                                        worker_id = worker_id,
429                                        plugin_id = %plugin_id,
430                                        "Plugin execution cancelled during shutdown"
431                                    );
432                                } else {
433                                    tracing::warn!(
434                                        worker_id = worker_id,
435                                        plugin_id = %plugin_id,
436                                        elapsed_ms = elapsed.as_millis() as u64,
437                                        error = ?e,
438                                        "Plugin execution failed"
439                                    );
440                                }
441                            } else if elapsed.as_secs() > 1 {
442                                tracing::debug!(
443                                    worker_id = worker_id,
444                                    plugin_id = %plugin_id,
445                                    elapsed_ms = elapsed.as_millis() as u64,
446                                    "Slow plugin execution"
447                                );
448                            }
449
450                            let _ = request.response_tx.send(result);
451                        }
452                    }
453                }
454
455                tracing::debug!(worker_id = worker_id, "Request queue worker exited");
456            });
457        }
458    }
459
460    /// Spawn a rate-limited stderr reader to prevent log flooding
461    fn spawn_rate_limited_stderr_reader(stderr: tokio::process::ChildStderr) {
462        tokio::spawn(async move {
463            let reader = BufReader::new(stderr);
464            let mut lines = reader.lines();
465
466            let mut last_log_time = std::time::Instant::now();
467            let mut suppressed_count = 0u64;
468            let min_interval = Duration::from_millis(100);
469
470            while let Ok(Some(line)) = lines.next_line().await {
471                let now = std::time::Instant::now();
472                let elapsed = now.duration_since(last_log_time);
473
474                if elapsed >= min_interval {
475                    if suppressed_count > 0 {
476                        tracing::warn!(
477                            target: "pool_server",
478                            suppressed = suppressed_count,
479                            "... ({} lines suppressed due to rate limiting)",
480                            suppressed_count
481                        );
482                        suppressed_count = 0;
483                    }
484                    tracing::error!(target: "pool_server", "{}", line);
485                    last_log_time = now;
486                } else {
487                    suppressed_count += 1;
488                    if suppressed_count % 100 == 0 {
489                        tracing::warn!(
490                            target: "pool_server",
491                            suppressed = suppressed_count,
492                            "Pool server producing excessive stderr output"
493                        );
494                    }
495                }
496            }
497
498            if suppressed_count > 0 {
499                tracing::warn!(
500                    target: "pool_server",
501                    suppressed = suppressed_count,
502                    "Pool server stderr closed ({} final lines suppressed)",
503                    suppressed_count
504                );
505            }
506        });
507    }
508
509    /// Execute plugin with optional pre-acquired permit (unified fast/slow path)
510    #[allow(clippy::too_many_arguments)]
511    async fn execute_with_permit(
512        connection_pool: &Arc<ConnectionPool>,
513        permit: Option<tokio::sync::OwnedSemaphorePermit>,
514        plugin_id: String,
515        compiled_code: Option<String>,
516        plugin_path: Option<String>,
517        params: serde_json::Value,
518        headers: Option<HashMap<String, Vec<String>>>,
519        socket_path: String,
520        http_request_id: Option<String>,
521        timeout_secs: Option<u64>,
522        route: Option<String>,
523        config: Option<serde_json::Value>,
524        method: Option<String>,
525        query: Option<serde_json::Value>,
526    ) -> Result<ScriptResult, PluginError> {
527        let mut conn = connection_pool.acquire_with_permit(permit).await?;
528
529        let request = PoolRequest::Execute(Box::new(super::protocol::ExecuteRequest {
530            task_id: Uuid::new_v4().to_string(),
531            plugin_id: plugin_id.clone(),
532            compiled_code,
533            plugin_path,
534            params,
535            headers,
536            socket_path,
537            http_request_id,
538            timeout: timeout_secs.map(|s| s * 1000),
539            route,
540            config,
541            method,
542            query,
543        }));
544
545        // Add buffer so the Node.js timeout fires first with a structured response;
546        // this Rust timeout is a backstop if the Node.js process hangs.
547        let configured_timeout = timeout_secs.unwrap_or(DEFAULT_PLUGIN_TIMEOUT_SECONDS);
548        let backstop_timeout = configured_timeout + PLUGIN_TIMEOUT_BUFFER_SECONDS;
549        let response = conn
550            .send_request_with_timeout(&request, backstop_timeout)
551            .await?;
552
553        // Use extracted parsing function for cleaner code and testability
554        Self::parse_pool_response(response)
555    }
556
557    /// Internal execution method (wrapper for execute_with_permit)
558    #[allow(clippy::too_many_arguments)]
559    async fn execute_plugin_internal(
560        connection_pool: &Arc<ConnectionPool>,
561        plugin_id: String,
562        compiled_code: Option<String>,
563        plugin_path: Option<String>,
564        params: serde_json::Value,
565        headers: Option<HashMap<String, Vec<String>>>,
566        socket_path: String,
567        http_request_id: Option<String>,
568        timeout_secs: Option<u64>,
569        route: Option<String>,
570        config: Option<serde_json::Value>,
571        method: Option<String>,
572        query: Option<serde_json::Value>,
573    ) -> Result<ScriptResult, PluginError> {
574        Self::execute_with_permit(
575            connection_pool,
576            None,
577            plugin_id,
578            compiled_code,
579            plugin_path,
580            params,
581            headers,
582            socket_path,
583            http_request_id,
584            timeout_secs,
585            route,
586            config,
587            method,
588            query,
589        )
590        .await
591    }
592
593    /// Check if the pool manager has been initialized.
594    ///
595    /// This is useful for health checks to determine if the plugin pool
596    /// is expected to be running.
597    pub async fn is_initialized(&self) -> bool {
598        self.initialized.load(Ordering::Acquire)
599    }
600
601    /// Start the pool server if not already running
602    pub async fn ensure_started(&self) -> Result<(), PluginError> {
603        if self.initialized.load(Ordering::Acquire) {
604            return Ok(());
605        }
606
607        let _startup_guard = self.restart_lock.lock().await;
608
609        if self.initialized.load(Ordering::Acquire) {
610            return Ok(());
611        }
612
613        self.start_pool_server().await?;
614        self.initialized.store(true, Ordering::Release);
615        Ok(())
616    }
617
618    /// Ensure pool is started and healthy, with auto-recovery on failure
619    async fn ensure_started_and_healthy(&self) -> Result<(), PluginError> {
620        self.ensure_started().await?;
621
622        if !self.health_check_needed.load(Ordering::Relaxed) {
623            return Ok(());
624        }
625
626        if self
627            .health_check_needed
628            .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
629            .is_err()
630        {
631            return Ok(());
632        }
633
634        self.check_and_restart_if_needed().await
635    }
636
637    /// Check process status and restart if needed
638    async fn check_and_restart_if_needed(&self) -> Result<(), PluginError> {
639        // Check process status without holding restart lock
640        let process_status = {
641            let mut process_guard = self.process.lock().await;
642            if let Some(child) = process_guard.as_mut() {
643                match child.try_wait() {
644                    Ok(Some(exit_status)) => {
645                        tracing::warn!(
646                            exit_status = ?exit_status,
647                            "Pool server process has exited"
648                        );
649                        *process_guard = None;
650                        ProcessStatus::Exited
651                    }
652                    Ok(None) => ProcessStatus::Running,
653                    Err(e) => {
654                        tracing::warn!(
655                            error = %e,
656                            "Failed to check pool server process status, assuming dead"
657                        );
658                        *process_guard = None;
659                        ProcessStatus::Unknown
660                    }
661                }
662            } else {
663                ProcessStatus::NoProcess
664            }
665        };
666
667        // Determine if restart is needed
668        let needs_restart = match process_status {
669            ProcessStatus::Running => {
670                let socket_exists = std::path::Path::new(&self.socket_path).exists();
671                if !socket_exists {
672                    tracing::warn!(
673                        socket_path = %self.socket_path,
674                        "Pool server socket file missing, needs restart"
675                    );
676                    true
677                } else {
678                    false
679                }
680            }
681            ProcessStatus::Exited | ProcessStatus::Unknown | ProcessStatus::NoProcess => {
682                tracing::warn!("Pool server not running, needs restart");
683                true
684            }
685        };
686
687        // Only acquire restart lock if restart is actually needed
688        if needs_restart {
689            let _restart_guard = self.restart_lock.lock().await;
690            self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
691            self.restart_internal().await?;
692            self.consecutive_failures.store(0, Ordering::Relaxed);
693        }
694
695        Ok(())
696    }
697
698    /// Clean up socket file with retry logic
699    async fn cleanup_socket_file(socket_path: &str) {
700        let max_cleanup_attempts = 5;
701        let mut attempts = 0;
702
703        while attempts < max_cleanup_attempts {
704            match std::fs::remove_file(socket_path) {
705                Ok(_) => break,
706                Err(e) if e.kind() == std::io::ErrorKind::NotFound => break,
707                Err(e) => {
708                    attempts += 1;
709                    if attempts >= max_cleanup_attempts {
710                        tracing::warn!(
711                            socket_path = %socket_path,
712                            error = %e,
713                            "Failed to remove socket file after {} attempts, proceeding anyway",
714                            max_cleanup_attempts
715                        );
716                        break;
717                    }
718                    let delay_ms = 10 * (1 << attempts.min(3));
719                    tokio::time::sleep(Duration::from_millis(delay_ms)).await;
720                }
721            }
722        }
723
724        tokio::time::sleep(Duration::from_millis(50)).await;
725    }
726
727    /// Spawn the pool server process with proper configuration
728    async fn spawn_pool_server_process(
729        socket_path: &str,
730        context: &str,
731    ) -> Result<Child, PluginError> {
732        let pool_server_path = std::env::current_dir()
733            .map(|cwd| cwd.join("plugins/lib/pool-server.ts").display().to_string())
734            .unwrap_or_else(|_| "plugins/lib/pool-server.ts".to_string());
735
736        let config = get_config();
737
738        // Use extracted function for heap calculation
739        let pool_server_heap_mb = Self::calculate_heap_size(config.max_concurrency);
740
741        // Log warning if heap was capped (for observability)
742        let uncapped_heap = Self::BASE_HEAP_MB
743            + ((config.max_concurrency / Self::CONCURRENCY_DIVISOR)
744                * Self::HEAP_INCREMENT_PER_DIVISOR_MB);
745        if uncapped_heap > Self::MAX_HEAP_MB {
746            tracing::warn!(
747                calculated_heap_mb = uncapped_heap,
748                capped_heap_mb = pool_server_heap_mb,
749                max_concurrency = config.max_concurrency,
750                "Pool server heap calculation exceeded 8GB cap"
751            );
752        }
753
754        tracing::info!(
755            socket_path = %socket_path,
756            heap_mb = pool_server_heap_mb,
757            max_concurrency = config.max_concurrency,
758            context = context,
759            "Spawning plugin pool server"
760        );
761
762        let node_options = format!("--max-old-space-size={pool_server_heap_mb} --expose-gc");
763
764        let mut child = Command::new("ts-node")
765            .arg("--transpile-only")
766            .arg(&pool_server_path)
767            .arg(socket_path)
768            .env("NODE_OPTIONS", node_options)
769            .env("PLUGIN_MAX_CONCURRENCY", config.max_concurrency.to_string())
770            .env(
771                "PLUGIN_POOL_MIN_THREADS",
772                config.nodejs_pool_min_threads.to_string(),
773            )
774            .env(
775                "PLUGIN_POOL_MAX_THREADS",
776                config.nodejs_pool_max_threads.to_string(),
777            )
778            .env(
779                "PLUGIN_POOL_CONCURRENT_TASKS",
780                config.nodejs_pool_concurrent_tasks.to_string(),
781            )
782            .env(
783                "PLUGIN_POOL_IDLE_TIMEOUT",
784                config.nodejs_pool_idle_timeout_ms.to_string(),
785            )
786            .env(
787                "PLUGIN_WORKER_HEAP_MB",
788                config.nodejs_worker_heap_mb.to_string(),
789            )
790            .env(
791                "PLUGIN_POOL_SOCKET_BACKLOG",
792                config.pool_socket_backlog.to_string(),
793            )
794            .stdin(Stdio::null())
795            .stdout(Stdio::piped())
796            .stderr(Stdio::piped())
797            .spawn()
798            .map_err(|e| {
799                PluginError::PluginExecutionError(format!("Failed to {context} pool server: {e}"))
800            })?;
801
802        if let Some(stderr) = child.stderr.take() {
803            Self::spawn_rate_limited_stderr_reader(stderr);
804        }
805
806        if let Some(stdout) = child.stdout.take() {
807            let reader = BufReader::new(stdout);
808            let mut lines = reader.lines();
809
810            let timeout_result = tokio::time::timeout(Duration::from_secs(10), async {
811                while let Ok(Some(line)) = lines.next_line().await {
812                    if line.contains("POOL_SERVER_READY") {
813                        return Ok(());
814                    }
815                }
816                Err(PluginError::PluginExecutionError(
817                    "Pool server did not send ready signal".to_string(),
818                ))
819            })
820            .await;
821
822            match timeout_result {
823                Ok(Ok(())) => {
824                    tracing::info!(context = context, "Plugin pool server ready");
825                }
826                Ok(Err(e)) => return Err(e),
827                Err(_) => {
828                    return Err(PluginError::PluginExecutionError(format!(
829                        "Timeout waiting for pool server to {context}"
830                    )))
831                }
832            }
833        }
834
835        Ok(child)
836    }
837
838    async fn start_pool_server(&self) -> Result<(), PluginError> {
839        let mut process_guard = self.process.lock().await;
840
841        if process_guard.is_some() {
842            return Ok(());
843        }
844
845        Self::cleanup_socket_file(&self.socket_path).await;
846
847        let child = Self::spawn_pool_server_process(&self.socket_path, "start").await?;
848
849        *process_guard = Some(child);
850        Ok(())
851    }
852
853    /// Execute a plugin via the pool
854    #[allow(clippy::too_many_arguments)]
855    pub async fn execute_plugin(
856        &self,
857        plugin_id: String,
858        compiled_code: Option<String>,
859        plugin_path: Option<String>,
860        params: serde_json::Value,
861        headers: Option<HashMap<String, Vec<String>>>,
862        socket_path: String,
863        http_request_id: Option<String>,
864        timeout_secs: Option<u64>,
865        route: Option<String>,
866        config: Option<serde_json::Value>,
867        method: Option<String>,
868        query: Option<serde_json::Value>,
869    ) -> Result<ScriptResult, PluginError> {
870        let rid = http_request_id.as_deref().unwrap_or("unknown");
871        let effective_timeout = timeout_secs.unwrap_or(DEFAULT_PLUGIN_TIMEOUT_SECONDS);
872        tracing::debug!(
873            plugin_id = %plugin_id,
874            http_request_id = %rid,
875            timeout_secs = effective_timeout,
876            "Pool execute request received"
877        );
878        let recovery_allowance = if self.recovery_mode.load(Ordering::Relaxed) {
879            Some(self.recovery_allowance.load(Ordering::Relaxed))
880        } else {
881            None
882        };
883
884        if !self
885            .circuit_breaker
886            .should_allow_request(recovery_allowance)
887        {
888            let state = self.circuit_breaker.state();
889            tracing::warn!(
890                plugin_id = %plugin_id,
891                circuit_state = ?state,
892                recovery_allowance = ?recovery_allowance,
893                "Request rejected by circuit breaker"
894            );
895            return Err(PluginError::PluginExecutionError(
896                "Plugin system temporarily unavailable due to high load. Please retry shortly."
897                    .to_string(),
898            ));
899        }
900
901        let start_time = Instant::now();
902
903        self.ensure_started_and_healthy().await?;
904        tracing::debug!(
905            plugin_id = %plugin_id,
906            http_request_id = %rid,
907            "Pool execute start (healthy/started)"
908        );
909
910        let circuit_breaker = self.circuit_breaker.clone();
911        match self.connection_pool.semaphore.clone().try_acquire_owned() {
912            Ok(permit) => {
913                tracing::debug!(
914                    plugin_id = %plugin_id,
915                    http_request_id = %rid,
916                    "Pool execute acquired connection permit (fast path)"
917                );
918                let result = Self::execute_with_permit(
919                    &self.connection_pool,
920                    Some(permit),
921                    plugin_id,
922                    compiled_code,
923                    plugin_path,
924                    params,
925                    headers,
926                    socket_path,
927                    http_request_id,
928                    timeout_secs,
929                    route,
930                    config,
931                    method,
932                    query,
933                )
934                .await;
935
936                let elapsed_ms = start_time.elapsed().as_millis() as u32;
937                match &result {
938                    Ok(_) => circuit_breaker.record_success(elapsed_ms),
939                    Err(e) => {
940                        // Only count infrastructure errors for circuit breaker, not business errors
941                        // Business errors (RPC failures, plugin logic errors) mean the pool is healthy
942                        if Self::is_dead_server_error(e) {
943                            circuit_breaker.record_failure();
944                            tracing::warn!(
945                                error = %e,
946                                "Detected dead pool server error, triggering health check for restart"
947                            );
948                            self.health_check_needed.store(true, Ordering::Relaxed);
949                        } else {
950                            // Plugin executed but returned error - infrastructure is healthy
951                            circuit_breaker.record_success(elapsed_ms);
952                        }
953                    }
954                }
955
956                tracing::debug!(
957                    elapsed_ms = elapsed_ms,
958                    result_ok = result.is_ok(),
959                    "Pool execute finished (fast path)"
960                );
961                result
962            }
963            Err(_) => {
964                tracing::debug!(
965                    plugin_id = %plugin_id,
966                    http_request_id = %rid,
967                    "Pool execute queueing (no permits)"
968                );
969                let (response_tx, response_rx) = oneshot::channel();
970
971                let queued_request = QueuedRequest {
972                    plugin_id,
973                    compiled_code,
974                    plugin_path,
975                    params,
976                    headers,
977                    socket_path,
978                    http_request_id,
979                    timeout_secs,
980                    route,
981                    config,
982                    method,
983                    query,
984                    response_tx,
985                };
986
987                let result = match self.request_tx.try_send(queued_request) {
988                    Ok(()) => {
989                        let queue_len = self.request_tx.len();
990                        if queue_len > self.max_queue_size / 2 {
991                            tracing::warn!(
992                                queue_len = queue_len,
993                                max_queue_size = self.max_queue_size,
994                                "Plugin queue is over 50% capacity"
995                            );
996                        }
997                        // Add timeout to response_rx to prevent hung requests if worker crashes.
998                        // Must exceed the Rust backstop (T + PLUGIN_TIMEOUT_BUFFER_SECONDS)
999                        // so the inner timeout layers fire first.
1000                        let response_timeout = timeout_secs
1001                            .map(Duration::from_secs)
1002                            .unwrap_or(Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS))
1003                            + Duration::from_secs(PLUGIN_TIMEOUT_BUFFER_SECONDS + 1);
1004
1005                        match tokio::time::timeout(response_timeout, response_rx).await {
1006                            Ok(Ok(result)) => result,
1007                            Ok(Err(_)) => Err(PluginError::PluginExecutionError(
1008                                "Request queue processor closed".to_string(),
1009                            )),
1010                            Err(_) => Err(PluginError::PluginExecutionError(format!(
1011                                "Request timed out after {}s waiting for worker response",
1012                                response_timeout.as_secs()
1013                            ))),
1014                        }
1015                    }
1016                    Err(async_channel::TrySendError::Full(req)) => {
1017                        let queue_timeout_ms = get_config().pool_queue_send_timeout_ms;
1018                        let queue_timeout = Duration::from_millis(queue_timeout_ms);
1019                        match tokio::time::timeout(queue_timeout, self.request_tx.send(req)).await {
1020                            Ok(Ok(())) => {
1021                                let queue_len = self.request_tx.len();
1022                                tracing::debug!(
1023                                    queue_len = queue_len,
1024                                    "Request queued after waiting for queue space"
1025                                );
1026                                // Must exceed the Rust backstop (T + PLUGIN_TIMEOUT_BUFFER_SECONDS)
1027                                let response_timeout = timeout_secs
1028                                    .map(Duration::from_secs)
1029                                    .unwrap_or(Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS))
1030                                    + Duration::from_secs(PLUGIN_TIMEOUT_BUFFER_SECONDS + 1);
1031
1032                                match tokio::time::timeout(response_timeout, response_rx).await {
1033                                    Ok(Ok(result)) => result,
1034                                    Ok(Err(_)) => Err(PluginError::PluginExecutionError(
1035                                        "Request queue processor closed".to_string(),
1036                                    )),
1037                                    Err(_) => Err(PluginError::PluginExecutionError(format!(
1038                                        "Request timed out after {}s waiting for worker response",
1039                                        response_timeout.as_secs()
1040                                    ))),
1041                                }
1042                            }
1043                            Ok(Err(async_channel::SendError(_))) => {
1044                                Err(PluginError::PluginExecutionError(
1045                                    "Plugin execution queue is closed".to_string(),
1046                                ))
1047                            }
1048                            Err(_) => {
1049                                let queue_len = self.request_tx.len();
1050                                tracing::error!(
1051                                    queue_len = queue_len,
1052                                    max_queue_size = self.max_queue_size,
1053                                    timeout_ms = queue_timeout.as_millis(),
1054                                    "Plugin execution queue is FULL - timeout waiting for space"
1055                                );
1056                                Err(PluginError::PluginExecutionError(format!(
1057                                    "Plugin execution queue is full (max: {}) and timeout waiting for space. \
1058                                    Consider increasing PLUGIN_POOL_MAX_QUEUE_SIZE or PLUGIN_POOL_MAX_CONNECTIONS.",
1059                                    self.max_queue_size
1060                                )))
1061                            }
1062                        }
1063                    }
1064                    Err(async_channel::TrySendError::Closed(_)) => {
1065                        Err(PluginError::PluginExecutionError(
1066                            "Plugin execution queue is closed".to_string(),
1067                        ))
1068                    }
1069                };
1070
1071                let elapsed_ms = start_time.elapsed().as_millis() as u32;
1072                match &result {
1073                    Ok(_) => circuit_breaker.record_success(elapsed_ms),
1074                    Err(e) => {
1075                        // Only count infrastructure errors for circuit breaker, not business errors
1076                        if Self::is_dead_server_error(e) {
1077                            circuit_breaker.record_failure();
1078                            tracing::warn!(
1079                                error = %e,
1080                                "Detected dead pool server error (queued path), triggering health check for restart"
1081                            );
1082                            self.health_check_needed.store(true, Ordering::Relaxed);
1083                        } else {
1084                            // Plugin executed but returned error - infrastructure is healthy
1085                            circuit_breaker.record_success(elapsed_ms);
1086                        }
1087                    }
1088                }
1089
1090                tracing::debug!(
1091                    elapsed_ms = elapsed_ms,
1092                    result_ok = result.is_ok(),
1093                    "Pool execute finished (queued path)"
1094                );
1095                result
1096            }
1097        }
1098    }
1099
1100    /// Check if an error indicates the pool server is dead and needs restart.
1101    /// Timeouts and handler/plugin errors are NOT dead-server indicators —
1102    /// they mean the server processed the request but the plugin failed.
1103    pub fn is_dead_server_error(err: &PluginError) -> bool {
1104        match err {
1105            // Timeouts mean the server is alive but the plugin took too long
1106            PluginError::ScriptTimeout(_) => false,
1107            // Handler errors are structured plugin failures, not infrastructure issues
1108            PluginError::HandlerError(_) => false,
1109            // Rust-side request timeout is the transport backstop firing because the
1110            // pool server stopped responding. This should trigger recovery.
1111            PluginError::SocketError(msg)
1112                if msg.to_lowercase().contains("request timed out after") =>
1113            {
1114                true
1115            }
1116            // For everything else, check the error message for dead-server patterns
1117            other => {
1118                let error_str = other.to_string();
1119                let lower = error_str.to_lowercase();
1120
1121                // Node.js handler timeout surfaced as a string error
1122                if lower.contains("handler timed out") {
1123                    return false;
1124                }
1125
1126                DeadServerIndicator::from_error_str(&error_str).is_some()
1127            }
1128        }
1129    }
1130
1131    /// Precompile a plugin
1132    pub async fn precompile_plugin(
1133        &self,
1134        plugin_id: String,
1135        plugin_path: Option<String>,
1136        source_code: Option<String>,
1137    ) -> Result<String, PluginError> {
1138        self.ensure_started().await?;
1139
1140        let mut conn = self.connection_pool.acquire().await?;
1141
1142        let request = PoolRequest::Precompile {
1143            task_id: Uuid::new_v4().to_string(),
1144            plugin_id: plugin_id.clone(),
1145            plugin_path,
1146            source_code,
1147        };
1148
1149        let response = conn
1150            .send_request_with_timeout(&request, ADMIN_REQUEST_TIMEOUT_SECS)
1151            .await?;
1152
1153        if response.success {
1154            response
1155                .result
1156                .and_then(|v| {
1157                    v.get("code")
1158                        .and_then(|c| c.as_str())
1159                        .map(|s| s.to_string())
1160                })
1161                .ok_or_else(|| {
1162                    PluginError::PluginExecutionError("No compiled code in response".to_string())
1163                })
1164        } else {
1165            let error = response.error.unwrap_or(PoolError {
1166                message: "Compilation failed".to_string(),
1167                code: None,
1168                status: None,
1169                details: None,
1170            });
1171            Err(PluginError::PluginExecutionError(error.message))
1172        }
1173    }
1174
1175    /// Cache compiled code in the pool
1176    pub async fn cache_compiled_code(
1177        &self,
1178        plugin_id: String,
1179        compiled_code: String,
1180    ) -> Result<(), PluginError> {
1181        self.ensure_started().await?;
1182
1183        let mut conn = self.connection_pool.acquire().await?;
1184
1185        let request = PoolRequest::Cache {
1186            task_id: Uuid::new_v4().to_string(),
1187            plugin_id: plugin_id.clone(),
1188            compiled_code,
1189        };
1190
1191        let response = conn
1192            .send_request_with_timeout(&request, ADMIN_REQUEST_TIMEOUT_SECS)
1193            .await?;
1194
1195        if response.success {
1196            Ok(())
1197        } else {
1198            let error = response.error.unwrap_or(PoolError {
1199                message: "Cache failed".to_string(),
1200                code: None,
1201                status: None,
1202                details: None,
1203            });
1204            Err(PluginError::PluginError(error.message))
1205        }
1206    }
1207
1208    /// Invalidate a cached plugin
1209    pub async fn invalidate_plugin(&self, plugin_id: String) -> Result<(), PluginError> {
1210        if !self.initialized.load(Ordering::Acquire) {
1211            return Ok(());
1212        }
1213
1214        let mut conn = self.connection_pool.acquire().await?;
1215
1216        let request = PoolRequest::Invalidate {
1217            task_id: Uuid::new_v4().to_string(),
1218            plugin_id,
1219        };
1220
1221        let _ = conn
1222            .send_request_with_timeout(&request, ADMIN_REQUEST_TIMEOUT_SECS)
1223            .await?;
1224        Ok(())
1225    }
1226
1227    /// Health check - verify the pool server is responding
1228    /// Collect socket connection statistics
1229    async fn collect_socket_stats(
1230        &self,
1231    ) -> (
1232        Option<usize>,
1233        Option<usize>,
1234        Option<usize>,
1235        Option<usize>,
1236        Option<usize>,
1237    ) {
1238        // Collect shared socket stats
1239        let (shared_available, shared_active, shared_executions) = match get_shared_socket_service()
1240        {
1241            Ok(service) => {
1242                let available = service.available_connection_slots();
1243                let active = service.active_connection_count();
1244                let executions = service.registered_executions_count().await;
1245                (Some(available), Some(active), Some(executions))
1246            }
1247            Err(_) => (None, None, None),
1248        };
1249
1250        // Collect connection pool stats (for pool server connections)
1251        let pool_available = self.connection_pool.semaphore.available_permits();
1252        let pool_max = get_config().pool_max_connections;
1253        let pool_active = pool_max.saturating_sub(pool_available);
1254
1255        (
1256            shared_available,
1257            shared_active,
1258            shared_executions,
1259            Some(pool_available),
1260            Some(pool_active),
1261        )
1262    }
1263
1264    pub async fn health_check(&self) -> Result<HealthStatus, PluginError> {
1265        let circuit_info = || {
1266            let state = match self.circuit_breaker.state() {
1267                CircuitState::Closed => "closed",
1268                CircuitState::HalfOpen => "half_open",
1269                CircuitState::Open => "open",
1270            };
1271            (
1272                Some(state.to_string()),
1273                Some(self.circuit_breaker.avg_response_time()),
1274                Some(self.recovery_mode.load(Ordering::Relaxed)),
1275                Some(self.recovery_allowance.load(Ordering::Relaxed)),
1276            )
1277        };
1278
1279        let socket_stats = self.collect_socket_stats().await;
1280
1281        if !self.initialized.load(Ordering::Acquire) {
1282            let (circuit_state, avg_rt, recovering, recovery_pct) = circuit_info();
1283            let (shared_available, shared_active, shared_executions, pool_available, pool_active) =
1284                socket_stats;
1285            return Ok(HealthStatus {
1286                healthy: false,
1287                status: "not_initialized".to_string(),
1288                uptime_ms: None,
1289                memory: None,
1290                pool_completed: None,
1291                pool_queued: None,
1292                success_rate: None,
1293                circuit_state,
1294                avg_response_time_ms: avg_rt,
1295                recovering,
1296                recovery_percent: recovery_pct,
1297                shared_socket_available_slots: shared_available,
1298                shared_socket_active_connections: shared_active,
1299                shared_socket_registered_executions: shared_executions,
1300                connection_pool_available_slots: pool_available,
1301                connection_pool_active_connections: pool_active,
1302            });
1303        }
1304
1305        if !std::path::Path::new(&self.socket_path).exists() {
1306            let (circuit_state, avg_rt, recovering, recovery_pct) = circuit_info();
1307            let (shared_available, shared_active, shared_executions, pool_available, pool_active) =
1308                socket_stats;
1309            return Ok(HealthStatus {
1310                healthy: false,
1311                status: "socket_missing".to_string(),
1312                uptime_ms: None,
1313                memory: None,
1314                pool_completed: None,
1315                pool_queued: None,
1316                success_rate: None,
1317                circuit_state,
1318                avg_response_time_ms: avg_rt,
1319                recovering,
1320                recovery_percent: recovery_pct,
1321                shared_socket_available_slots: shared_available,
1322                shared_socket_active_connections: shared_active,
1323                shared_socket_registered_executions: shared_executions,
1324                connection_pool_available_slots: pool_available,
1325                connection_pool_active_connections: pool_active,
1326            });
1327        }
1328
1329        let mut conn =
1330            match tokio::time::timeout(Duration::from_millis(100), self.connection_pool.acquire())
1331                .await
1332            {
1333                Ok(Ok(c)) => c,
1334                Ok(Err(e)) => {
1335                    let err_str = e.to_string();
1336                    let is_pool_exhausted =
1337                        err_str.contains("semaphore") || err_str.contains("Connection refused");
1338
1339                    // Try to check process status without blocking on lock
1340                    let process_status = match self.process.try_lock() {
1341                        Ok(guard) => {
1342                            if let Some(child) = guard.as_ref() {
1343                                format!("process_pid_{}", child.id().unwrap_or(0))
1344                            } else {
1345                                "no_process".to_string()
1346                            }
1347                        }
1348                        Err(_) => "process_lock_busy".to_string(),
1349                    };
1350
1351                    let (circuit_state, avg_rt, recovering, recovery_pct) = circuit_info();
1352                    let (
1353                        shared_available,
1354                        shared_active,
1355                        shared_executions,
1356                        pool_available,
1357                        pool_active,
1358                    ) = socket_stats;
1359                    return Ok(HealthStatus {
1360                        healthy: is_pool_exhausted,
1361                        status: if is_pool_exhausted {
1362                            format!("pool_exhausted: {e} ({process_status})")
1363                        } else {
1364                            format!("connection_failed: {e} ({process_status})")
1365                        },
1366                        uptime_ms: None,
1367                        memory: None,
1368                        pool_completed: None,
1369                        pool_queued: None,
1370                        success_rate: None,
1371                        circuit_state,
1372                        avg_response_time_ms: avg_rt,
1373                        recovering,
1374                        recovery_percent: recovery_pct,
1375                        shared_socket_available_slots: shared_available,
1376                        shared_socket_active_connections: shared_active,
1377                        shared_socket_registered_executions: shared_executions,
1378                        connection_pool_available_slots: pool_available,
1379                        connection_pool_active_connections: pool_active,
1380                    });
1381                }
1382                Err(_) => {
1383                    let (circuit_state, avg_rt, recovering, recovery_pct) = circuit_info();
1384                    let (
1385                        shared_available,
1386                        shared_active,
1387                        shared_executions,
1388                        pool_available,
1389                        pool_active,
1390                    ) = socket_stats;
1391                    return Ok(HealthStatus {
1392                        healthy: true,
1393                        status: "pool_busy".to_string(),
1394                        uptime_ms: None,
1395                        memory: None,
1396                        pool_completed: None,
1397                        pool_queued: None,
1398                        success_rate: None,
1399                        circuit_state,
1400                        avg_response_time_ms: avg_rt,
1401                        recovering,
1402                        recovery_percent: recovery_pct,
1403                        shared_socket_available_slots: shared_available,
1404                        shared_socket_active_connections: shared_active,
1405                        shared_socket_registered_executions: shared_executions,
1406                        connection_pool_available_slots: pool_available,
1407                        connection_pool_active_connections: pool_active,
1408                    });
1409                }
1410            };
1411
1412        let request = PoolRequest::Health {
1413            task_id: Uuid::new_v4().to_string(),
1414        };
1415
1416        let (circuit_state, avg_rt, recovering, recovery_pct) = circuit_info();
1417
1418        match conn.send_request_with_timeout(&request, 5).await {
1419            Ok(response) => {
1420                if response.success {
1421                    let result = response.result.unwrap_or_default();
1422                    // Use extracted parsing function for testability
1423                    let parsed = Self::parse_health_result(&result);
1424
1425                    {
1426                        let (
1427                            shared_available,
1428                            shared_active,
1429                            shared_executions,
1430                            pool_available,
1431                            pool_active,
1432                        ) = socket_stats;
1433                        Ok(HealthStatus {
1434                            healthy: true,
1435                            status: parsed.status,
1436                            uptime_ms: parsed.uptime_ms,
1437                            memory: parsed.memory,
1438                            pool_completed: parsed.pool_completed,
1439                            pool_queued: parsed.pool_queued,
1440                            success_rate: parsed.success_rate,
1441                            circuit_state,
1442                            avg_response_time_ms: avg_rt,
1443                            recovering,
1444                            recovery_percent: recovery_pct,
1445                            shared_socket_available_slots: shared_available,
1446                            shared_socket_active_connections: shared_active,
1447                            shared_socket_registered_executions: shared_executions,
1448                            connection_pool_available_slots: pool_available,
1449                            connection_pool_active_connections: pool_active,
1450                        })
1451                    }
1452                } else {
1453                    let (
1454                        shared_available,
1455                        shared_active,
1456                        shared_executions,
1457                        pool_available,
1458                        pool_active,
1459                    ) = socket_stats;
1460                    Ok(HealthStatus {
1461                        healthy: false,
1462                        status: response
1463                            .error
1464                            .map(|e| e.message)
1465                            .unwrap_or_else(|| "unknown_error".to_string()),
1466                        uptime_ms: None,
1467                        memory: None,
1468                        pool_completed: None,
1469                        pool_queued: None,
1470                        success_rate: None,
1471                        circuit_state,
1472                        avg_response_time_ms: avg_rt,
1473                        recovering,
1474                        recovery_percent: recovery_pct,
1475                        shared_socket_available_slots: shared_available,
1476                        shared_socket_active_connections: shared_active,
1477                        shared_socket_registered_executions: shared_executions,
1478                        connection_pool_available_slots: pool_available,
1479                        connection_pool_active_connections: pool_active,
1480                    })
1481                }
1482            }
1483            Err(e) => {
1484                let (
1485                    shared_available,
1486                    shared_active,
1487                    shared_executions,
1488                    pool_available,
1489                    pool_active,
1490                ) = socket_stats;
1491                Ok(HealthStatus {
1492                    healthy: false,
1493                    status: format!("request_failed: {e}"),
1494                    uptime_ms: None,
1495                    memory: None,
1496                    pool_completed: None,
1497                    pool_queued: None,
1498                    success_rate: None,
1499                    circuit_state,
1500                    avg_response_time_ms: avg_rt,
1501                    recovering,
1502                    recovery_percent: recovery_pct,
1503                    shared_socket_available_slots: shared_available,
1504                    shared_socket_active_connections: shared_active,
1505                    shared_socket_registered_executions: shared_executions,
1506                    connection_pool_available_slots: pool_available,
1507                    connection_pool_active_connections: pool_active,
1508                })
1509            }
1510        }
1511    }
1512
1513    /// Check health and restart if unhealthy
1514    pub async fn ensure_healthy(&self) -> Result<bool, PluginError> {
1515        let health = self.health_check().await?;
1516
1517        if health.healthy {
1518            return Ok(true);
1519        }
1520
1521        match self.restart_lock.try_lock() {
1522            Ok(_guard) => {
1523                let health_recheck = self.health_check().await?;
1524                if health_recheck.healthy {
1525                    return Ok(true);
1526                }
1527
1528                tracing::warn!(status = %health.status, "Pool server unhealthy, attempting restart");
1529                self.restart_internal().await?;
1530            }
1531            Err(_) => {
1532                tracing::debug!("Waiting for another task to complete pool server restart");
1533                let _guard = self.restart_lock.lock().await;
1534            }
1535        }
1536
1537        let health_after = self.health_check().await?;
1538        Ok(health_after.healthy)
1539    }
1540
1541    /// Force restart the pool server (public API - acquires lock)
1542    pub async fn restart(&self) -> Result<(), PluginError> {
1543        let _guard = self.restart_lock.lock().await;
1544        self.restart_internal().await
1545    }
1546
1547    /// Internal restart without lock (must be called with restart_lock held)
1548    async fn restart_internal(&self) -> Result<(), PluginError> {
1549        tracing::info!("Restarting plugin pool server");
1550
1551        {
1552            let mut process_guard = self.process.lock().await;
1553            if let Some(mut child) = process_guard.take() {
1554                let _ = child.kill().await;
1555                tokio::time::sleep(Duration::from_millis(100)).await;
1556            }
1557        }
1558
1559        Self::cleanup_socket_file(&self.socket_path).await;
1560
1561        self.initialized.store(false, Ordering::Release);
1562
1563        let mut process_guard = self.process.lock().await;
1564        if process_guard.is_some() {
1565            return Ok(());
1566        }
1567
1568        let child = Self::spawn_pool_server_process(&self.socket_path, "restart").await?;
1569        *process_guard = Some(child);
1570
1571        self.initialized.store(true, Ordering::Release);
1572
1573        self.recovery_allowance.store(10, Ordering::Relaxed);
1574        self.recovery_mode.store(true, Ordering::Relaxed);
1575
1576        self.circuit_breaker.force_close();
1577
1578        let now = std::time::SystemTime::now()
1579            .duration_since(std::time::UNIX_EPOCH)
1580            .unwrap_or_default()
1581            .as_millis() as u64;
1582        self.last_restart_time_ms.store(now, Ordering::Relaxed);
1583
1584        tracing::info!("Recovery mode enabled - requests will gradually increase from 10%");
1585
1586        Ok(())
1587    }
1588
1589    /// Get current circuit breaker state for monitoring
1590    pub fn circuit_state(&self) -> CircuitState {
1591        self.circuit_breaker.state()
1592    }
1593
1594    /// Get average response time in ms (for monitoring)
1595    pub fn avg_response_time_ms(&self) -> u32 {
1596        self.circuit_breaker.avg_response_time()
1597    }
1598
1599    /// Check if currently in recovery mode
1600    pub fn is_recovering(&self) -> bool {
1601        self.recovery_mode.load(Ordering::Relaxed)
1602    }
1603
1604    /// Get current recovery allowance percentage (0-100)
1605    pub fn recovery_allowance_percent(&self) -> u32 {
1606        self.recovery_allowance.load(Ordering::Relaxed)
1607    }
1608
1609    /// Shutdown the pool server gracefully
1610    pub async fn shutdown(&self) -> Result<(), PluginError> {
1611        if !self.initialized.load(Ordering::Acquire) {
1612            return Ok(());
1613        }
1614
1615        tracing::info!("Initiating graceful shutdown of plugin pool server");
1616
1617        self.shutdown_signal.notify_waiters();
1618
1619        let shutdown_timeout = std::time::Duration::from_secs(35);
1620        let shutdown_result = self.send_shutdown_request(shutdown_timeout).await;
1621
1622        match &shutdown_result {
1623            Ok(response) => {
1624                tracing::info!(
1625                    response = ?response,
1626                    "Pool server acknowledged shutdown, waiting for graceful exit"
1627                );
1628            }
1629            Err(e) => {
1630                tracing::warn!(
1631                    error = %e,
1632                    "Failed to send shutdown request, will force kill"
1633                );
1634            }
1635        }
1636
1637        let mut process_guard = self.process.lock().await;
1638        if let Some(ref mut child) = *process_guard {
1639            let graceful_wait = std::time::Duration::from_secs(35);
1640            let start = std::time::Instant::now();
1641
1642            loop {
1643                match child.try_wait() {
1644                    Ok(Some(status)) => {
1645                        tracing::info!(
1646                            exit_status = ?status,
1647                            elapsed_ms = start.elapsed().as_millis(),
1648                            "Pool server exited gracefully"
1649                        );
1650                        break;
1651                    }
1652                    Ok(None) => {
1653                        if start.elapsed() >= graceful_wait {
1654                            tracing::warn!(
1655                                "Pool server did not exit within graceful timeout, force killing"
1656                            );
1657                            let _ = child.kill().await;
1658                            break;
1659                        }
1660                        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1661                    }
1662                    Err(e) => {
1663                        tracing::warn!(error = %e, "Error checking pool server status");
1664                        let _ = child.kill().await;
1665                        break;
1666                    }
1667                }
1668            }
1669        }
1670        *process_guard = None;
1671
1672        let _ = std::fs::remove_file(&self.socket_path);
1673
1674        self.initialized.store(false, Ordering::Release);
1675        tracing::info!("Plugin pool server shutdown complete");
1676        Ok(())
1677    }
1678
1679    /// Send shutdown request to the pool server
1680    async fn send_shutdown_request(
1681        &self,
1682        timeout: std::time::Duration,
1683    ) -> Result<PoolResponse, PluginError> {
1684        let request = PoolRequest::Shutdown {
1685            task_id: Uuid::new_v4().to_string(),
1686        };
1687
1688        // Use the pool's connection ID counter to ensure unique IDs
1689        // even for shutdown connections that bypass the pool
1690        let connection_id = self.connection_pool.next_connection_id();
1691        let mut conn = match PoolConnection::new(&self.socket_path, connection_id).await {
1692            Ok(c) => c,
1693            Err(e) => {
1694                return Err(PluginError::PluginExecutionError(format!(
1695                    "Failed to connect for shutdown: {e}"
1696                )));
1697            }
1698        };
1699
1700        conn.send_request_with_timeout(&request, timeout.as_secs())
1701            .await
1702    }
1703}
1704
1705impl Drop for PoolManager {
1706    fn drop(&mut self) {
1707        let _ = std::fs::remove_file(&self.socket_path);
1708    }
1709}
1710
1711/// Global pool manager instance
1712static POOL_MANAGER: std::sync::OnceLock<Arc<PoolManager>> = std::sync::OnceLock::new();
1713
1714/// Get or create the global pool manager
1715pub fn get_pool_manager() -> Arc<PoolManager> {
1716    POOL_MANAGER
1717        .get_or_init(|| Arc::new(PoolManager::new()))
1718        .clone()
1719}
1720
1721#[cfg(test)]
1722mod tests {
1723    use super::*;
1724    use crate::services::plugins::script_executor::LogLevel;
1725
1726    #[test]
1727    fn test_is_dead_server_error_detects_dead_server() {
1728        let err = PluginError::PluginExecutionError("Connection refused".to_string());
1729        assert!(PoolManager::is_dead_server_error(&err));
1730
1731        let err = PluginError::PluginExecutionError("Broken pipe".to_string());
1732        assert!(PoolManager::is_dead_server_error(&err));
1733    }
1734
1735    #[test]
1736    fn test_is_dead_server_error_excludes_plugin_timeouts() {
1737        let err = PluginError::PluginExecutionError("Plugin timed out after 30s".to_string());
1738        assert!(!PoolManager::is_dead_server_error(&err));
1739
1740        let err = PluginError::PluginExecutionError("Handler timed out".to_string());
1741        assert!(!PoolManager::is_dead_server_error(&err));
1742    }
1743
1744    #[test]
1745    fn test_is_dead_server_error_normal_errors() {
1746        let err =
1747            PluginError::PluginExecutionError("TypeError: undefined is not a function".to_string());
1748        assert!(!PoolManager::is_dead_server_error(&err));
1749
1750        let err = PluginError::PluginExecutionError("Plugin returned invalid JSON".to_string());
1751        assert!(!PoolManager::is_dead_server_error(&err));
1752    }
1753
1754    #[test]
1755    fn test_is_dead_server_error_detects_all_dead_server_indicators() {
1756        // Test common DeadServerIndicator patterns
1757        let dead_server_errors = vec![
1758            "EOF while parsing JSON response",
1759            "Broken pipe when writing to socket",
1760            "Connection refused: server not running",
1761            "Connection reset by peer",
1762            "Socket not connected",
1763            "Failed to connect to pool server",
1764            "Socket file missing: /tmp/test.sock",
1765            "No such file or directory",
1766        ];
1767
1768        for error_msg in dead_server_errors {
1769            let err = PluginError::PluginExecutionError(error_msg.to_string());
1770            assert!(
1771                PoolManager::is_dead_server_error(&err),
1772                "Expected '{error_msg}' to be detected as dead server error"
1773            );
1774        }
1775    }
1776
1777    #[test]
1778    fn test_dead_server_indicator_patterns() {
1779        // Test the DeadServerIndicator pattern matching directly
1780        use super::super::health::DeadServerIndicator;
1781
1782        // These should all match
1783        assert!(DeadServerIndicator::from_error_str("eof while parsing").is_some());
1784        assert!(DeadServerIndicator::from_error_str("broken pipe").is_some());
1785        assert!(DeadServerIndicator::from_error_str("connection refused").is_some());
1786        assert!(DeadServerIndicator::from_error_str("connection reset").is_some());
1787        assert!(DeadServerIndicator::from_error_str("not connected").is_some());
1788        assert!(DeadServerIndicator::from_error_str("failed to connect").is_some());
1789        assert!(DeadServerIndicator::from_error_str("socket file missing").is_some());
1790        assert!(DeadServerIndicator::from_error_str("no such file").is_some());
1791        assert!(DeadServerIndicator::from_error_str("connection timed out").is_some());
1792        assert!(DeadServerIndicator::from_error_str("connect timed out").is_some());
1793
1794        // These should NOT match
1795        assert!(DeadServerIndicator::from_error_str("handler timed out").is_none());
1796        assert!(DeadServerIndicator::from_error_str("validation error").is_none());
1797        assert!(DeadServerIndicator::from_error_str("TypeError: undefined").is_none());
1798    }
1799
1800    #[test]
1801    fn test_is_dead_server_error_detects_connection_timeout_in_plugin_error() {
1802        // "connection timed out" inside PluginExecutionError is an infrastructure failure
1803        // (Rust couldn't connect to the pool server), not a plugin execution timeout.
1804        let plugin_timeout =
1805            PluginError::PluginExecutionError("plugin connection timed out".to_string());
1806        assert!(PoolManager::is_dead_server_error(&plugin_timeout));
1807    }
1808
1809    #[test]
1810    fn test_is_dead_server_error_case_insensitive() {
1811        // Test case insensitivity
1812        let err = PluginError::PluginExecutionError("CONNECTION REFUSED".to_string());
1813        assert!(PoolManager::is_dead_server_error(&err));
1814
1815        let err = PluginError::PluginExecutionError("BROKEN PIPE".to_string());
1816        assert!(PoolManager::is_dead_server_error(&err));
1817
1818        let err = PluginError::PluginExecutionError("Connection Reset By Peer".to_string());
1819        assert!(PoolManager::is_dead_server_error(&err));
1820    }
1821
1822    #[test]
1823    fn test_is_dead_server_error_handler_timeout_variations() {
1824        // All variations of plugin/handler timeouts should NOT trigger restart
1825        let timeout_errors = vec![
1826            "Handler timed out",
1827            "handler timed out after 30000ms",
1828            "Plugin handler timed out",
1829            "plugin timed out",
1830            "Plugin execution timed out after 60s",
1831        ];
1832
1833        for error_msg in timeout_errors {
1834            let err = PluginError::PluginExecutionError(error_msg.to_string());
1835            assert!(
1836                !PoolManager::is_dead_server_error(&err),
1837                "Expected '{error_msg}' to NOT be detected as dead server error"
1838            );
1839        }
1840    }
1841
1842    #[test]
1843    fn test_is_dead_server_error_business_errors_not_detected() {
1844        // Business logic errors should not trigger restart
1845        let business_errors = vec![
1846            "ReferenceError: x is not defined",
1847            "SyntaxError: Unexpected token",
1848            "TypeError: Cannot read property 'foo' of undefined",
1849            "Plugin returned status 400: Bad Request",
1850            "Validation error: missing required field",
1851            "Authorization failed",
1852            "Rate limit exceeded",
1853            "Plugin threw an error: Invalid input",
1854        ];
1855
1856        for error_msg in business_errors {
1857            let err = PluginError::PluginExecutionError(error_msg.to_string());
1858            assert!(
1859                !PoolManager::is_dead_server_error(&err),
1860                "Expected '{error_msg}' to NOT be detected as dead server error"
1861            );
1862        }
1863    }
1864
1865    #[test]
1866    fn test_is_dead_server_error_with_handler_error_type() {
1867        // HandlerError means Node.js responded with a structured error —
1868        // the pool server is alive, so this is never a dead-server indicator,
1869        // even if the message contains patterns like "Connection refused"
1870        // (which would be from the plugin's own code, not infrastructure).
1871        let handler_payload = PluginHandlerPayload {
1872            message: "Connection refused".to_string(),
1873            status: 500,
1874            code: None,
1875            details: None,
1876            logs: None,
1877            traces: None,
1878        };
1879        let err = PluginError::HandlerError(Box::new(handler_payload));
1880        assert!(!PoolManager::is_dead_server_error(&err));
1881    }
1882
1883    // ============================================
1884    // Heap calculation tests
1885    // ============================================
1886
1887    #[test]
1888    fn test_heap_calculation_base_case() {
1889        // With default concurrency, should get base heap
1890        let base = PoolManager::BASE_HEAP_MB;
1891        let divisor = PoolManager::CONCURRENCY_DIVISOR;
1892        let increment = PoolManager::HEAP_INCREMENT_PER_DIVISOR_MB;
1893
1894        // For 100 concurrent requests:
1895        // 512 + (100 / 10) * 32 = 512 + 320 = 832 MB
1896        let concurrency = 100;
1897        let expected = base + ((concurrency / divisor) * increment);
1898        assert_eq!(expected, 832);
1899    }
1900
1901    #[test]
1902    fn test_heap_calculation_minimum() {
1903        // With very low concurrency, should still get base heap
1904        let base = PoolManager::BASE_HEAP_MB;
1905        let divisor = PoolManager::CONCURRENCY_DIVISOR;
1906        let increment = PoolManager::HEAP_INCREMENT_PER_DIVISOR_MB;
1907
1908        // For 5 concurrent requests:
1909        // 512 + (5 / 10) * 32 = 512 + 0 = 512 MB (integer division)
1910        let concurrency = 5;
1911        let expected = base + ((concurrency / divisor) * increment);
1912        assert_eq!(expected, 512);
1913    }
1914
1915    #[test]
1916    fn test_heap_calculation_high_concurrency() {
1917        // With high concurrency, should scale appropriately
1918        let base = PoolManager::BASE_HEAP_MB;
1919        let divisor = PoolManager::CONCURRENCY_DIVISOR;
1920        let increment = PoolManager::HEAP_INCREMENT_PER_DIVISOR_MB;
1921
1922        // For 500 concurrent requests:
1923        // 512 + (500 / 10) * 32 = 512 + 1600 = 2112 MB
1924        let concurrency = 500;
1925        let expected = base + ((concurrency / divisor) * increment);
1926        assert_eq!(expected, 2112);
1927    }
1928
1929    #[test]
1930    fn test_heap_calculation_max_cap() {
1931        // Verify max heap cap is respected
1932        let max_heap = PoolManager::MAX_HEAP_MB;
1933        assert_eq!(max_heap, 8192);
1934
1935        // For extreme concurrency that would exceed cap:
1936        // e.g., 3000 concurrent: 512 + (3000 / 10) * 32 = 512 + 9600 = 10112 MB
1937        // Should be capped to 8192 MB
1938        let base = PoolManager::BASE_HEAP_MB;
1939        let divisor = PoolManager::CONCURRENCY_DIVISOR;
1940        let increment = PoolManager::HEAP_INCREMENT_PER_DIVISOR_MB;
1941
1942        let concurrency = 3000;
1943        let calculated = base + ((concurrency / divisor) * increment);
1944        let capped = calculated.min(max_heap);
1945
1946        assert_eq!(calculated, 10112);
1947        assert_eq!(capped, 8192);
1948    }
1949
1950    // ============================================
1951    // Constants verification tests
1952    // ============================================
1953
1954    #[test]
1955    fn test_pool_manager_constants() {
1956        // Verify important constants have reasonable values
1957        assert_eq!(PoolManager::BASE_HEAP_MB, 512);
1958        assert_eq!(PoolManager::CONCURRENCY_DIVISOR, 10);
1959        assert_eq!(PoolManager::HEAP_INCREMENT_PER_DIVISOR_MB, 32);
1960        assert_eq!(PoolManager::MAX_HEAP_MB, 8192);
1961    }
1962
1963    // ============================================
1964    // Extracted function tests: calculate_heap_size
1965    // ============================================
1966
1967    #[test]
1968    fn test_calculate_heap_size_low_concurrency() {
1969        // Low concurrency should give base heap
1970        assert_eq!(PoolManager::calculate_heap_size(5), 512);
1971        assert_eq!(PoolManager::calculate_heap_size(9), 512);
1972    }
1973
1974    #[test]
1975    fn test_calculate_heap_size_medium_concurrency() {
1976        // 10 concurrent: 512 + (10/10)*32 = 544
1977        assert_eq!(PoolManager::calculate_heap_size(10), 544);
1978        // 50 concurrent: 512 + (50/10)*32 = 672
1979        assert_eq!(PoolManager::calculate_heap_size(50), 672);
1980        // 100 concurrent: 512 + (100/10)*32 = 832
1981        assert_eq!(PoolManager::calculate_heap_size(100), 832);
1982    }
1983
1984    #[test]
1985    fn test_calculate_heap_size_high_concurrency() {
1986        // 500 concurrent: 512 + (500/10)*32 = 2112
1987        assert_eq!(PoolManager::calculate_heap_size(500), 2112);
1988        // 1000 concurrent: 512 + (1000/10)*32 = 3712
1989        assert_eq!(PoolManager::calculate_heap_size(1000), 3712);
1990    }
1991
1992    #[test]
1993    fn test_calculate_heap_size_capped_at_max() {
1994        // 3000 concurrent would be 10112, but capped at 8192
1995        assert_eq!(PoolManager::calculate_heap_size(3000), 8192);
1996        // Even higher should still be capped
1997        assert_eq!(PoolManager::calculate_heap_size(10000), 8192);
1998    }
1999
2000    #[test]
2001    fn test_calculate_heap_size_zero_concurrency() {
2002        // Zero concurrency gives base heap
2003        assert_eq!(PoolManager::calculate_heap_size(0), 512);
2004    }
2005
2006    // ============================================
2007    // Extracted function tests: format_return_value
2008    // ============================================
2009
2010    #[test]
2011    fn test_format_return_value_none() {
2012        assert_eq!(PoolManager::format_return_value(None), "");
2013    }
2014
2015    #[test]
2016    fn test_format_return_value_string() {
2017        let value = Some(serde_json::json!("hello world"));
2018        assert_eq!(PoolManager::format_return_value(value), "hello world");
2019    }
2020
2021    #[test]
2022    fn test_format_return_value_empty_string() {
2023        let value = Some(serde_json::json!(""));
2024        assert_eq!(PoolManager::format_return_value(value), "");
2025    }
2026
2027    #[test]
2028    fn test_format_return_value_object() {
2029        let value = Some(serde_json::json!({"key": "value", "num": 42}));
2030        let result = PoolManager::format_return_value(value);
2031        // JSON object gets serialized
2032        assert!(result.contains("key"));
2033        assert!(result.contains("value"));
2034        assert!(result.contains("42"));
2035    }
2036
2037    #[test]
2038    fn test_format_return_value_array() {
2039        let value = Some(serde_json::json!([1, 2, 3]));
2040        assert_eq!(PoolManager::format_return_value(value), "[1,2,3]");
2041    }
2042
2043    #[test]
2044    fn test_format_return_value_number() {
2045        let value = Some(serde_json::json!(42));
2046        assert_eq!(PoolManager::format_return_value(value), "42");
2047    }
2048
2049    #[test]
2050    fn test_format_return_value_boolean() {
2051        assert_eq!(
2052            PoolManager::format_return_value(Some(serde_json::json!(true))),
2053            "true"
2054        );
2055        assert_eq!(
2056            PoolManager::format_return_value(Some(serde_json::json!(false))),
2057            "false"
2058        );
2059    }
2060
2061    #[test]
2062    fn test_format_return_value_null() {
2063        let value = Some(serde_json::json!(null));
2064        assert_eq!(PoolManager::format_return_value(value), "null");
2065    }
2066
2067    // ============================================
2068    // Extracted function tests: parse_pool_response
2069    // ============================================
2070
2071    #[test]
2072    fn test_parse_pool_response_success_with_string_result() {
2073        use super::super::protocol::{PoolLogEntry, PoolResponse};
2074
2075        let response = PoolResponse {
2076            task_id: "test-123".to_string(),
2077            success: true,
2078            result: Some(serde_json::json!("success result")),
2079            error: None,
2080            logs: Some(vec![PoolLogEntry {
2081                level: "info".to_string(),
2082                message: "test log".to_string(),
2083            }]),
2084        };
2085
2086        let result = PoolManager::parse_pool_response(response).unwrap();
2087        assert_eq!(result.return_value, "success result");
2088        assert!(result.error.is_empty());
2089        assert_eq!(result.logs.len(), 1);
2090        assert_eq!(result.logs[0].level, LogLevel::Info);
2091        assert_eq!(result.logs[0].message, "test log");
2092    }
2093
2094    #[test]
2095    fn test_parse_pool_response_success_with_object_result() {
2096        use super::super::protocol::PoolResponse;
2097
2098        let response = PoolResponse {
2099            task_id: "test-456".to_string(),
2100            success: true,
2101            result: Some(serde_json::json!({"data": "value"})),
2102            error: None,
2103            logs: None,
2104        };
2105
2106        let result = PoolManager::parse_pool_response(response).unwrap();
2107        assert!(result.return_value.contains("data"));
2108        assert!(result.return_value.contains("value"));
2109        assert!(result.logs.is_empty());
2110    }
2111
2112    #[test]
2113    fn test_parse_pool_response_success_no_result() {
2114        use super::super::protocol::PoolResponse;
2115
2116        let response = PoolResponse {
2117            task_id: "test-789".to_string(),
2118            success: true,
2119            result: None,
2120            error: None,
2121            logs: None,
2122        };
2123
2124        let result = PoolManager::parse_pool_response(response).unwrap();
2125        assert_eq!(result.return_value, "");
2126        assert!(result.error.is_empty());
2127    }
2128
2129    #[test]
2130    fn test_parse_pool_response_failure_with_error() {
2131        use super::super::protocol::{PoolError, PoolResponse};
2132
2133        let response = PoolResponse {
2134            task_id: "test-error".to_string(),
2135            success: false,
2136            result: None,
2137            error: Some(PoolError {
2138                message: "Something went wrong".to_string(),
2139                code: Some("ERR_001".to_string()),
2140                status: Some(400),
2141                details: Some(serde_json::json!({"field": "name"})),
2142            }),
2143            logs: None,
2144        };
2145
2146        let err = PoolManager::parse_pool_response(response).unwrap_err();
2147        match err {
2148            PluginError::HandlerError(payload) => {
2149                assert_eq!(payload.message, "Something went wrong");
2150                assert_eq!(payload.status, 400);
2151                assert_eq!(payload.code, Some("ERR_001".to_string()));
2152            }
2153            _ => panic!("Expected HandlerError"),
2154        }
2155    }
2156
2157    #[test]
2158    fn test_parse_pool_response_failure_no_error_details() {
2159        use super::super::protocol::PoolResponse;
2160
2161        let response = PoolResponse {
2162            task_id: "test-unknown".to_string(),
2163            success: false,
2164            result: None,
2165            error: None,
2166            logs: None,
2167        };
2168
2169        let err = PoolManager::parse_pool_response(response).unwrap_err();
2170        match err {
2171            PluginError::HandlerError(payload) => {
2172                assert_eq!(payload.message, "Unknown error");
2173                assert_eq!(payload.status, 500);
2174            }
2175            _ => panic!("Expected HandlerError"),
2176        }
2177    }
2178
2179    #[test]
2180    fn test_parse_pool_response_failure_preserves_logs() {
2181        use super::super::protocol::{PoolError, PoolLogEntry, PoolResponse};
2182
2183        let response = PoolResponse {
2184            task_id: "test-logs".to_string(),
2185            success: false,
2186            result: None,
2187            error: Some(PoolError {
2188                message: "Error with logs".to_string(),
2189                code: None,
2190                status: None,
2191                details: None,
2192            }),
2193            logs: Some(vec![
2194                PoolLogEntry {
2195                    level: "debug".to_string(),
2196                    message: "debug message".to_string(),
2197                },
2198                PoolLogEntry {
2199                    level: "error".to_string(),
2200                    message: "error message".to_string(),
2201                },
2202            ]),
2203        };
2204
2205        let err = PoolManager::parse_pool_response(response).unwrap_err();
2206        match err {
2207            PluginError::HandlerError(payload) => {
2208                let logs = payload.logs.unwrap();
2209                assert_eq!(logs.len(), 2);
2210                assert_eq!(logs[0].level, LogLevel::Debug);
2211                assert_eq!(logs[1].level, LogLevel::Error);
2212            }
2213            _ => panic!("Expected HandlerError"),
2214        }
2215    }
2216
2217    // ============================================
2218    // Extracted function tests: parse_success_response
2219    // ============================================
2220
2221    #[test]
2222    fn test_parse_success_response_complete() {
2223        use super::super::protocol::{PoolLogEntry, PoolResponse};
2224
2225        let response = PoolResponse {
2226            task_id: "task-1".to_string(),
2227            success: true,
2228            result: Some(serde_json::json!("completed")),
2229            error: None,
2230            logs: Some(vec![
2231                PoolLogEntry {
2232                    level: "log".to_string(),
2233                    message: "starting".to_string(),
2234                },
2235                PoolLogEntry {
2236                    level: "result".to_string(),
2237                    message: "finished".to_string(),
2238                },
2239            ]),
2240        };
2241
2242        let result = PoolManager::parse_success_response(response);
2243        assert_eq!(result.return_value, "completed");
2244        assert!(result.error.is_empty());
2245        assert_eq!(result.logs.len(), 2);
2246        assert_eq!(result.logs[0].level, LogLevel::Log);
2247        assert_eq!(result.logs[1].level, LogLevel::Result);
2248    }
2249
2250    // ============================================
2251    // Extracted function tests: parse_error_response
2252    // ============================================
2253
2254    #[test]
2255    fn test_parse_error_response_with_all_fields() {
2256        use super::super::protocol::{PoolError, PoolLogEntry, PoolResponse};
2257
2258        let response = PoolResponse {
2259            task_id: "err-task".to_string(),
2260            success: false,
2261            result: None,
2262            error: Some(PoolError {
2263                message: "Validation failed".to_string(),
2264                code: Some("VALIDATION_ERROR".to_string()),
2265                status: Some(422),
2266                details: Some(serde_json::json!({"fields": ["email"]})),
2267            }),
2268            logs: Some(vec![PoolLogEntry {
2269                level: "warn".to_string(),
2270                message: "validation warning".to_string(),
2271            }]),
2272        };
2273
2274        let err = PoolManager::parse_error_response(response);
2275        match err {
2276            PluginError::HandlerError(payload) => {
2277                assert_eq!(payload.message, "Validation failed");
2278                assert_eq!(payload.status, 422);
2279                assert_eq!(payload.code, Some("VALIDATION_ERROR".to_string()));
2280                assert!(payload.details.is_some());
2281                let logs = payload.logs.unwrap();
2282                assert_eq!(logs.len(), 1);
2283                assert_eq!(logs[0].level, LogLevel::Warn);
2284            }
2285            _ => panic!("Expected HandlerError"),
2286        }
2287    }
2288
2289    // ============================================
2290    // Extracted function tests: parse_health_result
2291    // ============================================
2292
2293    #[test]
2294    fn test_parse_health_result_complete() {
2295        let json = serde_json::json!({
2296            "status": "healthy",
2297            "uptime": 123456,
2298            "memory": {
2299                "heapUsed": 50000000,
2300                "heapTotal": 100000000
2301            },
2302            "pool": {
2303                "completed": 1000,
2304                "queued": 5
2305            },
2306            "execution": {
2307                "successRate": 0.99
2308            }
2309        });
2310
2311        let result = PoolManager::parse_health_result(&json);
2312
2313        assert_eq!(result.status, "healthy");
2314        assert_eq!(result.uptime_ms, Some(123456));
2315        assert_eq!(result.memory, Some(50000000));
2316        assert_eq!(result.pool_completed, Some(1000));
2317        assert_eq!(result.pool_queued, Some(5));
2318        assert!((result.success_rate.unwrap() - 0.99).abs() < 0.001);
2319    }
2320
2321    #[test]
2322    fn test_parse_health_result_minimal() {
2323        let json = serde_json::json!({});
2324
2325        let result = PoolManager::parse_health_result(&json);
2326
2327        assert_eq!(result.status, "unknown");
2328        assert_eq!(result.uptime_ms, None);
2329        assert_eq!(result.memory, None);
2330        assert_eq!(result.pool_completed, None);
2331        assert_eq!(result.pool_queued, None);
2332        assert_eq!(result.success_rate, None);
2333    }
2334
2335    #[test]
2336    fn test_parse_health_result_partial() {
2337        let json = serde_json::json!({
2338            "status": "degraded",
2339            "uptime": 5000,
2340            "memory": {
2341                "heapTotal": 100000000
2342                // heapUsed missing
2343            }
2344        });
2345
2346        let result = PoolManager::parse_health_result(&json);
2347
2348        assert_eq!(result.status, "degraded");
2349        assert_eq!(result.uptime_ms, Some(5000));
2350        assert_eq!(result.memory, None); // heapUsed was missing
2351        assert_eq!(result.pool_completed, None);
2352        assert_eq!(result.pool_queued, None);
2353        assert_eq!(result.success_rate, None);
2354    }
2355
2356    #[test]
2357    fn test_parse_health_result_wrong_types() {
2358        let json = serde_json::json!({
2359            "status": 123,  // Should be string, will use "unknown"
2360            "uptime": "not a number",  // Should be u64, will be None
2361            "memory": "invalid"  // Should be object, will give None
2362        });
2363
2364        let result = PoolManager::parse_health_result(&json);
2365
2366        assert_eq!(result.status, "unknown"); // Falls back when not a string
2367        assert_eq!(result.uptime_ms, None);
2368        assert_eq!(result.memory, None);
2369        assert_eq!(result.pool_completed, None);
2370        assert_eq!(result.pool_queued, None);
2371        assert_eq!(result.success_rate, None);
2372    }
2373
2374    #[test]
2375    fn test_parse_health_result_nested_values() {
2376        let json = serde_json::json!({
2377            "pool": {
2378                "completed": 0,
2379                "queued": 0
2380            },
2381            "execution": {
2382                "successRate": 1.0
2383            }
2384        });
2385
2386        let result = PoolManager::parse_health_result(&json);
2387
2388        assert_eq!(result.status, "unknown");
2389        assert_eq!(result.uptime_ms, None);
2390        assert_eq!(result.memory, None);
2391        assert_eq!(result.pool_completed, Some(0));
2392        assert_eq!(result.pool_queued, Some(0));
2393        assert!((result.success_rate.unwrap() - 1.0).abs() < 0.001);
2394    }
2395
2396    // ============================================
2397    // PoolManager creation tests
2398    // ============================================
2399
2400    #[tokio::test]
2401    async fn test_pool_manager_new_creates_unique_socket_path() {
2402        // Two PoolManagers should have different socket paths
2403        let manager1 = PoolManager::new();
2404        let manager2 = PoolManager::new();
2405
2406        assert_ne!(manager1.socket_path, manager2.socket_path);
2407        assert!(manager1
2408            .socket_path
2409            .starts_with("/tmp/relayer-plugin-pool-"));
2410        assert!(manager2
2411            .socket_path
2412            .starts_with("/tmp/relayer-plugin-pool-"));
2413    }
2414
2415    #[tokio::test]
2416    async fn test_pool_manager_with_custom_socket_path() {
2417        let custom_path = "/tmp/custom-test-pool.sock".to_string();
2418        let manager = PoolManager::with_socket_path(custom_path.clone());
2419
2420        assert_eq!(manager.socket_path, custom_path);
2421    }
2422
2423    #[tokio::test]
2424    async fn test_pool_manager_default_trait() {
2425        // Verify Default trait creates a valid manager
2426        let manager = PoolManager::default();
2427        assert!(manager.socket_path.starts_with("/tmp/relayer-plugin-pool-"));
2428    }
2429
2430    // ============================================
2431    // Circuit breaker state tests
2432    // ============================================
2433
2434    #[tokio::test]
2435    async fn test_circuit_state_initial() {
2436        let manager = PoolManager::new();
2437
2438        // Initial state should be Closed
2439        assert_eq!(manager.circuit_state(), CircuitState::Closed);
2440    }
2441
2442    #[tokio::test]
2443    async fn test_avg_response_time_initial() {
2444        let manager = PoolManager::new();
2445
2446        // Initial response time should be 0
2447        assert_eq!(manager.avg_response_time_ms(), 0);
2448    }
2449
2450    // ============================================
2451    // Recovery mode tests
2452    // ============================================
2453
2454    #[tokio::test]
2455    async fn test_recovery_mode_initial() {
2456        let manager = PoolManager::new();
2457
2458        // Should not be in recovery mode initially
2459        assert!(!manager.is_recovering());
2460        assert_eq!(manager.recovery_allowance_percent(), 0);
2461    }
2462
2463    // ============================================
2464    // ScriptResult construction tests
2465    // ============================================
2466
2467    #[test]
2468    fn test_script_result_success_construction() {
2469        let result = ScriptResult {
2470            logs: vec![LogEntry {
2471                level: LogLevel::Info,
2472                message: "Test log".to_string(),
2473            }],
2474            error: String::new(),
2475            return_value: r#"{"success": true}"#.to_string(),
2476            trace: vec![],
2477        };
2478
2479        assert!(result.error.is_empty());
2480        assert_eq!(result.logs.len(), 1);
2481        assert_eq!(result.logs[0].level, LogLevel::Info);
2482    }
2483
2484    #[test]
2485    fn test_script_result_with_multiple_logs() {
2486        let result = ScriptResult {
2487            logs: vec![
2488                LogEntry {
2489                    level: LogLevel::Log,
2490                    message: "Starting execution".to_string(),
2491                },
2492                LogEntry {
2493                    level: LogLevel::Debug,
2494                    message: "Processing data".to_string(),
2495                },
2496                LogEntry {
2497                    level: LogLevel::Warn,
2498                    message: "Deprecated API used".to_string(),
2499                },
2500                LogEntry {
2501                    level: LogLevel::Error,
2502                    message: "Non-fatal error".to_string(),
2503                },
2504            ],
2505            error: String::new(),
2506            return_value: "done".to_string(),
2507            trace: vec![],
2508        };
2509
2510        assert_eq!(result.logs.len(), 4);
2511        assert_eq!(result.logs[0].level, LogLevel::Log);
2512        assert_eq!(result.logs[1].level, LogLevel::Debug);
2513        assert_eq!(result.logs[2].level, LogLevel::Warn);
2514        assert_eq!(result.logs[3].level, LogLevel::Error);
2515    }
2516
2517    // ============================================
2518    // QueuedRequest structure tests
2519    // ============================================
2520
2521    #[test]
2522    fn test_queued_request_required_fields() {
2523        let (tx, _rx) = oneshot::channel();
2524
2525        let request = QueuedRequest {
2526            plugin_id: "test-plugin".to_string(),
2527            compiled_code: Some("module.exports.handler = () => {}".to_string()),
2528            plugin_path: None,
2529            params: serde_json::json!({"key": "value"}),
2530            headers: None,
2531            socket_path: "/tmp/test.sock".to_string(),
2532            http_request_id: Some("req-123".to_string()),
2533            timeout_secs: Some(30),
2534            route: Some("/api/test".to_string()),
2535            config: Some(serde_json::json!({"setting": true})),
2536            method: Some("POST".to_string()),
2537            query: Some(serde_json::json!({"page": "1"})),
2538            response_tx: tx,
2539        };
2540
2541        assert_eq!(request.plugin_id, "test-plugin");
2542        assert!(request.compiled_code.is_some());
2543        assert!(request.plugin_path.is_none());
2544        assert_eq!(request.timeout_secs, Some(30));
2545    }
2546
2547    #[test]
2548    fn test_queued_request_minimal() {
2549        let (tx, _rx) = oneshot::channel();
2550
2551        let request = QueuedRequest {
2552            plugin_id: "minimal".to_string(),
2553            compiled_code: None,
2554            plugin_path: Some("/path/to/plugin.ts".to_string()),
2555            params: serde_json::json!(null),
2556            headers: None,
2557            socket_path: "/tmp/min.sock".to_string(),
2558            http_request_id: None,
2559            timeout_secs: None,
2560            route: None,
2561            config: None,
2562            method: None,
2563            query: None,
2564            response_tx: tx,
2565        };
2566
2567        assert_eq!(request.plugin_id, "minimal");
2568        assert!(request.compiled_code.is_none());
2569        assert!(request.plugin_path.is_some());
2570    }
2571
2572    // ============================================
2573    // Error type tests
2574    // ============================================
2575
2576    #[test]
2577    fn test_plugin_error_socket_error() {
2578        let err = PluginError::SocketError("Connection failed".to_string());
2579        let display = format!("{err}");
2580        assert!(display.contains("Socket error"));
2581        assert!(display.contains("Connection failed"));
2582    }
2583
2584    #[test]
2585    fn test_plugin_error_plugin_execution_error() {
2586        let err = PluginError::PluginExecutionError("Execution failed".to_string());
2587        let display = format!("{err}");
2588        assert!(display.contains("Execution failed"));
2589    }
2590
2591    #[test]
2592    fn test_plugin_error_handler_error() {
2593        let payload = PluginHandlerPayload {
2594            message: "Handler error".to_string(),
2595            status: 400,
2596            code: Some("BAD_REQUEST".to_string()),
2597            details: Some(serde_json::json!({"field": "name"})),
2598            logs: None,
2599            traces: None,
2600        };
2601        let err = PluginError::HandlerError(Box::new(payload));
2602
2603        // Check that it can be displayed
2604        let display = format!("{err:?}");
2605        assert!(display.contains("HandlerError"));
2606    }
2607
2608    // ============================================
2609    // Handler payload tests
2610    // ============================================
2611
2612    #[test]
2613    fn test_plugin_handler_payload_full() {
2614        let payload = PluginHandlerPayload {
2615            message: "Validation failed".to_string(),
2616            status: 422,
2617            code: Some("VALIDATION_ERROR".to_string()),
2618            details: Some(serde_json::json!({
2619                "errors": [
2620                    {"field": "email", "message": "Invalid format"}
2621                ]
2622            })),
2623            logs: Some(vec![LogEntry {
2624                level: LogLevel::Error,
2625                message: "Validation failed for email".to_string(),
2626            }]),
2627            traces: Some(vec![serde_json::json!({"stack": "Error at line 10"})]),
2628        };
2629
2630        assert_eq!(payload.status, 422);
2631        assert_eq!(payload.code, Some("VALIDATION_ERROR".to_string()));
2632        assert!(payload.logs.is_some());
2633        assert!(payload.traces.is_some());
2634    }
2635
2636    #[test]
2637    fn test_plugin_handler_payload_minimal() {
2638        let payload = PluginHandlerPayload {
2639            message: "Error".to_string(),
2640            status: 500,
2641            code: None,
2642            details: None,
2643            logs: None,
2644            traces: None,
2645        };
2646
2647        assert_eq!(payload.status, 500);
2648        assert!(payload.code.is_none());
2649        assert!(payload.details.is_none());
2650    }
2651
2652    // ============================================
2653    // Async tests (tokio runtime)
2654    // ============================================
2655
2656    #[tokio::test]
2657    async fn test_pool_manager_not_initialized_health_check() {
2658        let manager = PoolManager::with_socket_path("/tmp/test-health.sock".to_string());
2659
2660        // Health check on uninitialized manager should return not_initialized
2661        let health = manager.health_check().await.unwrap();
2662
2663        assert!(!health.healthy);
2664        assert_eq!(health.status, "not_initialized");
2665        assert!(health.uptime_ms.is_none());
2666        assert!(health.memory.is_none());
2667    }
2668
2669    #[tokio::test]
2670    async fn test_pool_manager_circuit_info_in_health_status() {
2671        let manager = PoolManager::with_socket_path("/tmp/test-circuit.sock".to_string());
2672
2673        let health = manager.health_check().await.unwrap();
2674
2675        // Circuit state info should be present even when not initialized
2676        assert!(health.circuit_state.is_some());
2677        assert_eq!(health.circuit_state, Some("closed".to_string()));
2678        assert!(health.avg_response_time_ms.is_some());
2679        assert!(health.recovering.is_some());
2680        assert!(health.recovery_percent.is_some());
2681    }
2682
2683    #[tokio::test]
2684    async fn test_invalidate_plugin_when_not_initialized() {
2685        let manager = PoolManager::with_socket_path("/tmp/test-invalidate.sock".to_string());
2686
2687        // Invalidating when not initialized should be a no-op
2688        let result = manager.invalidate_plugin("test-plugin".to_string()).await;
2689
2690        // Should succeed (no-op)
2691        assert!(result.is_ok());
2692    }
2693
2694    #[tokio::test]
2695    async fn test_shutdown_when_not_initialized() {
2696        let manager = PoolManager::with_socket_path("/tmp/test-shutdown.sock".to_string());
2697
2698        // Shutdown when not initialized should be a no-op
2699        let result = manager.shutdown().await;
2700
2701        // Should succeed (no-op)
2702        assert!(result.is_ok());
2703    }
2704
2705    // ============================================
2706    // Additional ParsedHealthResult tests
2707    // ============================================
2708
2709    #[test]
2710    fn test_parsed_health_result_default() {
2711        let result = ParsedHealthResult::default();
2712        assert_eq!(result.status, "");
2713        assert_eq!(result.uptime_ms, None);
2714        assert_eq!(result.memory, None);
2715        assert_eq!(result.pool_completed, None);
2716        assert_eq!(result.pool_queued, None);
2717        assert_eq!(result.success_rate, None);
2718    }
2719
2720    #[test]
2721    fn test_parsed_health_result_equality() {
2722        let result1 = ParsedHealthResult {
2723            status: "ok".to_string(),
2724            uptime_ms: Some(1000),
2725            memory: Some(500000),
2726            pool_completed: Some(50),
2727            pool_queued: Some(2),
2728            success_rate: Some(1.0),
2729        };
2730        let result2 = ParsedHealthResult {
2731            status: "ok".to_string(),
2732            uptime_ms: Some(1000),
2733            memory: Some(500000),
2734            pool_completed: Some(50),
2735            pool_queued: Some(2),
2736            success_rate: Some(1.0),
2737        };
2738        assert_eq!(result1, result2);
2739    }
2740
2741    #[test]
2742    fn test_format_return_value_nested_object() {
2743        let value = Some(serde_json::json!({
2744            "user": { "name": "John", "age": 30 }
2745        }));
2746        let result = PoolManager::format_return_value(value);
2747        assert!(result.contains("John"));
2748        assert!(result.contains("30"));
2749    }
2750
2751    #[test]
2752    fn test_format_return_value_empty_collections() {
2753        let value = Some(serde_json::json!({}));
2754        assert_eq!(PoolManager::format_return_value(value), "{}");
2755        let value = Some(serde_json::json!([]));
2756        assert_eq!(PoolManager::format_return_value(value), "[]");
2757    }
2758
2759    #[test]
2760    fn test_parse_health_result_zero_values() {
2761        let json = serde_json::json!({
2762            "status": "starting",
2763            "uptime": 0,
2764            "memory": { "heapUsed": 0 },
2765            "pool": { "completed": 0, "queued": 0 },
2766            "execution": { "successRate": 0.0 }
2767        });
2768        let result = PoolManager::parse_health_result(&json);
2769        assert_eq!(result.status, "starting");
2770        assert_eq!(result.uptime_ms, Some(0));
2771        assert_eq!(result.memory, Some(0));
2772        assert_eq!(result.pool_completed, Some(0));
2773        assert_eq!(result.pool_queued, Some(0));
2774        assert_eq!(result.success_rate, Some(0.0));
2775    }
2776
2777    #[test]
2778    fn test_calculate_heap_size_precise_calculations() {
2779        assert_eq!(PoolManager::calculate_heap_size(0), 512);
2780        assert_eq!(PoolManager::calculate_heap_size(1), 512);
2781        assert_eq!(PoolManager::calculate_heap_size(10), 544);
2782        assert_eq!(PoolManager::calculate_heap_size(20), 576);
2783        assert_eq!(PoolManager::calculate_heap_size(100), 832);
2784        assert_eq!(PoolManager::calculate_heap_size(200), 1152);
2785    }
2786
2787    #[tokio::test]
2788    async fn test_pool_manager_health_check_flag_initial() {
2789        let manager = PoolManager::new();
2790        assert!(!manager.health_check_needed.load(Ordering::Relaxed));
2791    }
2792
2793    #[tokio::test]
2794    async fn test_pool_manager_consecutive_failures_initial() {
2795        let manager = PoolManager::new();
2796        assert_eq!(manager.consecutive_failures.load(Ordering::Relaxed), 0);
2797    }
2798
2799    #[tokio::test]
2800    async fn test_recovery_allowance_bounds() {
2801        let manager = PoolManager::new();
2802        manager.recovery_allowance.store(0, Ordering::Relaxed);
2803        assert_eq!(manager.recovery_allowance_percent(), 0);
2804        manager.recovery_allowance.store(50, Ordering::Relaxed);
2805        assert_eq!(manager.recovery_allowance_percent(), 50);
2806        manager.recovery_allowance.store(100, Ordering::Relaxed);
2807        assert_eq!(manager.recovery_allowance_percent(), 100);
2808    }
2809
2810    #[tokio::test]
2811    async fn test_is_initialized_changes_with_state() {
2812        let manager = PoolManager::with_socket_path("/tmp/init-test-123.sock".to_string());
2813        assert!(!manager.is_initialized().await);
2814        manager.initialized.store(true, Ordering::Release);
2815        assert!(manager.is_initialized().await);
2816        manager.initialized.store(false, Ordering::Release);
2817        assert!(!manager.is_initialized().await);
2818    }
2819
2820    // ============================================
2821    // Additional edge case tests for coverage
2822    // ============================================
2823
2824    #[test]
2825    fn test_is_dead_server_error_with_script_timeout() {
2826        // ScriptTimeout should NOT be a dead server error
2827        let err = PluginError::ScriptTimeout(30);
2828        assert!(!PoolManager::is_dead_server_error(&err));
2829    }
2830
2831    #[test]
2832    fn test_is_dead_server_error_with_plugin_error() {
2833        let err = PluginError::PluginError("some plugin error".to_string());
2834        assert!(!PoolManager::is_dead_server_error(&err));
2835    }
2836
2837    #[test]
2838    fn test_is_dead_server_error_with_connection_timeout_in_plugin_error() {
2839        // "connection timed out" is an infrastructure failure regardless of which
2840        // PluginError variant wraps it — the pool server is unreachable.
2841        let err = PluginError::PluginExecutionError("connection timed out".to_string());
2842        assert!(PoolManager::is_dead_server_error(&err));
2843
2844        let err = PluginError::SocketError("connect timed out".to_string());
2845        assert!(PoolManager::is_dead_server_error(&err));
2846    }
2847
2848    #[test]
2849    fn test_is_dead_server_error_with_request_timeout_socket_error() {
2850        let err = PluginError::SocketError("Request timed out after 304 seconds".to_string());
2851        assert!(PoolManager::is_dead_server_error(&err));
2852    }
2853
2854    #[test]
2855    fn test_parse_pool_response_success_with_logs_various_levels() {
2856        use super::super::protocol::{PoolLogEntry, PoolResponse};
2857
2858        let response = PoolResponse {
2859            task_id: "test-levels".to_string(),
2860            success: true,
2861            result: Some(serde_json::json!("ok")),
2862            error: None,
2863            logs: Some(vec![
2864                PoolLogEntry {
2865                    level: "log".to_string(),
2866                    message: "log level".to_string(),
2867                },
2868                PoolLogEntry {
2869                    level: "debug".to_string(),
2870                    message: "debug level".to_string(),
2871                },
2872                PoolLogEntry {
2873                    level: "info".to_string(),
2874                    message: "info level".to_string(),
2875                },
2876                PoolLogEntry {
2877                    level: "warn".to_string(),
2878                    message: "warn level".to_string(),
2879                },
2880                PoolLogEntry {
2881                    level: "error".to_string(),
2882                    message: "error level".to_string(),
2883                },
2884                PoolLogEntry {
2885                    level: "result".to_string(),
2886                    message: "result level".to_string(),
2887                },
2888            ]),
2889        };
2890
2891        let result = PoolManager::parse_pool_response(response).unwrap();
2892        assert_eq!(result.logs.len(), 6);
2893        assert_eq!(result.logs[0].level, LogLevel::Log);
2894        assert_eq!(result.logs[1].level, LogLevel::Debug);
2895        assert_eq!(result.logs[2].level, LogLevel::Info);
2896        assert_eq!(result.logs[3].level, LogLevel::Warn);
2897        assert_eq!(result.logs[4].level, LogLevel::Error);
2898        assert_eq!(result.logs[5].level, LogLevel::Result);
2899    }
2900
2901    #[test]
2902    fn test_parse_error_response_defaults() {
2903        use super::super::protocol::PoolResponse;
2904
2905        // Response with no error field at all
2906        let response = PoolResponse {
2907            task_id: "no-error".to_string(),
2908            success: false,
2909            result: None,
2910            error: None,
2911            logs: None,
2912        };
2913
2914        let err = PoolManager::parse_error_response(response);
2915        match err {
2916            PluginError::HandlerError(payload) => {
2917                assert_eq!(payload.message, "Unknown error");
2918                assert_eq!(payload.status, 500);
2919                assert!(payload.code.is_none());
2920                assert!(payload.details.is_none());
2921            }
2922            _ => panic!("Expected HandlerError"),
2923        }
2924    }
2925
2926    #[test]
2927    fn test_format_return_value_float() {
2928        let value = Some(serde_json::json!(3.14159));
2929        let result = PoolManager::format_return_value(value);
2930        assert!(result.contains("3.14159"));
2931    }
2932
2933    #[test]
2934    fn test_format_return_value_large_array() {
2935        let value = Some(serde_json::json!([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]));
2936        let result = PoolManager::format_return_value(value);
2937        assert_eq!(result, "[1,2,3,4,5,6,7,8,9,10]");
2938    }
2939
2940    #[test]
2941    fn test_format_return_value_string_with_special_chars() {
2942        let value = Some(serde_json::json!("hello\nworld\ttab"));
2943        assert_eq!(PoolManager::format_return_value(value), "hello\nworld\ttab");
2944    }
2945
2946    #[test]
2947    fn test_format_return_value_unicode() {
2948        let value = Some(serde_json::json!("こんにちは世界 🌍"));
2949        assert_eq!(PoolManager::format_return_value(value), "こんにちは世界 🌍");
2950    }
2951
2952    #[test]
2953    fn test_parse_health_result_large_values() {
2954        let json = serde_json::json!({
2955            "status": "healthy",
2956            "uptime": 999999999999_u64,
2957            "memory": { "heapUsed": 9999999999_u64 },
2958            "pool": { "completed": 999999999_u64, "queued": 999999_u64 },
2959            "execution": { "successRate": 0.999999 }
2960        });
2961
2962        let result = PoolManager::parse_health_result(&json);
2963        assert_eq!(result.status, "healthy");
2964        assert_eq!(result.uptime_ms, Some(999999999999));
2965        assert_eq!(result.memory, Some(9999999999));
2966        assert_eq!(result.pool_completed, Some(999999999));
2967        assert_eq!(result.pool_queued, Some(999999));
2968        assert!((result.success_rate.unwrap() - 0.999999).abs() < 0.0000001);
2969    }
2970
2971    #[test]
2972    fn test_parse_health_result_negative_values_treated_as_none() {
2973        // JSON doesn't have unsigned, so negative values won't parse as u64
2974        let json = serde_json::json!({
2975            "status": "error",
2976            "uptime": -1,
2977            "memory": { "heapUsed": -100 }
2978        });
2979
2980        let result = PoolManager::parse_health_result(&json);
2981        assert_eq!(result.status, "error");
2982        assert_eq!(result.uptime_ms, None); // -1 can't be u64
2983        assert_eq!(result.memory, None);
2984    }
2985
2986    #[test]
2987    fn test_parsed_health_result_debug() {
2988        let result = ParsedHealthResult {
2989            status: "test".to_string(),
2990            uptime_ms: Some(100),
2991            memory: Some(200),
2992            pool_completed: Some(50),
2993            pool_queued: Some(5),
2994            success_rate: Some(0.95),
2995        };
2996
2997        let debug_str = format!("{result:?}");
2998        assert!(debug_str.contains("test"));
2999        assert!(debug_str.contains("100"));
3000        assert!(debug_str.contains("200"));
3001    }
3002
3003    #[test]
3004    fn test_calculate_heap_size_boundary_values() {
3005        // Test at exact boundaries
3006        // 9 should give base (9/10 = 0)
3007        assert_eq!(PoolManager::calculate_heap_size(9), 512);
3008        // 10 should give base + 32 (10/10 = 1)
3009        assert_eq!(PoolManager::calculate_heap_size(10), 544);
3010
3011        // Test boundary where cap kicks in
3012        // 2400 would be: 512 + (240 * 32) = 512 + 7680 = 8192 (at cap)
3013        assert_eq!(PoolManager::calculate_heap_size(2400), 8192);
3014        // 2399 would be: 512 + (239 * 32) = 512 + 7648 = 8160 (under cap)
3015        assert_eq!(PoolManager::calculate_heap_size(2399), 8160);
3016    }
3017
3018    #[tokio::test]
3019    async fn test_pool_manager_socket_path_format() {
3020        let manager = PoolManager::new();
3021        // Should contain UUID format
3022        assert!(manager.socket_path.starts_with("/tmp/relayer-plugin-pool-"));
3023        assert!(manager.socket_path.ends_with(".sock"));
3024        // UUID is 36 chars (32 hex + 4 dashes)
3025        let uuid_part = manager
3026            .socket_path
3027            .strip_prefix("/tmp/relayer-plugin-pool-")
3028            .unwrap()
3029            .strip_suffix(".sock")
3030            .unwrap();
3031        assert_eq!(uuid_part.len(), 36);
3032    }
3033
3034    #[tokio::test]
3035    async fn test_health_check_socket_missing() {
3036        let manager =
3037            PoolManager::with_socket_path("/tmp/nonexistent-socket-12345.sock".to_string());
3038        // Mark as initialized but socket doesn't exist
3039        manager.initialized.store(true, Ordering::Release);
3040
3041        let health = manager.health_check().await.unwrap();
3042        assert!(!health.healthy);
3043        assert_eq!(health.status, "socket_missing");
3044    }
3045
3046    #[test]
3047    fn test_is_dead_server_error_embedded_patterns() {
3048        // Patterns embedded in longer messages
3049        let err = PluginError::PluginExecutionError(
3050            "Error: ECONNREFUSED connection refused at 127.0.0.1:3000".to_string(),
3051        );
3052        assert!(PoolManager::is_dead_server_error(&err));
3053
3054        let err = PluginError::PluginExecutionError(
3055            "SocketError: broken pipe while writing to /tmp/socket".to_string(),
3056        );
3057        assert!(PoolManager::is_dead_server_error(&err));
3058
3059        let err = PluginError::PluginExecutionError(
3060            "IO Error: No such file or directory (os error 2)".to_string(),
3061        );
3062        assert!(PoolManager::is_dead_server_error(&err));
3063    }
3064
3065    #[test]
3066    fn test_is_dead_server_error_mixed_case_timeout_patterns() {
3067        // Handler timeout variants - none should be dead server errors
3068        let variants = vec![
3069            "HANDLER TIMED OUT",
3070            "Handler Timed Out after 30s",
3071            "the handler timed out waiting for response",
3072        ];
3073
3074        for msg in variants {
3075            let err = PluginError::PluginExecutionError(msg.to_string());
3076            assert!(
3077                !PoolManager::is_dead_server_error(&err),
3078                "Expected '{msg}' to NOT be dead server error"
3079            );
3080        }
3081    }
3082
3083    #[tokio::test]
3084    async fn test_ensure_started_idempotent() {
3085        let manager = PoolManager::with_socket_path("/tmp/idempotent-test-999.sock".to_string());
3086
3087        // First call when not initialized
3088        assert!(!manager.is_initialized().await);
3089
3090        // Manually set initialized without actually starting (for test)
3091        manager.initialized.store(true, Ordering::Release);
3092
3093        // ensure_started should return immediately
3094        let result = manager.ensure_started().await;
3095        assert!(result.is_ok());
3096        assert!(manager.is_initialized().await);
3097    }
3098
3099    #[test]
3100    fn test_queued_request_with_headers() {
3101        let (tx, _rx) = oneshot::channel();
3102
3103        let mut headers = HashMap::new();
3104        headers.insert(
3105            "Authorization".to_string(),
3106            vec!["Bearer token".to_string()],
3107        );
3108        headers.insert(
3109            "Content-Type".to_string(),
3110            vec!["application/json".to_string()],
3111        );
3112
3113        let request = QueuedRequest {
3114            plugin_id: "headers-test".to_string(),
3115            compiled_code: None,
3116            plugin_path: Some("/path/to/plugin.ts".to_string()),
3117            params: serde_json::json!({}),
3118            headers: Some(headers),
3119            socket_path: "/tmp/test.sock".to_string(),
3120            http_request_id: None,
3121            timeout_secs: None,
3122            route: None,
3123            config: None,
3124            method: None,
3125            query: None,
3126            response_tx: tx,
3127        };
3128
3129        assert!(request.headers.is_some());
3130        let headers = request.headers.unwrap();
3131        assert!(headers.contains_key("Authorization"));
3132        assert!(headers.contains_key("Content-Type"));
3133    }
3134
3135    #[test]
3136    fn test_plugin_error_display_formats() {
3137        // Test all PluginError variants have proper Display implementations
3138        let err = PluginError::SocketError("test socket error".to_string());
3139        assert!(format!("{err}").contains("Socket error"));
3140
3141        let err = PluginError::PluginExecutionError("test execution error".to_string());
3142        assert!(format!("{err}").contains("test execution error"));
3143
3144        let err = PluginError::ScriptTimeout(60);
3145        assert!(format!("{err}").contains("60"));
3146
3147        let err = PluginError::PluginError("test plugin error".to_string());
3148        assert!(format!("{err}").contains("test plugin error"));
3149    }
3150
3151    #[test]
3152    fn test_pool_log_entry_to_log_entry_conversion() {
3153        use super::super::protocol::PoolLogEntry;
3154
3155        // Test the From<PoolLogEntry> for LogEntry conversion
3156        let pool_log = PoolLogEntry {
3157            level: "info".to_string(),
3158            message: "test message".to_string(),
3159        };
3160
3161        let log_entry: LogEntry = pool_log.into();
3162        assert_eq!(log_entry.level, LogLevel::Info);
3163        assert_eq!(log_entry.message, "test message");
3164
3165        // Test unknown level defaults
3166        let pool_log = PoolLogEntry {
3167            level: "unknown_level".to_string(),
3168            message: "unknown level message".to_string(),
3169        };
3170
3171        let log_entry: LogEntry = pool_log.into();
3172        assert_eq!(log_entry.level, LogLevel::Log); // Should default to Log
3173    }
3174
3175    #[tokio::test]
3176    async fn test_circuit_breaker_records_success() {
3177        let manager = PoolManager::new();
3178
3179        // Record some successes
3180        manager.circuit_breaker.record_success(100);
3181        manager.circuit_breaker.record_success(150);
3182        manager.circuit_breaker.record_success(200);
3183
3184        // Average should be calculated
3185        let avg = manager.avg_response_time_ms();
3186        assert!(avg > 0);
3187    }
3188
3189    #[tokio::test]
3190    async fn test_circuit_breaker_state_transitions() {
3191        let manager = PoolManager::new();
3192
3193        // Initial state is Closed
3194        assert_eq!(manager.circuit_state(), CircuitState::Closed);
3195
3196        // Record many failures to potentially trip the breaker
3197        for _ in 0..20 {
3198            manager.circuit_breaker.record_failure();
3199        }
3200
3201        // State might have changed (depends on thresholds)
3202        let state = manager.circuit_state();
3203        assert!(matches!(
3204            state,
3205            CircuitState::Closed | CircuitState::HalfOpen | CircuitState::Open
3206        ));
3207    }
3208
3209    #[tokio::test]
3210    async fn test_recovery_mode_activation() {
3211        let manager = PoolManager::new();
3212
3213        // Manually activate recovery mode
3214        manager.recovery_allowance.store(10, Ordering::Relaxed);
3215        manager.recovery_mode.store(true, Ordering::Relaxed);
3216
3217        assert!(manager.is_recovering());
3218        assert_eq!(manager.recovery_allowance_percent(), 10);
3219
3220        // Increase allowance
3221        manager.recovery_allowance.store(50, Ordering::Relaxed);
3222        assert_eq!(manager.recovery_allowance_percent(), 50);
3223
3224        // Exit recovery mode
3225        manager.recovery_mode.store(false, Ordering::Relaxed);
3226        assert!(!manager.is_recovering());
3227    }
3228
3229    #[test]
3230    fn test_parse_pool_response_with_empty_logs() {
3231        use super::super::protocol::PoolResponse;
3232
3233        let response = PoolResponse {
3234            task_id: "empty-logs".to_string(),
3235            success: true,
3236            result: Some(serde_json::json!("done")),
3237            error: None,
3238            logs: Some(vec![]), // Empty logs array
3239        };
3240
3241        let result = PoolManager::parse_pool_response(response).unwrap();
3242        assert!(result.logs.is_empty());
3243        assert_eq!(result.return_value, "done");
3244    }
3245
3246    #[test]
3247    fn test_handler_payload_with_complex_details() {
3248        let payload = PluginHandlerPayload {
3249            message: "Complex error".to_string(),
3250            status: 400,
3251            code: Some("VALIDATION_ERROR".to_string()),
3252            details: Some(serde_json::json!({
3253                "errors": [
3254                    {"field": "email", "code": "invalid", "message": "Invalid email format"},
3255                    {"field": "password", "code": "weak", "message": "Password too weak"}
3256                ],
3257                "metadata": {
3258                    "requestId": "req-123",
3259                    "timestamp": "2024-01-01T00:00:00Z"
3260                }
3261            })),
3262            logs: None,
3263            traces: None,
3264        };
3265
3266        assert_eq!(payload.status, 400);
3267        let details = payload.details.unwrap();
3268        assert!(details.get("errors").is_some());
3269        assert!(details.get("metadata").is_some());
3270    }
3271
3272    #[test]
3273    fn test_health_status_construction_healthy() {
3274        use super::super::health::HealthStatus;
3275
3276        let status = HealthStatus {
3277            healthy: true,
3278            status: "ok".to_string(),
3279            uptime_ms: Some(1000000),
3280            memory: Some(500000000),
3281            pool_completed: Some(1000),
3282            pool_queued: Some(5),
3283            success_rate: Some(0.99),
3284            circuit_state: Some("closed".to_string()),
3285            avg_response_time_ms: Some(50),
3286            recovering: Some(false),
3287            recovery_percent: Some(100),
3288            shared_socket_available_slots: Some(100),
3289            shared_socket_active_connections: Some(10),
3290            shared_socket_registered_executions: Some(5),
3291            connection_pool_available_slots: Some(50),
3292            connection_pool_active_connections: Some(5),
3293        };
3294
3295        assert!(status.healthy);
3296        assert_eq!(status.status, "ok");
3297        assert_eq!(status.uptime_ms, Some(1000000));
3298        assert_eq!(status.circuit_state, Some("closed".to_string()));
3299    }
3300
3301    #[test]
3302    fn test_health_status_construction_unhealthy() {
3303        use super::super::health::HealthStatus;
3304
3305        let status = HealthStatus {
3306            healthy: false,
3307            status: "connection_failed".to_string(),
3308            uptime_ms: None,
3309            memory: None,
3310            pool_completed: None,
3311            pool_queued: None,
3312            success_rate: None,
3313            circuit_state: Some("open".to_string()),
3314            avg_response_time_ms: Some(0),
3315            recovering: Some(true),
3316            recovery_percent: Some(10),
3317            shared_socket_available_slots: None,
3318            shared_socket_active_connections: None,
3319            shared_socket_registered_executions: None,
3320            connection_pool_available_slots: None,
3321            connection_pool_active_connections: None,
3322        };
3323
3324        assert!(!status.healthy);
3325        assert_eq!(status.status, "connection_failed");
3326        assert!(status.uptime_ms.is_none());
3327    }
3328
3329    #[test]
3330    fn test_health_status_debug_format() {
3331        use super::super::health::HealthStatus;
3332
3333        let status = HealthStatus {
3334            healthy: true,
3335            status: "test".to_string(),
3336            uptime_ms: Some(100),
3337            memory: None,
3338            pool_completed: None,
3339            pool_queued: None,
3340            success_rate: None,
3341            circuit_state: None,
3342            avg_response_time_ms: None,
3343            recovering: None,
3344            recovery_percent: None,
3345            shared_socket_available_slots: None,
3346            shared_socket_active_connections: None,
3347            shared_socket_registered_executions: None,
3348            connection_pool_available_slots: None,
3349            connection_pool_active_connections: None,
3350        };
3351
3352        let debug_str = format!("{status:?}");
3353        assert!(debug_str.contains("healthy: true"));
3354        assert!(debug_str.contains("test"));
3355    }
3356
3357    #[test]
3358    fn test_health_status_clone() {
3359        use super::super::health::HealthStatus;
3360
3361        let status = HealthStatus {
3362            healthy: true,
3363            status: "original".to_string(),
3364            uptime_ms: Some(500),
3365            memory: Some(100),
3366            pool_completed: Some(10),
3367            pool_queued: Some(1),
3368            success_rate: Some(0.95),
3369            circuit_state: Some("closed".to_string()),
3370            avg_response_time_ms: Some(25),
3371            recovering: Some(false),
3372            recovery_percent: Some(100),
3373            shared_socket_available_slots: Some(50),
3374            shared_socket_active_connections: Some(2),
3375            shared_socket_registered_executions: Some(1),
3376            connection_pool_available_slots: Some(25),
3377            connection_pool_active_connections: Some(1),
3378        };
3379
3380        let cloned = status.clone();
3381        assert_eq!(cloned.healthy, status.healthy);
3382        assert_eq!(cloned.status, status.status);
3383        assert_eq!(cloned.uptime_ms, status.uptime_ms);
3384    }
3385
3386    #[test]
3387    fn test_execute_request_debug() {
3388        use super::super::protocol::ExecuteRequest;
3389
3390        let request = ExecuteRequest {
3391            task_id: "debug-test".to_string(),
3392            plugin_id: "test-plugin".to_string(),
3393            compiled_code: None,
3394            plugin_path: Some("/path/to/plugin.ts".to_string()),
3395            params: serde_json::json!({"test": true}),
3396            headers: None,
3397            socket_path: "/tmp/test.sock".to_string(),
3398            http_request_id: None,
3399            timeout: None,
3400            route: None,
3401            config: None,
3402            method: None,
3403            query: None,
3404        };
3405
3406        let debug_str = format!("{request:?}");
3407        assert!(debug_str.contains("debug-test"));
3408        assert!(debug_str.contains("test-plugin"));
3409    }
3410
3411    #[test]
3412    fn test_pool_error_debug() {
3413        use super::super::protocol::PoolError;
3414
3415        let error = PoolError {
3416            message: "Test error".to_string(),
3417            code: Some("TEST_ERR".to_string()),
3418            status: Some(400),
3419            details: Some(serde_json::json!({"info": "test"})),
3420        };
3421
3422        let debug_str = format!("{error:?}");
3423        assert!(debug_str.contains("Test error"));
3424        assert!(debug_str.contains("TEST_ERR"));
3425    }
3426
3427    #[test]
3428    fn test_pool_response_debug() {
3429        use super::super::protocol::PoolResponse;
3430
3431        let response = PoolResponse {
3432            task_id: "resp-123".to_string(),
3433            success: true,
3434            result: Some(serde_json::json!("result")),
3435            error: None,
3436            logs: None,
3437        };
3438
3439        let debug_str = format!("{response:?}");
3440        assert!(debug_str.contains("resp-123"));
3441        assert!(debug_str.contains("true"));
3442    }
3443
3444    #[test]
3445    fn test_pool_log_entry_debug() {
3446        use super::super::protocol::PoolLogEntry;
3447
3448        let entry = PoolLogEntry {
3449            level: "info".to_string(),
3450            message: "Test message".to_string(),
3451        };
3452
3453        let debug_str = format!("{entry:?}");
3454        assert!(debug_str.contains("info"));
3455        assert!(debug_str.contains("Test message"));
3456    }
3457
3458    #[test]
3459    fn test_circuit_breaker_default_trait() {
3460        use super::super::health::CircuitBreaker;
3461
3462        let cb = CircuitBreaker::default();
3463        assert_eq!(cb.state(), CircuitState::Closed);
3464    }
3465
3466    #[test]
3467    fn test_circuit_breaker_set_state_all_variants() {
3468        use super::super::health::CircuitBreaker;
3469
3470        let cb = CircuitBreaker::new();
3471
3472        // Test setting all states
3473        cb.set_state(CircuitState::HalfOpen);
3474        assert_eq!(cb.state(), CircuitState::HalfOpen);
3475
3476        cb.set_state(CircuitState::Open);
3477        assert_eq!(cb.state(), CircuitState::Open);
3478
3479        cb.set_state(CircuitState::Closed);
3480        assert_eq!(cb.state(), CircuitState::Closed);
3481    }
3482
3483    #[test]
3484    fn test_circuit_breaker_failure_rate_triggers_open() {
3485        use super::super::health::CircuitBreaker;
3486
3487        let cb = CircuitBreaker::new();
3488
3489        // Record enough failures to trigger circuit opening
3490        for _ in 0..100 {
3491            cb.record_failure();
3492        }
3493
3494        assert_eq!(cb.state(), CircuitState::Open);
3495    }
3496
3497    #[test]
3498    fn test_circuit_breaker_low_failure_rate_stays_closed() {
3499        use super::super::health::CircuitBreaker;
3500
3501        let cb = CircuitBreaker::new();
3502
3503        // Record mostly successes with few failures
3504        for _ in 0..90 {
3505            cb.record_success(50);
3506        }
3507        for _ in 0..10 {
3508            cb.record_failure();
3509        }
3510
3511        // Should still be closed (10% failure rate)
3512        assert_eq!(cb.state(), CircuitState::Closed);
3513    }
3514
3515    #[test]
3516    fn test_circuit_breaker_ema_response_time() {
3517        use super::super::health::CircuitBreaker;
3518
3519        let cb = CircuitBreaker::new();
3520
3521        // Record several response times
3522        cb.record_success(100);
3523        let avg1 = cb.avg_response_time();
3524
3525        cb.record_success(100);
3526        cb.record_success(100);
3527        cb.record_success(100);
3528        let avg2 = cb.avg_response_time();
3529
3530        // Average should stabilize around 100
3531        assert!(avg1 > 0);
3532        assert!(avg2 > 0);
3533        assert!(avg2 <= 100);
3534    }
3535
3536    #[test]
3537    fn test_circuit_breaker_force_close_resets_counters() {
3538        use super::super::health::CircuitBreaker;
3539
3540        let cb = CircuitBreaker::new();
3541        cb.set_state(CircuitState::Open);
3542
3543        cb.force_close();
3544
3545        assert_eq!(cb.state(), CircuitState::Closed);
3546    }
3547
3548    #[test]
3549    fn test_process_status_debug() {
3550        use super::super::health::ProcessStatus;
3551
3552        assert_eq!(format!("{:?}", ProcessStatus::Running), "Running");
3553        assert_eq!(format!("{:?}", ProcessStatus::Exited), "Exited");
3554        assert_eq!(format!("{:?}", ProcessStatus::Unknown), "Unknown");
3555        assert_eq!(format!("{:?}", ProcessStatus::NoProcess), "NoProcess");
3556    }
3557
3558    #[test]
3559    fn test_process_status_clone() {
3560        use super::super::health::ProcessStatus;
3561
3562        let status = ProcessStatus::Running;
3563        let cloned = status;
3564        assert_eq!(status, cloned);
3565    }
3566
3567    // ============================================
3568    // Additional coverage tests - DeadServerIndicator
3569    // ============================================
3570
3571    #[test]
3572    fn test_dead_server_indicator_all_variants() {
3573        use super::super::health::DeadServerIndicator;
3574
3575        // Test all enum variants exist and are properly matched
3576        let variants = [
3577            ("eof while parsing", DeadServerIndicator::EofWhileParsing),
3578            ("broken pipe", DeadServerIndicator::BrokenPipe),
3579            ("connection refused", DeadServerIndicator::ConnectionRefused),
3580            ("connection reset", DeadServerIndicator::ConnectionReset),
3581            ("not connected", DeadServerIndicator::NotConnected),
3582            ("failed to connect", DeadServerIndicator::FailedToConnect),
3583            (
3584                "socket file missing",
3585                DeadServerIndicator::SocketFileMissing,
3586            ),
3587            ("no such file", DeadServerIndicator::NoSuchFile),
3588            (
3589                "connection timed out",
3590                DeadServerIndicator::ConnectionTimedOut,
3591            ),
3592            ("connect timed out", DeadServerIndicator::ConnectionTimedOut),
3593        ];
3594
3595        for (pattern, expected) in variants {
3596            let result = DeadServerIndicator::from_error_str(pattern);
3597            assert_eq!(result, Some(expected), "Pattern '{pattern}' should match");
3598        }
3599    }
3600
3601    #[test]
3602    fn test_dead_server_indicator_debug_format() {
3603        use super::super::health::DeadServerIndicator;
3604
3605        let indicator = DeadServerIndicator::BrokenPipe;
3606        let debug_str = format!("{indicator:?}");
3607        assert_eq!(debug_str, "BrokenPipe");
3608    }
3609
3610    #[test]
3611    fn test_dead_server_indicator_clone_copy() {
3612        use super::super::health::DeadServerIndicator;
3613
3614        let indicator = DeadServerIndicator::ConnectionRefused;
3615        let cloned = indicator;
3616        assert_eq!(indicator, cloned);
3617    }
3618
3619    #[test]
3620    fn test_result_ring_buffer_not_enough_data() {
3621        use super::super::health::ResultRingBuffer;
3622
3623        let buffer = ResultRingBuffer::new(100);
3624
3625        // Record less than 10 results
3626        for _ in 0..9 {
3627            buffer.record(false);
3628        }
3629
3630        // Should return 0.0 because not enough data
3631        assert_eq!(buffer.failure_rate(), 0.0);
3632    }
3633
3634    #[test]
3635    fn test_result_ring_buffer_exactly_10_samples() {
3636        use super::super::health::ResultRingBuffer;
3637
3638        let buffer = ResultRingBuffer::new(100);
3639
3640        // Record exactly 10 failures
3641        for _ in 0..10 {
3642            buffer.record(false);
3643        }
3644
3645        // Should return 1.0 (100% failure)
3646        assert_eq!(buffer.failure_rate(), 1.0);
3647    }
3648
3649    #[test]
3650    fn test_result_ring_buffer_wraps_correctly() {
3651        use super::super::health::ResultRingBuffer;
3652
3653        let buffer = ResultRingBuffer::new(10);
3654
3655        // Fill buffer with successes
3656        for _ in 0..10 {
3657            buffer.record(true);
3658        }
3659        assert_eq!(buffer.failure_rate(), 0.0);
3660
3661        // Overwrite with failures
3662        for _ in 0..10 {
3663            buffer.record(false);
3664        }
3665        assert_eq!(buffer.failure_rate(), 1.0);
3666    }
3667
3668    #[test]
3669    fn test_circuit_state_equality_all_pairs() {
3670        assert_eq!(CircuitState::Closed, CircuitState::Closed);
3671        assert_eq!(CircuitState::HalfOpen, CircuitState::HalfOpen);
3672        assert_eq!(CircuitState::Open, CircuitState::Open);
3673
3674        assert_ne!(CircuitState::Closed, CircuitState::HalfOpen);
3675        assert_ne!(CircuitState::Closed, CircuitState::Open);
3676        assert_ne!(CircuitState::HalfOpen, CircuitState::Open);
3677    }
3678
3679    #[test]
3680    fn test_circuit_state_clone_copy() {
3681        let state = CircuitState::HalfOpen;
3682        let copied = state;
3683        assert_eq!(state, copied);
3684    }
3685
3686    #[test]
3687    fn test_parse_pool_response_with_null_values() {
3688        use super::super::protocol::PoolResponse;
3689
3690        let response = PoolResponse {
3691            task_id: "null-test".to_string(),
3692            success: true,
3693            result: Some(serde_json::json!(null)),
3694            error: None,
3695            logs: None,
3696        };
3697
3698        let result = PoolManager::parse_pool_response(response).unwrap();
3699        assert_eq!(result.return_value, "null");
3700    }
3701
3702    #[test]
3703    fn test_parse_pool_response_with_nested_result() {
3704        use super::super::protocol::PoolResponse;
3705
3706        let response = PoolResponse {
3707            task_id: "nested-test".to_string(),
3708            success: true,
3709            result: Some(serde_json::json!({
3710                "level1": {
3711                    "level2": {
3712                        "level3": "deep value"
3713                    }
3714                }
3715            })),
3716            error: None,
3717            logs: None,
3718        };
3719
3720        let result = PoolManager::parse_pool_response(response).unwrap();
3721        assert!(result.return_value.contains("level1"));
3722        assert!(result.return_value.contains("level2"));
3723        assert!(result.return_value.contains("level3"));
3724        assert!(result.return_value.contains("deep value"));
3725    }
3726
3727    #[test]
3728    fn test_parse_pool_response_error_with_details() {
3729        use super::super::protocol::{PoolError, PoolResponse};
3730
3731        let response = PoolResponse {
3732            task_id: "error-details".to_string(),
3733            success: false,
3734            result: None,
3735            error: Some(PoolError {
3736                message: "Error with details".to_string(),
3737                code: Some("DETAILED_ERROR".to_string()),
3738                status: Some(422),
3739                details: Some(serde_json::json!({
3740                    "field": "email",
3741                    "expected": "string",
3742                    "received": "number"
3743                })),
3744            }),
3745            logs: None,
3746        };
3747
3748        let err = PoolManager::parse_pool_response(response).unwrap_err();
3749        match err {
3750            PluginError::HandlerError(payload) => {
3751                assert_eq!(payload.message, "Error with details");
3752                assert_eq!(payload.code, Some("DETAILED_ERROR".to_string()));
3753                assert!(payload.details.is_some());
3754                let details = payload.details.unwrap();
3755                assert_eq!(details.get("field").unwrap(), "email");
3756            }
3757            _ => panic!("Expected HandlerError"),
3758        }
3759    }
3760
3761    #[test]
3762    fn test_parse_health_result_with_all_optional_fields() {
3763        let json = serde_json::json!({
3764            "status": "healthy",
3765            "uptime": 999999,
3766            "memory": {
3767                "heapUsed": 123456789,
3768                "heapTotal": 987654321,
3769                "external": 111111,
3770                "arrayBuffers": 222222
3771            },
3772            "pool": {
3773                "completed": 50000,
3774                "queued": 100,
3775                "active": 50,
3776                "waiting": 25
3777            },
3778            "execution": {
3779                "successRate": 0.9999,
3780                "avgDuration": 45.5,
3781                "totalExecutions": 100000
3782            }
3783        });
3784
3785        let result = PoolManager::parse_health_result(&json);
3786        assert_eq!(result.status, "healthy");
3787        assert_eq!(result.uptime_ms, Some(999999));
3788        assert_eq!(result.memory, Some(123456789));
3789        assert_eq!(result.pool_completed, Some(50000));
3790        assert_eq!(result.pool_queued, Some(100));
3791        assert!((result.success_rate.unwrap() - 0.9999).abs() < 0.0001);
3792    }
3793
3794    #[tokio::test]
3795    async fn test_pool_manager_max_queue_size() {
3796        let manager = PoolManager::new();
3797        // max_queue_size should be set from config
3798        assert!(manager.max_queue_size > 0);
3799    }
3800
3801    #[tokio::test]
3802    async fn test_pool_manager_last_restart_time_initial() {
3803        let manager = PoolManager::new();
3804        assert_eq!(manager.last_restart_time_ms.load(Ordering::Relaxed), 0);
3805    }
3806
3807    #[tokio::test]
3808    async fn test_pool_manager_connection_pool_exists() {
3809        let manager = PoolManager::new();
3810        // Connection pool should be initialized
3811        let available = manager.connection_pool.semaphore.available_permits();
3812        assert!(available > 0);
3813    }
3814
3815    #[test]
3816    fn test_is_dead_server_error_with_whitespace() {
3817        // Patterns with extra whitespace
3818        let err = PluginError::SocketError("  connection refused  ".to_string());
3819        assert!(PoolManager::is_dead_server_error(&err));
3820
3821        let err = PluginError::SocketError("error: broken pipe occurred".to_string());
3822        assert!(PoolManager::is_dead_server_error(&err));
3823    }
3824
3825    #[test]
3826    fn test_is_dead_server_error_multiline() {
3827        // Multiline error messages
3828        let err = PluginError::SocketError(
3829            "Error occurred\nConnection refused\nPlease retry".to_string(),
3830        );
3831        assert!(PoolManager::is_dead_server_error(&err));
3832    }
3833
3834    #[test]
3835    fn test_is_dead_server_error_json_in_message() {
3836        // Error with JSON content
3837        let err = PluginError::PluginExecutionError(
3838            r#"{"error": "connection refused", "code": 61}"#.to_string(),
3839        );
3840        assert!(PoolManager::is_dead_server_error(&err));
3841    }
3842
3843    #[test]
3844    fn test_format_return_value_special_json() {
3845        // Test with special JSON values
3846        let value = Some(serde_json::json!(f64::MAX));
3847        let result = PoolManager::format_return_value(value);
3848        assert!(!result.is_empty());
3849
3850        let value = Some(serde_json::json!(i64::MIN));
3851        let result = PoolManager::format_return_value(value);
3852        assert!(result.contains("-"));
3853    }
3854
3855    #[test]
3856    fn test_format_return_value_with_escaped_chars() {
3857        let value = Some(serde_json::json!("line1\nline2\ttab\"quote"));
3858        let result = PoolManager::format_return_value(value);
3859        assert!(result.contains("line1"));
3860        assert!(result.contains("line2"));
3861    }
3862
3863    #[test]
3864    fn test_format_return_value_array_of_objects() {
3865        let value = Some(serde_json::json!([
3866            {"id": 1, "name": "first"},
3867            {"id": 2, "name": "second"}
3868        ]));
3869        let result = PoolManager::format_return_value(value);
3870        assert!(result.contains("first"));
3871        assert!(result.contains("second"));
3872    }
3873
3874    #[test]
3875    fn test_all_log_levels_conversion() {
3876        use super::super::protocol::PoolLogEntry;
3877
3878        let levels = [
3879            ("log", LogLevel::Log),
3880            ("debug", LogLevel::Debug),
3881            ("info", LogLevel::Info),
3882            ("warn", LogLevel::Warn),
3883            ("error", LogLevel::Error),
3884            ("result", LogLevel::Result),
3885            ("unknown_level", LogLevel::Log), // Unknown defaults to Log
3886            ("LOG", LogLevel::Log),           // Case matters - uppercase goes to default
3887            ("", LogLevel::Log),              // Empty string goes to default
3888        ];
3889
3890        for (input, expected) in levels {
3891            let entry = PoolLogEntry {
3892                level: input.to_string(),
3893                message: "test".to_string(),
3894            };
3895            let log_entry: LogEntry = entry.into();
3896            assert_eq!(
3897                log_entry.level, expected,
3898                "Level '{input}' should convert to {expected:?}"
3899            );
3900        }
3901    }
3902
3903    #[tokio::test]
3904    async fn test_pool_manager_health_check_flag_manipulation() {
3905        let manager = PoolManager::new();
3906
3907        manager.health_check_needed.store(true, Ordering::Relaxed);
3908        assert!(manager.health_check_needed.load(Ordering::Relaxed));
3909
3910        manager.health_check_needed.store(false, Ordering::Relaxed);
3911        assert!(!manager.health_check_needed.load(Ordering::Relaxed));
3912    }
3913
3914    #[tokio::test]
3915    async fn test_pool_manager_consecutive_failures_manipulation() {
3916        let manager = PoolManager::new();
3917
3918        manager.consecutive_failures.fetch_add(1, Ordering::Relaxed);
3919        assert_eq!(manager.consecutive_failures.load(Ordering::Relaxed), 1);
3920
3921        manager.consecutive_failures.fetch_add(5, Ordering::Relaxed);
3922        assert_eq!(manager.consecutive_failures.load(Ordering::Relaxed), 6);
3923
3924        manager.consecutive_failures.store(0, Ordering::Relaxed);
3925        assert_eq!(manager.consecutive_failures.load(Ordering::Relaxed), 0);
3926    }
3927
3928    #[test]
3929    fn test_parsed_health_result_with_all_none() {
3930        let result = ParsedHealthResult {
3931            status: "minimal".to_string(),
3932            uptime_ms: None,
3933            memory: None,
3934            pool_completed: None,
3935            pool_queued: None,
3936            success_rate: None,
3937        };
3938
3939        assert_eq!(result.status, "minimal");
3940        assert!(result.uptime_ms.is_none());
3941        assert!(result.memory.is_none());
3942    }
3943
3944    #[test]
3945    fn test_parsed_health_result_with_all_some() {
3946        let result = ParsedHealthResult {
3947            status: "complete".to_string(),
3948            uptime_ms: Some(u64::MAX),
3949            memory: Some(u64::MAX),
3950            pool_completed: Some(u64::MAX),
3951            pool_queued: Some(u64::MAX),
3952            success_rate: Some(1.0),
3953        };
3954
3955        assert_eq!(result.status, "complete");
3956        assert_eq!(result.uptime_ms, Some(u64::MAX));
3957        assert_eq!(result.success_rate, Some(1.0));
3958    }
3959
3960    #[test]
3961    fn test_calculate_heap_size_extensive_values() {
3962        // Test many different concurrency values
3963        let test_cases = [
3964            (0, 512),
3965            (1, 512),
3966            (5, 512),
3967            (9, 512),
3968            (10, 544),
3969            (11, 544),
3970            (19, 544),
3971            (20, 576),
3972            (50, 672),
3973            (100, 832),
3974            (150, 992),
3975            (200, 1152),
3976            (250, 1312),
3977            (300, 1472),
3978            (400, 1792),
3979            (500, 2112),
3980            (1000, 3712),
3981            (2000, 6912),
3982            (2400, 8192),  // At cap
3983            (3000, 8192),  // Capped
3984            (5000, 8192),  // Capped
3985            (10000, 8192), // Capped
3986        ];
3987
3988        for (concurrency, expected_heap) in test_cases {
3989            let heap = PoolManager::calculate_heap_size(concurrency);
3990            assert_eq!(
3991                heap, expected_heap,
3992                "Concurrency {concurrency} should give heap {expected_heap}"
3993            );
3994        }
3995    }
3996
3997    #[tokio::test]
3998    async fn test_pool_manager_drop_cleans_socket() {
3999        let socket_path = format!("/tmp/test-drop-{}.sock", uuid::Uuid::new_v4());
4000
4001        // Create a file at the socket path
4002        std::fs::write(&socket_path, "test").unwrap();
4003        assert!(std::path::Path::new(&socket_path).exists());
4004
4005        // Create manager with this socket path
4006        {
4007            let _manager = PoolManager::with_socket_path(socket_path.clone());
4008            // Manager exists here
4009        }
4010        // Manager dropped here - should clean up socket
4011
4012        // Socket should be removed
4013        assert!(!std::path::Path::new(&socket_path).exists());
4014    }
4015
4016    #[test]
4017    fn test_script_result_with_traces() {
4018        let result = ScriptResult {
4019            logs: vec![],
4020            error: String::new(),
4021            return_value: "with traces".to_string(),
4022            trace: vec![
4023                serde_json::json!({"action": "GET", "url": "/api/test"}),
4024                serde_json::json!({"action": "POST", "url": "/api/submit"}),
4025            ],
4026        };
4027
4028        assert_eq!(result.trace.len(), 2);
4029        assert!(result.trace[0].get("action").is_some());
4030    }
4031
4032    #[test]
4033    fn test_script_result_with_error() {
4034        let result = ScriptResult {
4035            logs: vec![LogEntry {
4036                level: LogLevel::Error,
4037                message: "Something went wrong".to_string(),
4038            }],
4039            error: "RuntimeError: undefined is not a function".to_string(),
4040            return_value: String::new(),
4041            trace: vec![],
4042        };
4043
4044        assert!(!result.error.is_empty());
4045        assert!(result.error.contains("RuntimeError"));
4046        assert_eq!(result.logs.len(), 1);
4047    }
4048
4049    #[test]
4050    fn test_plugin_handler_payload_with_traces() {
4051        let payload = PluginHandlerPayload {
4052            message: "Error with traces".to_string(),
4053            status: 500,
4054            code: None,
4055            details: None,
4056            logs: None,
4057            traces: Some(vec![
4058                serde_json::json!({"method": "GET", "path": "/health"}),
4059                serde_json::json!({"method": "POST", "path": "/execute"}),
4060            ]),
4061        };
4062
4063        assert!(payload.traces.is_some());
4064        assert_eq!(payload.traces.as_ref().unwrap().len(), 2);
4065    }
4066
4067    #[test]
4068    fn test_queued_request_all_optional_fields() {
4069        let (tx, _rx) = oneshot::channel();
4070
4071        let mut headers = HashMap::new();
4072        headers.insert(
4073            "X-Custom".to_string(),
4074            vec!["value1".to_string(), "value2".to_string()],
4075        );
4076
4077        let request = QueuedRequest {
4078            plugin_id: "full-request".to_string(),
4079            compiled_code: Some("compiled code here".to_string()),
4080            plugin_path: Some("/path/to/plugin.ts".to_string()),
4081            params: serde_json::json!({"key": "value", "number": 42}),
4082            headers: Some(headers),
4083            socket_path: "/tmp/full.sock".to_string(),
4084            http_request_id: Some("http-123".to_string()),
4085            timeout_secs: Some(60),
4086            route: Some("/api/v1/execute".to_string()),
4087            config: Some(serde_json::json!({"setting": true})),
4088            method: Some("PUT".to_string()),
4089            query: Some(serde_json::json!({"page": 1, "limit": 10})),
4090            response_tx: tx,
4091        };
4092
4093        assert_eq!(request.plugin_id, "full-request");
4094        assert!(request.compiled_code.is_some());
4095        assert!(request.plugin_path.is_some());
4096        assert!(request.headers.is_some());
4097        assert_eq!(request.timeout_secs, Some(60));
4098        assert_eq!(request.method, Some("PUT".to_string()));
4099    }
4100}