openzeppelin_relayer/services/plugins/
config.rs1use crate::constants::{
23 CONCURRENT_TASKS_HEADROOM_MULTIPLIER, DEFAULT_POOL_CONCURRENT_TASKS_PER_WORKER,
24 DEFAULT_POOL_CONNECT_RETRIES, DEFAULT_POOL_HEALTH_CHECK_INTERVAL_SECS,
25 DEFAULT_POOL_IDLE_TIMEOUT_MS, DEFAULT_POOL_MAX_CONNECTIONS, DEFAULT_POOL_MAX_THREADS_FLOOR,
26 DEFAULT_POOL_MIN_THREADS, DEFAULT_POOL_QUEUE_SEND_TIMEOUT_MS, DEFAULT_POOL_SOCKET_BACKLOG,
27 DEFAULT_TRACE_TIMEOUT_MS, MAX_CONCURRENT_TASKS_PER_WORKER,
28};
29use std::sync::OnceLock;
30
31static CONFIG: OnceLock<PluginConfig> = OnceLock::new();
33
34#[derive(Debug, Clone)]
36pub struct PluginConfig {
37 pub max_concurrency: usize,
40
41 pub pool_max_connections: usize,
44 pub pool_connect_retries: usize,
46
47 pub pool_max_queue_size: usize,
50 pub pool_queue_send_timeout_ms: u64,
52 pub pool_workers: usize,
54
55 pub socket_max_connections: usize,
58
59 pub nodejs_pool_min_threads: usize,
62 pub nodejs_pool_max_threads: usize,
64 pub nodejs_pool_concurrent_tasks: usize,
66 pub nodejs_pool_idle_timeout_ms: u64,
68 pub nodejs_worker_heap_mb: usize,
70
71 pub pool_socket_backlog: usize,
74
75 pub health_check_interval_secs: u64,
78 pub trace_timeout_ms: u64,
80}
81
82impl PluginConfig {
83 pub fn from_env() -> Self {
85 let max_concurrency = env_parse("PLUGIN_MAX_CONCURRENCY", DEFAULT_POOL_MAX_CONNECTIONS);
88
89 let pool_max_connections = env_parse("PLUGIN_POOL_MAX_CONNECTIONS", max_concurrency);
93
94 let socket_max_connections = env_parse(
96 "PLUGIN_SOCKET_MAX_CONCURRENT_CONNECTIONS",
97 (max_concurrency as f64 * 1.5) as usize,
98 );
99
100 let pool_max_queue_size = env_parse("PLUGIN_POOL_MAX_QUEUE_SIZE", max_concurrency * 2);
102
103 let cpu_count = std::thread::available_parallelism()
106 .map(|n| n.get())
107 .unwrap_or(4);
108
109 let estimated_memory_budget = 16384_u64 / 2; let estimated_memory_threads = (estimated_memory_budget / 1024).max(4) as usize;
113 let estimated_concurrency_threads = (max_concurrency / 200).max(cpu_count);
114 let estimated_max_threads = estimated_memory_threads
115 .min(estimated_concurrency_threads)
116 .clamp(DEFAULT_POOL_MAX_THREADS_FLOOR, 32); let base_queue_timeout = DEFAULT_POOL_QUEUE_SEND_TIMEOUT_MS;
122 let workload_per_thread = max_concurrency / estimated_max_threads.max(1);
123 let derived_queue_timeout = if workload_per_thread > 100 {
124 base_queue_timeout * 2 } else if workload_per_thread > 50 {
127 base_queue_timeout + 250 } else {
130 base_queue_timeout };
133 let pool_queue_send_timeout_ms =
134 env_parse("PLUGIN_POOL_QUEUE_SEND_TIMEOUT_MS", derived_queue_timeout);
135
136 let pool_connect_retries =
138 env_parse("PLUGIN_POOL_CONNECT_RETRIES", DEFAULT_POOL_CONNECT_RETRIES);
139 let pool_workers = env_parse("PLUGIN_POOL_WORKERS", 0); let health_check_interval_secs = env_parse(
142 "PLUGIN_POOL_HEALTH_CHECK_INTERVAL_SECS",
143 DEFAULT_POOL_HEALTH_CHECK_INTERVAL_SECS,
144 );
145 let trace_timeout_ms = env_parse("PLUGIN_TRACE_TIMEOUT_MS", DEFAULT_TRACE_TIMEOUT_MS);
146
147 let derived_min_threads = DEFAULT_POOL_MIN_THREADS.max(cpu_count / 2);
153 let nodejs_pool_min_threads = env_parse("PLUGIN_POOL_MIN_THREADS", derived_min_threads);
154
155 let total_memory_mb = {
175 #[cfg(target_os = "macos")]
176 {
177 use std::process::Command;
179 Command::new("sysctl")
180 .args(["-n", "hw.memsize"])
181 .output()
182 .ok()
183 .and_then(|o| String::from_utf8(o.stdout).ok())
184 .and_then(|s| s.trim().parse::<u64>().ok())
185 .map(|bytes| bytes / 1024 / 1024)
186 .unwrap_or(16384) }
188 #[cfg(target_os = "linux")]
189 {
190 std::fs::read_to_string("/proc/meminfo")
192 .ok()
193 .and_then(|contents| {
194 contents
195 .lines()
196 .find(|l| l.starts_with("MemTotal:"))
197 .and_then(|l| {
198 l.split_whitespace()
199 .nth(1)
200 .and_then(|s| s.parse::<u64>().ok())
201 })
202 })
203 .map(|kb| kb / 1024)
204 .unwrap_or(16384) }
206 #[cfg(not(any(target_os = "macos", target_os = "linux")))]
207 {
208 16384_u64 }
210 };
211
212 let memory_budget_mb = total_memory_mb / 2;
216 let heap_per_worker_mb = 1024_u64; let memory_based_max_threads = (memory_budget_mb / heap_per_worker_mb).max(4) as usize;
218
219 let concurrency_based_threads = (max_concurrency / 200).max(cpu_count);
223
224 let derived_max_threads = memory_based_max_threads
227 .min(concurrency_based_threads)
228 .clamp(DEFAULT_POOL_MAX_THREADS_FLOOR, 32); tracing::debug!(
231 total_memory_mb = total_memory_mb,
232 memory_based_max = memory_based_max_threads,
233 concurrency_based = concurrency_based_threads,
234 derived_max_threads = derived_max_threads,
235 "Thread scaling calculation"
236 );
237
238 let nodejs_pool_max_threads = env_parse("PLUGIN_POOL_MAX_THREADS", derived_max_threads);
239
240 let base_tasks = max_concurrency / nodejs_pool_max_threads.max(1);
250 let derived_concurrent_tasks =
251 ((base_tasks as f64 * CONCURRENT_TASKS_HEADROOM_MULTIPLIER) as usize).clamp(
252 DEFAULT_POOL_CONCURRENT_TASKS_PER_WORKER,
253 MAX_CONCURRENT_TASKS_PER_WORKER,
254 );
255 let nodejs_pool_concurrent_tasks =
256 env_parse("PLUGIN_POOL_CONCURRENT_TASKS", derived_concurrent_tasks);
257
258 let nodejs_pool_idle_timeout_ms =
259 env_parse("PLUGIN_POOL_IDLE_TIMEOUT", DEFAULT_POOL_IDLE_TIMEOUT_MS);
260
261 let base_worker_heap = 512_usize;
270 let heap_per_task = 5_usize;
271 let derived_worker_heap_mb =
272 (base_worker_heap + (nodejs_pool_concurrent_tasks * heap_per_task)).clamp(1024, 2048); let nodejs_worker_heap_mb = env_parse("PLUGIN_WORKER_HEAP_MB", derived_worker_heap_mb);
274
275 let default_backlog = DEFAULT_POOL_SOCKET_BACKLOG as usize;
283 let pool_socket_backlog = env_parse(
284 "PLUGIN_POOL_SOCKET_BACKLOG",
285 max_concurrency.max(default_backlog),
286 );
287
288 let config = Self {
289 max_concurrency,
290 pool_max_connections,
291 pool_connect_retries,
292 pool_max_queue_size,
293 pool_queue_send_timeout_ms,
294 pool_workers,
295 socket_max_connections,
296 nodejs_pool_min_threads,
297 nodejs_pool_max_threads,
298 nodejs_pool_concurrent_tasks,
299 nodejs_pool_idle_timeout_ms,
300 nodejs_worker_heap_mb,
301 pool_socket_backlog,
302 health_check_interval_secs,
303 trace_timeout_ms,
304 };
305
306 config.validate();
308
309 config
310 }
311
312 fn validate(&self) {
314 assert!(
316 self.pool_max_connections <= self.socket_max_connections,
317 "pool_max_connections ({}) must be <= socket_max_connections ({})",
318 self.pool_max_connections,
319 self.socket_max_connections
320 );
321 assert!(
322 self.nodejs_pool_min_threads <= self.nodejs_pool_max_threads,
323 "nodejs_pool_min_threads ({}) must be <= nodejs_pool_max_threads ({})",
324 self.nodejs_pool_min_threads,
325 self.nodejs_pool_max_threads
326 );
327 assert!(
328 self.max_concurrency > 0,
329 "max_concurrency must be > 0, got {}",
330 self.max_concurrency
331 );
332 assert!(
333 self.nodejs_pool_max_threads > 0,
334 "nodejs_pool_max_threads must be > 0, got {}",
335 self.nodejs_pool_max_threads
336 );
337
338 if self.pool_max_queue_size < self.max_concurrency {
340 tracing::warn!(
341 "pool_max_queue_size ({}) is less than max_concurrency ({}). \
342 This may cause request rejections under load.",
343 self.pool_max_queue_size,
344 self.max_concurrency
345 );
346 }
347 if self.nodejs_pool_concurrent_tasks > 500 {
348 tracing::warn!(
349 "nodejs_pool_concurrent_tasks ({}) is very high. \
350 This may cause excessive memory usage per worker.",
351 self.nodejs_pool_concurrent_tasks
352 );
353 }
354 }
355
356 pub fn log_config(&self) {
358 let tasks_per_thread = self.max_concurrency / self.nodejs_pool_max_threads.max(1);
359 let socket_ratio = self.socket_max_connections as f64 / self.max_concurrency as f64;
360 let queue_ratio = self.pool_max_queue_size as f64 / self.max_concurrency as f64;
361 let total_worker_heap_mb = self.nodejs_pool_max_threads * self.nodejs_worker_heap_mb;
362
363 tracing::info!(
364 max_concurrency = self.max_concurrency,
365 pool_max_connections = self.pool_max_connections,
366 pool_max_queue_size = self.pool_max_queue_size,
367 queue_timeout_ms = self.pool_queue_send_timeout_ms,
368 socket_max_connections = self.socket_max_connections,
369 socket_backlog = self.pool_socket_backlog,
370 nodejs_min_threads = self.nodejs_pool_min_threads,
371 nodejs_max_threads = self.nodejs_pool_max_threads,
372 nodejs_concurrent_tasks = self.nodejs_pool_concurrent_tasks,
373 nodejs_worker_heap_mb = self.nodejs_worker_heap_mb,
374 total_worker_heap_mb = total_worker_heap_mb,
375 tasks_per_thread = tasks_per_thread,
376 socket_multiplier = %format!("{:.2}x", socket_ratio),
377 queue_multiplier = %format!("{:.2}x", queue_ratio),
378 "Plugin configuration loaded (Rust + Node.js)"
379 );
380 }
381}
382
383impl Default for PluginConfig {
384 fn default() -> Self {
388 let max_concurrency = DEFAULT_POOL_MAX_CONNECTIONS;
391 let cpu_count = std::thread::available_parallelism()
392 .map(|n| n.get())
393 .unwrap_or(4);
394
395 let pool_max_connections = max_concurrency;
397 let socket_max_connections = (max_concurrency as f64 * 1.5) as usize;
398 let pool_max_queue_size = max_concurrency * 2;
399
400 let assumed_memory_mb = 16384_u64;
403 let memory_budget_mb = assumed_memory_mb / 2;
404 let heap_per_worker_mb = 1024_u64; let memory_based_max_threads = (memory_budget_mb / heap_per_worker_mb).max(4) as usize;
406 let concurrency_based_threads = (max_concurrency / 200).max(cpu_count);
407
408 let nodejs_pool_max_threads = memory_based_max_threads
409 .min(concurrency_based_threads)
410 .clamp(DEFAULT_POOL_MAX_THREADS_FLOOR, 32);
411 let nodejs_pool_min_threads = DEFAULT_POOL_MIN_THREADS.max(cpu_count / 2);
412
413 let base_tasks = max_concurrency / nodejs_pool_max_threads.max(1);
414 let nodejs_pool_concurrent_tasks =
415 ((base_tasks as f64 * CONCURRENT_TASKS_HEADROOM_MULTIPLIER) as usize).clamp(
416 DEFAULT_POOL_CONCURRENT_TASKS_PER_WORKER,
417 MAX_CONCURRENT_TASKS_PER_WORKER,
418 );
419
420 let base_worker_heap = 512_usize;
422 let heap_per_task = 5_usize;
423 let nodejs_worker_heap_mb =
424 (base_worker_heap + (nodejs_pool_concurrent_tasks * heap_per_task)).clamp(1024, 2048);
425
426 let default_backlog = DEFAULT_POOL_SOCKET_BACKLOG as usize;
427 let pool_socket_backlog = max_concurrency.max(default_backlog);
428
429 Self {
430 max_concurrency,
431 pool_max_connections,
432 pool_connect_retries: DEFAULT_POOL_CONNECT_RETRIES,
433 pool_max_queue_size,
434 pool_queue_send_timeout_ms: DEFAULT_POOL_QUEUE_SEND_TIMEOUT_MS,
435 pool_workers: 0,
436 socket_max_connections,
437 nodejs_pool_min_threads,
438 nodejs_pool_max_threads,
439 nodejs_pool_concurrent_tasks,
440 nodejs_pool_idle_timeout_ms: DEFAULT_POOL_IDLE_TIMEOUT_MS,
441 nodejs_worker_heap_mb,
442 pool_socket_backlog,
443 health_check_interval_secs: DEFAULT_POOL_HEALTH_CHECK_INTERVAL_SECS,
444 trace_timeout_ms: DEFAULT_TRACE_TIMEOUT_MS,
445 }
446 }
447}
448
449pub fn get_config() -> &'static PluginConfig {
451 CONFIG.get_or_init(|| {
452 let config = PluginConfig::from_env();
453 config.log_config();
454 config
455 })
456}
457
458fn env_parse<T: std::str::FromStr>(name: &str, default: T) -> T {
460 std::env::var(name)
461 .ok()
462 .and_then(|s| s.parse().ok())
463 .unwrap_or(default)
464}
465
466#[cfg(test)]
467mod tests {
468 use super::*;
469
470 #[test]
471 fn test_default_config() {
472 let config = PluginConfig::default();
473 assert_eq!(config.max_concurrency, DEFAULT_POOL_MAX_CONNECTIONS);
474 assert_eq!(config.pool_max_connections, DEFAULT_POOL_MAX_CONNECTIONS);
475 assert_eq!(config.pool_max_queue_size, config.max_concurrency * 2);
477 assert!(
478 config.socket_max_connections >= config.pool_max_connections,
479 "socket connections should be >= pool connections"
480 );
481 }
482
483 #[test]
484 fn test_auto_derivation_ratios() {
485 let config = PluginConfig {
487 max_concurrency: 1000,
488 pool_max_connections: 1000,
489 socket_max_connections: 1500, pool_max_queue_size: 2000, ..Default::default()
492 };
493
494 assert_eq!(
495 config.socket_max_connections,
496 config.max_concurrency * 3 / 2
497 );
498 assert_eq!(config.pool_max_queue_size, config.max_concurrency * 2);
499 }
500
501 #[test]
502 fn test_very_low_concurrency() {
503 let max_concurrency = 10;
507 let cpu_count = std::thread::available_parallelism()
508 .map(|n| n.get())
509 .unwrap_or(4);
510
511 let pool_max_connections = max_concurrency;
512 let socket_max_connections = (max_concurrency as f64 * 1.5) as usize;
513 let pool_max_queue_size = max_concurrency * 2;
514
515 let memory_budget_mb = 16384 / 2;
517 let memory_based_max = (memory_budget_mb / 1024).max(4);
518 let concurrency_based = (max_concurrency / 200).max(cpu_count);
519 let nodejs_pool_max_threads = memory_based_max
520 .min(concurrency_based)
521 .max(DEFAULT_POOL_MAX_THREADS_FLOOR)
522 .min(32);
523
524 assert_eq!(pool_max_connections, 10);
525 assert_eq!(socket_max_connections, 15); assert_eq!(pool_max_queue_size, 20); assert!(nodejs_pool_max_threads >= DEFAULT_POOL_MAX_THREADS_FLOOR);
530 }
531
532 #[test]
533 fn test_medium_concurrency() {
534 let max_concurrency = 1000;
536 let cpu_count = std::thread::available_parallelism()
537 .map(|n| n.get())
538 .unwrap_or(4);
539
540 let socket_max_connections = (max_concurrency as f64 * 1.5) as usize;
541 let pool_max_queue_size = max_concurrency * 2;
542
543 let memory_budget_mb = 16384 / 2;
545 let memory_based_max = (memory_budget_mb / 1024).max(4);
546 let concurrency_based = (max_concurrency / 200).max(cpu_count);
547 let nodejs_pool_max_threads = memory_based_max
548 .min(concurrency_based)
549 .max(DEFAULT_POOL_MAX_THREADS_FLOOR)
550 .min(32);
551
552 assert_eq!(socket_max_connections, 1500); assert_eq!(pool_max_queue_size, 2000); assert!(nodejs_pool_max_threads <= 16);
559 }
560
561 #[test]
562 fn test_high_concurrency() {
563 let max_concurrency = 10000;
566
567 let socket_max_connections = (max_concurrency as f64 * 1.5) as usize;
568 let pool_max_queue_size = max_concurrency * 2;
569
570 let cpu_count = std::thread::available_parallelism()
571 .map(|n| n.get())
572 .unwrap_or(4);
573
574 let memory_budget_mb = 16384 / 2;
576 let memory_based_max = (memory_budget_mb / 1024).max(4);
577 let concurrency_based = (max_concurrency / 200).max(cpu_count);
578 let nodejs_pool_max_threads = memory_based_max
579 .min(concurrency_based)
580 .max(DEFAULT_POOL_MAX_THREADS_FLOOR)
581 .min(32);
582
583 assert_eq!(socket_max_connections, 15000); assert_eq!(pool_max_queue_size, 20000); assert!(nodejs_pool_max_threads <= 32);
589
590 let base_tasks = max_concurrency / nodejs_pool_max_threads;
592 let derived_concurrent_tasks = ((base_tasks as f64 * CONCURRENT_TASKS_HEADROOM_MULTIPLIER)
593 as usize)
594 .max(DEFAULT_POOL_CONCURRENT_TASKS_PER_WORKER)
595 .min(MAX_CONCURRENT_TASKS_PER_WORKER);
596 assert!(derived_concurrent_tasks <= MAX_CONCURRENT_TASKS_PER_WORKER);
598 }
599
600 #[test]
601 fn test_validation_catches_invalid_config() {
602 let mut config = PluginConfig::default();
603
604 config.pool_max_connections = 1000;
606 config.socket_max_connections = 500;
607
608 let result = std::panic::catch_unwind(|| {
609 config.validate();
610 });
611 assert!(
612 result.is_err(),
613 "Should panic on invalid pool > socket connections"
614 );
615 }
616
617 #[test]
618 fn test_validation_catches_invalid_threads() {
619 let mut config = PluginConfig::default();
620
621 config.nodejs_pool_min_threads = 64;
623 config.nodejs_pool_max_threads = 8;
624
625 let result = std::panic::catch_unwind(|| {
626 config.validate();
627 });
628 assert!(result.is_err(), "Should panic on invalid min > max threads");
629 }
630
631 #[test]
632 fn test_overridden_values_respected() {
633 let max_concurrency = 1000;
636 let pool_max_queue_size = 5000; let pool_max_connections = 1000; assert_eq!(pool_max_connections, max_concurrency); assert_eq!(pool_max_queue_size, 5000); let auto_derived_queue = max_concurrency * 2;
645 assert_eq!(auto_derived_queue, 2000);
646 assert_ne!(pool_max_queue_size, auto_derived_queue); }
648}