Compare commits
4 Commits
3af95235e6
...
integratio
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
27ad93d749
|
||
|
|
f880795b44
|
||
| a158ee385f | |||
|
|
2da7cc4450
|
13
Cargo.lock
generated
13
Cargo.lock
generated
@@ -1401,6 +1401,18 @@ dependencies = [
|
||||
"tempfile",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nix"
|
||||
version = "0.30.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"cfg-if",
|
||||
"cfg_aliases",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nom"
|
||||
version = "7.1.3"
|
||||
@@ -1973,6 +1985,7 @@ dependencies = [
|
||||
"genai",
|
||||
"human-panic",
|
||||
"irc",
|
||||
"nix",
|
||||
"rstest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
|
||||
@@ -15,6 +15,10 @@ serde_json = "1.0"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = "0.3"
|
||||
|
||||
[dependencies.nix]
|
||||
version = "0.30.1"
|
||||
features = [ "fs" ]
|
||||
|
||||
[dependencies.clap]
|
||||
version = "4.5"
|
||||
features = [ "derive" ]
|
||||
|
||||
@@ -1,19 +0,0 @@
|
||||
use color_eyre::Result;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Root {
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
impl Root {
|
||||
pub fn new(path: impl AsRef<Path>) -> Self {
|
||||
Root {
|
||||
path: path.as_ref().to_owned(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn run_command(_cmd_string: impl AsRef<str>) -> Result<()> {
|
||||
todo!();
|
||||
}
|
||||
}
|
||||
@@ -1,9 +1,15 @@
|
||||
use std::{collections::VecDeque, path::Path, sync::Arc};
|
||||
|
||||
use color_eyre::Result;
|
||||
//use nix::{NixPath, sys::stat, unistd::mkfifo};
|
||||
use tokio::{
|
||||
// fs::File,
|
||||
io::AsyncWriteExt,
|
||||
net::{UnixListener, UnixStream},
|
||||
net::{
|
||||
UnixListener,
|
||||
UnixStream,
|
||||
// unix::pipe::{self, Receiver},
|
||||
},
|
||||
sync::{RwLock, broadcast},
|
||||
};
|
||||
use tracing::{error, info};
|
||||
@@ -46,8 +52,20 @@ impl EventManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn start_listening(self: Arc<Self>, path: impl AsRef<Path>) {
|
||||
let listener = UnixListener::bind(path).unwrap();
|
||||
// NB: This assumes it has exclusive control of the FIFO.
|
||||
// async fn start_fifo<P>(path: &P) -> Result<()>
|
||||
// where
|
||||
// P: AsRef<Path> + NixPath + ?Sized,
|
||||
// {
|
||||
// // Just delete the old FIFO if it exists.
|
||||
// let _ = std::fs::remove_file(path);
|
||||
// mkfifo(path, stat::Mode::S_IRWXU)?;
|
||||
|
||||
// Ok(())
|
||||
// }
|
||||
|
||||
pub async fn start_listening(self: Arc<Self>, broadcast_path: impl AsRef<Path>) {
|
||||
let listener = UnixListener::bind(broadcast_path).unwrap();
|
||||
|
||||
loop {
|
||||
match listener.accept().await {
|
||||
|
||||
85
src/lib.rs
Normal file
85
src/lib.rs
Normal file
@@ -0,0 +1,85 @@
|
||||
// Robotnik libraries
|
||||
|
||||
use std::{os::unix::fs, sync::Arc};
|
||||
|
||||
use color_eyre::{Result, eyre::WrapErr};
|
||||
use human_panic::setup_panic;
|
||||
use tracing::{Level, info};
|
||||
use tracing_subscriber::FmtSubscriber;
|
||||
|
||||
pub mod chat;
|
||||
pub mod event;
|
||||
pub mod event_manager;
|
||||
pub mod ipc;
|
||||
pub mod qna;
|
||||
pub mod setup;
|
||||
|
||||
pub use event::Event;
|
||||
pub use event_manager::EventManager;
|
||||
pub use qna::LLMHandle;
|
||||
|
||||
const DEFAULT_INSTRUCT: &str =
|
||||
"You are a shady, yet helpful IRC bot. You try to give responses that can
|
||||
be sent in a single IRC response according to the specification. Keep answers to
|
||||
500 characters or less.";
|
||||
|
||||
// NB: Everything should fail if logging doesn't start properly.
|
||||
async fn init_logging() {
|
||||
better_panic::install();
|
||||
setup_panic!();
|
||||
|
||||
let subscriber = FmtSubscriber::builder()
|
||||
.with_max_level(Level::TRACE)
|
||||
.finish();
|
||||
|
||||
tracing::subscriber::set_global_default(subscriber).unwrap();
|
||||
}
|
||||
|
||||
pub async fn run() -> Result<()> {
|
||||
init_logging().await;
|
||||
info!("Starting up.");
|
||||
|
||||
let settings = setup::init().await.wrap_err("Failed to initialize.")?;
|
||||
let config = settings.config;
|
||||
|
||||
// NOTE: Doing chroot this way might be impractical.
|
||||
if let Ok(chroot_path) = config.get_string("chroot-dir") {
|
||||
info!("Attempting to chroot to {}", chroot_path);
|
||||
fs::chroot(&chroot_path)
|
||||
.wrap_err_with(|| format!("Failed setting chroot '{}'", chroot_path))?;
|
||||
std::env::set_current_dir("/").wrap_err("Couldn't change directory after chroot.")?;
|
||||
}
|
||||
|
||||
let handle = qna::LLMHandle::new(
|
||||
config.get_string("api-key").wrap_err("API missing.")?,
|
||||
config
|
||||
.get_string("base-url")
|
||||
.wrap_err("base-url missing.")?,
|
||||
config
|
||||
.get_string("model")
|
||||
.wrap_err("model string missing.")?,
|
||||
config
|
||||
.get_string("instruct")
|
||||
.unwrap_or_else(|_| DEFAULT_INSTRUCT.to_string()),
|
||||
)
|
||||
.wrap_err("Couldn't initialize LLM handle.")?;
|
||||
|
||||
let ev_manager = Arc::new(EventManager::new()?);
|
||||
let ev_manager_clone = Arc::clone(&ev_manager);
|
||||
ev_manager_clone
|
||||
.broadcast(&Event::new("Starting..."))
|
||||
.await?;
|
||||
|
||||
let mut c = chat::new(&config, &handle).await?;
|
||||
|
||||
tokio::select! {
|
||||
_ = ev_manager_clone.start_listening("/tmp/robo.sock") => {
|
||||
// Event listener ended
|
||||
}
|
||||
result = c.run() => {
|
||||
result.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
83
src/main.rs
83
src/main.rs
@@ -1,85 +1,6 @@
|
||||
use color_eyre::{Result, eyre::WrapErr};
|
||||
use human_panic::setup_panic;
|
||||
use std::{os::unix::fs, sync::Arc};
|
||||
use tracing::{Level, info};
|
||||
use tracing_subscriber::FmtSubscriber;
|
||||
|
||||
use crate::event_manager::EventManager;
|
||||
|
||||
mod chat;
|
||||
mod event;
|
||||
mod event_manager;
|
||||
mod qna;
|
||||
mod setup;
|
||||
|
||||
const DEFAULT_INSTRUCT: &str =
|
||||
"You are a shady, yet helpful IRC bot. You try to give responses that can
|
||||
be sent in a single IRC response according to the specification. Keep answers to
|
||||
500 characters or less.";
|
||||
use color_eyre::Result;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
// Some error sprucing.
|
||||
better_panic::install();
|
||||
setup_panic!();
|
||||
|
||||
let subscriber = FmtSubscriber::builder()
|
||||
.with_max_level(Level::TRACE)
|
||||
.finish();
|
||||
|
||||
tracing::subscriber::set_global_default(subscriber)
|
||||
.wrap_err("Failed to setup trace logging.")?;
|
||||
|
||||
info!("Starting");
|
||||
|
||||
let settings = setup::init().await.wrap_err("Failed to initialize.")?;
|
||||
let config = settings.config;
|
||||
|
||||
// chroot if applicable.
|
||||
if let Ok(chroot_path) = config.get_string("chroot-dir") {
|
||||
info!("Attempting to chroot to {}", chroot_path);
|
||||
fs::chroot(&chroot_path)
|
||||
.wrap_err_with(|| format!("Failed setting chroot '{}'", chroot_path))?;
|
||||
std::env::set_current_dir("/").wrap_err("Couldn't change directory after chroot.")?;
|
||||
}
|
||||
|
||||
// Setup root path for commands.
|
||||
// let cmd_root = if let Ok(command_path) = config.get_string("command-path") {
|
||||
// Some(commands::Root::new(command_path))
|
||||
// } else {
|
||||
// None
|
||||
// };
|
||||
|
||||
let handle = qna::LLMHandle::new(
|
||||
config.get_string("api-key").wrap_err("API missing.")?,
|
||||
config
|
||||
.get_string("base-url")
|
||||
.wrap_err("base-url missing.")?,
|
||||
config
|
||||
.get_string("model")
|
||||
.wrap_err("model string missing.")?,
|
||||
config
|
||||
.get_string("instruct")
|
||||
.unwrap_or_else(|_| DEFAULT_INSTRUCT.to_string()),
|
||||
)
|
||||
.wrap_err("Couldn't initialize LLM handle.")?;
|
||||
|
||||
let ev_manager = Arc::new(EventManager::new()?);
|
||||
let ev_manager_clone = Arc::clone(&ev_manager);
|
||||
ev_manager_clone
|
||||
.broadcast(&event::Event::new("Starting..."))
|
||||
.await?;
|
||||
|
||||
let mut c = chat::new(&config, &handle).await?;
|
||||
|
||||
tokio::select! {
|
||||
_ = ev_manager_clone.start_listening("/tmp/robo.sock") => {
|
||||
// Event listener ended
|
||||
}
|
||||
result = c.run() => {
|
||||
result.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
robotnik::run().await
|
||||
}
|
||||
|
||||
34
src/setup.rs
34
src/setup.rs
@@ -8,65 +8,65 @@ use tracing::{info, instrument};
|
||||
// TODO: use [clap(long, short, help_heading = Some(section))]
|
||||
#[derive(Clone, Debug, Parser)]
|
||||
#[command(about, version)]
|
||||
pub(crate) struct Args {
|
||||
pub struct Args {
|
||||
#[arg(short, long)]
|
||||
/// API Key for the LLM in use.
|
||||
pub(crate) api_key: Option<String>,
|
||||
pub api_key: Option<String>,
|
||||
|
||||
#[arg(short, long, default_value = "https://api.openai.com")]
|
||||
/// Base URL for the LLM API to use.
|
||||
pub(crate) base_url: Option<String>,
|
||||
pub base_url: Option<String>,
|
||||
|
||||
/// Directory to use for chroot (recommended).
|
||||
#[arg(long)]
|
||||
pub(crate) chroot_dir: Option<String>,
|
||||
pub chroot_dir: Option<String>,
|
||||
|
||||
/// Root directory for file based command structure.
|
||||
#[arg(long)]
|
||||
pub(crate) command_dir: Option<String>,
|
||||
pub command_dir: Option<String>,
|
||||
|
||||
#[arg(long)]
|
||||
/// Instructions to the model on how to behave.
|
||||
pub(crate) instruct: Option<String>,
|
||||
pub instruct: Option<String>,
|
||||
|
||||
#[arg(long)]
|
||||
pub(crate) model: Option<String>,
|
||||
pub model: Option<String>,
|
||||
|
||||
#[arg(long)]
|
||||
/// List of IRC channels to join.
|
||||
pub(crate) channels: Option<Vec<String>>,
|
||||
pub channels: Option<Vec<String>>,
|
||||
|
||||
#[arg(short, long)]
|
||||
/// Custom configuration file location if need be.
|
||||
pub(crate) config_file: Option<PathBuf>,
|
||||
pub config_file: Option<PathBuf>,
|
||||
|
||||
#[arg(short, long, default_value = "irc.libera.chat")]
|
||||
/// IRC server.
|
||||
pub(crate) server: Option<String>,
|
||||
pub server: Option<String>,
|
||||
|
||||
#[arg(short, long, default_value = "6697")]
|
||||
/// Port of the IRC server.
|
||||
pub(crate) port: Option<String>,
|
||||
pub port: Option<String>,
|
||||
|
||||
#[arg(long)]
|
||||
/// IRC Nickname.
|
||||
pub(crate) nickname: Option<String>,
|
||||
pub nickname: Option<String>,
|
||||
|
||||
#[arg(long)]
|
||||
/// IRC Nick Password
|
||||
pub(crate) nick_password: Option<String>,
|
||||
pub nick_password: Option<String>,
|
||||
|
||||
#[arg(long)]
|
||||
/// IRC Username
|
||||
pub(crate) username: Option<String>,
|
||||
pub username: Option<String>,
|
||||
|
||||
#[arg(long)]
|
||||
/// Whether or not to use TLS when connecting to the IRC server.
|
||||
pub(crate) use_tls: Option<bool>,
|
||||
pub use_tls: Option<bool>,
|
||||
}
|
||||
|
||||
pub(crate) struct Setup {
|
||||
pub(crate) config: Config,
|
||||
pub struct Setup {
|
||||
pub config: Config,
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
|
||||
492
tests/event_test.rs
Normal file
492
tests/event_test.rs
Normal file
@@ -0,0 +1,492 @@
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use robotnik::{event::Event, event_manager::EventManager};
|
||||
use rstest::rstest;
|
||||
use tokio::{
|
||||
io::{AsyncBufReadExt, BufReader},
|
||||
net::UnixStream,
|
||||
time::timeout,
|
||||
};
|
||||
|
||||
const TEST_SOCKET_BASE: &str = "/tmp/robotnik_test";
|
||||
|
||||
/// Helper to create unique socket paths for parallel tests
|
||||
fn test_socket_path(name: &str) -> String {
|
||||
format!("{}_{}_{}", TEST_SOCKET_BASE, name, std::process::id())
|
||||
}
|
||||
|
||||
/// Helper to read one JSON event from a stream
|
||||
async fn read_event(
|
||||
reader: &mut BufReader<UnixStream>,
|
||||
) -> Result<Event, Box<dyn std::error::Error>> {
|
||||
let mut line = String::new();
|
||||
reader.read_line(&mut line).await?;
|
||||
let event: Event = serde_json::from_str(&line)?;
|
||||
Ok(event)
|
||||
}
|
||||
|
||||
/// Helper to read all available events with a timeout
|
||||
async fn read_events_with_timeout(
|
||||
reader: &mut BufReader<UnixStream>,
|
||||
max_count: usize,
|
||||
timeout_ms: u64,
|
||||
) -> Vec<String> {
|
||||
let mut events = Vec::new();
|
||||
for _ in 0..max_count {
|
||||
let mut line = String::new();
|
||||
match timeout(
|
||||
Duration::from_millis(timeout_ms),
|
||||
reader.read_line(&mut line),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Ok(0)) => break, // EOF
|
||||
Ok(Ok(_)) => events.push(line),
|
||||
Ok(Err(_)) => break, // Read error
|
||||
Err(_) => break, // Timeout
|
||||
}
|
||||
}
|
||||
events
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_client_connects_and_receives_event() {
|
||||
let socket_path = test_socket_path("basic_connect");
|
||||
let manager = Arc::new(EventManager::new().unwrap());
|
||||
|
||||
// Start the listener
|
||||
let listener_manager = Arc::clone(&manager);
|
||||
let socket_path_clone = socket_path.clone();
|
||||
tokio::spawn(async move {
|
||||
listener_manager.start_listening(socket_path_clone).await;
|
||||
});
|
||||
|
||||
// Give the listener time to start
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
|
||||
// Broadcast an event
|
||||
let event = Event::new("test message");
|
||||
manager.broadcast(&event).await.unwrap();
|
||||
|
||||
// Connect as a client
|
||||
let stream = UnixStream::connect(&socket_path).await.unwrap();
|
||||
let mut reader = BufReader::new(stream);
|
||||
|
||||
// Read the event
|
||||
let mut line = String::new();
|
||||
reader.read_line(&mut line).await.unwrap();
|
||||
|
||||
assert!(line.contains("test message"));
|
||||
assert!(line.ends_with('\n'));
|
||||
|
||||
// Cleanup
|
||||
let _ = std::fs::remove_file(&socket_path);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_client_receives_event_history() {
|
||||
let socket_path = test_socket_path("event_history");
|
||||
let manager = Arc::new(EventManager::new().unwrap());
|
||||
|
||||
// Broadcast events BEFORE starting the listener
|
||||
for i in 0..5 {
|
||||
let event = Event::new(format!("historical event {}", i));
|
||||
manager.broadcast(&event).await.unwrap();
|
||||
}
|
||||
|
||||
// Start the listener
|
||||
let listener_manager = Arc::clone(&manager);
|
||||
let socket_path_clone = socket_path.clone();
|
||||
tokio::spawn(async move {
|
||||
listener_manager.start_listening(socket_path_clone).await;
|
||||
});
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
|
||||
// Connect as a client
|
||||
let stream = UnixStream::connect(&socket_path).await.unwrap();
|
||||
let mut reader = BufReader::new(stream);
|
||||
|
||||
// Should receive all 5 historical events
|
||||
let events = read_events_with_timeout(&mut reader, 5, 100).await;
|
||||
|
||||
assert_eq!(events.len(), 5);
|
||||
assert!(events[0].contains("historical event 0"));
|
||||
assert!(events[4].contains("historical event 4"));
|
||||
|
||||
// Cleanup
|
||||
let _ = std::fs::remove_file(&socket_path);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_multiple_clients_receive_same_events() {
|
||||
let socket_path = test_socket_path("multiple_clients");
|
||||
let manager = Arc::new(EventManager::new().unwrap());
|
||||
|
||||
// Start the listener
|
||||
let listener_manager = Arc::clone(&manager);
|
||||
let socket_path_clone = socket_path.clone();
|
||||
tokio::spawn(async move {
|
||||
listener_manager.start_listening(socket_path_clone).await;
|
||||
});
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
|
||||
// Connect 3 clients
|
||||
let stream1 = UnixStream::connect(&socket_path).await.unwrap();
|
||||
let stream2 = UnixStream::connect(&socket_path).await.unwrap();
|
||||
let stream3 = UnixStream::connect(&socket_path).await.unwrap();
|
||||
|
||||
let mut reader1 = BufReader::new(stream1);
|
||||
let mut reader2 = BufReader::new(stream2);
|
||||
let mut reader3 = BufReader::new(stream3);
|
||||
|
||||
// Broadcast a new event
|
||||
let event = Event::new("broadcast to all");
|
||||
manager.broadcast(&event).await.unwrap();
|
||||
|
||||
// All clients should receive the event
|
||||
let mut line1 = String::new();
|
||||
let mut line2 = String::new();
|
||||
let mut line3 = String::new();
|
||||
|
||||
timeout(Duration::from_millis(100), reader1.read_line(&mut line1))
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
timeout(Duration::from_millis(100), reader2.read_line(&mut line2))
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
timeout(Duration::from_millis(100), reader3.read_line(&mut line3))
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
assert!(line1.contains("broadcast to all"));
|
||||
assert!(line2.contains("broadcast to all"));
|
||||
assert!(line3.contains("broadcast to all"));
|
||||
|
||||
// Cleanup
|
||||
let _ = std::fs::remove_file(&socket_path);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_late_joiner_receives_full_history() {
|
||||
let socket_path = test_socket_path("late_joiner");
|
||||
let manager = Arc::new(EventManager::new().unwrap());
|
||||
|
||||
// Start the listener
|
||||
let listener_manager = Arc::clone(&manager);
|
||||
let socket_path_clone = socket_path.clone();
|
||||
tokio::spawn(async move {
|
||||
listener_manager.start_listening(socket_path_clone).await;
|
||||
});
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
|
||||
// First client connects
|
||||
let stream1 = UnixStream::connect(&socket_path).await.unwrap();
|
||||
let mut reader1 = BufReader::new(stream1);
|
||||
|
||||
// Broadcast several events
|
||||
for i in 0..10 {
|
||||
let event = Event::new(format!("event {}", i));
|
||||
manager.broadcast(&event).await.unwrap();
|
||||
}
|
||||
|
||||
// Consume events from first client
|
||||
let _ = read_events_with_timeout(&mut reader1, 10, 100).await;
|
||||
|
||||
// Late joiner connects
|
||||
let stream2 = UnixStream::connect(&socket_path).await.unwrap();
|
||||
let mut reader2 = BufReader::new(stream2);
|
||||
|
||||
// Late joiner should receive all 10 events from history
|
||||
let events = read_events_with_timeout(&mut reader2, 10, 100).await;
|
||||
|
||||
assert_eq!(events.len(), 10);
|
||||
assert!(events[0].contains("event 0"));
|
||||
assert!(events[9].contains("event 9"));
|
||||
|
||||
// Cleanup
|
||||
let _ = std::fs::remove_file(&socket_path);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_client_receives_events_in_order() {
|
||||
let socket_path = test_socket_path("event_order");
|
||||
let manager = Arc::new(EventManager::new().unwrap());
|
||||
|
||||
// Start the listener
|
||||
let listener_manager = Arc::clone(&manager);
|
||||
let socket_path_clone = socket_path.clone();
|
||||
tokio::spawn(async move {
|
||||
listener_manager.start_listening(socket_path_clone).await;
|
||||
});
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
|
||||
// Connect client
|
||||
let stream = UnixStream::connect(&socket_path).await.unwrap();
|
||||
let mut reader = BufReader::new(stream);
|
||||
|
||||
// Broadcast events rapidly
|
||||
let count = 50;
|
||||
for i in 0..count {
|
||||
let event = Event::new(format!("sequence {}", i));
|
||||
manager.broadcast(&event).await.unwrap();
|
||||
}
|
||||
|
||||
// Read all events
|
||||
let events = read_events_with_timeout(&mut reader, count, 500).await;
|
||||
|
||||
assert_eq!(events.len(), count);
|
||||
|
||||
// Verify order
|
||||
for (i, event) in events.iter().enumerate() {
|
||||
assert!(
|
||||
event.contains(&format!("sequence {}", i)),
|
||||
"Event {} out of order: {}",
|
||||
i,
|
||||
event
|
||||
);
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
let _ = std::fs::remove_file(&socket_path);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_concurrent_broadcasts_during_client_connections() {
|
||||
let socket_path = test_socket_path("concurrent_ops");
|
||||
let manager = Arc::new(EventManager::new().unwrap());
|
||||
|
||||
// Start the listener
|
||||
let listener_manager = Arc::clone(&manager);
|
||||
let socket_path_clone = socket_path.clone();
|
||||
tokio::spawn(async move {
|
||||
listener_manager.start_listening(socket_path_clone).await;
|
||||
});
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
|
||||
// Connect client 1 BEFORE any broadcasts
|
||||
let stream1 = UnixStream::connect(&socket_path).await.unwrap();
|
||||
let mut reader1 = BufReader::new(stream1);
|
||||
|
||||
// Spawn a task that continuously broadcasts
|
||||
let broadcast_manager = Arc::clone(&manager);
|
||||
let broadcast_handle = tokio::spawn(async move {
|
||||
for i in 0..100 {
|
||||
let event = Event::new(format!("concurrent event {}", i));
|
||||
broadcast_manager.broadcast(&event).await.unwrap();
|
||||
tokio::time::sleep(Duration::from_millis(5)).await;
|
||||
}
|
||||
});
|
||||
|
||||
// While broadcasting, connect more clients at different times
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
let stream2 = UnixStream::connect(&socket_path).await.unwrap();
|
||||
let mut reader2 = BufReader::new(stream2);
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(150)).await;
|
||||
let stream3 = UnixStream::connect(&socket_path).await.unwrap();
|
||||
let mut reader3 = BufReader::new(stream3);
|
||||
|
||||
// Wait for broadcasts to complete
|
||||
broadcast_handle.await.unwrap();
|
||||
|
||||
// All clients should have received events
|
||||
let events1 = read_events_with_timeout(&mut reader1, 100, 200).await;
|
||||
let events2 = read_events_with_timeout(&mut reader2, 100, 200).await;
|
||||
let events3 = read_events_with_timeout(&mut reader3, 100, 200).await;
|
||||
|
||||
// Client 1 connected first (before any broadcasts), should get all 100
|
||||
assert_eq!(events1.len(), 100);
|
||||
|
||||
// Client 2 connected after ~20 events were broadcast
|
||||
// Gets ~20 from history + ~80 live = 100
|
||||
assert_eq!(events2.len(), 100);
|
||||
|
||||
// Client 3 connected after ~50 events were broadcast
|
||||
// Gets ~50 from history + ~50 live = 100
|
||||
assert_eq!(events3.len(), 100);
|
||||
|
||||
// Verify they all received events in order
|
||||
assert!(events1[0].contains("concurrent event 0"));
|
||||
assert!(events2[0].contains("concurrent event 0"));
|
||||
assert!(events3[0].contains("concurrent event 0"));
|
||||
|
||||
// Cleanup
|
||||
let _ = std::fs::remove_file(&socket_path);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_buffer_overflow_affects_new_clients() {
|
||||
let socket_path = test_socket_path("buffer_overflow");
|
||||
let manager = Arc::new(EventManager::new().unwrap());
|
||||
|
||||
// Broadcast more than buffer max (1000)
|
||||
for i in 0..1100 {
|
||||
let event = Event::new(format!("overflow event {}", i));
|
||||
manager.broadcast(&event).await.unwrap();
|
||||
}
|
||||
|
||||
// Start the listener
|
||||
let listener_manager = Arc::clone(&manager);
|
||||
let socket_path_clone = socket_path.clone();
|
||||
tokio::spawn(async move {
|
||||
listener_manager.start_listening(socket_path_clone).await;
|
||||
});
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
|
||||
// New client connects
|
||||
let stream = UnixStream::connect(&socket_path).await.unwrap();
|
||||
let mut reader = BufReader::new(stream);
|
||||
|
||||
// Should receive exactly 1000 events (buffer max)
|
||||
let events = read_events_with_timeout(&mut reader, 1100, 500).await;
|
||||
|
||||
assert_eq!(events.len(), 1000);
|
||||
|
||||
// First event should be 100 (oldest 100 were evicted)
|
||||
assert!(events[0].contains("overflow event 100"));
|
||||
|
||||
// Last event should be 1099
|
||||
assert!(events[999].contains("overflow event 1099"));
|
||||
|
||||
// Cleanup
|
||||
let _ = std::fs::remove_file(&socket_path);
|
||||
}
|
||||
|
||||
#[rstest]
|
||||
#[case(10, 1)]
|
||||
#[case(50, 5)]
|
||||
#[tokio::test]
|
||||
async fn test_client_count_scaling(#[case] num_clients: usize, #[case] events_per_client: usize) {
|
||||
let socket_path = test_socket_path(&format!("scaling_{}_{}", num_clients, events_per_client));
|
||||
let manager = Arc::new(EventManager::new().unwrap());
|
||||
|
||||
// Start the listener
|
||||
let listener_manager = Arc::clone(&manager);
|
||||
let socket_path_clone = socket_path.clone();
|
||||
tokio::spawn(async move {
|
||||
listener_manager.start_listening(socket_path_clone).await;
|
||||
});
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
|
||||
// Connect many clients
|
||||
let mut readers = Vec::new();
|
||||
for _ in 0..num_clients {
|
||||
let stream = UnixStream::connect(&socket_path).await.unwrap();
|
||||
readers.push(BufReader::new(stream));
|
||||
}
|
||||
|
||||
// Broadcast events
|
||||
for i in 0..events_per_client {
|
||||
let event = Event::new(format!("scale event {}", i));
|
||||
manager.broadcast(&event).await.unwrap();
|
||||
}
|
||||
|
||||
// Verify all clients received all events
|
||||
for reader in &mut readers {
|
||||
let events = read_events_with_timeout(reader, events_per_client, 200).await;
|
||||
assert_eq!(events.len(), events_per_client);
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
let _ = std::fs::remove_file(&socket_path);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_client_disconnect_doesnt_affect_others() {
|
||||
let socket_path = test_socket_path("disconnect");
|
||||
let manager = Arc::new(EventManager::new().unwrap());
|
||||
|
||||
// Start the listener
|
||||
let listener_manager = Arc::clone(&manager);
|
||||
let socket_path_clone = socket_path.clone();
|
||||
tokio::spawn(async move {
|
||||
listener_manager.start_listening(socket_path_clone).await;
|
||||
});
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
|
||||
// Connect 3 clients
|
||||
let stream1 = UnixStream::connect(&socket_path).await.unwrap();
|
||||
let stream2 = UnixStream::connect(&socket_path).await.unwrap();
|
||||
let stream3 = UnixStream::connect(&socket_path).await.unwrap();
|
||||
|
||||
let mut reader1 = BufReader::new(stream1);
|
||||
let mut reader2 = BufReader::new(stream2);
|
||||
let mut reader3 = BufReader::new(stream3);
|
||||
|
||||
// Broadcast initial event
|
||||
manager
|
||||
.broadcast(&Event::new("before disconnect"))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// All receive it
|
||||
let _ = read_events_with_timeout(&mut reader1, 1, 100).await;
|
||||
let _ = read_events_with_timeout(&mut reader2, 1, 100).await;
|
||||
let _ = read_events_with_timeout(&mut reader3, 1, 100).await;
|
||||
|
||||
// Drop client 2 (simulates disconnect)
|
||||
drop(reader2);
|
||||
|
||||
// Broadcast another event
|
||||
manager
|
||||
.broadcast(&Event::new("after disconnect"))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Clients 1 and 3 should still receive it
|
||||
let events1 = read_events_with_timeout(&mut reader1, 1, 100).await;
|
||||
let events3 = read_events_with_timeout(&mut reader3, 1, 100).await;
|
||||
|
||||
assert_eq!(events1.len(), 1);
|
||||
assert_eq!(events3.len(), 1);
|
||||
assert!(events1[0].contains("after disconnect"));
|
||||
assert!(events3[0].contains("after disconnect"));
|
||||
|
||||
// Cleanup
|
||||
let _ = std::fs::remove_file(&socket_path);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_json_deserialization_of_received_events() {
|
||||
let socket_path = test_socket_path("json_deser");
|
||||
let manager = Arc::new(EventManager::new().unwrap());
|
||||
|
||||
// Start the listener
|
||||
let listener_manager = Arc::clone(&manager);
|
||||
let socket_path_clone = socket_path.clone();
|
||||
tokio::spawn(async move {
|
||||
listener_manager.start_listening(socket_path_clone).await;
|
||||
});
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
|
||||
// Broadcast an event with special characters
|
||||
let test_message = "special chars: @#$% newline\\n tab\\t quotes \"test\"";
|
||||
manager.broadcast(&Event::new(test_message)).await.unwrap();
|
||||
|
||||
// Connect and deserialize
|
||||
let stream = UnixStream::connect(&socket_path).await.unwrap();
|
||||
let mut reader = BufReader::new(stream);
|
||||
|
||||
let mut line = String::new();
|
||||
reader.read_line(&mut line).await.unwrap();
|
||||
|
||||
// Should be valid JSON
|
||||
let parsed: serde_json::Value = serde_json::from_str(&line.trim()).unwrap();
|
||||
|
||||
assert_eq!(parsed["message"], test_message);
|
||||
|
||||
// Cleanup
|
||||
let _ = std::fs::remove_file(&socket_path);
|
||||
}
|
||||
Reference in New Issue
Block a user