use std::{collections::VecDeque, path::Path, sync::Arc}; use color_eyre::Result; use tokio::{ io::AsyncWriteExt, net::{UnixListener, UnixStream}, sync::{RwLock, broadcast}, }; use tracing::{error, info}; use crate::event::Event; // Hard coding for now. Maybe make this a parameter to new. const EVENT_BUF_MAX: usize = 1000; // Manager for communication with plugins. pub struct EventManager { announce: broadcast::Sender, // Everything broadcasts here. events: Arc>>, // Ring buffer. } impl EventManager { pub fn new() -> Result { let (announce, _) = broadcast::channel(100); Ok(Self { announce, events: Arc::new(RwLock::new(VecDeque::::new())), }) } pub async fn broadcast(&self, event: &Event) -> Result<()> { let msg = serde_json::to_string(event)? + "\n"; let mut events = self.events.write().await; if events.len() >= EVENT_BUF_MAX { events.pop_front(); } events.push_back(msg.clone()); drop(events); let _ = self.announce.send(msg); Ok(()) } pub async fn start_listening(self: Arc, path: impl AsRef) { let listener = UnixListener::bind(path).unwrap(); loop { match listener.accept().await { Ok((stream, _addr)) => { info!("New broadcast subscriber"); // Spawn a new stream for the plugin. The loop // runs recursively from there. let broadcaster = Arc::clone(&self); tokio::spawn(async move { // send events. let _ = broadcaster.send_events(stream).await; }); } Err(e) => error!("Accept error: {e}"), } } } async fn send_events(&self, stream: UnixStream) -> Result<()> { let mut writer = stream; // Take care of history. let events = self.events.read().await; for event in events.iter() { writer.write_all(event.as_bytes()).await?; } drop(events); // Now just broadcast the new events. let mut rx = self.announce.subscribe(); while let Ok(event) = rx.recv().await { if writer.write_all(event.as_bytes()).await.is_err() { // *click* break; } } Ok(()) } } #[cfg(test)] mod tests { use super::*; use rstest::rstest; #[tokio::test] async fn test_new_event_manager_has_empty_buffer() { let manager = EventManager::new().unwrap(); let events = manager.events.read().await; assert_eq!(events.len(), 0); } #[tokio::test] async fn test_broadcast_adds_event_to_buffer() { let manager = EventManager::new().unwrap(); let event = Event::new("test message"); manager.broadcast(&event).await.unwrap(); let events = manager.events.read().await; assert_eq!(events.len(), 1); assert!(events[0].contains("test message")); assert!(events[0].ends_with('\n')); } #[tokio::test] async fn test_broadcast_serializes_event_as_json() { let manager = EventManager::new().unwrap(); let event = Event::new("hello world"); manager.broadcast(&event).await.unwrap(); let events = manager.events.read().await; let stored = &events[0]; // Should be valid JSON let parsed: serde_json::Value = serde_json::from_str(stored.trim()).unwrap(); assert_eq!(parsed["message"], "hello world"); } #[rstest] #[case(1)] #[case(10)] #[case(100)] #[case(999)] #[tokio::test] async fn test_buffer_holds_events_below_max(#[case] count: usize) { let manager = EventManager::new().unwrap(); for i in 0..count { let event = Event::new(format!("event {}", i)); manager.broadcast(&event).await.unwrap(); } let events = manager.events.read().await; assert_eq!(events.len(), count); } #[tokio::test] async fn test_buffer_at_exactly_max_capacity() { let manager = EventManager::new().unwrap(); // Fill to exactly EVENT_BUF_MAX (1000) for i in 0..EVENT_BUF_MAX { let event = Event::new(format!("event {}", i)); manager.broadcast(&event).await.unwrap(); } let events = manager.events.read().await; assert_eq!(events.len(), EVENT_BUF_MAX); assert!(events[0].contains("event 0")); assert!(events[EVENT_BUF_MAX - 1].contains("event 999")); } #[rstest] #[case(1)] #[case(10)] #[case(100)] #[case(500)] #[tokio::test] async fn test_buffer_overflow_evicts_oldest_fifo(#[case] overflow: usize) { let manager = EventManager::new().unwrap(); let total = EVENT_BUF_MAX + overflow; // Broadcast more events than buffer can hold for i in 0..total { let event = Event::new(format!("event {}", i)); manager.broadcast(&event).await.unwrap(); } let events = manager.events.read().await; // Buffer should still be at max capacity assert_eq!(events.len(), EVENT_BUF_MAX); // Oldest events (0 through overflow-1) should be evicted // Buffer should contain events [overflow..total) let first_event = &events[0]; let last_event = &events[EVENT_BUF_MAX - 1]; assert!(first_event.contains(&format!("event {}", overflow))); assert!(last_event.contains(&format!("event {}", total - 1))); // Verify the evicted events are NOT in the buffer let buffer_string = events.iter().cloned().collect::(); assert!(!buffer_string.contains(r#""message":"event 0""#)); } #[tokio::test] async fn test_multiple_broadcasts_maintain_order() { let manager = EventManager::new().unwrap(); let messages = vec!["first", "second", "third", "fourth", "fifth"]; for msg in &messages { let event = Event::new(*msg); manager.broadcast(&event).await.unwrap(); } let events = manager.events.read().await; assert_eq!(events.len(), messages.len()); for (i, expected) in messages.iter().enumerate() { assert!(events[i].contains(expected)); } } #[tokio::test] async fn test_buffer_wraparound_maintains_newest_events() { let manager = EventManager::new().unwrap(); // Fill buffer completely for i in 0..EVENT_BUF_MAX { let event = Event::new(format!("old {}", i)); manager.broadcast(&event).await.unwrap(); } // Add 5 more events for i in 0..5 { let event = Event::new(format!("new {}", i)); manager.broadcast(&event).await.unwrap(); } let events = manager.events.read().await; assert_eq!(events.len(), EVENT_BUF_MAX); // First 5 old events should be gone let buffer_string = events.iter().cloned().collect::(); assert!(!buffer_string.contains(r#""message":"old 0""#)); assert!(!buffer_string.contains(r#""message":"old 4""#)); // But old 5 should still be there (now at the front) assert!(events[0].contains("old 5")); // New events should be at the end assert!(events[EVENT_BUF_MAX - 5].contains("new 0")); assert!(events[EVENT_BUF_MAX - 1].contains("new 4")); } #[tokio::test] async fn test_concurrent_broadcasts_all_stored() { let manager = Arc::new(EventManager::new().unwrap()); let mut handles = vec![]; // Spawn 10 concurrent tasks, each broadcasting 10 events for task_id in 0..10 { let manager_clone = Arc::clone(&manager); let handle = tokio::spawn(async move { for i in 0..10 { let event = Event::new(format!("task {} event {}", task_id, i)); manager_clone.broadcast(&event).await.unwrap(); } }); handles.push(handle); } // Wait for all tasks to complete for handle in handles { handle.await.unwrap(); } let events = manager.events.read().await; assert_eq!(events.len(), 100); } }