diff --git a/Cargo.lock b/Cargo.lock index a074606..1eee981 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1401,6 +1401,18 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nix" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6" +dependencies = [ + "bitflags", + "cfg-if", + "cfg_aliases", + "libc", +] + [[package]] name = "nom" version = "7.1.3" @@ -1973,6 +1985,7 @@ dependencies = [ "genai", "human-panic", "irc", + "nix", "rstest", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index ff168eb..6d1b3e3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,10 @@ serde_json = "1.0" tracing = "0.1" tracing-subscriber = "0.3" + [dependencies.nix] + version = "0.30.1" + features = [ "fs" ] + [dependencies.clap] version = "4.5" features = [ "derive" ] diff --git a/src/event_manager.rs b/src/event_manager.rs index 9b7fcf8..6a6dfcd 100644 --- a/src/event_manager.rs +++ b/src/event_manager.rs @@ -1,10 +1,13 @@ use std::{collections::VecDeque, path::Path, sync::Arc}; use color_eyre::Result; +use nix::{NixPath, sys::stat, unistd::mkfifo}; use tokio::{ - io::AsyncWriteExt, - net::{UnixListener, UnixStream}, - sync::{RwLock, broadcast}, + io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, + net::{ + unix::pipe, UnixListener, UnixStream + }, + sync::{broadcast, RwLock}, }; use tracing::{error, info}; @@ -46,8 +49,30 @@ impl EventManager { Ok(()) } - pub async fn start_listening(self: Arc, path: impl AsRef) { - let listener = UnixListener::bind(path).unwrap(); + // NB: This assumes it has exclusive control of the FIFO. + pub async fn start_fifo

(path: &P) -> Result<()> + where + P: AsRef + NixPath + ?Sized, + { + // Overwrite, or create the FIFO. + let _ = std::fs::remove_file(path); + mkfifo(path, stat::Mode::S_IRWXU)?; + + loop { + let rx = pipe::OpenOptions::new().open_receiver(path)?; + + let mut reader = BufReader::new(rx); + let mut line = String::new(); + + while reader.read_line(&mut line).await? > 0 { + // Now handle the command. + line.clear(); + } + } + } + + pub async fn start_listening(self: Arc, broadcast_path: impl AsRef) { + let listener = UnixListener::bind(broadcast_path).unwrap(); loop { match listener.accept().await {