1use super::config::get_config;
126use crate::jobs::JobProducerTrait;
127use crate::models::{
128 NetworkRepoModel, NotificationRepoModel, RelayerRepoModel, SignerRepoModel, ThinDataAppState,
129 TransactionRepoModel,
130};
131use crate::repositories::{
132 ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository, Repository,
133 TransactionCounterTrait, TransactionRepository,
134};
135use crate::services::plugins::relayer_api::{RelayerApi, Request};
136use scc::HashMap as SccHashMap;
137use serde::{Deserialize, Serialize};
138use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
139use std::sync::Arc;
140use std::time::{Duration, Instant};
141use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
142use tokio::net::{UnixListener, UnixStream};
143use tokio::sync::{mpsc, watch, Semaphore};
144use tracing::{debug, info, warn};
145
146use super::PluginError;
147
148fn log_socket_write_error(context: &str, error: &std::io::Error) {
152 match error.kind() {
153 std::io::ErrorKind::BrokenPipe | std::io::ErrorKind::ConnectionReset => {
154 debug!(
155 "Failed to write {}: {} (plugin likely timed out)",
156 context, error
157 );
158 }
159 _ => {
160 warn!("Failed to write {}: {}", context, error);
161 }
162 }
163}
164
165#[derive(Debug, Serialize, Deserialize, Clone)]
167#[serde(tag = "type", rename_all = "snake_case")]
168pub enum PluginMessage {
169 Register { execution_id: String },
171 ApiRequest {
173 request_id: String,
174 relayer_id: String,
175 method: crate::services::plugins::relayer_api::PluginMethod,
176 payload: serde_json::Value,
177 },
178 ApiResponse {
180 request_id: String,
181 result: Option<serde_json::Value>,
182 error: Option<String>,
183 },
184 Trace { trace: serde_json::Value },
186 Shutdown,
188}
189
190struct ExecutionContext {
192 traces_tx: Option<mpsc::Sender<Vec<serde_json::Value>>>,
195 created_at: Instant,
197 #[allow(dead_code)] bound_execution_id: String,
201}
202
203pub struct ExecutionGuard {
205 execution_id: String,
206 executions: Arc<SccHashMap<String, ExecutionContext>>,
207 rx: Option<mpsc::Receiver<Vec<serde_json::Value>>>,
208 active_count: Arc<AtomicUsize>,
210 registered: bool,
213}
214
215impl ExecutionGuard {
216 pub fn into_receiver(mut self) -> Option<mpsc::Receiver<Vec<serde_json::Value>>> {
219 self.rx.take()
220 }
221}
222
223impl Drop for ExecutionGuard {
224 fn drop(&mut self) {
225 if self.registered && self.executions.remove(&self.execution_id).is_some() {
236 self.active_count.fetch_sub(1, Ordering::AcqRel);
237 }
238 }
239}
240
241pub struct SharedSocketService {
243 socket_path: String,
245 executions: Arc<SccHashMap<String, ExecutionContext>>,
248 active_count: Arc<AtomicUsize>,
250 started: AtomicBool,
252 shutdown_tx: watch::Sender<bool>,
254 connection_semaphore: Arc<Semaphore>,
256}
257
258impl SharedSocketService {
259 pub fn new(socket_path: &str) -> Result<Self, PluginError> {
261 let _ = std::fs::remove_file(socket_path);
263
264 let (shutdown_tx, _) = watch::channel(false);
265
266 let config = get_config();
268 let max_connections = config.socket_max_connections;
269
270 let executions: Arc<SccHashMap<String, ExecutionContext>> = Arc::new(SccHashMap::new());
271 let active_count = Arc::new(AtomicUsize::new(0));
272
273 let executions_clone = executions.clone();
275 let active_count_clone = active_count.clone();
276 let mut cleanup_shutdown_rx = shutdown_tx.subscribe();
277 tokio::spawn(async move {
278 let mut interval = tokio::time::interval(Duration::from_secs(60));
279 loop {
280 tokio::select! {
281 _ = interval.tick() => {}
282 _ = cleanup_shutdown_rx.changed() => {
283 if *cleanup_shutdown_rx.borrow() {
284 break;
285 }
286 }
287 }
288 let now = Instant::now();
289 let mut removed = 0usize;
291 executions_clone.retain(|_, ctx| {
292 let keep = now.duration_since(ctx.created_at) < Duration::from_secs(300);
293 if !keep {
294 removed += 1;
295 }
296 keep
297 });
298 if removed > 0 {
299 active_count_clone.fetch_sub(removed, Ordering::AcqRel);
300 }
301 }
302 });
303
304 Ok(Self {
305 socket_path: socket_path.to_string(),
306 executions,
307 active_count,
308 started: AtomicBool::new(false),
309 shutdown_tx,
310 connection_semaphore: Arc::new(Semaphore::new(max_connections)),
311 })
312 }
313
314 pub fn socket_path(&self) -> &str {
315 &self.socket_path
316 }
317
318 pub async fn register_execution(
325 &self,
326 execution_id: String,
327 emit_traces: bool,
328 ) -> ExecutionGuard {
329 let (tx, rx) = if emit_traces {
331 let (tx, rx) = mpsc::channel(1);
332 (Some(tx), Some(rx))
333 } else {
334 (None, None)
335 };
336
337 let ctx = ExecutionContext {
338 traces_tx: tx,
339 created_at: Instant::now(),
340 bound_execution_id: execution_id.clone(),
341 };
342
343 let registered = match self.executions.insert(execution_id.clone(), ctx) {
345 Ok(_) => {
346 self.active_count.fetch_add(1, Ordering::AcqRel);
347 true
348 }
349 Err((existing_key, _)) => {
350 tracing::warn!(
351 execution_id = %existing_key,
352 "Duplicate execution_id detected during registration, guard will not decrement counter"
353 );
354 false
355 }
356 };
357
358 ExecutionGuard {
359 execution_id,
360 executions: self.executions.clone(),
361 rx,
362 registered,
363 active_count: self.active_count.clone(),
364 }
365 }
366
367 pub fn available_connection_slots(&self) -> usize {
369 self.connection_semaphore.available_permits()
370 }
371
372 pub fn active_connection_count(&self) -> usize {
374 get_config().socket_max_connections - self.connection_semaphore.available_permits()
375 }
376
377 pub async fn registered_executions_count(&self) -> usize {
379 self.active_count.load(Ordering::Relaxed)
380 }
381
382 pub async fn shutdown(&self) {
384 let _ = self.shutdown_tx.send(true);
385 info!("Shared socket service: shutdown signal sent");
386
387 let max_wait = Duration::from_secs(30);
389 let start = Instant::now();
390
391 while start.elapsed() < max_wait {
392 let available = self.connection_semaphore.available_permits();
393 if available == get_config().socket_max_connections {
394 break;
396 }
397 tokio::time::sleep(Duration::from_millis(100)).await;
398 }
399
400 let _ = std::fs::remove_file(&self.socket_path);
402 info!("Shared socket service: shutdown complete");
403 }
404
405 #[allow(clippy::type_complexity)]
409 pub async fn start<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
410 self: Arc<Self>,
411 state: Arc<ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>>,
412 ) -> Result<(), PluginError>
413 where
414 J: JobProducerTrait + Send + Sync + 'static,
415 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
416 TR: TransactionRepository
417 + Repository<TransactionRepoModel, String>
418 + Send
419 + Sync
420 + 'static,
421 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
422 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
423 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
424 TCR: TransactionCounterTrait + Send + Sync + 'static,
425 PR: PluginRepositoryTrait + Send + Sync + 'static,
426 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
427 {
428 if self.started.swap(true, Ordering::Acquire) {
430 return Ok(());
431 }
432
433 let listener = UnixListener::bind(&self.socket_path)
435 .map_err(|e| PluginError::SocketError(format!("Failed to bind listener: {e}")))?;
436 let executions = self.executions.clone();
437 let relayer_api = Arc::new(RelayerApi);
438 let socket_path = self.socket_path.clone();
439 let mut shutdown_rx = self.shutdown_tx.subscribe();
440 let connection_semaphore = self.connection_semaphore.clone();
441
442 debug!(
443 "Shared socket service: starting listener on {}",
444 socket_path
445 );
446
447 tokio::spawn(async move {
449 debug!("Shared socket service: listener task started");
450 loop {
451 tokio::select! {
452 _ = shutdown_rx.changed() => {
454 if *shutdown_rx.borrow() {
455 info!("Shared socket service: shutting down listener");
456 break;
457 }
458 }
459 accept_result = listener.accept() => {
461 match accept_result {
462 Ok((stream, _)) => {
463 match connection_semaphore.clone().try_acquire_owned() {
465 Ok(permit) => {
466 debug!("Shared socket service: accepted new connection");
467
468 let relayer_api_clone = relayer_api.clone();
469 let state_clone = Arc::clone(&state);
470 let executions_clone = executions.clone();
471
472 tokio::spawn(async move {
473 let _permit = permit;
475
476 let result = Self::handle_connection(
477 stream,
478 relayer_api_clone,
479 state_clone,
480 executions_clone,
481 )
482 .await;
483
484 if let Err(e) = result {
485 debug!("Connection handler finished with error: {}", e);
486 }
487 });
488 }
489 Err(_) => {
490 warn!(
491 "Connection limit reached, rejecting new connection. \
492 Consider increasing PLUGIN_MAX_CONCURRENCY or PLUGIN_SOCKET_MAX_CONCURRENT_CONNECTIONS."
493 );
494 drop(stream);
495 }
496 }
497 }
498 Err(e) => {
499 warn!("Error accepting connection: {}", e);
500 }
501 }
502 }
503 }
504 }
505
506 let _ = std::fs::remove_file(&socket_path);
508 info!("Shared socket service: listener stopped");
509 });
510
511 Ok(())
512 }
513
514 #[allow(clippy::type_complexity)]
524 async fn handle_connection<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
525 stream: UnixStream,
526 relayer_api: Arc<RelayerApi>,
527 state: Arc<ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>>,
528 executions: Arc<SccHashMap<String, ExecutionContext>>,
529 ) -> Result<(), PluginError>
530 where
531 J: JobProducerTrait + Send + Sync + 'static,
532 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
533 TR: TransactionRepository
534 + Repository<TransactionRepoModel, String>
535 + Send
536 + Sync
537 + 'static,
538 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
539 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
540 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
541 TCR: TransactionCounterTrait + Send + Sync + 'static,
542 PR: PluginRepositoryTrait + Send + Sync + 'static,
543 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
544 {
545 let (r, mut w) = stream.into_split();
546 let mut reader = BufReader::new(r).lines();
547
548 let mut traces: Option<Vec<serde_json::Value>> = None;
550 let mut traces_enabled = false;
552
553 let mut bound_execution_id: Option<String> = None;
556
557 let safety_timeout =
562 Duration::from_secs(crate::constants::DEFAULT_PLUGIN_TIMEOUT_SECONDS + 60);
563
564 loop {
565 let line = match tokio::time::timeout(safety_timeout, reader.next_line()).await {
569 Ok(Ok(Some(line))) => line,
570 Ok(Ok(None)) => break, Ok(Err(e)) => {
572 warn!("Error reading from connection: {}", e);
573 break;
574 }
575 Err(_) => {
576 debug!(
577 "Connection safety timeout reached ({}s with no message)",
578 safety_timeout.as_secs()
579 );
580 break;
581 }
582 };
583
584 debug!("Shared socket service: received message");
585
586 let json_value: serde_json::Value = match serde_json::from_str(&line) {
588 Ok(v) => v,
589 Err(e) => {
590 warn!("Failed to parse JSON: {}", e);
591 continue;
592 }
593 };
594
595 let has_type_field = json_value.get("type").is_some();
596
597 if has_type_field {
598 let message: PluginMessage = match serde_json::from_value(json_value) {
600 Ok(msg) => msg,
601 Err(e) => {
602 warn!("Failed to parse PluginMessage: {}", e);
603 continue;
604 }
605 };
606
607 match message {
609 PluginMessage::Register { execution_id } => {
610 if bound_execution_id.is_some() {
612 warn!("Attempted to re-register connection (security violation)");
613 break;
614 }
615
616 if let Some(has_traces) =
619 executions.read(&execution_id, |_, ctx| ctx.traces_tx.is_some())
620 {
621 traces_enabled = has_traces;
622 } else {
623 warn!("Unknown execution_id: {}", execution_id);
624 break;
625 }
626
627 debug!(
628 execution_id = %execution_id,
629 traces_enabled = traces_enabled,
630 "Connection registered"
631 );
632 bound_execution_id = Some(execution_id);
633 }
634
635 PluginMessage::ApiRequest {
636 request_id,
637 relayer_id,
638 method,
639 payload,
640 } => {
641 let exec_id = match &bound_execution_id {
643 Some(id) => id,
644 None => {
645 warn!("ApiRequest before Register (security violation)");
646 break;
647 }
648 };
649
650 let request = Request {
652 request_id: request_id.clone(),
653 relayer_id,
654 method,
655 payload,
656 http_request_id: Some(exec_id.clone()),
657 };
658
659 let response = relayer_api.handle_request(request, &state).await;
661
662 let api_response = PluginMessage::ApiResponse {
664 request_id: response.request_id,
665 result: response.result,
666 error: response.error,
667 };
668
669 let response_str = serde_json::to_string(&api_response)
670 .map_err(|e| PluginError::PluginError(e.to_string()))?
671 + "\n";
672
673 if let Err(e) = w.write_all(response_str.as_bytes()).await {
674 log_socket_write_error("API response", &e);
675 break;
676 }
677
678 if let Err(e) = w.flush().await {
679 log_socket_write_error("API response flush", &e);
680 break;
681 }
682 }
683
684 PluginMessage::Trace { trace } => {
685 if traces_enabled {
687 if traces.is_none() {
688 traces = Some(Vec::new());
689 }
690 if let Some(ref mut t) = traces {
691 t.push(trace);
692 }
693 }
694 }
696
697 PluginMessage::Shutdown => {
698 debug!("Plugin requested shutdown");
699 break;
700 }
701
702 PluginMessage::ApiResponse { .. } => {
703 warn!("Received ApiResponse from plugin (invalid direction)");
704 continue;
705 }
706 }
707 } else {
708 if let Ok(request) = serde_json::from_value::<Request>(json_value.clone()) {
710 if bound_execution_id.is_none() {
714 let candidate_id = request
715 .http_request_id
716 .clone()
717 .or_else(|| Some(request.request_id.clone()));
718
719 if let Some(ref id) = candidate_id {
722 if let Some(has_traces) =
723 executions.read(id, |_, ctx| ctx.traces_tx.is_some())
724 {
725 traces_enabled = has_traces;
726 bound_execution_id = candidate_id;
727 } else {
728 debug!("Legacy request with unknown execution_id: {}", id);
729 }
730 }
731 }
732
733 let response = relayer_api.handle_request(request, &state).await;
735 let response_str = serde_json::to_string(&response)
736 .map_err(|e| PluginError::PluginError(e.to_string()))?
737 + "\n";
738
739 if let Err(e) = w.write_all(response_str.as_bytes()).await {
740 log_socket_write_error("response", &e);
741 break;
742 }
743
744 if let Err(e) = w.flush().await {
745 log_socket_write_error("response flush", &e);
746 break;
747 }
748 } else {
749 warn!("Failed to parse message as either PluginMessage or legacy Request");
750 }
751 }
752 }
753
754 if traces_enabled {
756 if let Some(exec_id) = bound_execution_id {
757 let traces_tx = executions
759 .read(&exec_id, |_, ctx| ctx.traces_tx.clone())
760 .flatten();
761
762 if let Some(tx) = traces_tx {
763 let collected_traces = traces.unwrap_or_default();
764 let trace_count = collected_traces.len();
765 match tokio::time::timeout(
768 Duration::from_millis(100),
769 tx.send(collected_traces),
770 )
771 .await
772 {
773 Ok(Ok(())) => {}
774 Ok(Err(_)) => {
775 if trace_count > 0 {
776 warn!(
777 "Trace channel closed for execution_id: {} ({} traces lost)",
778 exec_id, trace_count
779 );
780 }
781 }
782 Err(_) => warn!("Timeout sending traces for execution_id: {}", exec_id),
783 }
784 }
785 }
786 }
787 debug!("Shared socket service: connection closed");
790 Ok(())
791 }
792}
793
794impl Drop for SharedSocketService {
795 fn drop(&mut self) {
796 let _ = self.shutdown_tx.send(true);
798 }
801}
802
803static SHARED_SOCKET: std::sync::OnceLock<Result<Arc<SharedSocketService>, String>> =
805 std::sync::OnceLock::new();
806
807pub fn get_shared_socket_service() -> Result<Arc<SharedSocketService>, PluginError> {
810 let socket_path = "/tmp/relayer-plugin-shared.sock";
811
812 let result = SHARED_SOCKET.get_or_init(|| {
813 let _ = std::fs::remove_file(socket_path);
815
816 match SharedSocketService::new(socket_path) {
817 Ok(service) => Ok(Arc::new(service)),
818 Err(e) => Err(e.to_string()),
819 }
820 });
821
822 match result {
823 Ok(service) => Ok(service.clone()),
824 Err(e) => Err(PluginError::SocketError(format!(
825 "Failed to create shared socket service: {e}"
826 ))),
827 }
828}
829
830#[allow(clippy::type_complexity)]
832pub async fn ensure_shared_socket_started<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
833 state: Arc<ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>>,
834) -> Result<(), PluginError>
835where
836 J: JobProducerTrait + Send + Sync + 'static,
837 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
838 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
839 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
840 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
841 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
842 TCR: TransactionCounterTrait + Send + Sync + 'static,
843 PR: PluginRepositoryTrait + Send + Sync + 'static,
844 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
845{
846 let service = get_shared_socket_service()?;
847 service.start(state).await
848}
849
850#[cfg(test)]
851mod tests {
852 use super::*;
853 use crate::utils::mocks::mockutils::create_mock_app_state;
854 use actix_web::web;
855 use tempfile::tempdir;
856 use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
857 use tokio::net::UnixStream;
858
859 #[tokio::test]
860 async fn test_unified_protocol_register_and_api_request() {
861 let temp_dir = tempdir().unwrap();
862 let socket_path = temp_dir.path().join("shared.sock");
863
864 let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
865 let state = create_mock_app_state(None, None, None, None, None, None).await;
866
867 service
869 .clone()
870 .start(Arc::new(web::ThinData(state)))
871 .await
872 .unwrap();
873
874 let execution_id = "test-exec-123".to_string();
876 let _guard = service.register_execution(execution_id.clone(), true).await;
877
878 tokio::time::sleep(Duration::from_millis(50)).await;
880
881 let mut client = UnixStream::connect(socket_path.to_str().unwrap())
883 .await
884 .unwrap();
885
886 let register_msg = PluginMessage::Register {
888 execution_id: execution_id.clone(),
889 };
890 let msg_json = serde_json::to_string(®ister_msg).unwrap() + "\n";
891 client.write_all(msg_json.as_bytes()).await.unwrap();
892
893 let api_request = PluginMessage::ApiRequest {
895 request_id: "req-1".to_string(),
896 relayer_id: "relayer-1".to_string(),
897 method: crate::services::plugins::relayer_api::PluginMethod::GetRelayerStatus,
898 payload: serde_json::json!({}),
899 };
900 let req_json = serde_json::to_string(&api_request).unwrap() + "\n";
901 client.write_all(req_json.as_bytes()).await.unwrap();
902 client.flush().await.unwrap();
903
904 let (r, _w) = client.into_split();
906 let mut reader = BufReader::new(r);
907 let mut response_line = String::new();
908 reader.read_line(&mut response_line).await.unwrap();
909
910 let response: PluginMessage = serde_json::from_str(&response_line).unwrap();
911 match response {
912 PluginMessage::ApiResponse { request_id, .. } => {
913 assert_eq!(request_id, "req-1");
914 }
915 _ => panic!("Expected ApiResponse, got {response:?}"),
916 }
917
918 drop(reader);
919 drop(_w);
920 service.shutdown().await;
921 }
922
923 #[tokio::test]
924 async fn test_connection_tagging_prevents_spoofing() {
925 let temp_dir = tempdir().unwrap();
926 let socket_path = temp_dir.path().join("shared2.sock");
927
928 let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
929 let state = create_mock_app_state(None, None, None, None, None, None).await;
930
931 service
932 .clone()
933 .start(Arc::new(web::ThinData(state)))
934 .await
935 .unwrap();
936
937 let execution_id = "test-exec-456".to_string();
938 let _guard = service.register_execution(execution_id.clone(), true).await;
939
940 tokio::time::sleep(Duration::from_millis(50)).await;
941
942 let mut client = UnixStream::connect(socket_path.to_str().unwrap())
943 .await
944 .unwrap();
945
946 let register_msg = PluginMessage::Register {
948 execution_id: execution_id.clone(),
949 };
950 let msg_json = serde_json::to_string(®ister_msg).unwrap() + "\n";
951 client.write_all(msg_json.as_bytes()).await.unwrap();
952
953 let spoofed_register = PluginMessage::Register {
955 execution_id: "different-exec-id".to_string(),
956 };
957 let spoofed_json = serde_json::to_string(&spoofed_register).unwrap() + "\n";
958 client.write_all(spoofed_json.as_bytes()).await.unwrap();
959 client.flush().await.unwrap();
960
961 tokio::time::sleep(Duration::from_millis(100)).await;
963
964 let (r, _w) = client.into_split();
966 let mut reader = BufReader::new(r);
967 let mut line = String::new();
968 let result = reader.read_line(&mut line).await;
969
970 assert!(result.is_err() || result.unwrap() == 0);
972
973 drop(reader);
974 drop(_w);
975 service.shutdown().await;
976 }
977
978 #[tokio::test]
979 async fn test_backward_compatibility_with_legacy_format() {
980 let temp_dir = tempdir().unwrap();
981 let socket_path = temp_dir.path().join("shared3.sock");
982
983 let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
984 let state = create_mock_app_state(None, None, None, None, None, None).await;
985
986 service
987 .clone()
988 .start(Arc::new(web::ThinData(state)))
989 .await
990 .unwrap();
991
992 let execution_id = "test-exec-789".to_string();
993 let _guard = service.register_execution(execution_id.clone(), true).await;
994
995 tokio::time::sleep(Duration::from_millis(50)).await;
996
997 let mut client = UnixStream::connect(socket_path.to_str().unwrap())
998 .await
999 .unwrap();
1000
1001 let legacy_request = crate::services::plugins::relayer_api::Request {
1003 request_id: "legacy-1".to_string(),
1004 relayer_id: "relayer-1".to_string(),
1005 method: crate::services::plugins::relayer_api::PluginMethod::GetRelayerStatus,
1006 payload: serde_json::json!({}),
1007 http_request_id: Some(execution_id.clone()),
1008 };
1009 let legacy_json = serde_json::to_string(&legacy_request).unwrap() + "\n";
1010 client.write_all(legacy_json.as_bytes()).await.unwrap();
1011 client.flush().await.unwrap();
1012
1013 let (r, _w) = client.into_split();
1015 let mut reader = BufReader::new(r);
1016 let mut response_line = String::new();
1017 reader.read_line(&mut response_line).await.unwrap();
1018
1019 let response: crate::services::plugins::relayer_api::Response =
1020 serde_json::from_str(&response_line).unwrap();
1021
1022 assert_eq!(response.request_id, "legacy-1");
1023 assert!(response.result.is_some() || response.error.is_some());
1026
1027 drop(reader);
1028 drop(_w);
1029 service.shutdown().await;
1030 }
1031
1032 #[tokio::test]
1033 async fn test_trace_collection() {
1034 let temp_dir = tempdir().unwrap();
1035 let socket_path = temp_dir.path().join("shared4.sock");
1036
1037 let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1038 let state = create_mock_app_state(None, None, None, None, None, None).await;
1039
1040 service
1041 .clone()
1042 .start(Arc::new(web::ThinData(state)))
1043 .await
1044 .unwrap();
1045
1046 let execution_id = "test-exec-trace".to_string();
1047 let guard = service.register_execution(execution_id.clone(), true).await;
1048
1049 tokio::time::sleep(Duration::from_millis(50)).await;
1050
1051 let mut client = UnixStream::connect(socket_path.to_str().unwrap())
1052 .await
1053 .unwrap();
1054
1055 let register_msg = PluginMessage::Register {
1057 execution_id: execution_id.clone(),
1058 };
1059 client
1060 .write_all((serde_json::to_string(®ister_msg).unwrap() + "\n").as_bytes())
1061 .await
1062 .unwrap();
1063
1064 let trace1 = PluginMessage::Trace {
1066 trace: serde_json::json!({"event": "start", "timestamp": 1000}),
1067 };
1068 client
1069 .write_all((serde_json::to_string(&trace1).unwrap() + "\n").as_bytes())
1070 .await
1071 .unwrap();
1072
1073 let trace2 = PluginMessage::Trace {
1074 trace: serde_json::json!({"event": "processing", "timestamp": 2000}),
1075 };
1076 client
1077 .write_all((serde_json::to_string(&trace2).unwrap() + "\n").as_bytes())
1078 .await
1079 .unwrap();
1080
1081 let shutdown_msg = PluginMessage::Shutdown;
1083 client
1084 .write_all((serde_json::to_string(&shutdown_msg).unwrap() + "\n").as_bytes())
1085 .await
1086 .unwrap();
1087 client.flush().await.unwrap();
1088
1089 drop(client);
1090
1091 tokio::time::sleep(Duration::from_millis(100)).await;
1093
1094 let mut traces_rx = guard.into_receiver().expect("Traces should be enabled");
1096 let traces = traces_rx.recv().await.unwrap();
1097
1098 assert_eq!(traces.len(), 2);
1100 assert_eq!(traces[0]["event"], "start");
1101 assert_eq!(traces[1]["event"], "processing");
1102
1103 service.shutdown().await;
1104 }
1105
1106 #[tokio::test]
1107 async fn test_execution_guard_auto_unregister() {
1108 let temp_dir = tempdir().unwrap();
1109 let socket_path = temp_dir.path().join("shared_guard.sock");
1110
1111 let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1112 let execution_id = "test-exec-guard".to_string();
1113
1114 {
1115 let _guard = service.register_execution(execution_id.clone(), true).await;
1116
1117 assert_eq!(service.registered_executions_count().await, 1);
1119 }
1120 assert_eq!(service.registered_executions_count().await, 0);
1124 }
1125
1126 #[tokio::test]
1127 async fn test_api_request_without_register_rejected() {
1128 let temp_dir = tempdir().unwrap();
1129 let socket_path = temp_dir.path().join("shared_no_register.sock");
1130
1131 let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1132 let state = create_mock_app_state(None, None, None, None, None, None).await;
1133
1134 service
1135 .clone()
1136 .start(Arc::new(web::ThinData(state)))
1137 .await
1138 .unwrap();
1139
1140 tokio::time::sleep(Duration::from_millis(50)).await;
1141
1142 let mut client = UnixStream::connect(socket_path.to_str().unwrap())
1143 .await
1144 .unwrap();
1145
1146 let api_request = PluginMessage::ApiRequest {
1148 request_id: "req-1".to_string(),
1149 relayer_id: "relayer-1".to_string(),
1150 method: crate::services::plugins::relayer_api::PluginMethod::GetRelayerStatus,
1151 payload: serde_json::json!({}),
1152 };
1153 let req_json = serde_json::to_string(&api_request).unwrap() + "\n";
1154 client.write_all(req_json.as_bytes()).await.unwrap();
1155 client.flush().await.unwrap();
1156
1157 tokio::time::sleep(Duration::from_millis(100)).await;
1159
1160 let (r, _w) = client.into_split();
1161 let mut reader = BufReader::new(r);
1162 let mut line = String::new();
1163 let result = reader.read_line(&mut line).await;
1164
1165 assert!(result.is_err() || result.unwrap() == 0);
1167
1168 drop(reader);
1169 drop(_w);
1170 service.shutdown().await;
1171 }
1172
1173 #[tokio::test]
1174 async fn test_register_with_unknown_execution_id_rejected() {
1175 let temp_dir = tempdir().unwrap();
1176 let socket_path = temp_dir.path().join("shared_unknown_exec.sock");
1177
1178 let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1179 let state = create_mock_app_state(None, None, None, None, None, None).await;
1180
1181 service
1182 .clone()
1183 .start(Arc::new(web::ThinData(state)))
1184 .await
1185 .unwrap();
1186
1187 tokio::time::sleep(Duration::from_millis(50)).await;
1188
1189 let mut client = UnixStream::connect(socket_path.to_str().unwrap())
1190 .await
1191 .unwrap();
1192
1193 let register_msg = PluginMessage::Register {
1195 execution_id: "unknown-exec-id".to_string(),
1196 };
1197 let msg_json = serde_json::to_string(®ister_msg).unwrap() + "\n";
1198 client.write_all(msg_json.as_bytes()).await.unwrap();
1199 client.flush().await.unwrap();
1200
1201 tokio::time::sleep(Duration::from_millis(100)).await;
1203
1204 let (r, _w) = client.into_split();
1205 let mut reader = BufReader::new(r);
1206 let mut line = String::new();
1207 let result = reader.read_line(&mut line).await;
1208
1209 assert!(result.is_err() || result.unwrap() == 0);
1210
1211 drop(reader);
1212 drop(_w);
1213 service.shutdown().await;
1214 }
1215
1216 #[tokio::test]
1217 async fn test_connection_limit_enforcement() {
1218 let temp_dir = tempdir().unwrap();
1219 let socket_path = temp_dir.path().join("shared_connection_limit.sock");
1220
1221 let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1222 let state = create_mock_app_state(None, None, None, None, None, None).await;
1223
1224 service
1225 .clone()
1226 .start(Arc::new(web::ThinData(state)))
1227 .await
1228 .unwrap();
1229
1230 tokio::time::sleep(Duration::from_millis(50)).await;
1231
1232 let initial_permits = service.connection_semaphore.available_permits();
1234 let max_connections = get_config().socket_max_connections;
1235 assert_eq!(initial_permits, max_connections);
1236
1237 let _client = UnixStream::connect(socket_path.to_str().unwrap())
1239 .await
1240 .unwrap();
1241
1242 tokio::time::sleep(Duration::from_millis(50)).await;
1243
1244 let after_connect = service.connection_semaphore.available_permits();
1246 assert!(after_connect < initial_permits);
1247
1248 drop(_client);
1249 service.shutdown().await;
1250 }
1251
1252 #[tokio::test]
1253 async fn test_connection_stays_alive_within_safety_timeout() {
1254 let temp_dir = tempdir().unwrap();
1255 let socket_path = temp_dir.path().join("shared_safety.sock");
1256
1257 let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1258 let state = create_mock_app_state(None, None, None, None, None, None).await;
1259
1260 service
1261 .clone()
1262 .start(Arc::new(web::ThinData(state)))
1263 .await
1264 .unwrap();
1265
1266 let execution_id = "test-exec-safety".to_string();
1267 let _guard = service.register_execution(execution_id.clone(), true).await;
1268
1269 tokio::time::sleep(Duration::from_millis(50)).await;
1270
1271 let mut client = UnixStream::connect(socket_path.to_str().unwrap())
1272 .await
1273 .unwrap();
1274
1275 let register_msg = PluginMessage::Register { execution_id };
1277 client
1278 .write_all((serde_json::to_string(®ister_msg).unwrap() + "\n").as_bytes())
1279 .await
1280 .unwrap();
1281 client.flush().await.unwrap();
1282
1283 tokio::time::sleep(Duration::from_millis(200)).await;
1286
1287 let shutdown_msg = PluginMessage::Shutdown;
1289 let write_result = client
1290 .write_all((serde_json::to_string(&shutdown_msg).unwrap() + "\n").as_bytes())
1291 .await;
1292
1293 assert!(
1294 write_result.is_ok(),
1295 "Connection should still be alive within safety timeout"
1296 );
1297
1298 drop(client);
1299 service.shutdown().await;
1300 }
1301
1302 #[tokio::test]
1303 async fn test_idle_connection_cleaned_up_by_safety_timeout() {
1304 let temp_dir = tempdir().unwrap();
1305 let socket_path = temp_dir.path().join("shared_safety_timeout.sock");
1306
1307 let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1308 let state = create_mock_app_state(None, None, None, None, None, None).await;
1309
1310 service
1311 .clone()
1312 .start(Arc::new(web::ThinData(state)))
1313 .await
1314 .unwrap();
1315
1316 let execution_id = "test-exec-safety-timeout".to_string();
1317 let _guard = service.register_execution(execution_id.clone(), true).await;
1318
1319 tokio::time::sleep(Duration::from_millis(50)).await;
1320
1321 let mut client = UnixStream::connect(socket_path.to_str().unwrap())
1322 .await
1323 .unwrap();
1324
1325 let register_msg = PluginMessage::Register { execution_id };
1327 client
1328 .write_all((serde_json::to_string(®ister_msg).unwrap() + "\n").as_bytes())
1329 .await
1330 .unwrap();
1331 client.flush().await.unwrap();
1332
1333 tokio::time::sleep(Duration::from_millis(200)).await;
1338
1339 drop(client);
1341
1342 service.shutdown().await;
1343 }
1344
1345 #[tokio::test]
1346 async fn test_multiple_api_requests_same_connection() {
1347 let temp_dir = tempdir().unwrap();
1348 let socket_path = temp_dir.path().join("shared_multiple_requests.sock");
1349
1350 let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1351 let state = create_mock_app_state(None, None, None, None, None, None).await;
1352
1353 service
1354 .clone()
1355 .start(Arc::new(web::ThinData(state)))
1356 .await
1357 .unwrap();
1358
1359 let execution_id = "test-exec-multi".to_string();
1360 let _guard = service.register_execution(execution_id.clone(), true).await;
1361
1362 tokio::time::sleep(Duration::from_millis(50)).await;
1363
1364 let mut client = UnixStream::connect(socket_path.to_str().unwrap())
1365 .await
1366 .unwrap();
1367
1368 let register_msg = PluginMessage::Register {
1370 execution_id: execution_id.clone(),
1371 };
1372 client
1373 .write_all((serde_json::to_string(®ister_msg).unwrap() + "\n").as_bytes())
1374 .await
1375 .unwrap();
1376
1377 let (r, mut w) = client.into_split();
1378 let mut reader = BufReader::new(r);
1379
1380 for i in 1..=3 {
1382 let api_request = PluginMessage::ApiRequest {
1383 request_id: format!("req-{i}"),
1384 relayer_id: "relayer-1".to_string(),
1385 method: crate::services::plugins::relayer_api::PluginMethod::GetRelayerStatus,
1386 payload: serde_json::json!({}),
1387 };
1388 w.write_all((serde_json::to_string(&api_request).unwrap() + "\n").as_bytes())
1389 .await
1390 .unwrap();
1391 w.flush().await.unwrap();
1392
1393 let mut response_line = String::new();
1395 reader.read_line(&mut response_line).await.unwrap();
1396
1397 let response: PluginMessage = serde_json::from_str(&response_line).unwrap();
1398 match response {
1399 PluginMessage::ApiResponse { request_id, .. } => {
1400 assert_eq!(request_id, format!("req-{i}"));
1401 }
1402 _ => panic!("Expected ApiResponse"),
1403 }
1404 }
1405
1406 drop(reader);
1407 drop(w);
1408 service.shutdown().await;
1409 }
1410
1411 #[tokio::test]
1412 async fn test_shutdown_signal() {
1413 let temp_dir = tempdir().unwrap();
1414 let socket_path = temp_dir.path().join("shared_shutdown_signal.sock");
1415
1416 let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1417 let state = create_mock_app_state(None, None, None, None, None, None).await;
1418
1419 service
1420 .clone()
1421 .start(Arc::new(web::ThinData(state)))
1422 .await
1423 .unwrap();
1424
1425 tokio::time::sleep(Duration::from_millis(50)).await;
1426
1427 assert!(std::path::Path::new(socket_path.to_str().unwrap()).exists());
1429
1430 service.shutdown().await;
1432
1433 tokio::time::sleep(Duration::from_millis(100)).await;
1435
1436 assert!(!std::path::Path::new(socket_path.to_str().unwrap()).exists());
1438 }
1439
1440 #[tokio::test]
1441 async fn test_malformed_json_handling() {
1442 let temp_dir = tempdir().unwrap();
1443 let socket_path = temp_dir.path().join("shared_malformed.sock");
1444
1445 let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1446 let state = create_mock_app_state(None, None, None, None, None, None).await;
1447
1448 service
1449 .clone()
1450 .start(Arc::new(web::ThinData(state)))
1451 .await
1452 .unwrap();
1453
1454 let execution_id = "test-exec-malformed".to_string();
1455 let _guard = service.register_execution(execution_id.clone(), true).await;
1456
1457 tokio::time::sleep(Duration::from_millis(50)).await;
1458
1459 let mut client = UnixStream::connect(socket_path.to_str().unwrap())
1460 .await
1461 .unwrap();
1462
1463 let register_msg = PluginMessage::Register {
1465 execution_id: execution_id.clone(),
1466 };
1467 client
1468 .write_all((serde_json::to_string(®ister_msg).unwrap() + "\n").as_bytes())
1469 .await
1470 .unwrap();
1471
1472 client
1474 .write_all(b"{ this is not valid json }\n")
1475 .await
1476 .unwrap();
1477 client.flush().await.unwrap();
1478
1479 tokio::time::sleep(Duration::from_millis(100)).await;
1481
1482 let shutdown_msg = PluginMessage::Shutdown;
1484 let write_result = client
1485 .write_all((serde_json::to_string(&shutdown_msg).unwrap() + "\n").as_bytes())
1486 .await;
1487
1488 assert!(
1489 write_result.is_ok(),
1490 "Connection should still be alive after malformed JSON"
1491 );
1492
1493 drop(client);
1494 service.shutdown().await;
1495 }
1496
1497 #[tokio::test]
1498 async fn test_invalid_message_direction() {
1499 let temp_dir = tempdir().unwrap();
1500 let socket_path = temp_dir.path().join("shared_invalid_direction.sock");
1501
1502 let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1503 let state = create_mock_app_state(None, None, None, None, None, None).await;
1504
1505 service
1506 .clone()
1507 .start(Arc::new(web::ThinData(state)))
1508 .await
1509 .unwrap();
1510
1511 let execution_id = "test-exec-invalid-dir".to_string();
1512 let _guard = service.register_execution(execution_id.clone(), true).await;
1513
1514 tokio::time::sleep(Duration::from_millis(50)).await;
1515
1516 let mut client = UnixStream::connect(socket_path.to_str().unwrap())
1517 .await
1518 .unwrap();
1519
1520 let register_msg = PluginMessage::Register {
1522 execution_id: execution_id.clone(),
1523 };
1524 client
1525 .write_all((serde_json::to_string(®ister_msg).unwrap() + "\n").as_bytes())
1526 .await
1527 .unwrap();
1528
1529 let invalid_msg = PluginMessage::ApiResponse {
1531 request_id: "invalid".to_string(),
1532 result: Some(serde_json::json!({})),
1533 error: None,
1534 };
1535 client
1536 .write_all((serde_json::to_string(&invalid_msg).unwrap() + "\n").as_bytes())
1537 .await
1538 .unwrap();
1539 client.flush().await.unwrap();
1540
1541 tokio::time::sleep(Duration::from_millis(100)).await;
1543
1544 let shutdown_msg = PluginMessage::Shutdown;
1546 let write_result = client
1547 .write_all((serde_json::to_string(&shutdown_msg).unwrap() + "\n").as_bytes())
1548 .await;
1549
1550 assert!(write_result.is_ok());
1551
1552 drop(client);
1553 service.shutdown().await;
1554 }
1555
1556 #[tokio::test]
1557 async fn test_stale_execution_cleanup() {
1558 let temp_dir = tempdir().unwrap();
1559 let socket_path = temp_dir.path().join("shared_stale_cleanup.sock");
1560
1561 let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1562
1563 let execution_id = "stale-exec".to_string();
1565 let (tx, _rx) = mpsc::channel(1);
1566 let _ = service.executions.insert(
1568 execution_id.clone(),
1569 ExecutionContext {
1570 traces_tx: Some(tx),
1571 created_at: Instant::now() - Duration::from_secs(400), bound_execution_id: execution_id.clone(),
1573 },
1574 );
1575
1576 assert!(service.executions.contains(&execution_id));
1578
1579 drop(service);
1585 }
1586
1587 #[tokio::test]
1588 async fn test_socket_path_getter() {
1589 let temp_dir = tempdir().unwrap();
1590 let socket_path = temp_dir.path().join("shared_path.sock");
1591
1592 let service = SharedSocketService::new(socket_path.to_str().unwrap()).unwrap();
1593
1594 assert_eq!(service.socket_path(), socket_path.to_str().unwrap());
1595 }
1596
1597 #[tokio::test]
1598 async fn test_trace_send_timeout() {
1599 let temp_dir = tempdir().unwrap();
1600 let socket_path = temp_dir.path().join("shared_trace_timeout.sock");
1601
1602 let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1603 let state = create_mock_app_state(None, None, None, None, None, None).await;
1604
1605 service
1606 .clone()
1607 .start(Arc::new(web::ThinData(state)))
1608 .await
1609 .unwrap();
1610
1611 let execution_id = "test-exec-trace-timeout".to_string();
1612 let guard = service.register_execution(execution_id.clone(), true).await;
1613
1614 drop(guard);
1616
1617 tokio::time::sleep(Duration::from_millis(50)).await;
1618
1619 let mut client = UnixStream::connect(socket_path.to_str().unwrap())
1620 .await
1621 .unwrap();
1622
1623 let register_msg = PluginMessage::Register {
1625 execution_id: execution_id.clone(),
1626 };
1627 client
1628 .write_all((serde_json::to_string(®ister_msg).unwrap() + "\n").as_bytes())
1629 .await
1630 .unwrap();
1631
1632 let trace = PluginMessage::Trace {
1634 trace: serde_json::json!({"event": "test"}),
1635 };
1636 client
1637 .write_all((serde_json::to_string(&trace).unwrap() + "\n").as_bytes())
1638 .await
1639 .unwrap();
1640
1641 let shutdown_msg = PluginMessage::Shutdown;
1643 client
1644 .write_all((serde_json::to_string(&shutdown_msg).unwrap() + "\n").as_bytes())
1645 .await
1646 .unwrap();
1647 client.flush().await.unwrap();
1648
1649 drop(client);
1650
1651 tokio::time::sleep(Duration::from_millis(200)).await;
1653
1654 service.shutdown().await;
1655 }
1656
1657 #[tokio::test]
1658 async fn test_get_shared_socket_service() {
1659 let service1 = get_shared_socket_service();
1661 assert!(service1.is_ok());
1662
1663 let service2 = get_shared_socket_service();
1664 assert!(service2.is_ok());
1665
1666 let svc1 = service1.unwrap();
1668 let svc2 = service2.unwrap();
1669 let path1 = svc1.socket_path();
1670 let path2 = svc2.socket_path();
1671 assert_eq!(path1, path2);
1672 }
1673
1674 #[tokio::test]
1675 async fn test_available_connection_slots_and_active_count() {
1676 let temp_dir = tempdir().unwrap();
1677 let socket_path = temp_dir.path().join("shared_slots.sock");
1678
1679 let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1680 let state = create_mock_app_state(None, None, None, None, None, None).await;
1681 let max_connections = get_config().socket_max_connections;
1682
1683 assert_eq!(service.available_connection_slots(), max_connections);
1685 assert_eq!(service.active_connection_count(), 0);
1686
1687 service
1688 .clone()
1689 .start(Arc::new(web::ThinData(state)))
1690 .await
1691 .unwrap();
1692
1693 tokio::time::sleep(Duration::from_millis(50)).await;
1694
1695 let _client = UnixStream::connect(socket_path.to_str().unwrap())
1697 .await
1698 .unwrap();
1699
1700 tokio::time::sleep(Duration::from_millis(50)).await;
1701
1702 assert!(service.available_connection_slots() < max_connections);
1703 assert!(service.active_connection_count() > 0);
1704
1705 drop(_client);
1706 service.shutdown().await;
1707 }
1708
1709 #[tokio::test]
1710 async fn test_start_called_twice_is_idempotent() {
1711 let temp_dir = tempdir().unwrap();
1712 let socket_path = temp_dir.path().join("shared_double_start.sock");
1713
1714 let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1715 let state = create_mock_app_state(None, None, None, None, None, None).await;
1716 let thin = Arc::new(web::ThinData(state));
1717
1718 service.clone().start(thin.clone()).await.unwrap();
1720
1721 service.clone().start(thin).await.unwrap();
1723
1724 service.shutdown().await;
1725 }
1726
1727 #[tokio::test]
1728 async fn test_traces_discarded_when_emit_traces_false() {
1729 let temp_dir = tempdir().unwrap();
1730 let socket_path = temp_dir.path().join("shared_no_traces.sock");
1731
1732 let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1733 let state = create_mock_app_state(None, None, None, None, None, None).await;
1734
1735 service
1736 .clone()
1737 .start(Arc::new(web::ThinData(state)))
1738 .await
1739 .unwrap();
1740
1741 let execution_id = "test-exec-no-trace".to_string();
1742 let guard = service
1744 .register_execution(execution_id.clone(), false)
1745 .await;
1746 assert_eq!(service.registered_executions_count().await, 1);
1747
1748 tokio::time::sleep(Duration::from_millis(50)).await;
1749
1750 let mut client = UnixStream::connect(socket_path.to_str().unwrap())
1751 .await
1752 .unwrap();
1753
1754 let register_msg = PluginMessage::Register {
1756 execution_id: execution_id.clone(),
1757 };
1758 client
1759 .write_all((serde_json::to_string(®ister_msg).unwrap() + "\n").as_bytes())
1760 .await
1761 .unwrap();
1762
1763 let trace = PluginMessage::Trace {
1765 trace: serde_json::json!({"event": "should_be_discarded"}),
1766 };
1767 client
1768 .write_all((serde_json::to_string(&trace).unwrap() + "\n").as_bytes())
1769 .await
1770 .unwrap();
1771
1772 let shutdown_msg = PluginMessage::Shutdown;
1774 client
1775 .write_all((serde_json::to_string(&shutdown_msg).unwrap() + "\n").as_bytes())
1776 .await
1777 .unwrap();
1778 client.flush().await.unwrap();
1779 drop(client);
1780
1781 tokio::time::timeout(Duration::from_secs(1), async {
1783 loop {
1784 if service.active_connection_count() == 0 {
1785 break;
1786 }
1787 tokio::time::sleep(Duration::from_millis(20)).await;
1788 }
1789 })
1790 .await
1791 .expect("Connection should close promptly when traces are disabled");
1792
1793 assert!(guard.into_receiver().is_none());
1795 assert_eq!(service.registered_executions_count().await, 0);
1796
1797 service.shutdown().await;
1798 }
1799
1800 #[tokio::test]
1801 async fn test_legacy_protocol_without_http_request_id() {
1802 let temp_dir = tempdir().unwrap();
1803 let socket_path = temp_dir.path().join("shared_legacy_no_http_id.sock");
1804
1805 let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1806 let state = create_mock_app_state(None, None, None, None, None, None).await;
1807
1808 service
1809 .clone()
1810 .start(Arc::new(web::ThinData(state)))
1811 .await
1812 .unwrap();
1813
1814 let request_id = "legacy-fallback-id".to_string();
1816 let _guard = service.register_execution(request_id.clone(), true).await;
1817
1818 tokio::time::sleep(Duration::from_millis(50)).await;
1819
1820 let mut client = UnixStream::connect(socket_path.to_str().unwrap())
1821 .await
1822 .unwrap();
1823
1824 let legacy_request = crate::services::plugins::relayer_api::Request {
1827 request_id: request_id.clone(),
1828 relayer_id: "relayer-1".to_string(),
1829 method: crate::services::plugins::relayer_api::PluginMethod::GetRelayerStatus,
1830 payload: serde_json::json!({}),
1831 http_request_id: None,
1832 };
1833 let legacy_json = serde_json::to_string(&legacy_request).unwrap() + "\n";
1834 client.write_all(legacy_json.as_bytes()).await.unwrap();
1835 client.flush().await.unwrap();
1836
1837 let (r, _w) = client.into_split();
1839 let mut reader = BufReader::new(r);
1840 let mut response_line = String::new();
1841 reader.read_line(&mut response_line).await.unwrap();
1842
1843 let response: crate::services::plugins::relayer_api::Response =
1844 serde_json::from_str(&response_line).unwrap();
1845 assert_eq!(response.request_id, request_id);
1846 assert!(response.result.is_some() || response.error.is_some());
1847
1848 drop(reader);
1849 drop(_w);
1850 service.shutdown().await;
1851 }
1852
1853 #[tokio::test]
1854 async fn test_legacy_protocol_with_unknown_execution_id() {
1855 let temp_dir = tempdir().unwrap();
1856 let socket_path = temp_dir.path().join("shared_legacy_unknown.sock");
1857
1858 let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1859 let state = create_mock_app_state(None, None, None, None, None, None).await;
1860
1861 service
1862 .clone()
1863 .start(Arc::new(web::ThinData(state)))
1864 .await
1865 .unwrap();
1866
1867 tokio::time::sleep(Duration::from_millis(50)).await;
1871
1872 let mut client = UnixStream::connect(socket_path.to_str().unwrap())
1873 .await
1874 .unwrap();
1875
1876 let legacy_request = crate::services::plugins::relayer_api::Request {
1878 request_id: "unknown-req".to_string(),
1879 relayer_id: "relayer-1".to_string(),
1880 method: crate::services::plugins::relayer_api::PluginMethod::GetRelayerStatus,
1881 payload: serde_json::json!({}),
1882 http_request_id: Some("nonexistent-exec-id".to_string()),
1883 };
1884 let legacy_json = serde_json::to_string(&legacy_request).unwrap() + "\n";
1885 client.write_all(legacy_json.as_bytes()).await.unwrap();
1886 client.flush().await.unwrap();
1887
1888 let (r, _w) = client.into_split();
1890 let mut reader = BufReader::new(r);
1891 let mut response_line = String::new();
1892 reader.read_line(&mut response_line).await.unwrap();
1893
1894 let response: crate::services::plugins::relayer_api::Response =
1895 serde_json::from_str(&response_line).unwrap();
1896 assert_eq!(response.request_id, "unknown-req");
1897
1898 drop(reader);
1899 drop(_w);
1900 service.shutdown().await;
1901 }
1902
1903 #[tokio::test]
1904 async fn test_legacy_unparsable_message() {
1905 let temp_dir = tempdir().unwrap();
1906 let socket_path = temp_dir.path().join("shared_legacy_unparse.sock");
1907
1908 let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1909 let state = create_mock_app_state(None, None, None, None, None, None).await;
1910
1911 service
1912 .clone()
1913 .start(Arc::new(web::ThinData(state)))
1914 .await
1915 .unwrap();
1916
1917 let execution_id = "test-exec-legacy-unparse".to_string();
1918 let _guard = service.register_execution(execution_id.clone(), true).await;
1919
1920 tokio::time::sleep(Duration::from_millis(50)).await;
1921
1922 let mut client = UnixStream::connect(socket_path.to_str().unwrap())
1923 .await
1924 .unwrap();
1925
1926 let register_msg = PluginMessage::Register {
1928 execution_id: execution_id.clone(),
1929 };
1930 client
1931 .write_all((serde_json::to_string(®ister_msg).unwrap() + "\n").as_bytes())
1932 .await
1933 .unwrap();
1934
1935 client.write_all(b"{\"foo\": \"bar\"}\n").await.unwrap();
1938 client.flush().await.unwrap();
1939
1940 tokio::time::sleep(Duration::from_millis(100)).await;
1941
1942 let shutdown_msg = PluginMessage::Shutdown;
1944 let write_result = client
1945 .write_all((serde_json::to_string(&shutdown_msg).unwrap() + "\n").as_bytes())
1946 .await;
1947
1948 assert!(
1949 write_result.is_ok(),
1950 "Connection should still be alive after unparsable legacy message"
1951 );
1952
1953 drop(client);
1954 service.shutdown().await;
1955 }
1956
1957 #[tokio::test]
1958 async fn test_invalid_plugin_message_type() {
1959 let temp_dir = tempdir().unwrap();
1960 let socket_path = temp_dir.path().join("shared_invalid_type.sock");
1961
1962 let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
1963 let state = create_mock_app_state(None, None, None, None, None, None).await;
1964
1965 service
1966 .clone()
1967 .start(Arc::new(web::ThinData(state)))
1968 .await
1969 .unwrap();
1970
1971 let execution_id = "test-exec-invalid-type".to_string();
1972 let _guard = service.register_execution(execution_id.clone(), true).await;
1973
1974 tokio::time::sleep(Duration::from_millis(50)).await;
1975
1976 let mut client = UnixStream::connect(socket_path.to_str().unwrap())
1977 .await
1978 .unwrap();
1979
1980 let register_msg = PluginMessage::Register {
1982 execution_id: execution_id.clone(),
1983 };
1984 client
1985 .write_all((serde_json::to_string(®ister_msg).unwrap() + "\n").as_bytes())
1986 .await
1987 .unwrap();
1988
1989 client
1992 .write_all(b"{\"type\": \"nonexistent_type\", \"data\": \"foo\"}\n")
1993 .await
1994 .unwrap();
1995 client.flush().await.unwrap();
1996
1997 let api_request = PluginMessage::ApiRequest {
2000 request_id: "req-after-invalid".to_string(),
2001 relayer_id: "relayer-1".to_string(),
2002 method: crate::services::plugins::relayer_api::PluginMethod::GetRelayerStatus,
2003 payload: serde_json::json!({}),
2004 };
2005 client
2006 .write_all((serde_json::to_string(&api_request).unwrap() + "\n").as_bytes())
2007 .await
2008 .unwrap();
2009 client.flush().await.unwrap();
2010
2011 let (r, mut w) = client.into_split();
2012 let mut reader = BufReader::new(r);
2013 let mut response_line = String::new();
2014 tokio::time::timeout(Duration::from_secs(1), reader.read_line(&mut response_line))
2015 .await
2016 .expect("Timed out waiting for ApiResponse")
2017 .unwrap();
2018
2019 let response: PluginMessage = serde_json::from_str(&response_line).unwrap();
2020 match response {
2021 PluginMessage::ApiResponse { request_id, .. } => {
2022 assert_eq!(request_id, "req-after-invalid");
2023 }
2024 _ => panic!("Expected ApiResponse after invalid plugin message"),
2025 }
2026
2027 let shutdown_msg = PluginMessage::Shutdown;
2028 w.write_all((serde_json::to_string(&shutdown_msg).unwrap() + "\n").as_bytes())
2029 .await
2030 .unwrap();
2031 w.flush().await.unwrap();
2032
2033 drop(reader);
2034 drop(w);
2035 service.shutdown().await;
2036 }
2037
2038 #[tokio::test]
2039 async fn test_service_drop_sends_shutdown() {
2040 let temp_dir = tempdir().unwrap();
2041 let socket_path = temp_dir.path().join("shared_drop.sock");
2042
2043 let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
2044 let state = create_mock_app_state(None, None, None, None, None, None).await;
2045
2046 service
2047 .clone()
2048 .start(Arc::new(web::ThinData(state)))
2049 .await
2050 .unwrap();
2051
2052 let mut shutdown_rx = service.shutdown_tx.subscribe();
2054
2055 drop(service);
2057
2058 tokio::time::timeout(Duration::from_secs(1), shutdown_rx.changed())
2059 .await
2060 .expect("Timed out waiting for shutdown signal from Drop")
2061 .unwrap();
2062 assert!(
2063 *shutdown_rx.borrow(),
2064 "Drop should broadcast shutdown=true to listeners"
2065 );
2066 }
2067
2068 #[tokio::test]
2069 async fn test_trace_channel_closed_before_send() {
2070 let temp_dir = tempdir().unwrap();
2071 let socket_path = temp_dir.path().join("shared_trace_closed.sock");
2072
2073 let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
2074 let state = create_mock_app_state(None, None, None, None, None, None).await;
2075
2076 service
2077 .clone()
2078 .start(Arc::new(web::ThinData(state)))
2079 .await
2080 .unwrap();
2081
2082 let execution_id = "test-exec-trace-closed".to_string();
2083 let guard = service.register_execution(execution_id.clone(), true).await;
2084
2085 let rx = guard.into_receiver();
2087 assert!(rx.is_some());
2088 drop(rx);
2089
2090 tokio::time::sleep(Duration::from_millis(50)).await;
2091
2092 let mut client = UnixStream::connect(socket_path.to_str().unwrap())
2093 .await
2094 .unwrap();
2095
2096 let register_msg = PluginMessage::Register {
2098 execution_id: execution_id.clone(),
2099 };
2100 client
2101 .write_all((serde_json::to_string(®ister_msg).unwrap() + "\n").as_bytes())
2102 .await
2103 .unwrap();
2104
2105 let trace = PluginMessage::Trace {
2107 trace: serde_json::json!({"event": "will_be_lost"}),
2108 };
2109 client
2110 .write_all((serde_json::to_string(&trace).unwrap() + "\n").as_bytes())
2111 .await
2112 .unwrap();
2113
2114 let shutdown_msg = PluginMessage::Shutdown;
2116 client
2117 .write_all((serde_json::to_string(&shutdown_msg).unwrap() + "\n").as_bytes())
2118 .await
2119 .unwrap();
2120 client.flush().await.unwrap();
2121 drop(client);
2122
2123 tokio::time::timeout(Duration::from_secs(1), async {
2125 loop {
2126 if service.active_connection_count() == 0 {
2127 break;
2128 }
2129 tokio::time::sleep(Duration::from_millis(20)).await;
2130 }
2131 })
2132 .await
2133 .expect("Connection should close even when trace receiver is dropped");
2134
2135 service.shutdown().await;
2136 }
2137
2138 #[tokio::test]
2139 async fn test_duplicate_execution_id_does_not_corrupt_counter() {
2140 let temp_dir = tempdir().unwrap();
2141 let socket_path = temp_dir.path().join("shared_duplicate_exec.sock");
2142
2143 let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
2144 let execution_id = "duplicate-exec-id".to_string();
2145
2146 assert_eq!(service.registered_executions_count().await, 0);
2148
2149 let guard1 = service.register_execution(execution_id.clone(), true).await;
2151 assert_eq!(service.registered_executions_count().await, 1);
2152
2153 let guard2 = service.register_execution(execution_id.clone(), true).await;
2156 assert_eq!(service.registered_executions_count().await, 1);
2158
2159 drop(guard2);
2162 assert_eq!(service.registered_executions_count().await, 1);
2163
2164 drop(guard1);
2166 assert_eq!(service.registered_executions_count().await, 0);
2167 }
2168
2169 #[tokio::test]
2170 async fn test_execution_guard_registered_field() {
2171 let temp_dir = tempdir().unwrap();
2172 let socket_path = temp_dir.path().join("shared_registered_field.sock");
2173
2174 let service = Arc::new(SharedSocketService::new(socket_path.to_str().unwrap()).unwrap());
2175
2176 let execution_id_1 = "unique-exec-1".to_string();
2178 let guard1 = service
2179 .register_execution(execution_id_1.clone(), true)
2180 .await;
2181 assert_eq!(service.registered_executions_count().await, 1);
2182
2183 let execution_id_2 = "unique-exec-2".to_string();
2185 let guard2 = service
2186 .register_execution(execution_id_2.clone(), false)
2187 .await;
2188 assert_eq!(service.registered_executions_count().await, 2);
2189
2190 let rx = guard1.into_receiver();
2192 assert!(rx.is_some()); let rx2 = guard2.into_receiver();
2196 assert!(rx2.is_none()); assert_eq!(service.registered_executions_count().await, 0);
2200 }
2201
2202 #[test]
2207 fn test_log_socket_write_error_broken_pipe_does_not_panic() {
2208 let err = std::io::Error::new(std::io::ErrorKind::BrokenPipe, "Broken pipe");
2210 log_socket_write_error("API response", &err);
2211 }
2212
2213 #[test]
2214 fn test_log_socket_write_error_connection_reset_does_not_panic() {
2215 let err = std::io::Error::new(std::io::ErrorKind::ConnectionReset, "Connection reset");
2217 log_socket_write_error("API response flush", &err);
2218 }
2219
2220 #[test]
2221 fn test_log_socket_write_error_other_errors_do_not_panic() {
2222 let err = std::io::Error::new(std::io::ErrorKind::PermissionDenied, "Permission denied");
2224 log_socket_write_error("response", &err);
2225 }
2226
2227 #[test]
2228 fn test_log_socket_write_error_unexpected_eof() {
2229 let err = std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "unexpected eof");
2230 log_socket_write_error("response flush", &err);
2231 }
2232
2233 #[test]
2234 fn test_log_socket_write_error_context_strings() {
2235 let err = std::io::Error::new(std::io::ErrorKind::BrokenPipe, "os error 32");
2237 log_socket_write_error("API response", &err);
2238 log_socket_write_error("API response flush", &err);
2239 log_socket_write_error("response", &err);
2240 log_socket_write_error("response flush", &err);
2241 }
2242}