use std::{collections::VecDeque, path::Path, sync::Arc}; use color_eyre::Result; use tokio::{ io::AsyncWriteExt, net::{UnixListener, UnixStream}, sync::{RwLock, broadcast}, }; use tracing::error; use crate::event::Event; // Hard coding for now. Maybe make this a parameter to new. const EVENT_BUF_MAX: usize = 1000; // Manager for communication with plugins. pub struct EventManager { announce: broadcast::Sender, // Everything broadcasts here. events: Arc>>, // Ring buffer. } impl EventManager { pub fn new() -> Result { let (announce, _) = broadcast::channel(100); Ok(Self { announce, events: Arc::new(RwLock::new(VecDeque::::new())), }) } pub async fn broadcast(&self, event: &Event) -> Result<()> { let msg = serde_json::to_string(event)? + "\n"; let mut events = self.events.write().await; if events.len() >= EVENT_BUF_MAX { events.pop_front(); } events.push_back(msg.clone()); drop(events); let _ = self.announce.send(msg); Ok(()) } pub async fn start_listening(self: Arc, path: impl AsRef) { let listener = UnixListener::bind(path).unwrap(); loop { match listener.accept().await { Ok((stream, _)) => { // Spawn a new stream for the plugin. The loop // runs recursively from there. let broadcaster = Arc::clone(&self); tokio::spawn(async move { // send events. let _ = broadcaster.send_events(stream).await; }); } Err(e) => error!("Accept error: {e}"), } } } async fn send_events(&self, stream: UnixStream) -> Result<()> { let mut writer = stream; // Take care of history. let events = self.events.read().await; for event in events.iter() { writer.write_all(event.as_bytes()).await?; } drop(events); // Now just broadcast the new events. let mut rx = self.announce.subscribe(); while let Ok(event) = rx.recv().await { if writer.write_all(event.as_bytes()).await.is_err() { // *click* break; } } Ok(()) } }