1use std::collections::HashMap;
10use std::process::Stdio;
11use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::io::{AsyncBufReadExt, BufReader};
15use tokio::process::{Child, Command};
16use tokio::sync::oneshot;
17use uuid::Uuid;
18
19use crate::constants::{
20 ADMIN_REQUEST_TIMEOUT_SECS, DEFAULT_PLUGIN_TIMEOUT_SECONDS, PLUGIN_TIMEOUT_BUFFER_SECONDS,
21};
22
23use super::config::get_config;
24use super::connection::{ConnectionPool, PoolConnection};
25use super::health::{
26 CircuitBreaker, CircuitState, DeadServerIndicator, HealthStatus, ProcessStatus,
27};
28use super::protocol::{PoolError, PoolRequest, PoolResponse};
29use super::shared_socket::get_shared_socket_service;
30use super::{LogEntry, PluginError, PluginHandlerPayload, ScriptResult};
31
32struct QueuedRequest {
34 plugin_id: String,
35 compiled_code: Option<String>,
36 plugin_path: Option<String>,
37 params: serde_json::Value,
38 headers: Option<HashMap<String, Vec<String>>>,
39 socket_path: String,
40 http_request_id: Option<String>,
41 timeout_secs: Option<u64>,
42 route: Option<String>,
43 config: Option<serde_json::Value>,
44 method: Option<String>,
45 query: Option<serde_json::Value>,
46 response_tx: oneshot::Sender<Result<ScriptResult, PluginError>>,
47}
48
49#[derive(Debug, Default, PartialEq)]
54pub struct ParsedHealthResult {
55 pub status: String,
56 pub uptime_ms: Option<u64>,
57 pub memory: Option<u64>,
58 pub pool_completed: Option<u64>,
59 pub pool_queued: Option<u64>,
60 pub success_rate: Option<f64>,
61}
62
63pub struct PoolManager {
65 socket_path: String,
66 process: tokio::sync::Mutex<Option<Child>>,
67 initialized: Arc<AtomicBool>,
68 restart_lock: tokio::sync::Mutex<()>,
70 connection_pool: Arc<ConnectionPool>,
72 request_tx: async_channel::Sender<QueuedRequest>,
74 max_queue_size: usize,
76 health_check_needed: Arc<AtomicBool>,
78 consecutive_failures: Arc<AtomicU32>,
80 circuit_breaker: Arc<CircuitBreaker>,
82 last_restart_time_ms: Arc<AtomicU64>,
84 recovery_mode: Arc<AtomicBool>,
86 recovery_allowance: Arc<AtomicU32>,
88 shutdown_signal: Arc<tokio::sync::Notify>,
90}
91
92impl Default for PoolManager {
93 fn default() -> Self {
94 Self::new()
95 }
96}
97
98impl PoolManager {
99 const BASE_HEAP_MB: usize = 512;
102
103 const CONCURRENCY_DIVISOR: usize = 10;
106
107 const HEAP_INCREMENT_PER_DIVISOR_MB: usize = 32;
111
112 const MAX_HEAP_MB: usize = 8192;
116
117 pub fn calculate_heap_size(max_concurrency: usize) -> usize {
124 let calculated = Self::BASE_HEAP_MB
125 + ((max_concurrency / Self::CONCURRENCY_DIVISOR) * Self::HEAP_INCREMENT_PER_DIVISOR_MB);
126 calculated.min(Self::MAX_HEAP_MB)
127 }
128
129 pub fn format_return_value(value: Option<serde_json::Value>) -> String {
134 value
135 .map(|v| {
136 if v.is_string() {
137 v.as_str().unwrap_or("").to_string()
138 } else {
139 serde_json::to_string(&v).unwrap_or_default()
140 }
141 })
142 .unwrap_or_default()
143 }
144
145 pub fn parse_success_response(response: PoolResponse) -> ScriptResult {
149 let logs: Vec<LogEntry> = response
150 .logs
151 .map(|logs| logs.into_iter().map(|l| l.into()).collect())
152 .unwrap_or_default();
153
154 ScriptResult {
155 logs,
156 error: String::new(),
157 return_value: Self::format_return_value(response.result),
158 trace: Vec::new(),
159 }
160 }
161
162 pub fn parse_error_response(response: PoolResponse) -> PluginError {
166 let logs: Vec<LogEntry> = response
167 .logs
168 .map(|logs| logs.into_iter().map(|l| l.into()).collect())
169 .unwrap_or_default();
170
171 let error = response.error.unwrap_or(PoolError {
172 message: "Unknown error".to_string(),
173 code: None,
174 status: None,
175 details: None,
176 });
177
178 PluginError::HandlerError(Box::new(PluginHandlerPayload {
179 message: error.message,
180 status: error.status.unwrap_or(500),
181 code: error.code,
182 details: error.details,
183 logs: Some(logs),
184 traces: None,
185 }))
186 }
187
188 pub fn parse_pool_response(response: PoolResponse) -> Result<ScriptResult, PluginError> {
193 if response.success {
194 Ok(Self::parse_success_response(response))
195 } else {
196 Err(Self::parse_error_response(response))
197 }
198 }
199
200 pub fn parse_health_result(result: &serde_json::Value) -> ParsedHealthResult {
205 ParsedHealthResult {
206 status: result
207 .get("status")
208 .and_then(|v| v.as_str())
209 .unwrap_or("unknown")
210 .to_string(),
211 uptime_ms: result.get("uptime").and_then(|v| v.as_u64()),
212 memory: result
213 .get("memory")
214 .and_then(|v| v.get("heapUsed"))
215 .and_then(|v| v.as_u64()),
216 pool_completed: result
217 .get("pool")
218 .and_then(|v| v.get("completed"))
219 .and_then(|v| v.as_u64()),
220 pool_queued: result
221 .get("pool")
222 .and_then(|v| v.get("queued"))
223 .and_then(|v| v.as_u64()),
224 success_rate: result
225 .get("execution")
226 .and_then(|v| v.get("successRate"))
227 .and_then(|v| v.as_f64()),
228 }
229 }
230
231 pub fn new() -> Self {
233 Self::init(format!("/tmp/relayer-plugin-pool-{}.sock", Uuid::new_v4()))
234 }
235
236 pub fn with_socket_path(socket_path: String) -> Self {
238 Self::init(socket_path)
239 }
240
241 fn init(socket_path: String) -> Self {
243 let config = get_config();
244 let max_connections = config.pool_max_connections;
245 let max_queue_size = config.pool_max_queue_size;
246
247 let (tx, rx) = async_channel::bounded(max_queue_size);
248
249 let connection_pool = Arc::new(ConnectionPool::new(socket_path.clone(), max_connections));
250 let connection_pool_clone = connection_pool.clone();
251
252 let shutdown_signal = Arc::new(tokio::sync::Notify::new());
253
254 Self::spawn_queue_workers(
255 rx,
256 connection_pool_clone,
257 config.pool_workers,
258 shutdown_signal.clone(),
259 );
260
261 let health_check_needed = Arc::new(AtomicBool::new(false));
262 let consecutive_failures = Arc::new(AtomicU32::new(0));
263 let circuit_breaker = Arc::new(CircuitBreaker::new());
264 let last_restart_time_ms = Arc::new(AtomicU64::new(0));
265 let recovery_mode = Arc::new(AtomicBool::new(false));
266 let recovery_allowance = Arc::new(AtomicU32::new(0));
267
268 Self::spawn_health_check_task(
269 health_check_needed.clone(),
270 config.health_check_interval_secs,
271 shutdown_signal.clone(),
272 );
273
274 Self::spawn_recovery_task(
275 recovery_mode.clone(),
276 recovery_allowance.clone(),
277 shutdown_signal.clone(),
278 );
279
280 Self {
281 connection_pool,
282 socket_path,
283 process: tokio::sync::Mutex::new(None),
284 initialized: Arc::new(AtomicBool::new(false)),
285 restart_lock: tokio::sync::Mutex::new(()),
286 request_tx: tx,
287 max_queue_size,
288 health_check_needed,
289 consecutive_failures,
290 circuit_breaker,
291 last_restart_time_ms,
292 recovery_mode,
293 recovery_allowance,
294 shutdown_signal,
295 }
296 }
297
298 fn spawn_recovery_task(
300 recovery_mode: Arc<AtomicBool>,
301 recovery_allowance: Arc<AtomicU32>,
302 shutdown_signal: Arc<tokio::sync::Notify>,
303 ) {
304 tokio::spawn(async move {
305 let mut interval = tokio::time::interval(Duration::from_millis(500));
306 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
307
308 loop {
309 tokio::select! {
310 biased;
311
312 _ = shutdown_signal.notified() => {
313 tracing::debug!("Recovery task received shutdown signal");
314 break;
315 }
316
317 _ = interval.tick() => {
318 if recovery_mode.load(Ordering::Relaxed) {
319 let current = recovery_allowance.load(Ordering::Relaxed);
320 if current < 100 {
321 let new_allowance = (current + 10).min(100);
322 recovery_allowance.store(new_allowance, Ordering::Relaxed);
323 tracing::debug!(
324 allowance = new_allowance,
325 "Recovery mode: increasing request allowance"
326 );
327 } else {
328 recovery_mode.store(false, Ordering::Relaxed);
329 tracing::info!("Recovery mode complete - full capacity restored");
330 }
331 }
332 }
333 }
334 }
335 });
336 }
337
338 fn spawn_health_check_task(
340 health_check_needed: Arc<AtomicBool>,
341 interval_secs: u64,
342 shutdown_signal: Arc<tokio::sync::Notify>,
343 ) {
344 tokio::spawn(async move {
345 let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
346 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
347
348 loop {
349 tokio::select! {
350 biased;
351
352 _ = shutdown_signal.notified() => {
353 tracing::debug!("Health check task received shutdown signal");
354 break;
355 }
356
357 _ = interval.tick() => {
358 health_check_needed.store(true, Ordering::Relaxed);
359 }
360 }
361 }
362 });
363 }
364
365 fn spawn_queue_workers(
367 rx: async_channel::Receiver<QueuedRequest>,
368 connection_pool: Arc<ConnectionPool>,
369 configured_workers: usize,
370 shutdown_signal: Arc<tokio::sync::Notify>,
371 ) {
372 let num_workers = if configured_workers > 0 {
373 configured_workers
374 } else {
375 std::thread::available_parallelism()
376 .map(|n| n.get().clamp(4, 32))
377 .unwrap_or(8)
378 };
379
380 tracing::info!(num_workers = num_workers, "Starting request queue workers");
381
382 for worker_id in 0..num_workers {
383 let rx_clone = rx.clone();
384 let pool_clone = connection_pool.clone();
385 let shutdown = shutdown_signal.clone();
386
387 tokio::spawn(async move {
388 loop {
389 tokio::select! {
390 biased;
391
392 _ = shutdown.notified() => {
393 tracing::debug!(worker_id = worker_id, "Request queue worker received shutdown signal");
394 break;
395 }
396
397 request_result = rx_clone.recv() => {
398 let request = match request_result {
399 Ok(r) => r,
400 Err(_) => break,
401 };
402
403 let start = std::time::Instant::now();
404 let plugin_id = request.plugin_id.clone();
405
406 let result = Self::execute_plugin_internal(
407 &pool_clone,
408 request.plugin_id,
409 request.compiled_code,
410 request.plugin_path,
411 request.params,
412 request.headers,
413 request.socket_path,
414 request.http_request_id,
415 request.timeout_secs,
416 request.route,
417 request.config,
418 request.method,
419 request.query,
420 )
421 .await;
422
423 let elapsed = start.elapsed();
424 if let Err(ref e) = result {
425 let error_str = format!("{e:?}");
426 if error_str.contains("shutdown") || error_str.contains("Shutdown") {
427 tracing::debug!(
428 worker_id = worker_id,
429 plugin_id = %plugin_id,
430 "Plugin execution cancelled during shutdown"
431 );
432 } else {
433 tracing::warn!(
434 worker_id = worker_id,
435 plugin_id = %plugin_id,
436 elapsed_ms = elapsed.as_millis() as u64,
437 error = ?e,
438 "Plugin execution failed"
439 );
440 }
441 } else if elapsed.as_secs() > 1 {
442 tracing::debug!(
443 worker_id = worker_id,
444 plugin_id = %plugin_id,
445 elapsed_ms = elapsed.as_millis() as u64,
446 "Slow plugin execution"
447 );
448 }
449
450 let _ = request.response_tx.send(result);
451 }
452 }
453 }
454
455 tracing::debug!(worker_id = worker_id, "Request queue worker exited");
456 });
457 }
458 }
459
460 fn spawn_rate_limited_stderr_reader(stderr: tokio::process::ChildStderr) {
462 tokio::spawn(async move {
463 let reader = BufReader::new(stderr);
464 let mut lines = reader.lines();
465
466 let mut last_log_time = std::time::Instant::now();
467 let mut suppressed_count = 0u64;
468 let min_interval = Duration::from_millis(100);
469
470 while let Ok(Some(line)) = lines.next_line().await {
471 let now = std::time::Instant::now();
472 let elapsed = now.duration_since(last_log_time);
473
474 if elapsed >= min_interval {
475 if suppressed_count > 0 {
476 tracing::warn!(
477 target: "pool_server",
478 suppressed = suppressed_count,
479 "... ({} lines suppressed due to rate limiting)",
480 suppressed_count
481 );
482 suppressed_count = 0;
483 }
484 tracing::error!(target: "pool_server", "{}", line);
485 last_log_time = now;
486 } else {
487 suppressed_count += 1;
488 if suppressed_count % 100 == 0 {
489 tracing::warn!(
490 target: "pool_server",
491 suppressed = suppressed_count,
492 "Pool server producing excessive stderr output"
493 );
494 }
495 }
496 }
497
498 if suppressed_count > 0 {
499 tracing::warn!(
500 target: "pool_server",
501 suppressed = suppressed_count,
502 "Pool server stderr closed ({} final lines suppressed)",
503 suppressed_count
504 );
505 }
506 });
507 }
508
509 #[allow(clippy::too_many_arguments)]
511 async fn execute_with_permit(
512 connection_pool: &Arc<ConnectionPool>,
513 permit: Option<tokio::sync::OwnedSemaphorePermit>,
514 plugin_id: String,
515 compiled_code: Option<String>,
516 plugin_path: Option<String>,
517 params: serde_json::Value,
518 headers: Option<HashMap<String, Vec<String>>>,
519 socket_path: String,
520 http_request_id: Option<String>,
521 timeout_secs: Option<u64>,
522 route: Option<String>,
523 config: Option<serde_json::Value>,
524 method: Option<String>,
525 query: Option<serde_json::Value>,
526 ) -> Result<ScriptResult, PluginError> {
527 let mut conn = connection_pool.acquire_with_permit(permit).await?;
528
529 let request = PoolRequest::Execute(Box::new(super::protocol::ExecuteRequest {
530 task_id: Uuid::new_v4().to_string(),
531 plugin_id: plugin_id.clone(),
532 compiled_code,
533 plugin_path,
534 params,
535 headers,
536 socket_path,
537 http_request_id,
538 timeout: timeout_secs.map(|s| s * 1000),
539 route,
540 config,
541 method,
542 query,
543 }));
544
545 let configured_timeout = timeout_secs.unwrap_or(DEFAULT_PLUGIN_TIMEOUT_SECONDS);
548 let backstop_timeout = configured_timeout + PLUGIN_TIMEOUT_BUFFER_SECONDS;
549 let response = conn
550 .send_request_with_timeout(&request, backstop_timeout)
551 .await?;
552
553 Self::parse_pool_response(response)
555 }
556
557 #[allow(clippy::too_many_arguments)]
559 async fn execute_plugin_internal(
560 connection_pool: &Arc<ConnectionPool>,
561 plugin_id: String,
562 compiled_code: Option<String>,
563 plugin_path: Option<String>,
564 params: serde_json::Value,
565 headers: Option<HashMap<String, Vec<String>>>,
566 socket_path: String,
567 http_request_id: Option<String>,
568 timeout_secs: Option<u64>,
569 route: Option<String>,
570 config: Option<serde_json::Value>,
571 method: Option<String>,
572 query: Option<serde_json::Value>,
573 ) -> Result<ScriptResult, PluginError> {
574 Self::execute_with_permit(
575 connection_pool,
576 None,
577 plugin_id,
578 compiled_code,
579 plugin_path,
580 params,
581 headers,
582 socket_path,
583 http_request_id,
584 timeout_secs,
585 route,
586 config,
587 method,
588 query,
589 )
590 .await
591 }
592
593 pub async fn is_initialized(&self) -> bool {
598 self.initialized.load(Ordering::Acquire)
599 }
600
601 pub async fn ensure_started(&self) -> Result<(), PluginError> {
603 if self.initialized.load(Ordering::Acquire) {
604 return Ok(());
605 }
606
607 let _startup_guard = self.restart_lock.lock().await;
608
609 if self.initialized.load(Ordering::Acquire) {
610 return Ok(());
611 }
612
613 self.start_pool_server().await?;
614 self.initialized.store(true, Ordering::Release);
615 Ok(())
616 }
617
618 async fn ensure_started_and_healthy(&self) -> Result<(), PluginError> {
620 self.ensure_started().await?;
621
622 if !self.health_check_needed.load(Ordering::Relaxed) {
623 return Ok(());
624 }
625
626 if self
627 .health_check_needed
628 .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
629 .is_err()
630 {
631 return Ok(());
632 }
633
634 self.check_and_restart_if_needed().await
635 }
636
637 async fn check_and_restart_if_needed(&self) -> Result<(), PluginError> {
639 let process_status = {
641 let mut process_guard = self.process.lock().await;
642 if let Some(child) = process_guard.as_mut() {
643 match child.try_wait() {
644 Ok(Some(exit_status)) => {
645 tracing::warn!(
646 exit_status = ?exit_status,
647 "Pool server process has exited"
648 );
649 *process_guard = None;
650 ProcessStatus::Exited
651 }
652 Ok(None) => ProcessStatus::Running,
653 Err(e) => {
654 tracing::warn!(
655 error = %e,
656 "Failed to check pool server process status, assuming dead"
657 );
658 *process_guard = None;
659 ProcessStatus::Unknown
660 }
661 }
662 } else {
663 ProcessStatus::NoProcess
664 }
665 };
666
667 let needs_restart = match process_status {
669 ProcessStatus::Running => {
670 let socket_exists = std::path::Path::new(&self.socket_path).exists();
671 if !socket_exists {
672 tracing::warn!(
673 socket_path = %self.socket_path,
674 "Pool server socket file missing, needs restart"
675 );
676 true
677 } else {
678 false
679 }
680 }
681 ProcessStatus::Exited | ProcessStatus::Unknown | ProcessStatus::NoProcess => {
682 tracing::warn!("Pool server not running, needs restart");
683 true
684 }
685 };
686
687 if needs_restart {
689 let _restart_guard = self.restart_lock.lock().await;
690 self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
691 self.restart_internal().await?;
692 self.consecutive_failures.store(0, Ordering::Relaxed);
693 }
694
695 Ok(())
696 }
697
698 async fn cleanup_socket_file(socket_path: &str) {
700 let max_cleanup_attempts = 5;
701 let mut attempts = 0;
702
703 while attempts < max_cleanup_attempts {
704 match std::fs::remove_file(socket_path) {
705 Ok(_) => break,
706 Err(e) if e.kind() == std::io::ErrorKind::NotFound => break,
707 Err(e) => {
708 attempts += 1;
709 if attempts >= max_cleanup_attempts {
710 tracing::warn!(
711 socket_path = %socket_path,
712 error = %e,
713 "Failed to remove socket file after {} attempts, proceeding anyway",
714 max_cleanup_attempts
715 );
716 break;
717 }
718 let delay_ms = 10 * (1 << attempts.min(3));
719 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
720 }
721 }
722 }
723
724 tokio::time::sleep(Duration::from_millis(50)).await;
725 }
726
727 async fn spawn_pool_server_process(
729 socket_path: &str,
730 context: &str,
731 ) -> Result<Child, PluginError> {
732 let pool_server_path = std::env::current_dir()
733 .map(|cwd| cwd.join("plugins/lib/pool-server.ts").display().to_string())
734 .unwrap_or_else(|_| "plugins/lib/pool-server.ts".to_string());
735
736 let config = get_config();
737
738 let pool_server_heap_mb = Self::calculate_heap_size(config.max_concurrency);
740
741 let uncapped_heap = Self::BASE_HEAP_MB
743 + ((config.max_concurrency / Self::CONCURRENCY_DIVISOR)
744 * Self::HEAP_INCREMENT_PER_DIVISOR_MB);
745 if uncapped_heap > Self::MAX_HEAP_MB {
746 tracing::warn!(
747 calculated_heap_mb = uncapped_heap,
748 capped_heap_mb = pool_server_heap_mb,
749 max_concurrency = config.max_concurrency,
750 "Pool server heap calculation exceeded 8GB cap"
751 );
752 }
753
754 tracing::info!(
755 socket_path = %socket_path,
756 heap_mb = pool_server_heap_mb,
757 max_concurrency = config.max_concurrency,
758 context = context,
759 "Spawning plugin pool server"
760 );
761
762 let node_options = format!("--max-old-space-size={pool_server_heap_mb} --expose-gc");
763
764 let mut child = Command::new("ts-node")
765 .arg("--transpile-only")
766 .arg(&pool_server_path)
767 .arg(socket_path)
768 .env("NODE_OPTIONS", node_options)
769 .env("PLUGIN_MAX_CONCURRENCY", config.max_concurrency.to_string())
770 .env(
771 "PLUGIN_POOL_MIN_THREADS",
772 config.nodejs_pool_min_threads.to_string(),
773 )
774 .env(
775 "PLUGIN_POOL_MAX_THREADS",
776 config.nodejs_pool_max_threads.to_string(),
777 )
778 .env(
779 "PLUGIN_POOL_CONCURRENT_TASKS",
780 config.nodejs_pool_concurrent_tasks.to_string(),
781 )
782 .env(
783 "PLUGIN_POOL_IDLE_TIMEOUT",
784 config.nodejs_pool_idle_timeout_ms.to_string(),
785 )
786 .env(
787 "PLUGIN_WORKER_HEAP_MB",
788 config.nodejs_worker_heap_mb.to_string(),
789 )
790 .env(
791 "PLUGIN_POOL_SOCKET_BACKLOG",
792 config.pool_socket_backlog.to_string(),
793 )
794 .stdin(Stdio::null())
795 .stdout(Stdio::piped())
796 .stderr(Stdio::piped())
797 .spawn()
798 .map_err(|e| {
799 PluginError::PluginExecutionError(format!("Failed to {context} pool server: {e}"))
800 })?;
801
802 if let Some(stderr) = child.stderr.take() {
803 Self::spawn_rate_limited_stderr_reader(stderr);
804 }
805
806 if let Some(stdout) = child.stdout.take() {
807 let reader = BufReader::new(stdout);
808 let mut lines = reader.lines();
809
810 let timeout_result = tokio::time::timeout(Duration::from_secs(10), async {
811 while let Ok(Some(line)) = lines.next_line().await {
812 if line.contains("POOL_SERVER_READY") {
813 return Ok(());
814 }
815 }
816 Err(PluginError::PluginExecutionError(
817 "Pool server did not send ready signal".to_string(),
818 ))
819 })
820 .await;
821
822 match timeout_result {
823 Ok(Ok(())) => {
824 tracing::info!(context = context, "Plugin pool server ready");
825 }
826 Ok(Err(e)) => return Err(e),
827 Err(_) => {
828 return Err(PluginError::PluginExecutionError(format!(
829 "Timeout waiting for pool server to {context}"
830 )))
831 }
832 }
833 }
834
835 Ok(child)
836 }
837
838 async fn start_pool_server(&self) -> Result<(), PluginError> {
839 let mut process_guard = self.process.lock().await;
840
841 if process_guard.is_some() {
842 return Ok(());
843 }
844
845 Self::cleanup_socket_file(&self.socket_path).await;
846
847 let child = Self::spawn_pool_server_process(&self.socket_path, "start").await?;
848
849 *process_guard = Some(child);
850 Ok(())
851 }
852
853 #[allow(clippy::too_many_arguments)]
855 pub async fn execute_plugin(
856 &self,
857 plugin_id: String,
858 compiled_code: Option<String>,
859 plugin_path: Option<String>,
860 params: serde_json::Value,
861 headers: Option<HashMap<String, Vec<String>>>,
862 socket_path: String,
863 http_request_id: Option<String>,
864 timeout_secs: Option<u64>,
865 route: Option<String>,
866 config: Option<serde_json::Value>,
867 method: Option<String>,
868 query: Option<serde_json::Value>,
869 ) -> Result<ScriptResult, PluginError> {
870 let rid = http_request_id.as_deref().unwrap_or("unknown");
871 let effective_timeout = timeout_secs.unwrap_or(DEFAULT_PLUGIN_TIMEOUT_SECONDS);
872 tracing::debug!(
873 plugin_id = %plugin_id,
874 http_request_id = %rid,
875 timeout_secs = effective_timeout,
876 "Pool execute request received"
877 );
878 let recovery_allowance = if self.recovery_mode.load(Ordering::Relaxed) {
879 Some(self.recovery_allowance.load(Ordering::Relaxed))
880 } else {
881 None
882 };
883
884 if !self
885 .circuit_breaker
886 .should_allow_request(recovery_allowance)
887 {
888 let state = self.circuit_breaker.state();
889 tracing::warn!(
890 plugin_id = %plugin_id,
891 circuit_state = ?state,
892 recovery_allowance = ?recovery_allowance,
893 "Request rejected by circuit breaker"
894 );
895 return Err(PluginError::PluginExecutionError(
896 "Plugin system temporarily unavailable due to high load. Please retry shortly."
897 .to_string(),
898 ));
899 }
900
901 let start_time = Instant::now();
902
903 self.ensure_started_and_healthy().await?;
904 tracing::debug!(
905 plugin_id = %plugin_id,
906 http_request_id = %rid,
907 "Pool execute start (healthy/started)"
908 );
909
910 let circuit_breaker = self.circuit_breaker.clone();
911 match self.connection_pool.semaphore.clone().try_acquire_owned() {
912 Ok(permit) => {
913 tracing::debug!(
914 plugin_id = %plugin_id,
915 http_request_id = %rid,
916 "Pool execute acquired connection permit (fast path)"
917 );
918 let result = Self::execute_with_permit(
919 &self.connection_pool,
920 Some(permit),
921 plugin_id,
922 compiled_code,
923 plugin_path,
924 params,
925 headers,
926 socket_path,
927 http_request_id,
928 timeout_secs,
929 route,
930 config,
931 method,
932 query,
933 )
934 .await;
935
936 let elapsed_ms = start_time.elapsed().as_millis() as u32;
937 match &result {
938 Ok(_) => circuit_breaker.record_success(elapsed_ms),
939 Err(e) => {
940 if Self::is_dead_server_error(e) {
943 circuit_breaker.record_failure();
944 tracing::warn!(
945 error = %e,
946 "Detected dead pool server error, triggering health check for restart"
947 );
948 self.health_check_needed.store(true, Ordering::Relaxed);
949 } else {
950 circuit_breaker.record_success(elapsed_ms);
952 }
953 }
954 }
955
956 tracing::debug!(
957 elapsed_ms = elapsed_ms,
958 result_ok = result.is_ok(),
959 "Pool execute finished (fast path)"
960 );
961 result
962 }
963 Err(_) => {
964 tracing::debug!(
965 plugin_id = %plugin_id,
966 http_request_id = %rid,
967 "Pool execute queueing (no permits)"
968 );
969 let (response_tx, response_rx) = oneshot::channel();
970
971 let queued_request = QueuedRequest {
972 plugin_id,
973 compiled_code,
974 plugin_path,
975 params,
976 headers,
977 socket_path,
978 http_request_id,
979 timeout_secs,
980 route,
981 config,
982 method,
983 query,
984 response_tx,
985 };
986
987 let result = match self.request_tx.try_send(queued_request) {
988 Ok(()) => {
989 let queue_len = self.request_tx.len();
990 if queue_len > self.max_queue_size / 2 {
991 tracing::warn!(
992 queue_len = queue_len,
993 max_queue_size = self.max_queue_size,
994 "Plugin queue is over 50% capacity"
995 );
996 }
997 let response_timeout = timeout_secs
1001 .map(Duration::from_secs)
1002 .unwrap_or(Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS))
1003 + Duration::from_secs(PLUGIN_TIMEOUT_BUFFER_SECONDS + 1);
1004
1005 match tokio::time::timeout(response_timeout, response_rx).await {
1006 Ok(Ok(result)) => result,
1007 Ok(Err(_)) => Err(PluginError::PluginExecutionError(
1008 "Request queue processor closed".to_string(),
1009 )),
1010 Err(_) => Err(PluginError::PluginExecutionError(format!(
1011 "Request timed out after {}s waiting for worker response",
1012 response_timeout.as_secs()
1013 ))),
1014 }
1015 }
1016 Err(async_channel::TrySendError::Full(req)) => {
1017 let queue_timeout_ms = get_config().pool_queue_send_timeout_ms;
1018 let queue_timeout = Duration::from_millis(queue_timeout_ms);
1019 match tokio::time::timeout(queue_timeout, self.request_tx.send(req)).await {
1020 Ok(Ok(())) => {
1021 let queue_len = self.request_tx.len();
1022 tracing::debug!(
1023 queue_len = queue_len,
1024 "Request queued after waiting for queue space"
1025 );
1026 let response_timeout = timeout_secs
1028 .map(Duration::from_secs)
1029 .unwrap_or(Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS))
1030 + Duration::from_secs(PLUGIN_TIMEOUT_BUFFER_SECONDS + 1);
1031
1032 match tokio::time::timeout(response_timeout, response_rx).await {
1033 Ok(Ok(result)) => result,
1034 Ok(Err(_)) => Err(PluginError::PluginExecutionError(
1035 "Request queue processor closed".to_string(),
1036 )),
1037 Err(_) => Err(PluginError::PluginExecutionError(format!(
1038 "Request timed out after {}s waiting for worker response",
1039 response_timeout.as_secs()
1040 ))),
1041 }
1042 }
1043 Ok(Err(async_channel::SendError(_))) => {
1044 Err(PluginError::PluginExecutionError(
1045 "Plugin execution queue is closed".to_string(),
1046 ))
1047 }
1048 Err(_) => {
1049 let queue_len = self.request_tx.len();
1050 tracing::error!(
1051 queue_len = queue_len,
1052 max_queue_size = self.max_queue_size,
1053 timeout_ms = queue_timeout.as_millis(),
1054 "Plugin execution queue is FULL - timeout waiting for space"
1055 );
1056 Err(PluginError::PluginExecutionError(format!(
1057 "Plugin execution queue is full (max: {}) and timeout waiting for space. \
1058 Consider increasing PLUGIN_POOL_MAX_QUEUE_SIZE or PLUGIN_POOL_MAX_CONNECTIONS.",
1059 self.max_queue_size
1060 )))
1061 }
1062 }
1063 }
1064 Err(async_channel::TrySendError::Closed(_)) => {
1065 Err(PluginError::PluginExecutionError(
1066 "Plugin execution queue is closed".to_string(),
1067 ))
1068 }
1069 };
1070
1071 let elapsed_ms = start_time.elapsed().as_millis() as u32;
1072 match &result {
1073 Ok(_) => circuit_breaker.record_success(elapsed_ms),
1074 Err(e) => {
1075 if Self::is_dead_server_error(e) {
1077 circuit_breaker.record_failure();
1078 tracing::warn!(
1079 error = %e,
1080 "Detected dead pool server error (queued path), triggering health check for restart"
1081 );
1082 self.health_check_needed.store(true, Ordering::Relaxed);
1083 } else {
1084 circuit_breaker.record_success(elapsed_ms);
1086 }
1087 }
1088 }
1089
1090 tracing::debug!(
1091 elapsed_ms = elapsed_ms,
1092 result_ok = result.is_ok(),
1093 "Pool execute finished (queued path)"
1094 );
1095 result
1096 }
1097 }
1098 }
1099
1100 pub fn is_dead_server_error(err: &PluginError) -> bool {
1104 match err {
1105 PluginError::ScriptTimeout(_) => false,
1107 PluginError::HandlerError(_) => false,
1109 PluginError::SocketError(msg)
1112 if msg.to_lowercase().contains("request timed out after") =>
1113 {
1114 true
1115 }
1116 other => {
1118 let error_str = other.to_string();
1119 let lower = error_str.to_lowercase();
1120
1121 if lower.contains("handler timed out") {
1123 return false;
1124 }
1125
1126 DeadServerIndicator::from_error_str(&error_str).is_some()
1127 }
1128 }
1129 }
1130
1131 pub async fn precompile_plugin(
1133 &self,
1134 plugin_id: String,
1135 plugin_path: Option<String>,
1136 source_code: Option<String>,
1137 ) -> Result<String, PluginError> {
1138 self.ensure_started().await?;
1139
1140 let mut conn = self.connection_pool.acquire().await?;
1141
1142 let request = PoolRequest::Precompile {
1143 task_id: Uuid::new_v4().to_string(),
1144 plugin_id: plugin_id.clone(),
1145 plugin_path,
1146 source_code,
1147 };
1148
1149 let response = conn
1150 .send_request_with_timeout(&request, ADMIN_REQUEST_TIMEOUT_SECS)
1151 .await?;
1152
1153 if response.success {
1154 response
1155 .result
1156 .and_then(|v| {
1157 v.get("code")
1158 .and_then(|c| c.as_str())
1159 .map(|s| s.to_string())
1160 })
1161 .ok_or_else(|| {
1162 PluginError::PluginExecutionError("No compiled code in response".to_string())
1163 })
1164 } else {
1165 let error = response.error.unwrap_or(PoolError {
1166 message: "Compilation failed".to_string(),
1167 code: None,
1168 status: None,
1169 details: None,
1170 });
1171 Err(PluginError::PluginExecutionError(error.message))
1172 }
1173 }
1174
1175 pub async fn cache_compiled_code(
1177 &self,
1178 plugin_id: String,
1179 compiled_code: String,
1180 ) -> Result<(), PluginError> {
1181 self.ensure_started().await?;
1182
1183 let mut conn = self.connection_pool.acquire().await?;
1184
1185 let request = PoolRequest::Cache {
1186 task_id: Uuid::new_v4().to_string(),
1187 plugin_id: plugin_id.clone(),
1188 compiled_code,
1189 };
1190
1191 let response = conn
1192 .send_request_with_timeout(&request, ADMIN_REQUEST_TIMEOUT_SECS)
1193 .await?;
1194
1195 if response.success {
1196 Ok(())
1197 } else {
1198 let error = response.error.unwrap_or(PoolError {
1199 message: "Cache failed".to_string(),
1200 code: None,
1201 status: None,
1202 details: None,
1203 });
1204 Err(PluginError::PluginError(error.message))
1205 }
1206 }
1207
1208 pub async fn invalidate_plugin(&self, plugin_id: String) -> Result<(), PluginError> {
1210 if !self.initialized.load(Ordering::Acquire) {
1211 return Ok(());
1212 }
1213
1214 let mut conn = self.connection_pool.acquire().await?;
1215
1216 let request = PoolRequest::Invalidate {
1217 task_id: Uuid::new_v4().to_string(),
1218 plugin_id,
1219 };
1220
1221 let _ = conn
1222 .send_request_with_timeout(&request, ADMIN_REQUEST_TIMEOUT_SECS)
1223 .await?;
1224 Ok(())
1225 }
1226
1227 async fn collect_socket_stats(
1230 &self,
1231 ) -> (
1232 Option<usize>,
1233 Option<usize>,
1234 Option<usize>,
1235 Option<usize>,
1236 Option<usize>,
1237 ) {
1238 let (shared_available, shared_active, shared_executions) = match get_shared_socket_service()
1240 {
1241 Ok(service) => {
1242 let available = service.available_connection_slots();
1243 let active = service.active_connection_count();
1244 let executions = service.registered_executions_count().await;
1245 (Some(available), Some(active), Some(executions))
1246 }
1247 Err(_) => (None, None, None),
1248 };
1249
1250 let pool_available = self.connection_pool.semaphore.available_permits();
1252 let pool_max = get_config().pool_max_connections;
1253 let pool_active = pool_max.saturating_sub(pool_available);
1254
1255 (
1256 shared_available,
1257 shared_active,
1258 shared_executions,
1259 Some(pool_available),
1260 Some(pool_active),
1261 )
1262 }
1263
1264 pub async fn health_check(&self) -> Result<HealthStatus, PluginError> {
1265 let circuit_info = || {
1266 let state = match self.circuit_breaker.state() {
1267 CircuitState::Closed => "closed",
1268 CircuitState::HalfOpen => "half_open",
1269 CircuitState::Open => "open",
1270 };
1271 (
1272 Some(state.to_string()),
1273 Some(self.circuit_breaker.avg_response_time()),
1274 Some(self.recovery_mode.load(Ordering::Relaxed)),
1275 Some(self.recovery_allowance.load(Ordering::Relaxed)),
1276 )
1277 };
1278
1279 let socket_stats = self.collect_socket_stats().await;
1280
1281 if !self.initialized.load(Ordering::Acquire) {
1282 let (circuit_state, avg_rt, recovering, recovery_pct) = circuit_info();
1283 let (shared_available, shared_active, shared_executions, pool_available, pool_active) =
1284 socket_stats;
1285 return Ok(HealthStatus {
1286 healthy: false,
1287 status: "not_initialized".to_string(),
1288 uptime_ms: None,
1289 memory: None,
1290 pool_completed: None,
1291 pool_queued: None,
1292 success_rate: None,
1293 circuit_state,
1294 avg_response_time_ms: avg_rt,
1295 recovering,
1296 recovery_percent: recovery_pct,
1297 shared_socket_available_slots: shared_available,
1298 shared_socket_active_connections: shared_active,
1299 shared_socket_registered_executions: shared_executions,
1300 connection_pool_available_slots: pool_available,
1301 connection_pool_active_connections: pool_active,
1302 });
1303 }
1304
1305 if !std::path::Path::new(&self.socket_path).exists() {
1306 let (circuit_state, avg_rt, recovering, recovery_pct) = circuit_info();
1307 let (shared_available, shared_active, shared_executions, pool_available, pool_active) =
1308 socket_stats;
1309 return Ok(HealthStatus {
1310 healthy: false,
1311 status: "socket_missing".to_string(),
1312 uptime_ms: None,
1313 memory: None,
1314 pool_completed: None,
1315 pool_queued: None,
1316 success_rate: None,
1317 circuit_state,
1318 avg_response_time_ms: avg_rt,
1319 recovering,
1320 recovery_percent: recovery_pct,
1321 shared_socket_available_slots: shared_available,
1322 shared_socket_active_connections: shared_active,
1323 shared_socket_registered_executions: shared_executions,
1324 connection_pool_available_slots: pool_available,
1325 connection_pool_active_connections: pool_active,
1326 });
1327 }
1328
1329 let mut conn =
1330 match tokio::time::timeout(Duration::from_millis(100), self.connection_pool.acquire())
1331 .await
1332 {
1333 Ok(Ok(c)) => c,
1334 Ok(Err(e)) => {
1335 let err_str = e.to_string();
1336 let is_pool_exhausted =
1337 err_str.contains("semaphore") || err_str.contains("Connection refused");
1338
1339 let process_status = match self.process.try_lock() {
1341 Ok(guard) => {
1342 if let Some(child) = guard.as_ref() {
1343 format!("process_pid_{}", child.id().unwrap_or(0))
1344 } else {
1345 "no_process".to_string()
1346 }
1347 }
1348 Err(_) => "process_lock_busy".to_string(),
1349 };
1350
1351 let (circuit_state, avg_rt, recovering, recovery_pct) = circuit_info();
1352 let (
1353 shared_available,
1354 shared_active,
1355 shared_executions,
1356 pool_available,
1357 pool_active,
1358 ) = socket_stats;
1359 return Ok(HealthStatus {
1360 healthy: is_pool_exhausted,
1361 status: if is_pool_exhausted {
1362 format!("pool_exhausted: {e} ({process_status})")
1363 } else {
1364 format!("connection_failed: {e} ({process_status})")
1365 },
1366 uptime_ms: None,
1367 memory: None,
1368 pool_completed: None,
1369 pool_queued: None,
1370 success_rate: None,
1371 circuit_state,
1372 avg_response_time_ms: avg_rt,
1373 recovering,
1374 recovery_percent: recovery_pct,
1375 shared_socket_available_slots: shared_available,
1376 shared_socket_active_connections: shared_active,
1377 shared_socket_registered_executions: shared_executions,
1378 connection_pool_available_slots: pool_available,
1379 connection_pool_active_connections: pool_active,
1380 });
1381 }
1382 Err(_) => {
1383 let (circuit_state, avg_rt, recovering, recovery_pct) = circuit_info();
1384 let (
1385 shared_available,
1386 shared_active,
1387 shared_executions,
1388 pool_available,
1389 pool_active,
1390 ) = socket_stats;
1391 return Ok(HealthStatus {
1392 healthy: true,
1393 status: "pool_busy".to_string(),
1394 uptime_ms: None,
1395 memory: None,
1396 pool_completed: None,
1397 pool_queued: None,
1398 success_rate: None,
1399 circuit_state,
1400 avg_response_time_ms: avg_rt,
1401 recovering,
1402 recovery_percent: recovery_pct,
1403 shared_socket_available_slots: shared_available,
1404 shared_socket_active_connections: shared_active,
1405 shared_socket_registered_executions: shared_executions,
1406 connection_pool_available_slots: pool_available,
1407 connection_pool_active_connections: pool_active,
1408 });
1409 }
1410 };
1411
1412 let request = PoolRequest::Health {
1413 task_id: Uuid::new_v4().to_string(),
1414 };
1415
1416 let (circuit_state, avg_rt, recovering, recovery_pct) = circuit_info();
1417
1418 match conn.send_request_with_timeout(&request, 5).await {
1419 Ok(response) => {
1420 if response.success {
1421 let result = response.result.unwrap_or_default();
1422 let parsed = Self::parse_health_result(&result);
1424
1425 {
1426 let (
1427 shared_available,
1428 shared_active,
1429 shared_executions,
1430 pool_available,
1431 pool_active,
1432 ) = socket_stats;
1433 Ok(HealthStatus {
1434 healthy: true,
1435 status: parsed.status,
1436 uptime_ms: parsed.uptime_ms,
1437 memory: parsed.memory,
1438 pool_completed: parsed.pool_completed,
1439 pool_queued: parsed.pool_queued,
1440 success_rate: parsed.success_rate,
1441 circuit_state,
1442 avg_response_time_ms: avg_rt,
1443 recovering,
1444 recovery_percent: recovery_pct,
1445 shared_socket_available_slots: shared_available,
1446 shared_socket_active_connections: shared_active,
1447 shared_socket_registered_executions: shared_executions,
1448 connection_pool_available_slots: pool_available,
1449 connection_pool_active_connections: pool_active,
1450 })
1451 }
1452 } else {
1453 let (
1454 shared_available,
1455 shared_active,
1456 shared_executions,
1457 pool_available,
1458 pool_active,
1459 ) = socket_stats;
1460 Ok(HealthStatus {
1461 healthy: false,
1462 status: response
1463 .error
1464 .map(|e| e.message)
1465 .unwrap_or_else(|| "unknown_error".to_string()),
1466 uptime_ms: None,
1467 memory: None,
1468 pool_completed: None,
1469 pool_queued: None,
1470 success_rate: None,
1471 circuit_state,
1472 avg_response_time_ms: avg_rt,
1473 recovering,
1474 recovery_percent: recovery_pct,
1475 shared_socket_available_slots: shared_available,
1476 shared_socket_active_connections: shared_active,
1477 shared_socket_registered_executions: shared_executions,
1478 connection_pool_available_slots: pool_available,
1479 connection_pool_active_connections: pool_active,
1480 })
1481 }
1482 }
1483 Err(e) => {
1484 let (
1485 shared_available,
1486 shared_active,
1487 shared_executions,
1488 pool_available,
1489 pool_active,
1490 ) = socket_stats;
1491 Ok(HealthStatus {
1492 healthy: false,
1493 status: format!("request_failed: {e}"),
1494 uptime_ms: None,
1495 memory: None,
1496 pool_completed: None,
1497 pool_queued: None,
1498 success_rate: None,
1499 circuit_state,
1500 avg_response_time_ms: avg_rt,
1501 recovering,
1502 recovery_percent: recovery_pct,
1503 shared_socket_available_slots: shared_available,
1504 shared_socket_active_connections: shared_active,
1505 shared_socket_registered_executions: shared_executions,
1506 connection_pool_available_slots: pool_available,
1507 connection_pool_active_connections: pool_active,
1508 })
1509 }
1510 }
1511 }
1512
1513 pub async fn ensure_healthy(&self) -> Result<bool, PluginError> {
1515 let health = self.health_check().await?;
1516
1517 if health.healthy {
1518 return Ok(true);
1519 }
1520
1521 match self.restart_lock.try_lock() {
1522 Ok(_guard) => {
1523 let health_recheck = self.health_check().await?;
1524 if health_recheck.healthy {
1525 return Ok(true);
1526 }
1527
1528 tracing::warn!(status = %health.status, "Pool server unhealthy, attempting restart");
1529 self.restart_internal().await?;
1530 }
1531 Err(_) => {
1532 tracing::debug!("Waiting for another task to complete pool server restart");
1533 let _guard = self.restart_lock.lock().await;
1534 }
1535 }
1536
1537 let health_after = self.health_check().await?;
1538 Ok(health_after.healthy)
1539 }
1540
1541 pub async fn restart(&self) -> Result<(), PluginError> {
1543 let _guard = self.restart_lock.lock().await;
1544 self.restart_internal().await
1545 }
1546
1547 async fn restart_internal(&self) -> Result<(), PluginError> {
1549 tracing::info!("Restarting plugin pool server");
1550
1551 {
1552 let mut process_guard = self.process.lock().await;
1553 if let Some(mut child) = process_guard.take() {
1554 let _ = child.kill().await;
1555 tokio::time::sleep(Duration::from_millis(100)).await;
1556 }
1557 }
1558
1559 Self::cleanup_socket_file(&self.socket_path).await;
1560
1561 self.initialized.store(false, Ordering::Release);
1562
1563 let mut process_guard = self.process.lock().await;
1564 if process_guard.is_some() {
1565 return Ok(());
1566 }
1567
1568 let child = Self::spawn_pool_server_process(&self.socket_path, "restart").await?;
1569 *process_guard = Some(child);
1570
1571 self.initialized.store(true, Ordering::Release);
1572
1573 self.recovery_allowance.store(10, Ordering::Relaxed);
1574 self.recovery_mode.store(true, Ordering::Relaxed);
1575
1576 self.circuit_breaker.force_close();
1577
1578 let now = std::time::SystemTime::now()
1579 .duration_since(std::time::UNIX_EPOCH)
1580 .unwrap_or_default()
1581 .as_millis() as u64;
1582 self.last_restart_time_ms.store(now, Ordering::Relaxed);
1583
1584 tracing::info!("Recovery mode enabled - requests will gradually increase from 10%");
1585
1586 Ok(())
1587 }
1588
1589 pub fn circuit_state(&self) -> CircuitState {
1591 self.circuit_breaker.state()
1592 }
1593
1594 pub fn avg_response_time_ms(&self) -> u32 {
1596 self.circuit_breaker.avg_response_time()
1597 }
1598
1599 pub fn is_recovering(&self) -> bool {
1601 self.recovery_mode.load(Ordering::Relaxed)
1602 }
1603
1604 pub fn recovery_allowance_percent(&self) -> u32 {
1606 self.recovery_allowance.load(Ordering::Relaxed)
1607 }
1608
1609 pub async fn shutdown(&self) -> Result<(), PluginError> {
1611 if !self.initialized.load(Ordering::Acquire) {
1612 return Ok(());
1613 }
1614
1615 tracing::info!("Initiating graceful shutdown of plugin pool server");
1616
1617 self.shutdown_signal.notify_waiters();
1618
1619 let shutdown_timeout = std::time::Duration::from_secs(35);
1620 let shutdown_result = self.send_shutdown_request(shutdown_timeout).await;
1621
1622 match &shutdown_result {
1623 Ok(response) => {
1624 tracing::info!(
1625 response = ?response,
1626 "Pool server acknowledged shutdown, waiting for graceful exit"
1627 );
1628 }
1629 Err(e) => {
1630 tracing::warn!(
1631 error = %e,
1632 "Failed to send shutdown request, will force kill"
1633 );
1634 }
1635 }
1636
1637 let mut process_guard = self.process.lock().await;
1638 if let Some(ref mut child) = *process_guard {
1639 let graceful_wait = std::time::Duration::from_secs(35);
1640 let start = std::time::Instant::now();
1641
1642 loop {
1643 match child.try_wait() {
1644 Ok(Some(status)) => {
1645 tracing::info!(
1646 exit_status = ?status,
1647 elapsed_ms = start.elapsed().as_millis(),
1648 "Pool server exited gracefully"
1649 );
1650 break;
1651 }
1652 Ok(None) => {
1653 if start.elapsed() >= graceful_wait {
1654 tracing::warn!(
1655 "Pool server did not exit within graceful timeout, force killing"
1656 );
1657 let _ = child.kill().await;
1658 break;
1659 }
1660 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1661 }
1662 Err(e) => {
1663 tracing::warn!(error = %e, "Error checking pool server status");
1664 let _ = child.kill().await;
1665 break;
1666 }
1667 }
1668 }
1669 }
1670 *process_guard = None;
1671
1672 let _ = std::fs::remove_file(&self.socket_path);
1673
1674 self.initialized.store(false, Ordering::Release);
1675 tracing::info!("Plugin pool server shutdown complete");
1676 Ok(())
1677 }
1678
1679 async fn send_shutdown_request(
1681 &self,
1682 timeout: std::time::Duration,
1683 ) -> Result<PoolResponse, PluginError> {
1684 let request = PoolRequest::Shutdown {
1685 task_id: Uuid::new_v4().to_string(),
1686 };
1687
1688 let connection_id = self.connection_pool.next_connection_id();
1691 let mut conn = match PoolConnection::new(&self.socket_path, connection_id).await {
1692 Ok(c) => c,
1693 Err(e) => {
1694 return Err(PluginError::PluginExecutionError(format!(
1695 "Failed to connect for shutdown: {e}"
1696 )));
1697 }
1698 };
1699
1700 conn.send_request_with_timeout(&request, timeout.as_secs())
1701 .await
1702 }
1703}
1704
1705impl Drop for PoolManager {
1706 fn drop(&mut self) {
1707 let _ = std::fs::remove_file(&self.socket_path);
1708 }
1709}
1710
1711static POOL_MANAGER: std::sync::OnceLock<Arc<PoolManager>> = std::sync::OnceLock::new();
1713
1714pub fn get_pool_manager() -> Arc<PoolManager> {
1716 POOL_MANAGER
1717 .get_or_init(|| Arc::new(PoolManager::new()))
1718 .clone()
1719}
1720
1721#[cfg(test)]
1722mod tests {
1723 use super::*;
1724 use crate::services::plugins::script_executor::LogLevel;
1725
1726 #[test]
1727 fn test_is_dead_server_error_detects_dead_server() {
1728 let err = PluginError::PluginExecutionError("Connection refused".to_string());
1729 assert!(PoolManager::is_dead_server_error(&err));
1730
1731 let err = PluginError::PluginExecutionError("Broken pipe".to_string());
1732 assert!(PoolManager::is_dead_server_error(&err));
1733 }
1734
1735 #[test]
1736 fn test_is_dead_server_error_excludes_plugin_timeouts() {
1737 let err = PluginError::PluginExecutionError("Plugin timed out after 30s".to_string());
1738 assert!(!PoolManager::is_dead_server_error(&err));
1739
1740 let err = PluginError::PluginExecutionError("Handler timed out".to_string());
1741 assert!(!PoolManager::is_dead_server_error(&err));
1742 }
1743
1744 #[test]
1745 fn test_is_dead_server_error_normal_errors() {
1746 let err =
1747 PluginError::PluginExecutionError("TypeError: undefined is not a function".to_string());
1748 assert!(!PoolManager::is_dead_server_error(&err));
1749
1750 let err = PluginError::PluginExecutionError("Plugin returned invalid JSON".to_string());
1751 assert!(!PoolManager::is_dead_server_error(&err));
1752 }
1753
1754 #[test]
1755 fn test_is_dead_server_error_detects_all_dead_server_indicators() {
1756 let dead_server_errors = vec![
1758 "EOF while parsing JSON response",
1759 "Broken pipe when writing to socket",
1760 "Connection refused: server not running",
1761 "Connection reset by peer",
1762 "Socket not connected",
1763 "Failed to connect to pool server",
1764 "Socket file missing: /tmp/test.sock",
1765 "No such file or directory",
1766 ];
1767
1768 for error_msg in dead_server_errors {
1769 let err = PluginError::PluginExecutionError(error_msg.to_string());
1770 assert!(
1771 PoolManager::is_dead_server_error(&err),
1772 "Expected '{error_msg}' to be detected as dead server error"
1773 );
1774 }
1775 }
1776
1777 #[test]
1778 fn test_dead_server_indicator_patterns() {
1779 use super::super::health::DeadServerIndicator;
1781
1782 assert!(DeadServerIndicator::from_error_str("eof while parsing").is_some());
1784 assert!(DeadServerIndicator::from_error_str("broken pipe").is_some());
1785 assert!(DeadServerIndicator::from_error_str("connection refused").is_some());
1786 assert!(DeadServerIndicator::from_error_str("connection reset").is_some());
1787 assert!(DeadServerIndicator::from_error_str("not connected").is_some());
1788 assert!(DeadServerIndicator::from_error_str("failed to connect").is_some());
1789 assert!(DeadServerIndicator::from_error_str("socket file missing").is_some());
1790 assert!(DeadServerIndicator::from_error_str("no such file").is_some());
1791 assert!(DeadServerIndicator::from_error_str("connection timed out").is_some());
1792 assert!(DeadServerIndicator::from_error_str("connect timed out").is_some());
1793
1794 assert!(DeadServerIndicator::from_error_str("handler timed out").is_none());
1796 assert!(DeadServerIndicator::from_error_str("validation error").is_none());
1797 assert!(DeadServerIndicator::from_error_str("TypeError: undefined").is_none());
1798 }
1799
1800 #[test]
1801 fn test_is_dead_server_error_detects_connection_timeout_in_plugin_error() {
1802 let plugin_timeout =
1805 PluginError::PluginExecutionError("plugin connection timed out".to_string());
1806 assert!(PoolManager::is_dead_server_error(&plugin_timeout));
1807 }
1808
1809 #[test]
1810 fn test_is_dead_server_error_case_insensitive() {
1811 let err = PluginError::PluginExecutionError("CONNECTION REFUSED".to_string());
1813 assert!(PoolManager::is_dead_server_error(&err));
1814
1815 let err = PluginError::PluginExecutionError("BROKEN PIPE".to_string());
1816 assert!(PoolManager::is_dead_server_error(&err));
1817
1818 let err = PluginError::PluginExecutionError("Connection Reset By Peer".to_string());
1819 assert!(PoolManager::is_dead_server_error(&err));
1820 }
1821
1822 #[test]
1823 fn test_is_dead_server_error_handler_timeout_variations() {
1824 let timeout_errors = vec![
1826 "Handler timed out",
1827 "handler timed out after 30000ms",
1828 "Plugin handler timed out",
1829 "plugin timed out",
1830 "Plugin execution timed out after 60s",
1831 ];
1832
1833 for error_msg in timeout_errors {
1834 let err = PluginError::PluginExecutionError(error_msg.to_string());
1835 assert!(
1836 !PoolManager::is_dead_server_error(&err),
1837 "Expected '{error_msg}' to NOT be detected as dead server error"
1838 );
1839 }
1840 }
1841
1842 #[test]
1843 fn test_is_dead_server_error_business_errors_not_detected() {
1844 let business_errors = vec![
1846 "ReferenceError: x is not defined",
1847 "SyntaxError: Unexpected token",
1848 "TypeError: Cannot read property 'foo' of undefined",
1849 "Plugin returned status 400: Bad Request",
1850 "Validation error: missing required field",
1851 "Authorization failed",
1852 "Rate limit exceeded",
1853 "Plugin threw an error: Invalid input",
1854 ];
1855
1856 for error_msg in business_errors {
1857 let err = PluginError::PluginExecutionError(error_msg.to_string());
1858 assert!(
1859 !PoolManager::is_dead_server_error(&err),
1860 "Expected '{error_msg}' to NOT be detected as dead server error"
1861 );
1862 }
1863 }
1864
1865 #[test]
1866 fn test_is_dead_server_error_with_handler_error_type() {
1867 let handler_payload = PluginHandlerPayload {
1872 message: "Connection refused".to_string(),
1873 status: 500,
1874 code: None,
1875 details: None,
1876 logs: None,
1877 traces: None,
1878 };
1879 let err = PluginError::HandlerError(Box::new(handler_payload));
1880 assert!(!PoolManager::is_dead_server_error(&err));
1881 }
1882
1883 #[test]
1888 fn test_heap_calculation_base_case() {
1889 let base = PoolManager::BASE_HEAP_MB;
1891 let divisor = PoolManager::CONCURRENCY_DIVISOR;
1892 let increment = PoolManager::HEAP_INCREMENT_PER_DIVISOR_MB;
1893
1894 let concurrency = 100;
1897 let expected = base + ((concurrency / divisor) * increment);
1898 assert_eq!(expected, 832);
1899 }
1900
1901 #[test]
1902 fn test_heap_calculation_minimum() {
1903 let base = PoolManager::BASE_HEAP_MB;
1905 let divisor = PoolManager::CONCURRENCY_DIVISOR;
1906 let increment = PoolManager::HEAP_INCREMENT_PER_DIVISOR_MB;
1907
1908 let concurrency = 5;
1911 let expected = base + ((concurrency / divisor) * increment);
1912 assert_eq!(expected, 512);
1913 }
1914
1915 #[test]
1916 fn test_heap_calculation_high_concurrency() {
1917 let base = PoolManager::BASE_HEAP_MB;
1919 let divisor = PoolManager::CONCURRENCY_DIVISOR;
1920 let increment = PoolManager::HEAP_INCREMENT_PER_DIVISOR_MB;
1921
1922 let concurrency = 500;
1925 let expected = base + ((concurrency / divisor) * increment);
1926 assert_eq!(expected, 2112);
1927 }
1928
1929 #[test]
1930 fn test_heap_calculation_max_cap() {
1931 let max_heap = PoolManager::MAX_HEAP_MB;
1933 assert_eq!(max_heap, 8192);
1934
1935 let base = PoolManager::BASE_HEAP_MB;
1939 let divisor = PoolManager::CONCURRENCY_DIVISOR;
1940 let increment = PoolManager::HEAP_INCREMENT_PER_DIVISOR_MB;
1941
1942 let concurrency = 3000;
1943 let calculated = base + ((concurrency / divisor) * increment);
1944 let capped = calculated.min(max_heap);
1945
1946 assert_eq!(calculated, 10112);
1947 assert_eq!(capped, 8192);
1948 }
1949
1950 #[test]
1955 fn test_pool_manager_constants() {
1956 assert_eq!(PoolManager::BASE_HEAP_MB, 512);
1958 assert_eq!(PoolManager::CONCURRENCY_DIVISOR, 10);
1959 assert_eq!(PoolManager::HEAP_INCREMENT_PER_DIVISOR_MB, 32);
1960 assert_eq!(PoolManager::MAX_HEAP_MB, 8192);
1961 }
1962
1963 #[test]
1968 fn test_calculate_heap_size_low_concurrency() {
1969 assert_eq!(PoolManager::calculate_heap_size(5), 512);
1971 assert_eq!(PoolManager::calculate_heap_size(9), 512);
1972 }
1973
1974 #[test]
1975 fn test_calculate_heap_size_medium_concurrency() {
1976 assert_eq!(PoolManager::calculate_heap_size(10), 544);
1978 assert_eq!(PoolManager::calculate_heap_size(50), 672);
1980 assert_eq!(PoolManager::calculate_heap_size(100), 832);
1982 }
1983
1984 #[test]
1985 fn test_calculate_heap_size_high_concurrency() {
1986 assert_eq!(PoolManager::calculate_heap_size(500), 2112);
1988 assert_eq!(PoolManager::calculate_heap_size(1000), 3712);
1990 }
1991
1992 #[test]
1993 fn test_calculate_heap_size_capped_at_max() {
1994 assert_eq!(PoolManager::calculate_heap_size(3000), 8192);
1996 assert_eq!(PoolManager::calculate_heap_size(10000), 8192);
1998 }
1999
2000 #[test]
2001 fn test_calculate_heap_size_zero_concurrency() {
2002 assert_eq!(PoolManager::calculate_heap_size(0), 512);
2004 }
2005
2006 #[test]
2011 fn test_format_return_value_none() {
2012 assert_eq!(PoolManager::format_return_value(None), "");
2013 }
2014
2015 #[test]
2016 fn test_format_return_value_string() {
2017 let value = Some(serde_json::json!("hello world"));
2018 assert_eq!(PoolManager::format_return_value(value), "hello world");
2019 }
2020
2021 #[test]
2022 fn test_format_return_value_empty_string() {
2023 let value = Some(serde_json::json!(""));
2024 assert_eq!(PoolManager::format_return_value(value), "");
2025 }
2026
2027 #[test]
2028 fn test_format_return_value_object() {
2029 let value = Some(serde_json::json!({"key": "value", "num": 42}));
2030 let result = PoolManager::format_return_value(value);
2031 assert!(result.contains("key"));
2033 assert!(result.contains("value"));
2034 assert!(result.contains("42"));
2035 }
2036
2037 #[test]
2038 fn test_format_return_value_array() {
2039 let value = Some(serde_json::json!([1, 2, 3]));
2040 assert_eq!(PoolManager::format_return_value(value), "[1,2,3]");
2041 }
2042
2043 #[test]
2044 fn test_format_return_value_number() {
2045 let value = Some(serde_json::json!(42));
2046 assert_eq!(PoolManager::format_return_value(value), "42");
2047 }
2048
2049 #[test]
2050 fn test_format_return_value_boolean() {
2051 assert_eq!(
2052 PoolManager::format_return_value(Some(serde_json::json!(true))),
2053 "true"
2054 );
2055 assert_eq!(
2056 PoolManager::format_return_value(Some(serde_json::json!(false))),
2057 "false"
2058 );
2059 }
2060
2061 #[test]
2062 fn test_format_return_value_null() {
2063 let value = Some(serde_json::json!(null));
2064 assert_eq!(PoolManager::format_return_value(value), "null");
2065 }
2066
2067 #[test]
2072 fn test_parse_pool_response_success_with_string_result() {
2073 use super::super::protocol::{PoolLogEntry, PoolResponse};
2074
2075 let response = PoolResponse {
2076 task_id: "test-123".to_string(),
2077 success: true,
2078 result: Some(serde_json::json!("success result")),
2079 error: None,
2080 logs: Some(vec![PoolLogEntry {
2081 level: "info".to_string(),
2082 message: "test log".to_string(),
2083 }]),
2084 };
2085
2086 let result = PoolManager::parse_pool_response(response).unwrap();
2087 assert_eq!(result.return_value, "success result");
2088 assert!(result.error.is_empty());
2089 assert_eq!(result.logs.len(), 1);
2090 assert_eq!(result.logs[0].level, LogLevel::Info);
2091 assert_eq!(result.logs[0].message, "test log");
2092 }
2093
2094 #[test]
2095 fn test_parse_pool_response_success_with_object_result() {
2096 use super::super::protocol::PoolResponse;
2097
2098 let response = PoolResponse {
2099 task_id: "test-456".to_string(),
2100 success: true,
2101 result: Some(serde_json::json!({"data": "value"})),
2102 error: None,
2103 logs: None,
2104 };
2105
2106 let result = PoolManager::parse_pool_response(response).unwrap();
2107 assert!(result.return_value.contains("data"));
2108 assert!(result.return_value.contains("value"));
2109 assert!(result.logs.is_empty());
2110 }
2111
2112 #[test]
2113 fn test_parse_pool_response_success_no_result() {
2114 use super::super::protocol::PoolResponse;
2115
2116 let response = PoolResponse {
2117 task_id: "test-789".to_string(),
2118 success: true,
2119 result: None,
2120 error: None,
2121 logs: None,
2122 };
2123
2124 let result = PoolManager::parse_pool_response(response).unwrap();
2125 assert_eq!(result.return_value, "");
2126 assert!(result.error.is_empty());
2127 }
2128
2129 #[test]
2130 fn test_parse_pool_response_failure_with_error() {
2131 use super::super::protocol::{PoolError, PoolResponse};
2132
2133 let response = PoolResponse {
2134 task_id: "test-error".to_string(),
2135 success: false,
2136 result: None,
2137 error: Some(PoolError {
2138 message: "Something went wrong".to_string(),
2139 code: Some("ERR_001".to_string()),
2140 status: Some(400),
2141 details: Some(serde_json::json!({"field": "name"})),
2142 }),
2143 logs: None,
2144 };
2145
2146 let err = PoolManager::parse_pool_response(response).unwrap_err();
2147 match err {
2148 PluginError::HandlerError(payload) => {
2149 assert_eq!(payload.message, "Something went wrong");
2150 assert_eq!(payload.status, 400);
2151 assert_eq!(payload.code, Some("ERR_001".to_string()));
2152 }
2153 _ => panic!("Expected HandlerError"),
2154 }
2155 }
2156
2157 #[test]
2158 fn test_parse_pool_response_failure_no_error_details() {
2159 use super::super::protocol::PoolResponse;
2160
2161 let response = PoolResponse {
2162 task_id: "test-unknown".to_string(),
2163 success: false,
2164 result: None,
2165 error: None,
2166 logs: None,
2167 };
2168
2169 let err = PoolManager::parse_pool_response(response).unwrap_err();
2170 match err {
2171 PluginError::HandlerError(payload) => {
2172 assert_eq!(payload.message, "Unknown error");
2173 assert_eq!(payload.status, 500);
2174 }
2175 _ => panic!("Expected HandlerError"),
2176 }
2177 }
2178
2179 #[test]
2180 fn test_parse_pool_response_failure_preserves_logs() {
2181 use super::super::protocol::{PoolError, PoolLogEntry, PoolResponse};
2182
2183 let response = PoolResponse {
2184 task_id: "test-logs".to_string(),
2185 success: false,
2186 result: None,
2187 error: Some(PoolError {
2188 message: "Error with logs".to_string(),
2189 code: None,
2190 status: None,
2191 details: None,
2192 }),
2193 logs: Some(vec![
2194 PoolLogEntry {
2195 level: "debug".to_string(),
2196 message: "debug message".to_string(),
2197 },
2198 PoolLogEntry {
2199 level: "error".to_string(),
2200 message: "error message".to_string(),
2201 },
2202 ]),
2203 };
2204
2205 let err = PoolManager::parse_pool_response(response).unwrap_err();
2206 match err {
2207 PluginError::HandlerError(payload) => {
2208 let logs = payload.logs.unwrap();
2209 assert_eq!(logs.len(), 2);
2210 assert_eq!(logs[0].level, LogLevel::Debug);
2211 assert_eq!(logs[1].level, LogLevel::Error);
2212 }
2213 _ => panic!("Expected HandlerError"),
2214 }
2215 }
2216
2217 #[test]
2222 fn test_parse_success_response_complete() {
2223 use super::super::protocol::{PoolLogEntry, PoolResponse};
2224
2225 let response = PoolResponse {
2226 task_id: "task-1".to_string(),
2227 success: true,
2228 result: Some(serde_json::json!("completed")),
2229 error: None,
2230 logs: Some(vec![
2231 PoolLogEntry {
2232 level: "log".to_string(),
2233 message: "starting".to_string(),
2234 },
2235 PoolLogEntry {
2236 level: "result".to_string(),
2237 message: "finished".to_string(),
2238 },
2239 ]),
2240 };
2241
2242 let result = PoolManager::parse_success_response(response);
2243 assert_eq!(result.return_value, "completed");
2244 assert!(result.error.is_empty());
2245 assert_eq!(result.logs.len(), 2);
2246 assert_eq!(result.logs[0].level, LogLevel::Log);
2247 assert_eq!(result.logs[1].level, LogLevel::Result);
2248 }
2249
2250 #[test]
2255 fn test_parse_error_response_with_all_fields() {
2256 use super::super::protocol::{PoolError, PoolLogEntry, PoolResponse};
2257
2258 let response = PoolResponse {
2259 task_id: "err-task".to_string(),
2260 success: false,
2261 result: None,
2262 error: Some(PoolError {
2263 message: "Validation failed".to_string(),
2264 code: Some("VALIDATION_ERROR".to_string()),
2265 status: Some(422),
2266 details: Some(serde_json::json!({"fields": ["email"]})),
2267 }),
2268 logs: Some(vec![PoolLogEntry {
2269 level: "warn".to_string(),
2270 message: "validation warning".to_string(),
2271 }]),
2272 };
2273
2274 let err = PoolManager::parse_error_response(response);
2275 match err {
2276 PluginError::HandlerError(payload) => {
2277 assert_eq!(payload.message, "Validation failed");
2278 assert_eq!(payload.status, 422);
2279 assert_eq!(payload.code, Some("VALIDATION_ERROR".to_string()));
2280 assert!(payload.details.is_some());
2281 let logs = payload.logs.unwrap();
2282 assert_eq!(logs.len(), 1);
2283 assert_eq!(logs[0].level, LogLevel::Warn);
2284 }
2285 _ => panic!("Expected HandlerError"),
2286 }
2287 }
2288
2289 #[test]
2294 fn test_parse_health_result_complete() {
2295 let json = serde_json::json!({
2296 "status": "healthy",
2297 "uptime": 123456,
2298 "memory": {
2299 "heapUsed": 50000000,
2300 "heapTotal": 100000000
2301 },
2302 "pool": {
2303 "completed": 1000,
2304 "queued": 5
2305 },
2306 "execution": {
2307 "successRate": 0.99
2308 }
2309 });
2310
2311 let result = PoolManager::parse_health_result(&json);
2312
2313 assert_eq!(result.status, "healthy");
2314 assert_eq!(result.uptime_ms, Some(123456));
2315 assert_eq!(result.memory, Some(50000000));
2316 assert_eq!(result.pool_completed, Some(1000));
2317 assert_eq!(result.pool_queued, Some(5));
2318 assert!((result.success_rate.unwrap() - 0.99).abs() < 0.001);
2319 }
2320
2321 #[test]
2322 fn test_parse_health_result_minimal() {
2323 let json = serde_json::json!({});
2324
2325 let result = PoolManager::parse_health_result(&json);
2326
2327 assert_eq!(result.status, "unknown");
2328 assert_eq!(result.uptime_ms, None);
2329 assert_eq!(result.memory, None);
2330 assert_eq!(result.pool_completed, None);
2331 assert_eq!(result.pool_queued, None);
2332 assert_eq!(result.success_rate, None);
2333 }
2334
2335 #[test]
2336 fn test_parse_health_result_partial() {
2337 let json = serde_json::json!({
2338 "status": "degraded",
2339 "uptime": 5000,
2340 "memory": {
2341 "heapTotal": 100000000
2342 }
2344 });
2345
2346 let result = PoolManager::parse_health_result(&json);
2347
2348 assert_eq!(result.status, "degraded");
2349 assert_eq!(result.uptime_ms, Some(5000));
2350 assert_eq!(result.memory, None); assert_eq!(result.pool_completed, None);
2352 assert_eq!(result.pool_queued, None);
2353 assert_eq!(result.success_rate, None);
2354 }
2355
2356 #[test]
2357 fn test_parse_health_result_wrong_types() {
2358 let json = serde_json::json!({
2359 "status": 123, "uptime": "not a number", "memory": "invalid" });
2363
2364 let result = PoolManager::parse_health_result(&json);
2365
2366 assert_eq!(result.status, "unknown"); assert_eq!(result.uptime_ms, None);
2368 assert_eq!(result.memory, None);
2369 assert_eq!(result.pool_completed, None);
2370 assert_eq!(result.pool_queued, None);
2371 assert_eq!(result.success_rate, None);
2372 }
2373
2374 #[test]
2375 fn test_parse_health_result_nested_values() {
2376 let json = serde_json::json!({
2377 "pool": {
2378 "completed": 0,
2379 "queued": 0
2380 },
2381 "execution": {
2382 "successRate": 1.0
2383 }
2384 });
2385
2386 let result = PoolManager::parse_health_result(&json);
2387
2388 assert_eq!(result.status, "unknown");
2389 assert_eq!(result.uptime_ms, None);
2390 assert_eq!(result.memory, None);
2391 assert_eq!(result.pool_completed, Some(0));
2392 assert_eq!(result.pool_queued, Some(0));
2393 assert!((result.success_rate.unwrap() - 1.0).abs() < 0.001);
2394 }
2395
2396 #[tokio::test]
2401 async fn test_pool_manager_new_creates_unique_socket_path() {
2402 let manager1 = PoolManager::new();
2404 let manager2 = PoolManager::new();
2405
2406 assert_ne!(manager1.socket_path, manager2.socket_path);
2407 assert!(manager1
2408 .socket_path
2409 .starts_with("/tmp/relayer-plugin-pool-"));
2410 assert!(manager2
2411 .socket_path
2412 .starts_with("/tmp/relayer-plugin-pool-"));
2413 }
2414
2415 #[tokio::test]
2416 async fn test_pool_manager_with_custom_socket_path() {
2417 let custom_path = "/tmp/custom-test-pool.sock".to_string();
2418 let manager = PoolManager::with_socket_path(custom_path.clone());
2419
2420 assert_eq!(manager.socket_path, custom_path);
2421 }
2422
2423 #[tokio::test]
2424 async fn test_pool_manager_default_trait() {
2425 let manager = PoolManager::default();
2427 assert!(manager.socket_path.starts_with("/tmp/relayer-plugin-pool-"));
2428 }
2429
2430 #[tokio::test]
2435 async fn test_circuit_state_initial() {
2436 let manager = PoolManager::new();
2437
2438 assert_eq!(manager.circuit_state(), CircuitState::Closed);
2440 }
2441
2442 #[tokio::test]
2443 async fn test_avg_response_time_initial() {
2444 let manager = PoolManager::new();
2445
2446 assert_eq!(manager.avg_response_time_ms(), 0);
2448 }
2449
2450 #[tokio::test]
2455 async fn test_recovery_mode_initial() {
2456 let manager = PoolManager::new();
2457
2458 assert!(!manager.is_recovering());
2460 assert_eq!(manager.recovery_allowance_percent(), 0);
2461 }
2462
2463 #[test]
2468 fn test_script_result_success_construction() {
2469 let result = ScriptResult {
2470 logs: vec![LogEntry {
2471 level: LogLevel::Info,
2472 message: "Test log".to_string(),
2473 }],
2474 error: String::new(),
2475 return_value: r#"{"success": true}"#.to_string(),
2476 trace: vec![],
2477 };
2478
2479 assert!(result.error.is_empty());
2480 assert_eq!(result.logs.len(), 1);
2481 assert_eq!(result.logs[0].level, LogLevel::Info);
2482 }
2483
2484 #[test]
2485 fn test_script_result_with_multiple_logs() {
2486 let result = ScriptResult {
2487 logs: vec![
2488 LogEntry {
2489 level: LogLevel::Log,
2490 message: "Starting execution".to_string(),
2491 },
2492 LogEntry {
2493 level: LogLevel::Debug,
2494 message: "Processing data".to_string(),
2495 },
2496 LogEntry {
2497 level: LogLevel::Warn,
2498 message: "Deprecated API used".to_string(),
2499 },
2500 LogEntry {
2501 level: LogLevel::Error,
2502 message: "Non-fatal error".to_string(),
2503 },
2504 ],
2505 error: String::new(),
2506 return_value: "done".to_string(),
2507 trace: vec![],
2508 };
2509
2510 assert_eq!(result.logs.len(), 4);
2511 assert_eq!(result.logs[0].level, LogLevel::Log);
2512 assert_eq!(result.logs[1].level, LogLevel::Debug);
2513 assert_eq!(result.logs[2].level, LogLevel::Warn);
2514 assert_eq!(result.logs[3].level, LogLevel::Error);
2515 }
2516
2517 #[test]
2522 fn test_queued_request_required_fields() {
2523 let (tx, _rx) = oneshot::channel();
2524
2525 let request = QueuedRequest {
2526 plugin_id: "test-plugin".to_string(),
2527 compiled_code: Some("module.exports.handler = () => {}".to_string()),
2528 plugin_path: None,
2529 params: serde_json::json!({"key": "value"}),
2530 headers: None,
2531 socket_path: "/tmp/test.sock".to_string(),
2532 http_request_id: Some("req-123".to_string()),
2533 timeout_secs: Some(30),
2534 route: Some("/api/test".to_string()),
2535 config: Some(serde_json::json!({"setting": true})),
2536 method: Some("POST".to_string()),
2537 query: Some(serde_json::json!({"page": "1"})),
2538 response_tx: tx,
2539 };
2540
2541 assert_eq!(request.plugin_id, "test-plugin");
2542 assert!(request.compiled_code.is_some());
2543 assert!(request.plugin_path.is_none());
2544 assert_eq!(request.timeout_secs, Some(30));
2545 }
2546
2547 #[test]
2548 fn test_queued_request_minimal() {
2549 let (tx, _rx) = oneshot::channel();
2550
2551 let request = QueuedRequest {
2552 plugin_id: "minimal".to_string(),
2553 compiled_code: None,
2554 plugin_path: Some("/path/to/plugin.ts".to_string()),
2555 params: serde_json::json!(null),
2556 headers: None,
2557 socket_path: "/tmp/min.sock".to_string(),
2558 http_request_id: None,
2559 timeout_secs: None,
2560 route: None,
2561 config: None,
2562 method: None,
2563 query: None,
2564 response_tx: tx,
2565 };
2566
2567 assert_eq!(request.plugin_id, "minimal");
2568 assert!(request.compiled_code.is_none());
2569 assert!(request.plugin_path.is_some());
2570 }
2571
2572 #[test]
2577 fn test_plugin_error_socket_error() {
2578 let err = PluginError::SocketError("Connection failed".to_string());
2579 let display = format!("{err}");
2580 assert!(display.contains("Socket error"));
2581 assert!(display.contains("Connection failed"));
2582 }
2583
2584 #[test]
2585 fn test_plugin_error_plugin_execution_error() {
2586 let err = PluginError::PluginExecutionError("Execution failed".to_string());
2587 let display = format!("{err}");
2588 assert!(display.contains("Execution failed"));
2589 }
2590
2591 #[test]
2592 fn test_plugin_error_handler_error() {
2593 let payload = PluginHandlerPayload {
2594 message: "Handler error".to_string(),
2595 status: 400,
2596 code: Some("BAD_REQUEST".to_string()),
2597 details: Some(serde_json::json!({"field": "name"})),
2598 logs: None,
2599 traces: None,
2600 };
2601 let err = PluginError::HandlerError(Box::new(payload));
2602
2603 let display = format!("{err:?}");
2605 assert!(display.contains("HandlerError"));
2606 }
2607
2608 #[test]
2613 fn test_plugin_handler_payload_full() {
2614 let payload = PluginHandlerPayload {
2615 message: "Validation failed".to_string(),
2616 status: 422,
2617 code: Some("VALIDATION_ERROR".to_string()),
2618 details: Some(serde_json::json!({
2619 "errors": [
2620 {"field": "email", "message": "Invalid format"}
2621 ]
2622 })),
2623 logs: Some(vec![LogEntry {
2624 level: LogLevel::Error,
2625 message: "Validation failed for email".to_string(),
2626 }]),
2627 traces: Some(vec![serde_json::json!({"stack": "Error at line 10"})]),
2628 };
2629
2630 assert_eq!(payload.status, 422);
2631 assert_eq!(payload.code, Some("VALIDATION_ERROR".to_string()));
2632 assert!(payload.logs.is_some());
2633 assert!(payload.traces.is_some());
2634 }
2635
2636 #[test]
2637 fn test_plugin_handler_payload_minimal() {
2638 let payload = PluginHandlerPayload {
2639 message: "Error".to_string(),
2640 status: 500,
2641 code: None,
2642 details: None,
2643 logs: None,
2644 traces: None,
2645 };
2646
2647 assert_eq!(payload.status, 500);
2648 assert!(payload.code.is_none());
2649 assert!(payload.details.is_none());
2650 }
2651
2652 #[tokio::test]
2657 async fn test_pool_manager_not_initialized_health_check() {
2658 let manager = PoolManager::with_socket_path("/tmp/test-health.sock".to_string());
2659
2660 let health = manager.health_check().await.unwrap();
2662
2663 assert!(!health.healthy);
2664 assert_eq!(health.status, "not_initialized");
2665 assert!(health.uptime_ms.is_none());
2666 assert!(health.memory.is_none());
2667 }
2668
2669 #[tokio::test]
2670 async fn test_pool_manager_circuit_info_in_health_status() {
2671 let manager = PoolManager::with_socket_path("/tmp/test-circuit.sock".to_string());
2672
2673 let health = manager.health_check().await.unwrap();
2674
2675 assert!(health.circuit_state.is_some());
2677 assert_eq!(health.circuit_state, Some("closed".to_string()));
2678 assert!(health.avg_response_time_ms.is_some());
2679 assert!(health.recovering.is_some());
2680 assert!(health.recovery_percent.is_some());
2681 }
2682
2683 #[tokio::test]
2684 async fn test_invalidate_plugin_when_not_initialized() {
2685 let manager = PoolManager::with_socket_path("/tmp/test-invalidate.sock".to_string());
2686
2687 let result = manager.invalidate_plugin("test-plugin".to_string()).await;
2689
2690 assert!(result.is_ok());
2692 }
2693
2694 #[tokio::test]
2695 async fn test_shutdown_when_not_initialized() {
2696 let manager = PoolManager::with_socket_path("/tmp/test-shutdown.sock".to_string());
2697
2698 let result = manager.shutdown().await;
2700
2701 assert!(result.is_ok());
2703 }
2704
2705 #[test]
2710 fn test_parsed_health_result_default() {
2711 let result = ParsedHealthResult::default();
2712 assert_eq!(result.status, "");
2713 assert_eq!(result.uptime_ms, None);
2714 assert_eq!(result.memory, None);
2715 assert_eq!(result.pool_completed, None);
2716 assert_eq!(result.pool_queued, None);
2717 assert_eq!(result.success_rate, None);
2718 }
2719
2720 #[test]
2721 fn test_parsed_health_result_equality() {
2722 let result1 = ParsedHealthResult {
2723 status: "ok".to_string(),
2724 uptime_ms: Some(1000),
2725 memory: Some(500000),
2726 pool_completed: Some(50),
2727 pool_queued: Some(2),
2728 success_rate: Some(1.0),
2729 };
2730 let result2 = ParsedHealthResult {
2731 status: "ok".to_string(),
2732 uptime_ms: Some(1000),
2733 memory: Some(500000),
2734 pool_completed: Some(50),
2735 pool_queued: Some(2),
2736 success_rate: Some(1.0),
2737 };
2738 assert_eq!(result1, result2);
2739 }
2740
2741 #[test]
2742 fn test_format_return_value_nested_object() {
2743 let value = Some(serde_json::json!({
2744 "user": { "name": "John", "age": 30 }
2745 }));
2746 let result = PoolManager::format_return_value(value);
2747 assert!(result.contains("John"));
2748 assert!(result.contains("30"));
2749 }
2750
2751 #[test]
2752 fn test_format_return_value_empty_collections() {
2753 let value = Some(serde_json::json!({}));
2754 assert_eq!(PoolManager::format_return_value(value), "{}");
2755 let value = Some(serde_json::json!([]));
2756 assert_eq!(PoolManager::format_return_value(value), "[]");
2757 }
2758
2759 #[test]
2760 fn test_parse_health_result_zero_values() {
2761 let json = serde_json::json!({
2762 "status": "starting",
2763 "uptime": 0,
2764 "memory": { "heapUsed": 0 },
2765 "pool": { "completed": 0, "queued": 0 },
2766 "execution": { "successRate": 0.0 }
2767 });
2768 let result = PoolManager::parse_health_result(&json);
2769 assert_eq!(result.status, "starting");
2770 assert_eq!(result.uptime_ms, Some(0));
2771 assert_eq!(result.memory, Some(0));
2772 assert_eq!(result.pool_completed, Some(0));
2773 assert_eq!(result.pool_queued, Some(0));
2774 assert_eq!(result.success_rate, Some(0.0));
2775 }
2776
2777 #[test]
2778 fn test_calculate_heap_size_precise_calculations() {
2779 assert_eq!(PoolManager::calculate_heap_size(0), 512);
2780 assert_eq!(PoolManager::calculate_heap_size(1), 512);
2781 assert_eq!(PoolManager::calculate_heap_size(10), 544);
2782 assert_eq!(PoolManager::calculate_heap_size(20), 576);
2783 assert_eq!(PoolManager::calculate_heap_size(100), 832);
2784 assert_eq!(PoolManager::calculate_heap_size(200), 1152);
2785 }
2786
2787 #[tokio::test]
2788 async fn test_pool_manager_health_check_flag_initial() {
2789 let manager = PoolManager::new();
2790 assert!(!manager.health_check_needed.load(Ordering::Relaxed));
2791 }
2792
2793 #[tokio::test]
2794 async fn test_pool_manager_consecutive_failures_initial() {
2795 let manager = PoolManager::new();
2796 assert_eq!(manager.consecutive_failures.load(Ordering::Relaxed), 0);
2797 }
2798
2799 #[tokio::test]
2800 async fn test_recovery_allowance_bounds() {
2801 let manager = PoolManager::new();
2802 manager.recovery_allowance.store(0, Ordering::Relaxed);
2803 assert_eq!(manager.recovery_allowance_percent(), 0);
2804 manager.recovery_allowance.store(50, Ordering::Relaxed);
2805 assert_eq!(manager.recovery_allowance_percent(), 50);
2806 manager.recovery_allowance.store(100, Ordering::Relaxed);
2807 assert_eq!(manager.recovery_allowance_percent(), 100);
2808 }
2809
2810 #[tokio::test]
2811 async fn test_is_initialized_changes_with_state() {
2812 let manager = PoolManager::with_socket_path("/tmp/init-test-123.sock".to_string());
2813 assert!(!manager.is_initialized().await);
2814 manager.initialized.store(true, Ordering::Release);
2815 assert!(manager.is_initialized().await);
2816 manager.initialized.store(false, Ordering::Release);
2817 assert!(!manager.is_initialized().await);
2818 }
2819
2820 #[test]
2825 fn test_is_dead_server_error_with_script_timeout() {
2826 let err = PluginError::ScriptTimeout(30);
2828 assert!(!PoolManager::is_dead_server_error(&err));
2829 }
2830
2831 #[test]
2832 fn test_is_dead_server_error_with_plugin_error() {
2833 let err = PluginError::PluginError("some plugin error".to_string());
2834 assert!(!PoolManager::is_dead_server_error(&err));
2835 }
2836
2837 #[test]
2838 fn test_is_dead_server_error_with_connection_timeout_in_plugin_error() {
2839 let err = PluginError::PluginExecutionError("connection timed out".to_string());
2842 assert!(PoolManager::is_dead_server_error(&err));
2843
2844 let err = PluginError::SocketError("connect timed out".to_string());
2845 assert!(PoolManager::is_dead_server_error(&err));
2846 }
2847
2848 #[test]
2849 fn test_is_dead_server_error_with_request_timeout_socket_error() {
2850 let err = PluginError::SocketError("Request timed out after 304 seconds".to_string());
2851 assert!(PoolManager::is_dead_server_error(&err));
2852 }
2853
2854 #[test]
2855 fn test_parse_pool_response_success_with_logs_various_levels() {
2856 use super::super::protocol::{PoolLogEntry, PoolResponse};
2857
2858 let response = PoolResponse {
2859 task_id: "test-levels".to_string(),
2860 success: true,
2861 result: Some(serde_json::json!("ok")),
2862 error: None,
2863 logs: Some(vec![
2864 PoolLogEntry {
2865 level: "log".to_string(),
2866 message: "log level".to_string(),
2867 },
2868 PoolLogEntry {
2869 level: "debug".to_string(),
2870 message: "debug level".to_string(),
2871 },
2872 PoolLogEntry {
2873 level: "info".to_string(),
2874 message: "info level".to_string(),
2875 },
2876 PoolLogEntry {
2877 level: "warn".to_string(),
2878 message: "warn level".to_string(),
2879 },
2880 PoolLogEntry {
2881 level: "error".to_string(),
2882 message: "error level".to_string(),
2883 },
2884 PoolLogEntry {
2885 level: "result".to_string(),
2886 message: "result level".to_string(),
2887 },
2888 ]),
2889 };
2890
2891 let result = PoolManager::parse_pool_response(response).unwrap();
2892 assert_eq!(result.logs.len(), 6);
2893 assert_eq!(result.logs[0].level, LogLevel::Log);
2894 assert_eq!(result.logs[1].level, LogLevel::Debug);
2895 assert_eq!(result.logs[2].level, LogLevel::Info);
2896 assert_eq!(result.logs[3].level, LogLevel::Warn);
2897 assert_eq!(result.logs[4].level, LogLevel::Error);
2898 assert_eq!(result.logs[5].level, LogLevel::Result);
2899 }
2900
2901 #[test]
2902 fn test_parse_error_response_defaults() {
2903 use super::super::protocol::PoolResponse;
2904
2905 let response = PoolResponse {
2907 task_id: "no-error".to_string(),
2908 success: false,
2909 result: None,
2910 error: None,
2911 logs: None,
2912 };
2913
2914 let err = PoolManager::parse_error_response(response);
2915 match err {
2916 PluginError::HandlerError(payload) => {
2917 assert_eq!(payload.message, "Unknown error");
2918 assert_eq!(payload.status, 500);
2919 assert!(payload.code.is_none());
2920 assert!(payload.details.is_none());
2921 }
2922 _ => panic!("Expected HandlerError"),
2923 }
2924 }
2925
2926 #[test]
2927 fn test_format_return_value_float() {
2928 let value = Some(serde_json::json!(3.14159));
2929 let result = PoolManager::format_return_value(value);
2930 assert!(result.contains("3.14159"));
2931 }
2932
2933 #[test]
2934 fn test_format_return_value_large_array() {
2935 let value = Some(serde_json::json!([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]));
2936 let result = PoolManager::format_return_value(value);
2937 assert_eq!(result, "[1,2,3,4,5,6,7,8,9,10]");
2938 }
2939
2940 #[test]
2941 fn test_format_return_value_string_with_special_chars() {
2942 let value = Some(serde_json::json!("hello\nworld\ttab"));
2943 assert_eq!(PoolManager::format_return_value(value), "hello\nworld\ttab");
2944 }
2945
2946 #[test]
2947 fn test_format_return_value_unicode() {
2948 let value = Some(serde_json::json!("こんにちは世界 🌍"));
2949 assert_eq!(PoolManager::format_return_value(value), "こんにちは世界 🌍");
2950 }
2951
2952 #[test]
2953 fn test_parse_health_result_large_values() {
2954 let json = serde_json::json!({
2955 "status": "healthy",
2956 "uptime": 999999999999_u64,
2957 "memory": { "heapUsed": 9999999999_u64 },
2958 "pool": { "completed": 999999999_u64, "queued": 999999_u64 },
2959 "execution": { "successRate": 0.999999 }
2960 });
2961
2962 let result = PoolManager::parse_health_result(&json);
2963 assert_eq!(result.status, "healthy");
2964 assert_eq!(result.uptime_ms, Some(999999999999));
2965 assert_eq!(result.memory, Some(9999999999));
2966 assert_eq!(result.pool_completed, Some(999999999));
2967 assert_eq!(result.pool_queued, Some(999999));
2968 assert!((result.success_rate.unwrap() - 0.999999).abs() < 0.0000001);
2969 }
2970
2971 #[test]
2972 fn test_parse_health_result_negative_values_treated_as_none() {
2973 let json = serde_json::json!({
2975 "status": "error",
2976 "uptime": -1,
2977 "memory": { "heapUsed": -100 }
2978 });
2979
2980 let result = PoolManager::parse_health_result(&json);
2981 assert_eq!(result.status, "error");
2982 assert_eq!(result.uptime_ms, None); assert_eq!(result.memory, None);
2984 }
2985
2986 #[test]
2987 fn test_parsed_health_result_debug() {
2988 let result = ParsedHealthResult {
2989 status: "test".to_string(),
2990 uptime_ms: Some(100),
2991 memory: Some(200),
2992 pool_completed: Some(50),
2993 pool_queued: Some(5),
2994 success_rate: Some(0.95),
2995 };
2996
2997 let debug_str = format!("{result:?}");
2998 assert!(debug_str.contains("test"));
2999 assert!(debug_str.contains("100"));
3000 assert!(debug_str.contains("200"));
3001 }
3002
3003 #[test]
3004 fn test_calculate_heap_size_boundary_values() {
3005 assert_eq!(PoolManager::calculate_heap_size(9), 512);
3008 assert_eq!(PoolManager::calculate_heap_size(10), 544);
3010
3011 assert_eq!(PoolManager::calculate_heap_size(2400), 8192);
3014 assert_eq!(PoolManager::calculate_heap_size(2399), 8160);
3016 }
3017
3018 #[tokio::test]
3019 async fn test_pool_manager_socket_path_format() {
3020 let manager = PoolManager::new();
3021 assert!(manager.socket_path.starts_with("/tmp/relayer-plugin-pool-"));
3023 assert!(manager.socket_path.ends_with(".sock"));
3024 let uuid_part = manager
3026 .socket_path
3027 .strip_prefix("/tmp/relayer-plugin-pool-")
3028 .unwrap()
3029 .strip_suffix(".sock")
3030 .unwrap();
3031 assert_eq!(uuid_part.len(), 36);
3032 }
3033
3034 #[tokio::test]
3035 async fn test_health_check_socket_missing() {
3036 let manager =
3037 PoolManager::with_socket_path("/tmp/nonexistent-socket-12345.sock".to_string());
3038 manager.initialized.store(true, Ordering::Release);
3040
3041 let health = manager.health_check().await.unwrap();
3042 assert!(!health.healthy);
3043 assert_eq!(health.status, "socket_missing");
3044 }
3045
3046 #[test]
3047 fn test_is_dead_server_error_embedded_patterns() {
3048 let err = PluginError::PluginExecutionError(
3050 "Error: ECONNREFUSED connection refused at 127.0.0.1:3000".to_string(),
3051 );
3052 assert!(PoolManager::is_dead_server_error(&err));
3053
3054 let err = PluginError::PluginExecutionError(
3055 "SocketError: broken pipe while writing to /tmp/socket".to_string(),
3056 );
3057 assert!(PoolManager::is_dead_server_error(&err));
3058
3059 let err = PluginError::PluginExecutionError(
3060 "IO Error: No such file or directory (os error 2)".to_string(),
3061 );
3062 assert!(PoolManager::is_dead_server_error(&err));
3063 }
3064
3065 #[test]
3066 fn test_is_dead_server_error_mixed_case_timeout_patterns() {
3067 let variants = vec![
3069 "HANDLER TIMED OUT",
3070 "Handler Timed Out after 30s",
3071 "the handler timed out waiting for response",
3072 ];
3073
3074 for msg in variants {
3075 let err = PluginError::PluginExecutionError(msg.to_string());
3076 assert!(
3077 !PoolManager::is_dead_server_error(&err),
3078 "Expected '{msg}' to NOT be dead server error"
3079 );
3080 }
3081 }
3082
3083 #[tokio::test]
3084 async fn test_ensure_started_idempotent() {
3085 let manager = PoolManager::with_socket_path("/tmp/idempotent-test-999.sock".to_string());
3086
3087 assert!(!manager.is_initialized().await);
3089
3090 manager.initialized.store(true, Ordering::Release);
3092
3093 let result = manager.ensure_started().await;
3095 assert!(result.is_ok());
3096 assert!(manager.is_initialized().await);
3097 }
3098
3099 #[test]
3100 fn test_queued_request_with_headers() {
3101 let (tx, _rx) = oneshot::channel();
3102
3103 let mut headers = HashMap::new();
3104 headers.insert(
3105 "Authorization".to_string(),
3106 vec!["Bearer token".to_string()],
3107 );
3108 headers.insert(
3109 "Content-Type".to_string(),
3110 vec!["application/json".to_string()],
3111 );
3112
3113 let request = QueuedRequest {
3114 plugin_id: "headers-test".to_string(),
3115 compiled_code: None,
3116 plugin_path: Some("/path/to/plugin.ts".to_string()),
3117 params: serde_json::json!({}),
3118 headers: Some(headers),
3119 socket_path: "/tmp/test.sock".to_string(),
3120 http_request_id: None,
3121 timeout_secs: None,
3122 route: None,
3123 config: None,
3124 method: None,
3125 query: None,
3126 response_tx: tx,
3127 };
3128
3129 assert!(request.headers.is_some());
3130 let headers = request.headers.unwrap();
3131 assert!(headers.contains_key("Authorization"));
3132 assert!(headers.contains_key("Content-Type"));
3133 }
3134
3135 #[test]
3136 fn test_plugin_error_display_formats() {
3137 let err = PluginError::SocketError("test socket error".to_string());
3139 assert!(format!("{err}").contains("Socket error"));
3140
3141 let err = PluginError::PluginExecutionError("test execution error".to_string());
3142 assert!(format!("{err}").contains("test execution error"));
3143
3144 let err = PluginError::ScriptTimeout(60);
3145 assert!(format!("{err}").contains("60"));
3146
3147 let err = PluginError::PluginError("test plugin error".to_string());
3148 assert!(format!("{err}").contains("test plugin error"));
3149 }
3150
3151 #[test]
3152 fn test_pool_log_entry_to_log_entry_conversion() {
3153 use super::super::protocol::PoolLogEntry;
3154
3155 let pool_log = PoolLogEntry {
3157 level: "info".to_string(),
3158 message: "test message".to_string(),
3159 };
3160
3161 let log_entry: LogEntry = pool_log.into();
3162 assert_eq!(log_entry.level, LogLevel::Info);
3163 assert_eq!(log_entry.message, "test message");
3164
3165 let pool_log = PoolLogEntry {
3167 level: "unknown_level".to_string(),
3168 message: "unknown level message".to_string(),
3169 };
3170
3171 let log_entry: LogEntry = pool_log.into();
3172 assert_eq!(log_entry.level, LogLevel::Log); }
3174
3175 #[tokio::test]
3176 async fn test_circuit_breaker_records_success() {
3177 let manager = PoolManager::new();
3178
3179 manager.circuit_breaker.record_success(100);
3181 manager.circuit_breaker.record_success(150);
3182 manager.circuit_breaker.record_success(200);
3183
3184 let avg = manager.avg_response_time_ms();
3186 assert!(avg > 0);
3187 }
3188
3189 #[tokio::test]
3190 async fn test_circuit_breaker_state_transitions() {
3191 let manager = PoolManager::new();
3192
3193 assert_eq!(manager.circuit_state(), CircuitState::Closed);
3195
3196 for _ in 0..20 {
3198 manager.circuit_breaker.record_failure();
3199 }
3200
3201 let state = manager.circuit_state();
3203 assert!(matches!(
3204 state,
3205 CircuitState::Closed | CircuitState::HalfOpen | CircuitState::Open
3206 ));
3207 }
3208
3209 #[tokio::test]
3210 async fn test_recovery_mode_activation() {
3211 let manager = PoolManager::new();
3212
3213 manager.recovery_allowance.store(10, Ordering::Relaxed);
3215 manager.recovery_mode.store(true, Ordering::Relaxed);
3216
3217 assert!(manager.is_recovering());
3218 assert_eq!(manager.recovery_allowance_percent(), 10);
3219
3220 manager.recovery_allowance.store(50, Ordering::Relaxed);
3222 assert_eq!(manager.recovery_allowance_percent(), 50);
3223
3224 manager.recovery_mode.store(false, Ordering::Relaxed);
3226 assert!(!manager.is_recovering());
3227 }
3228
3229 #[test]
3230 fn test_parse_pool_response_with_empty_logs() {
3231 use super::super::protocol::PoolResponse;
3232
3233 let response = PoolResponse {
3234 task_id: "empty-logs".to_string(),
3235 success: true,
3236 result: Some(serde_json::json!("done")),
3237 error: None,
3238 logs: Some(vec![]), };
3240
3241 let result = PoolManager::parse_pool_response(response).unwrap();
3242 assert!(result.logs.is_empty());
3243 assert_eq!(result.return_value, "done");
3244 }
3245
3246 #[test]
3247 fn test_handler_payload_with_complex_details() {
3248 let payload = PluginHandlerPayload {
3249 message: "Complex error".to_string(),
3250 status: 400,
3251 code: Some("VALIDATION_ERROR".to_string()),
3252 details: Some(serde_json::json!({
3253 "errors": [
3254 {"field": "email", "code": "invalid", "message": "Invalid email format"},
3255 {"field": "password", "code": "weak", "message": "Password too weak"}
3256 ],
3257 "metadata": {
3258 "requestId": "req-123",
3259 "timestamp": "2024-01-01T00:00:00Z"
3260 }
3261 })),
3262 logs: None,
3263 traces: None,
3264 };
3265
3266 assert_eq!(payload.status, 400);
3267 let details = payload.details.unwrap();
3268 assert!(details.get("errors").is_some());
3269 assert!(details.get("metadata").is_some());
3270 }
3271
3272 #[test]
3273 fn test_health_status_construction_healthy() {
3274 use super::super::health::HealthStatus;
3275
3276 let status = HealthStatus {
3277 healthy: true,
3278 status: "ok".to_string(),
3279 uptime_ms: Some(1000000),
3280 memory: Some(500000000),
3281 pool_completed: Some(1000),
3282 pool_queued: Some(5),
3283 success_rate: Some(0.99),
3284 circuit_state: Some("closed".to_string()),
3285 avg_response_time_ms: Some(50),
3286 recovering: Some(false),
3287 recovery_percent: Some(100),
3288 shared_socket_available_slots: Some(100),
3289 shared_socket_active_connections: Some(10),
3290 shared_socket_registered_executions: Some(5),
3291 connection_pool_available_slots: Some(50),
3292 connection_pool_active_connections: Some(5),
3293 };
3294
3295 assert!(status.healthy);
3296 assert_eq!(status.status, "ok");
3297 assert_eq!(status.uptime_ms, Some(1000000));
3298 assert_eq!(status.circuit_state, Some("closed".to_string()));
3299 }
3300
3301 #[test]
3302 fn test_health_status_construction_unhealthy() {
3303 use super::super::health::HealthStatus;
3304
3305 let status = HealthStatus {
3306 healthy: false,
3307 status: "connection_failed".to_string(),
3308 uptime_ms: None,
3309 memory: None,
3310 pool_completed: None,
3311 pool_queued: None,
3312 success_rate: None,
3313 circuit_state: Some("open".to_string()),
3314 avg_response_time_ms: Some(0),
3315 recovering: Some(true),
3316 recovery_percent: Some(10),
3317 shared_socket_available_slots: None,
3318 shared_socket_active_connections: None,
3319 shared_socket_registered_executions: None,
3320 connection_pool_available_slots: None,
3321 connection_pool_active_connections: None,
3322 };
3323
3324 assert!(!status.healthy);
3325 assert_eq!(status.status, "connection_failed");
3326 assert!(status.uptime_ms.is_none());
3327 }
3328
3329 #[test]
3330 fn test_health_status_debug_format() {
3331 use super::super::health::HealthStatus;
3332
3333 let status = HealthStatus {
3334 healthy: true,
3335 status: "test".to_string(),
3336 uptime_ms: Some(100),
3337 memory: None,
3338 pool_completed: None,
3339 pool_queued: None,
3340 success_rate: None,
3341 circuit_state: None,
3342 avg_response_time_ms: None,
3343 recovering: None,
3344 recovery_percent: None,
3345 shared_socket_available_slots: None,
3346 shared_socket_active_connections: None,
3347 shared_socket_registered_executions: None,
3348 connection_pool_available_slots: None,
3349 connection_pool_active_connections: None,
3350 };
3351
3352 let debug_str = format!("{status:?}");
3353 assert!(debug_str.contains("healthy: true"));
3354 assert!(debug_str.contains("test"));
3355 }
3356
3357 #[test]
3358 fn test_health_status_clone() {
3359 use super::super::health::HealthStatus;
3360
3361 let status = HealthStatus {
3362 healthy: true,
3363 status: "original".to_string(),
3364 uptime_ms: Some(500),
3365 memory: Some(100),
3366 pool_completed: Some(10),
3367 pool_queued: Some(1),
3368 success_rate: Some(0.95),
3369 circuit_state: Some("closed".to_string()),
3370 avg_response_time_ms: Some(25),
3371 recovering: Some(false),
3372 recovery_percent: Some(100),
3373 shared_socket_available_slots: Some(50),
3374 shared_socket_active_connections: Some(2),
3375 shared_socket_registered_executions: Some(1),
3376 connection_pool_available_slots: Some(25),
3377 connection_pool_active_connections: Some(1),
3378 };
3379
3380 let cloned = status.clone();
3381 assert_eq!(cloned.healthy, status.healthy);
3382 assert_eq!(cloned.status, status.status);
3383 assert_eq!(cloned.uptime_ms, status.uptime_ms);
3384 }
3385
3386 #[test]
3387 fn test_execute_request_debug() {
3388 use super::super::protocol::ExecuteRequest;
3389
3390 let request = ExecuteRequest {
3391 task_id: "debug-test".to_string(),
3392 plugin_id: "test-plugin".to_string(),
3393 compiled_code: None,
3394 plugin_path: Some("/path/to/plugin.ts".to_string()),
3395 params: serde_json::json!({"test": true}),
3396 headers: None,
3397 socket_path: "/tmp/test.sock".to_string(),
3398 http_request_id: None,
3399 timeout: None,
3400 route: None,
3401 config: None,
3402 method: None,
3403 query: None,
3404 };
3405
3406 let debug_str = format!("{request:?}");
3407 assert!(debug_str.contains("debug-test"));
3408 assert!(debug_str.contains("test-plugin"));
3409 }
3410
3411 #[test]
3412 fn test_pool_error_debug() {
3413 use super::super::protocol::PoolError;
3414
3415 let error = PoolError {
3416 message: "Test error".to_string(),
3417 code: Some("TEST_ERR".to_string()),
3418 status: Some(400),
3419 details: Some(serde_json::json!({"info": "test"})),
3420 };
3421
3422 let debug_str = format!("{error:?}");
3423 assert!(debug_str.contains("Test error"));
3424 assert!(debug_str.contains("TEST_ERR"));
3425 }
3426
3427 #[test]
3428 fn test_pool_response_debug() {
3429 use super::super::protocol::PoolResponse;
3430
3431 let response = PoolResponse {
3432 task_id: "resp-123".to_string(),
3433 success: true,
3434 result: Some(serde_json::json!("result")),
3435 error: None,
3436 logs: None,
3437 };
3438
3439 let debug_str = format!("{response:?}");
3440 assert!(debug_str.contains("resp-123"));
3441 assert!(debug_str.contains("true"));
3442 }
3443
3444 #[test]
3445 fn test_pool_log_entry_debug() {
3446 use super::super::protocol::PoolLogEntry;
3447
3448 let entry = PoolLogEntry {
3449 level: "info".to_string(),
3450 message: "Test message".to_string(),
3451 };
3452
3453 let debug_str = format!("{entry:?}");
3454 assert!(debug_str.contains("info"));
3455 assert!(debug_str.contains("Test message"));
3456 }
3457
3458 #[test]
3459 fn test_circuit_breaker_default_trait() {
3460 use super::super::health::CircuitBreaker;
3461
3462 let cb = CircuitBreaker::default();
3463 assert_eq!(cb.state(), CircuitState::Closed);
3464 }
3465
3466 #[test]
3467 fn test_circuit_breaker_set_state_all_variants() {
3468 use super::super::health::CircuitBreaker;
3469
3470 let cb = CircuitBreaker::new();
3471
3472 cb.set_state(CircuitState::HalfOpen);
3474 assert_eq!(cb.state(), CircuitState::HalfOpen);
3475
3476 cb.set_state(CircuitState::Open);
3477 assert_eq!(cb.state(), CircuitState::Open);
3478
3479 cb.set_state(CircuitState::Closed);
3480 assert_eq!(cb.state(), CircuitState::Closed);
3481 }
3482
3483 #[test]
3484 fn test_circuit_breaker_failure_rate_triggers_open() {
3485 use super::super::health::CircuitBreaker;
3486
3487 let cb = CircuitBreaker::new();
3488
3489 for _ in 0..100 {
3491 cb.record_failure();
3492 }
3493
3494 assert_eq!(cb.state(), CircuitState::Open);
3495 }
3496
3497 #[test]
3498 fn test_circuit_breaker_low_failure_rate_stays_closed() {
3499 use super::super::health::CircuitBreaker;
3500
3501 let cb = CircuitBreaker::new();
3502
3503 for _ in 0..90 {
3505 cb.record_success(50);
3506 }
3507 for _ in 0..10 {
3508 cb.record_failure();
3509 }
3510
3511 assert_eq!(cb.state(), CircuitState::Closed);
3513 }
3514
3515 #[test]
3516 fn test_circuit_breaker_ema_response_time() {
3517 use super::super::health::CircuitBreaker;
3518
3519 let cb = CircuitBreaker::new();
3520
3521 cb.record_success(100);
3523 let avg1 = cb.avg_response_time();
3524
3525 cb.record_success(100);
3526 cb.record_success(100);
3527 cb.record_success(100);
3528 let avg2 = cb.avg_response_time();
3529
3530 assert!(avg1 > 0);
3532 assert!(avg2 > 0);
3533 assert!(avg2 <= 100);
3534 }
3535
3536 #[test]
3537 fn test_circuit_breaker_force_close_resets_counters() {
3538 use super::super::health::CircuitBreaker;
3539
3540 let cb = CircuitBreaker::new();
3541 cb.set_state(CircuitState::Open);
3542
3543 cb.force_close();
3544
3545 assert_eq!(cb.state(), CircuitState::Closed);
3546 }
3547
3548 #[test]
3549 fn test_process_status_debug() {
3550 use super::super::health::ProcessStatus;
3551
3552 assert_eq!(format!("{:?}", ProcessStatus::Running), "Running");
3553 assert_eq!(format!("{:?}", ProcessStatus::Exited), "Exited");
3554 assert_eq!(format!("{:?}", ProcessStatus::Unknown), "Unknown");
3555 assert_eq!(format!("{:?}", ProcessStatus::NoProcess), "NoProcess");
3556 }
3557
3558 #[test]
3559 fn test_process_status_clone() {
3560 use super::super::health::ProcessStatus;
3561
3562 let status = ProcessStatus::Running;
3563 let cloned = status;
3564 assert_eq!(status, cloned);
3565 }
3566
3567 #[test]
3572 fn test_dead_server_indicator_all_variants() {
3573 use super::super::health::DeadServerIndicator;
3574
3575 let variants = [
3577 ("eof while parsing", DeadServerIndicator::EofWhileParsing),
3578 ("broken pipe", DeadServerIndicator::BrokenPipe),
3579 ("connection refused", DeadServerIndicator::ConnectionRefused),
3580 ("connection reset", DeadServerIndicator::ConnectionReset),
3581 ("not connected", DeadServerIndicator::NotConnected),
3582 ("failed to connect", DeadServerIndicator::FailedToConnect),
3583 (
3584 "socket file missing",
3585 DeadServerIndicator::SocketFileMissing,
3586 ),
3587 ("no such file", DeadServerIndicator::NoSuchFile),
3588 (
3589 "connection timed out",
3590 DeadServerIndicator::ConnectionTimedOut,
3591 ),
3592 ("connect timed out", DeadServerIndicator::ConnectionTimedOut),
3593 ];
3594
3595 for (pattern, expected) in variants {
3596 let result = DeadServerIndicator::from_error_str(pattern);
3597 assert_eq!(result, Some(expected), "Pattern '{pattern}' should match");
3598 }
3599 }
3600
3601 #[test]
3602 fn test_dead_server_indicator_debug_format() {
3603 use super::super::health::DeadServerIndicator;
3604
3605 let indicator = DeadServerIndicator::BrokenPipe;
3606 let debug_str = format!("{indicator:?}");
3607 assert_eq!(debug_str, "BrokenPipe");
3608 }
3609
3610 #[test]
3611 fn test_dead_server_indicator_clone_copy() {
3612 use super::super::health::DeadServerIndicator;
3613
3614 let indicator = DeadServerIndicator::ConnectionRefused;
3615 let cloned = indicator;
3616 assert_eq!(indicator, cloned);
3617 }
3618
3619 #[test]
3620 fn test_result_ring_buffer_not_enough_data() {
3621 use super::super::health::ResultRingBuffer;
3622
3623 let buffer = ResultRingBuffer::new(100);
3624
3625 for _ in 0..9 {
3627 buffer.record(false);
3628 }
3629
3630 assert_eq!(buffer.failure_rate(), 0.0);
3632 }
3633
3634 #[test]
3635 fn test_result_ring_buffer_exactly_10_samples() {
3636 use super::super::health::ResultRingBuffer;
3637
3638 let buffer = ResultRingBuffer::new(100);
3639
3640 for _ in 0..10 {
3642 buffer.record(false);
3643 }
3644
3645 assert_eq!(buffer.failure_rate(), 1.0);
3647 }
3648
3649 #[test]
3650 fn test_result_ring_buffer_wraps_correctly() {
3651 use super::super::health::ResultRingBuffer;
3652
3653 let buffer = ResultRingBuffer::new(10);
3654
3655 for _ in 0..10 {
3657 buffer.record(true);
3658 }
3659 assert_eq!(buffer.failure_rate(), 0.0);
3660
3661 for _ in 0..10 {
3663 buffer.record(false);
3664 }
3665 assert_eq!(buffer.failure_rate(), 1.0);
3666 }
3667
3668 #[test]
3669 fn test_circuit_state_equality_all_pairs() {
3670 assert_eq!(CircuitState::Closed, CircuitState::Closed);
3671 assert_eq!(CircuitState::HalfOpen, CircuitState::HalfOpen);
3672 assert_eq!(CircuitState::Open, CircuitState::Open);
3673
3674 assert_ne!(CircuitState::Closed, CircuitState::HalfOpen);
3675 assert_ne!(CircuitState::Closed, CircuitState::Open);
3676 assert_ne!(CircuitState::HalfOpen, CircuitState::Open);
3677 }
3678
3679 #[test]
3680 fn test_circuit_state_clone_copy() {
3681 let state = CircuitState::HalfOpen;
3682 let copied = state;
3683 assert_eq!(state, copied);
3684 }
3685
3686 #[test]
3687 fn test_parse_pool_response_with_null_values() {
3688 use super::super::protocol::PoolResponse;
3689
3690 let response = PoolResponse {
3691 task_id: "null-test".to_string(),
3692 success: true,
3693 result: Some(serde_json::json!(null)),
3694 error: None,
3695 logs: None,
3696 };
3697
3698 let result = PoolManager::parse_pool_response(response).unwrap();
3699 assert_eq!(result.return_value, "null");
3700 }
3701
3702 #[test]
3703 fn test_parse_pool_response_with_nested_result() {
3704 use super::super::protocol::PoolResponse;
3705
3706 let response = PoolResponse {
3707 task_id: "nested-test".to_string(),
3708 success: true,
3709 result: Some(serde_json::json!({
3710 "level1": {
3711 "level2": {
3712 "level3": "deep value"
3713 }
3714 }
3715 })),
3716 error: None,
3717 logs: None,
3718 };
3719
3720 let result = PoolManager::parse_pool_response(response).unwrap();
3721 assert!(result.return_value.contains("level1"));
3722 assert!(result.return_value.contains("level2"));
3723 assert!(result.return_value.contains("level3"));
3724 assert!(result.return_value.contains("deep value"));
3725 }
3726
3727 #[test]
3728 fn test_parse_pool_response_error_with_details() {
3729 use super::super::protocol::{PoolError, PoolResponse};
3730
3731 let response = PoolResponse {
3732 task_id: "error-details".to_string(),
3733 success: false,
3734 result: None,
3735 error: Some(PoolError {
3736 message: "Error with details".to_string(),
3737 code: Some("DETAILED_ERROR".to_string()),
3738 status: Some(422),
3739 details: Some(serde_json::json!({
3740 "field": "email",
3741 "expected": "string",
3742 "received": "number"
3743 })),
3744 }),
3745 logs: None,
3746 };
3747
3748 let err = PoolManager::parse_pool_response(response).unwrap_err();
3749 match err {
3750 PluginError::HandlerError(payload) => {
3751 assert_eq!(payload.message, "Error with details");
3752 assert_eq!(payload.code, Some("DETAILED_ERROR".to_string()));
3753 assert!(payload.details.is_some());
3754 let details = payload.details.unwrap();
3755 assert_eq!(details.get("field").unwrap(), "email");
3756 }
3757 _ => panic!("Expected HandlerError"),
3758 }
3759 }
3760
3761 #[test]
3762 fn test_parse_health_result_with_all_optional_fields() {
3763 let json = serde_json::json!({
3764 "status": "healthy",
3765 "uptime": 999999,
3766 "memory": {
3767 "heapUsed": 123456789,
3768 "heapTotal": 987654321,
3769 "external": 111111,
3770 "arrayBuffers": 222222
3771 },
3772 "pool": {
3773 "completed": 50000,
3774 "queued": 100,
3775 "active": 50,
3776 "waiting": 25
3777 },
3778 "execution": {
3779 "successRate": 0.9999,
3780 "avgDuration": 45.5,
3781 "totalExecutions": 100000
3782 }
3783 });
3784
3785 let result = PoolManager::parse_health_result(&json);
3786 assert_eq!(result.status, "healthy");
3787 assert_eq!(result.uptime_ms, Some(999999));
3788 assert_eq!(result.memory, Some(123456789));
3789 assert_eq!(result.pool_completed, Some(50000));
3790 assert_eq!(result.pool_queued, Some(100));
3791 assert!((result.success_rate.unwrap() - 0.9999).abs() < 0.0001);
3792 }
3793
3794 #[tokio::test]
3795 async fn test_pool_manager_max_queue_size() {
3796 let manager = PoolManager::new();
3797 assert!(manager.max_queue_size > 0);
3799 }
3800
3801 #[tokio::test]
3802 async fn test_pool_manager_last_restart_time_initial() {
3803 let manager = PoolManager::new();
3804 assert_eq!(manager.last_restart_time_ms.load(Ordering::Relaxed), 0);
3805 }
3806
3807 #[tokio::test]
3808 async fn test_pool_manager_connection_pool_exists() {
3809 let manager = PoolManager::new();
3810 let available = manager.connection_pool.semaphore.available_permits();
3812 assert!(available > 0);
3813 }
3814
3815 #[test]
3816 fn test_is_dead_server_error_with_whitespace() {
3817 let err = PluginError::SocketError(" connection refused ".to_string());
3819 assert!(PoolManager::is_dead_server_error(&err));
3820
3821 let err = PluginError::SocketError("error: broken pipe occurred".to_string());
3822 assert!(PoolManager::is_dead_server_error(&err));
3823 }
3824
3825 #[test]
3826 fn test_is_dead_server_error_multiline() {
3827 let err = PluginError::SocketError(
3829 "Error occurred\nConnection refused\nPlease retry".to_string(),
3830 );
3831 assert!(PoolManager::is_dead_server_error(&err));
3832 }
3833
3834 #[test]
3835 fn test_is_dead_server_error_json_in_message() {
3836 let err = PluginError::PluginExecutionError(
3838 r#"{"error": "connection refused", "code": 61}"#.to_string(),
3839 );
3840 assert!(PoolManager::is_dead_server_error(&err));
3841 }
3842
3843 #[test]
3844 fn test_format_return_value_special_json() {
3845 let value = Some(serde_json::json!(f64::MAX));
3847 let result = PoolManager::format_return_value(value);
3848 assert!(!result.is_empty());
3849
3850 let value = Some(serde_json::json!(i64::MIN));
3851 let result = PoolManager::format_return_value(value);
3852 assert!(result.contains("-"));
3853 }
3854
3855 #[test]
3856 fn test_format_return_value_with_escaped_chars() {
3857 let value = Some(serde_json::json!("line1\nline2\ttab\"quote"));
3858 let result = PoolManager::format_return_value(value);
3859 assert!(result.contains("line1"));
3860 assert!(result.contains("line2"));
3861 }
3862
3863 #[test]
3864 fn test_format_return_value_array_of_objects() {
3865 let value = Some(serde_json::json!([
3866 {"id": 1, "name": "first"},
3867 {"id": 2, "name": "second"}
3868 ]));
3869 let result = PoolManager::format_return_value(value);
3870 assert!(result.contains("first"));
3871 assert!(result.contains("second"));
3872 }
3873
3874 #[test]
3875 fn test_all_log_levels_conversion() {
3876 use super::super::protocol::PoolLogEntry;
3877
3878 let levels = [
3879 ("log", LogLevel::Log),
3880 ("debug", LogLevel::Debug),
3881 ("info", LogLevel::Info),
3882 ("warn", LogLevel::Warn),
3883 ("error", LogLevel::Error),
3884 ("result", LogLevel::Result),
3885 ("unknown_level", LogLevel::Log), ("LOG", LogLevel::Log), ("", LogLevel::Log), ];
3889
3890 for (input, expected) in levels {
3891 let entry = PoolLogEntry {
3892 level: input.to_string(),
3893 message: "test".to_string(),
3894 };
3895 let log_entry: LogEntry = entry.into();
3896 assert_eq!(
3897 log_entry.level, expected,
3898 "Level '{input}' should convert to {expected:?}"
3899 );
3900 }
3901 }
3902
3903 #[tokio::test]
3904 async fn test_pool_manager_health_check_flag_manipulation() {
3905 let manager = PoolManager::new();
3906
3907 manager.health_check_needed.store(true, Ordering::Relaxed);
3908 assert!(manager.health_check_needed.load(Ordering::Relaxed));
3909
3910 manager.health_check_needed.store(false, Ordering::Relaxed);
3911 assert!(!manager.health_check_needed.load(Ordering::Relaxed));
3912 }
3913
3914 #[tokio::test]
3915 async fn test_pool_manager_consecutive_failures_manipulation() {
3916 let manager = PoolManager::new();
3917
3918 manager.consecutive_failures.fetch_add(1, Ordering::Relaxed);
3919 assert_eq!(manager.consecutive_failures.load(Ordering::Relaxed), 1);
3920
3921 manager.consecutive_failures.fetch_add(5, Ordering::Relaxed);
3922 assert_eq!(manager.consecutive_failures.load(Ordering::Relaxed), 6);
3923
3924 manager.consecutive_failures.store(0, Ordering::Relaxed);
3925 assert_eq!(manager.consecutive_failures.load(Ordering::Relaxed), 0);
3926 }
3927
3928 #[test]
3929 fn test_parsed_health_result_with_all_none() {
3930 let result = ParsedHealthResult {
3931 status: "minimal".to_string(),
3932 uptime_ms: None,
3933 memory: None,
3934 pool_completed: None,
3935 pool_queued: None,
3936 success_rate: None,
3937 };
3938
3939 assert_eq!(result.status, "minimal");
3940 assert!(result.uptime_ms.is_none());
3941 assert!(result.memory.is_none());
3942 }
3943
3944 #[test]
3945 fn test_parsed_health_result_with_all_some() {
3946 let result = ParsedHealthResult {
3947 status: "complete".to_string(),
3948 uptime_ms: Some(u64::MAX),
3949 memory: Some(u64::MAX),
3950 pool_completed: Some(u64::MAX),
3951 pool_queued: Some(u64::MAX),
3952 success_rate: Some(1.0),
3953 };
3954
3955 assert_eq!(result.status, "complete");
3956 assert_eq!(result.uptime_ms, Some(u64::MAX));
3957 assert_eq!(result.success_rate, Some(1.0));
3958 }
3959
3960 #[test]
3961 fn test_calculate_heap_size_extensive_values() {
3962 let test_cases = [
3964 (0, 512),
3965 (1, 512),
3966 (5, 512),
3967 (9, 512),
3968 (10, 544),
3969 (11, 544),
3970 (19, 544),
3971 (20, 576),
3972 (50, 672),
3973 (100, 832),
3974 (150, 992),
3975 (200, 1152),
3976 (250, 1312),
3977 (300, 1472),
3978 (400, 1792),
3979 (500, 2112),
3980 (1000, 3712),
3981 (2000, 6912),
3982 (2400, 8192), (3000, 8192), (5000, 8192), (10000, 8192), ];
3987
3988 for (concurrency, expected_heap) in test_cases {
3989 let heap = PoolManager::calculate_heap_size(concurrency);
3990 assert_eq!(
3991 heap, expected_heap,
3992 "Concurrency {concurrency} should give heap {expected_heap}"
3993 );
3994 }
3995 }
3996
3997 #[tokio::test]
3998 async fn test_pool_manager_drop_cleans_socket() {
3999 let socket_path = format!("/tmp/test-drop-{}.sock", uuid::Uuid::new_v4());
4000
4001 std::fs::write(&socket_path, "test").unwrap();
4003 assert!(std::path::Path::new(&socket_path).exists());
4004
4005 {
4007 let _manager = PoolManager::with_socket_path(socket_path.clone());
4008 }
4010 assert!(!std::path::Path::new(&socket_path).exists());
4014 }
4015
4016 #[test]
4017 fn test_script_result_with_traces() {
4018 let result = ScriptResult {
4019 logs: vec![],
4020 error: String::new(),
4021 return_value: "with traces".to_string(),
4022 trace: vec![
4023 serde_json::json!({"action": "GET", "url": "/api/test"}),
4024 serde_json::json!({"action": "POST", "url": "/api/submit"}),
4025 ],
4026 };
4027
4028 assert_eq!(result.trace.len(), 2);
4029 assert!(result.trace[0].get("action").is_some());
4030 }
4031
4032 #[test]
4033 fn test_script_result_with_error() {
4034 let result = ScriptResult {
4035 logs: vec![LogEntry {
4036 level: LogLevel::Error,
4037 message: "Something went wrong".to_string(),
4038 }],
4039 error: "RuntimeError: undefined is not a function".to_string(),
4040 return_value: String::new(),
4041 trace: vec![],
4042 };
4043
4044 assert!(!result.error.is_empty());
4045 assert!(result.error.contains("RuntimeError"));
4046 assert_eq!(result.logs.len(), 1);
4047 }
4048
4049 #[test]
4050 fn test_plugin_handler_payload_with_traces() {
4051 let payload = PluginHandlerPayload {
4052 message: "Error with traces".to_string(),
4053 status: 500,
4054 code: None,
4055 details: None,
4056 logs: None,
4057 traces: Some(vec![
4058 serde_json::json!({"method": "GET", "path": "/health"}),
4059 serde_json::json!({"method": "POST", "path": "/execute"}),
4060 ]),
4061 };
4062
4063 assert!(payload.traces.is_some());
4064 assert_eq!(payload.traces.as_ref().unwrap().len(), 2);
4065 }
4066
4067 #[test]
4068 fn test_queued_request_all_optional_fields() {
4069 let (tx, _rx) = oneshot::channel();
4070
4071 let mut headers = HashMap::new();
4072 headers.insert(
4073 "X-Custom".to_string(),
4074 vec!["value1".to_string(), "value2".to_string()],
4075 );
4076
4077 let request = QueuedRequest {
4078 plugin_id: "full-request".to_string(),
4079 compiled_code: Some("compiled code here".to_string()),
4080 plugin_path: Some("/path/to/plugin.ts".to_string()),
4081 params: serde_json::json!({"key": "value", "number": 42}),
4082 headers: Some(headers),
4083 socket_path: "/tmp/full.sock".to_string(),
4084 http_request_id: Some("http-123".to_string()),
4085 timeout_secs: Some(60),
4086 route: Some("/api/v1/execute".to_string()),
4087 config: Some(serde_json::json!({"setting": true})),
4088 method: Some("PUT".to_string()),
4089 query: Some(serde_json::json!({"page": 1, "limit": 10})),
4090 response_tx: tx,
4091 };
4092
4093 assert_eq!(request.plugin_id, "full-request");
4094 assert!(request.compiled_code.is_some());
4095 assert!(request.plugin_path.is_some());
4096 assert!(request.headers.is_some());
4097 assert_eq!(request.timeout_secs, Some(60));
4098 assert_eq!(request.method, Some("PUT".to_string()));
4099 }
4100}