From 1a00daf6a8644e8642dc8d00fd61065e1c7f536c Mon Sep 17 00:00:00 2001 From: Breadway Date: Mon, 11 May 2026 16:30:05 +0800 Subject: [PATCH] unsure --- Cargo.lock | 130 ++++++++- bread-cli/Cargo.toml | 1 + bread-cli/src/main.rs | 317 +++++++++++++++++++++- breadd/src/adapters/mod.rs | 28 +- breadd/src/core/config.rs | 55 ++++ breadd/src/core/normalizer.rs | 117 ++++++-- breadd/src/core/state_engine.rs | 53 +++- breadd/src/core/types.rs | 2 + breadd/src/ipc/mod.rs | 65 ++++- breadd/src/lua/mod.rs | 460 ++++++++++++++++++++++++++++++-- breadd/src/main.rs | 31 ++- 11 files changed, 1192 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0ab00f1..d987076 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -289,6 +289,7 @@ dependencies = [ "anyhow", "bread-shared", "clap", + "notify", "serde", "serde_json", "tokio", @@ -429,6 +430,15 @@ dependencies = [ "libc", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -579,6 +589,16 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6" +[[package]] +name = "filetime" +version = "0.2.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d5b2eef6fafbf69f877e55509ce5b11a760690ac9700a2921be067aa6afaef6" +dependencies = [ + "cfg-if", + "libc", +] + [[package]] name = "find-msvc-tools" version = "0.1.9" @@ -591,6 +611,15 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "futures" version = "0.3.32" @@ -798,6 +827,26 @@ dependencies = [ "serde_core", ] +[[package]] +name = "inotify" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" +dependencies = [ + "bitflags 1.3.2", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "instant" version = "0.1.13" @@ -830,6 +879,26 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" +[[package]] +name = "kqueue" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac30106d7dce88daf4a3fcb4879ea939476d5074a9b7ddd0fb97fa4bed5596a" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "285efcf12ef41bec907b3000d5ffaeb54191d4d9d83c0d6157e6cbc2db255e64" +dependencies = [ + "bitflags 2.11.1", + "libc", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -952,6 +1021,18 @@ dependencies = [ "autocfg", ] +[[package]] +name = "mio" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +dependencies = [ + "libc", + "log", + "wasi", + "windows-sys 0.48.0", +] + [[package]] name = "mio" version = "1.2.0" @@ -1083,6 +1164,25 @@ dependencies = [ "memoffset 0.7.1", ] +[[package]] +name = "notify" +version = "6.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d" +dependencies = [ + "bitflags 2.11.1", + "crossbeam-channel", + "filetime", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "log", + "mio 0.8.11", + "walkdir", + "windows-sys 0.48.0", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -1402,6 +1502,15 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -1639,7 +1748,7 @@ checksum = "8fc7f01b389ac15039e4dc9531aa973a135d7a4135281b12d7c1bc79fd57fffe" dependencies = [ "bytes", "libc", - "mio", + "mio 1.2.0", "parking_lot", "pin-project-lite", "signal-hook-registry", @@ -1844,6 +1953,16 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "317211a0dc0ceedd78fb2ca9a44aed3d7b9b26f81870d485c07122b4350673b7" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" @@ -1930,6 +2049,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/bread-cli/Cargo.toml b/bread-cli/Cargo.toml index 0550c57..6d2541e 100644 --- a/bread-cli/Cargo.toml +++ b/bread-cli/Cargo.toml @@ -10,3 +10,4 @@ serde_json.workspace = true tokio.workspace = true anyhow.workspace = true clap = { version = "4.5", features = ["derive"] } +notify = "6.1" diff --git a/bread-cli/src/main.rs b/bread-cli/src/main.rs index e4af194..6e1982a 100644 --- a/bread-cli/src/main.rs +++ b/bread-cli/src/main.rs @@ -1,10 +1,14 @@ use anyhow::Result; use clap::{Parser, Subcommand}; +use notify::{RecommendedWatcher, RecursiveMode, Watcher}; use serde_json::{json, Value}; use std::env; +use std::io; use std::path::{Path, PathBuf}; +use std::time::{Duration, UNIX_EPOCH}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::UnixStream; +use tokio::sync::mpsc; #[derive(Parser, Debug)] #[command(author, version, about = "Bread CLI - the reactive desktop automation fabric")] @@ -16,13 +20,32 @@ struct Cli { #[derive(Subcommand, Debug)] enum Commands { /// Hot-reload all Lua modules - Reload, + Reload { + /// Watch config directory and reload on changes + #[arg(long)] + watch: bool, + }, /// Dump current runtime state - State, + State { + /// Optional dotted path into RuntimeState + path: Option, + /// Output raw JSON + #[arg(long)] + json: bool, + }, /// Stream live normalized events Events { #[arg(long)] filter: Option, + /// Output raw JSON + #[arg(long)] + json: bool, + /// Comma-separated fields to display + #[arg(long)] + fields: Option, + /// Replay events from the last N seconds + #[arg(long)] + since: Option, }, /// List loaded modules and status Modules, @@ -40,6 +63,12 @@ enum Commands { Ping, /// Fetch daemon health details Health, + /// Diagnose daemon and module health + Doctor { + /// Output raw JSON + #[arg(long)] + json: bool, + }, } #[tokio::main] @@ -48,16 +77,38 @@ async fn main() -> Result<()> { let socket = daemon_socket_path(); match &cli.command { - Commands::Reload => { - let response = send_request(&socket, "modules.reload", json!({})).await?; - print_json(&response)?; + Commands::Reload { watch } => { + if *watch { + watch_reload(&socket).await?; + } else { + let response = send_request(&socket, "modules.reload", json!({})).await?; + print_reload(&response); + } } - Commands::State => { - let response = send_request(&socket, "state.dump", json!({})).await?; - print_json(&response)?; + Commands::State { path, json } => { + if *json { + let response = if let Some(path) = path { + send_request(&socket, "state.get", json!({ "key": path })).await? + } else { + send_request(&socket, "state.dump", json!({})).await? + }; + print_json(&response)?; + } else { + let response = if let Some(path) = path { + send_request(&socket, "state.get", json!({ "key": path })).await? + } else { + send_request(&socket, "state.dump", json!({})).await? + }; + print_state_formatted(path.as_deref(), &response); + } } - Commands::Events { filter } => { - stream_events(&socket, filter.clone()).await?; + Commands::Events { + filter, + json, + fields, + since, + } => { + stream_events(&socket, filter.clone(), *json, fields.clone(), *since).await?; } Commands::Modules => { let response = send_request(&socket, "modules.list", json!({})).await?; @@ -92,6 +143,14 @@ async fn main() -> Result<()> { let response = send_request(&socket, "health", json!({})).await?; print_json(&response)?; } + Commands::Doctor { json } => { + if *json { + let response = send_request(&socket, "health", json!({})).await?; + print_json(&response)?; + } else { + print_doctor(&socket).await?; + } + } } Ok(()) @@ -128,7 +187,26 @@ async fn send_request(socket: &Path, method: &str, params: Value) -> Result) -> Result<()> { +async fn stream_events( + socket: &Path, + filter: Option, + raw_json: bool, + fields: Option, + since: Option, +) -> Result<()> { + if let Some(seconds) = since { + let replay = send_request(socket, "events.replay", json!({ "since_ms": seconds * 1000 })).await?; + if let Some(list) = replay.as_array() { + for item in list { + if raw_json { + println!("{}", serde_json::to_string_pretty(item)?); + } else { + print_event(item, fields.as_deref()); + } + } + } + } + let stream = UnixStream::connect(socket).await?; let (read_half, mut write_half) = stream.into_split(); let request = json!({ @@ -146,7 +224,11 @@ async fn stream_events(socket: &Path, filter: Option) -> Result<()> { let mut lines = BufReader::new(read_half).lines(); while let Some(line) = lines.next_line().await? { let value: Value = serde_json::from_str(&line)?; - println!("{}", serde_json::to_string_pretty(&value)?); + if raw_json { + println!("{}", serde_json::to_string_pretty(&value)?); + } else { + print_event(&value, fields.as_deref()); + } } Ok(()) @@ -156,3 +238,214 @@ fn print_json(value: &Value) -> Result<()> { println!("{}", serde_json::to_string_pretty(value)?); Ok(()) } + +fn print_state_formatted(path: Option<&str>, value: &Value) { + if let Some(path) = path { + println!("{path}"); + } + print_value(value, 0); +} + +fn print_value(value: &Value, indent: usize) { + let pad = " ".repeat(indent); + match value { + Value::Object(map) => { + for (key, val) in map { + println!("{pad}{key}"); + print_value(val, indent + 2); + } + } + Value::Array(list) => { + for (idx, val) in list.iter().enumerate() { + println!("{pad}[{idx}]"); + print_value(val, indent + 2); + } + } + other => { + println!("{pad}{}", other); + } + } +} + +fn print_event(event: &Value, fields: Option<&str>) { + if let Some(fields) = fields { + let mut out = serde_json::Map::new(); + for field in fields.split(',') { + let field = field.trim(); + if field.is_empty() { + continue; + } + if let Some(val) = event.get(field) { + out.insert(field.to_string(), val.clone()); + } + } + println!("{}", Value::Object(out)); + return; + } + + let ts = event.get("timestamp").and_then(Value::as_u64).unwrap_or(0); + let event_name = event.get("event").and_then(Value::as_str).unwrap_or("?"); + let source = event.get("source").and_then(Value::as_str).unwrap_or("?"); + let time = format_timestamp(ts); + println!("{time} {event_name} source={source}"); + if let Some(data) = event.get("data") { + println!(" data: {}", data); + } +} + +fn format_timestamp(ms: u64) -> String { + let secs = ms / 1000; + let millis = ms % 1000; + let time = UNIX_EPOCH + Duration::from_secs(secs); + let datetime = time.duration_since(UNIX_EPOCH).unwrap_or_default(); + let seconds = datetime.as_secs() % 60; + let minutes = (datetime.as_secs() / 60) % 60; + let hours = (datetime.as_secs() / 3600) % 24; + format!("{:02}:{:02}:{:02}.{:03}", hours, minutes, seconds, millis) +} + +fn print_reload(value: &Value) { + println!("reloading lua runtime..."); + if let Some(mods) = value.get("modules").and_then(Value::as_array) { + for module in mods { + let name = module.get("name").and_then(Value::as_str).unwrap_or("?"); + let status = module.get("status").and_then(Value::as_str).unwrap_or("?"); + let error = module.get("last_error").and_then(Value::as_str); + if let Some(error) = error { + println!(" ✗ {name} {status}"); + println!(" {error}"); + } else { + println!(" ✓ {name} {status}"); + } + } + } +} + +async fn watch_reload(socket: &Path) -> Result<()> { + let config_dir = config_directory(); + println!("watching {} for changes...", config_dir.display()); + + let (tx, mut rx) = mpsc::unbounded_channel(); + let mut watcher: RecommendedWatcher = notify::recommended_watcher(move |res| { + let _ = tx.send(res); + })?; + watcher.watch(&config_dir, RecursiveMode::Recursive)?; + + while let Some(msg) = rx.recv().await { + if msg.is_ok() { + let response = send_request(socket, "modules.reload", json!({})).await?; + print_reload(&response); + } + } + + Ok(()) +} + +async fn print_doctor(socket: &Path) -> Result<()> { + let stream = match UnixStream::connect(socket).await { + Ok(stream) => stream, + Err(err) => { + if err.kind() == io::ErrorKind::NotFound { + println!("bread doctor"); + println!(" daemon ✗ not running"); + println!(" socket {} (not found)", socket.display()); + println!(); + println!(" start the daemon: systemctl --user start breadd"); + println!(" view logs: journalctl --user -u breadd -f"); + return Ok(()); + } + return Err(err.into()); + } + }; + + let response = send_request_with_stream(stream, "health", json!({})).await?; + render_doctor(&response); + Ok(()) +} + +fn render_doctor(health: &Value) { + println!("bread doctor"); + let ok = health.get("ok").and_then(Value::as_bool).unwrap_or(false); + let pid = health.get("pid").and_then(Value::as_u64).unwrap_or(0); + let version = health.get("version").and_then(Value::as_str).unwrap_or("unknown"); + let uptime_ms = health.get("uptime_ms").and_then(Value::as_u64).unwrap_or(0); + let socket = health.get("socket").and_then(Value::as_str).unwrap_or("?"); + println!(" daemon {} (pid {})", if ok { "✓ running" } else { "✗ unreachable" }, pid); + println!(" version {version}"); + println!(" uptime {}s", uptime_ms / 1000); + println!(" socket {socket}"); + + if let Some(adapters) = health.get("adapters").and_then(Value::as_object) { + println!(); + println!("adapters"); + for (name, status) in adapters { + println!(" {:20} {}", name, status); + } + } + + if let Some(modules) = health.get("modules").and_then(Value::as_array) { + println!(); + println!("modules"); + for module in modules { + let name = module.get("name").and_then(Value::as_str).unwrap_or("?"); + let status = module.get("status").and_then(Value::as_str).unwrap_or("?"); + let error = module.get("last_error").and_then(Value::as_str); + println!(" {:30} {}", name, status); + if let Some(error) = error { + println!(" └ {error}"); + } + } + } + + if let Some(count) = health.get("subscriptions").and_then(Value::as_u64) { + println!(); + println!("subscriptions {count}"); + } + + if let Some(errors) = health.get("recent_errors").and_then(Value::as_array) { + if !errors.is_empty() { + println!(); + println!("recent errors ({} total)", errors.len()); + for entry in errors.iter().take(5) { + println!(" {entry}"); + } + } + } +} + +async fn send_request_with_stream( + stream: UnixStream, + method: &str, + params: Value, +) -> Result { + let (read_half, mut write_half) = stream.into_split(); + let request = json!({ + "id": "1", + "method": method, + "params": params, + }); + + write_half + .write_all(format!("{}\n", serde_json::to_string(&request)?).as_bytes()) + .await?; + + let mut lines = BufReader::new(read_half).lines(); + let Some(line) = lines.next_line().await? else { + anyhow::bail!("daemon closed connection without response"); + }; + let response: Value = serde_json::from_str(&line)?; + if let Some(error) = response.get("error").and_then(Value::as_str) { + anyhow::bail!(error.to_string()); + } + Ok(response.get("result").cloned().unwrap_or_else(|| json!({}))) +} + +fn config_directory() -> PathBuf { + if let Ok(xdg) = env::var("XDG_CONFIG_HOME") { + return Path::new(&xdg).join("bread"); + } + if let Ok(home) = env::var("HOME") { + return Path::new(&home).join(".config/bread"); + } + PathBuf::from(".config/bread") +} diff --git a/breadd/src/adapters/mod.rs b/breadd/src/adapters/mod.rs index d3a8a3f..b33569b 100644 --- a/breadd/src/adapters/mod.rs +++ b/breadd/src/adapters/mod.rs @@ -1,8 +1,11 @@ use anyhow::Result; use async_trait::async_trait; use bread_shared::RawEvent; -use tokio::sync::{mpsc, watch}; +use tokio::sync::{mpsc, watch, RwLock}; use tracing::info; +use serde::Serialize; +use std::collections::HashMap; +use std::sync::Arc; use crate::core::config::Config; use crate::core::supervisor::spawn_supervised; @@ -14,6 +17,13 @@ pub mod udev; pub mod network_rtnetlink; pub mod power_upower; +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum AdapterStatus { + Connected, + Disconnected, +} + #[async_trait] pub trait Adapter: Send + Sync { fn name(&self) -> &'static str; @@ -30,6 +40,7 @@ pub struct Manager { raw_tx: mpsc::Sender, config: Config, shutdown_rx: watch::Receiver, + status: Arc>>, } impl Manager { @@ -42,9 +53,14 @@ impl Manager { raw_tx, config, shutdown_rx, + status: Arc::new(RwLock::new(HashMap::new())), } } + pub fn status_handle(&self) -> Arc>> { + self.status.clone() + } + pub async fn start_all(&self) -> Result<()> { info!("starting adapters"); @@ -91,17 +107,27 @@ impl Manager { let tx = self.raw_tx.clone(); let shutdown_rx = self.shutdown_rx.clone(); let shutdown_for_task = shutdown_rx.clone(); + let status = self.status.clone(); spawn_supervised(name, shutdown_rx, move || { let adapter = adapter.clone(); let tx = tx.clone(); let mut shutdown_rx = shutdown_for_task.clone(); + let status = status.clone(); async move { adapter.on_connect().await?; + { + let mut guard = status.write().await; + guard.insert(adapter.name().to_string(), AdapterStatus::Connected); + } let result = tokio::select! { result = adapter.run(tx) => result, _ = shutdown_rx.changed() => Ok(()), }; adapter.on_disconnect().await?; + { + let mut guard = status.write().await; + guard.insert(adapter.name().to_string(), AdapterStatus::Disconnected); + } result } }); diff --git a/breadd/src/core/config.rs b/breadd/src/core/config.rs index dedd0d4..1c756a9 100644 --- a/breadd/src/core/config.rs +++ b/breadd/src/core/config.rs @@ -12,8 +12,12 @@ pub struct Config { #[serde(default)] pub lua: LuaConfig, #[serde(default)] + pub modules: ModulesConfig, + #[serde(default)] pub adapters: AdaptersConfig, #[serde(default)] + pub notifications: NotificationsConfig, + #[serde(default)] pub events: EventsConfig, } @@ -33,6 +37,14 @@ pub struct LuaConfig { pub module_path: String, } +#[derive(Debug, Clone, Deserialize)] +pub struct ModulesConfig { + #[serde(default = "default_true")] + pub builtin: bool, + #[serde(default)] + pub disable: Vec, +} + #[derive(Debug, Clone, Deserialize)] pub struct AdaptersConfig { #[serde(default)] @@ -73,12 +85,24 @@ pub struct EventsConfig { pub dedup_window_ms: u64, } +#[derive(Debug, Clone, Deserialize)] +pub struct NotificationsConfig { + #[serde(default = "default_notify_timeout")] + pub default_timeout_ms: i64, + #[serde(default = "default_notify_urgency")] + pub default_urgency: String, + #[serde(default = "default_notify_path")] + pub notify_send_path: String, +} + impl Default for Config { fn default() -> Self { Self { daemon: DaemonConfig::default(), lua: LuaConfig::default(), + modules: ModulesConfig::default(), adapters: AdaptersConfig::default(), + notifications: NotificationsConfig::default(), events: EventsConfig::default(), } } @@ -102,6 +126,15 @@ impl Default for LuaConfig { } } +impl Default for ModulesConfig { + fn default() -> Self { + Self { + builtin: default_true(), + disable: Vec::new(), + } + } +} + impl Default for AdaptersConfig { fn default() -> Self { Self { @@ -147,6 +180,16 @@ impl Default for EventsConfig { } } +impl Default for NotificationsConfig { + fn default() -> Self { + Self { + default_timeout_ms: default_notify_timeout(), + default_urgency: default_notify_urgency(), + notify_send_path: default_notify_path(), + } + } +} + impl Config { pub fn load() -> Result { let path = config_path(); @@ -218,6 +261,18 @@ fn default_dedup_window() -> u64 { 100 } +fn default_notify_timeout() -> i64 { + 3000 +} + +fn default_notify_urgency() -> String { + "normal".to_string() +} + +fn default_notify_path() -> String { + "notify-send".to_string() +} + fn default_udev_subsystems() -> Vec { vec![ "usb".to_string(), diff --git a/breadd/src/core/normalizer.rs b/breadd/src/core/normalizer.rs index d0f7b42..b424e1c 100644 --- a/breadd/src/core/normalizer.rs +++ b/breadd/src/core/normalizer.rs @@ -80,22 +80,102 @@ impl EventNormalizer { fn normalize_hyprland(&self, raw: &RawEvent) -> Vec { let kind = raw.payload.get("kind").and_then(Value::as_str).unwrap_or("unknown"); - let mapped = match kind { - "workspace" | "workspacev2" => "bread.workspace.changed", - "monitoradded" => "bread.monitor.connected", - "monitorremoved" => "bread.monitor.disconnected", - "activewindow" | "activewindowv2" => "bread.window.focus.changed", - "openwindow" => "bread.window.opened", - "closewindow" => "bread.window.closed", - _ => "bread.hyprland.event", - }; + let data = raw + .payload + .get("data") + .and_then(Value::as_str) + .unwrap_or(""); - vec![BreadEvent { - event: mapped.to_string(), - timestamp: raw.timestamp, - source: AdapterSource::Hyprland, - data: raw.payload.clone(), - }] + match kind { + "workspace" | "workspacev2" => vec![BreadEvent { + event: "bread.workspace.changed".to_string(), + timestamp: raw.timestamp, + source: AdapterSource::Hyprland, + data: raw.payload.clone(), + }], + "createworkspace" => vec![BreadEvent { + event: "bread.workspace.created".to_string(), + timestamp: raw.timestamp, + source: AdapterSource::Hyprland, + data: json!({ "workspace": data }), + }], + "destroyworkspace" => vec![BreadEvent { + event: "bread.workspace.destroyed".to_string(), + timestamp: raw.timestamp, + source: AdapterSource::Hyprland, + data: json!({ "workspace": data }), + }], + "monitoradded" => vec![BreadEvent { + event: "bread.monitor.connected".to_string(), + timestamp: raw.timestamp, + source: AdapterSource::Hyprland, + data: raw.payload.clone(), + }], + "monitorremoved" => vec![BreadEvent { + event: "bread.monitor.disconnected".to_string(), + timestamp: raw.timestamp, + source: AdapterSource::Hyprland, + data: raw.payload.clone(), + }], + "activewindow" => vec![BreadEvent { + event: "bread.window.focus.changed".to_string(), + timestamp: raw.timestamp, + source: AdapterSource::Hyprland, + data: raw.payload.clone(), + }], + "activewindowv2" => { + let fields = split_hyprland_fields(data); + vec![BreadEvent { + event: "bread.window.focused".to_string(), + timestamp: raw.timestamp, + source: AdapterSource::Hyprland, + data: json!({ + "address": fields.get(0).unwrap_or(&"") + }), + }] + } + "openwindow" => { + let fields = split_hyprland_fields(data); + vec![BreadEvent { + event: "bread.window.opened".to_string(), + timestamp: raw.timestamp, + source: AdapterSource::Hyprland, + data: json!({ + "address": fields.get(0).unwrap_or(&""), + "workspace": fields.get(1).unwrap_or(&""), + "class": fields.get(2).unwrap_or(&""), + "title": fields.get(3).unwrap_or(&""), + }), + }] + } + "closewindow" => { + let fields = split_hyprland_fields(data); + vec![BreadEvent { + event: "bread.window.closed".to_string(), + timestamp: raw.timestamp, + source: AdapterSource::Hyprland, + data: json!({ "address": fields.get(0).unwrap_or(&"") }), + }] + } + "movewindow" => { + let fields = split_hyprland_fields(data); + vec![BreadEvent { + event: "bread.window.moved".to_string(), + timestamp: raw.timestamp, + source: AdapterSource::Hyprland, + data: json!({ + "address": fields.get(0).unwrap_or(&""), + "workspace": fields.get(1).unwrap_or(&""), + }), + }] + } + _ => vec![BreadEvent { + event: "bread.hyprland.event".to_string(), + timestamp: raw.timestamp, + source: AdapterSource::Hyprland, + data: raw.payload.clone(), + }], + } } fn normalize_power(&self, raw: &RawEvent) -> Vec { @@ -201,6 +281,13 @@ impl EventNormalizer { } } +fn split_hyprland_fields(data: &str) -> Vec<&str> { + if data.is_empty() { + return Vec::new(); + } + data.split(">>").collect() +} + fn classify_device(payload: &Value) -> DeviceClass { let name = payload .get("name") diff --git a/breadd/src/core/state_engine.rs b/breadd/src/core/state_engine.rs index aecae3c..caea8dc 100644 --- a/breadd/src/core/state_engine.rs +++ b/breadd/src/core/state_engine.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; use anyhow::Result; use bread_shared::{AdapterSource, BreadEvent}; @@ -15,6 +16,7 @@ use crate::lua::LuaMessage; pub struct StateHandle { state: Arc>, command_tx: mpsc::UnboundedSender, + subscription_count: Arc, } pub enum StateCommand { @@ -38,6 +40,7 @@ pub enum StateCommand { name: String, status: ModuleLoadState, last_error: Option, + builtin: bool, }, SetProfile { name: String, @@ -45,8 +48,16 @@ pub enum StateCommand { } impl StateHandle { - pub fn new(state: Arc>, command_tx: mpsc::UnboundedSender) -> Self { - Self { state, command_tx } + pub fn new( + state: Arc>, + command_tx: mpsc::UnboundedSender, + subscription_count: Arc, + ) -> Self { + Self { + state, + command_tx, + subscription_count, + } } pub fn state_arc(&self) -> Arc> { @@ -101,17 +112,28 @@ impl StateHandle { let _ = self.command_tx.send(StateCommand::ClearSubscriptions); } - pub fn set_module_status(&self, name: String, status: ModuleLoadState, last_error: Option) { + pub fn set_module_status( + &self, + name: String, + status: ModuleLoadState, + last_error: Option, + builtin: bool, + ) { let _ = self.command_tx.send(StateCommand::SetModuleStatus { name, status, last_error, + builtin, }); } pub fn set_profile(&self, name: String) { let _ = self.command_tx.send(StateCommand::SetProfile { name }); } + + pub fn subscription_count(&self) -> Arc { + self.subscription_count.clone() + } } pub async fn run_state_engine( @@ -120,6 +142,7 @@ pub async fn run_state_engine( state: Arc>, lua_tx: mpsc::UnboundedSender, event_stream_tx: broadcast::Sender, + subscription_count: Arc, mut shutdown_rx: watch::Receiver, ) { let mut subscriptions = SubscriptionTable::default(); @@ -136,7 +159,7 @@ pub async fn run_state_engine( let Some(cmd) = maybe_cmd else { break; }; - handle_command(cmd, &state, &mut subscriptions, &mut watches).await; + handle_command(cmd, &state, &mut subscriptions, &mut watches, &subscription_count).await; } maybe_event = event_rx.recv() => { let Some(event) = maybe_event else { @@ -158,7 +181,7 @@ pub async fn run_state_engine( apply_event_to_state(&mut guard, &event); } - dispatch_event(&event, &mut subscriptions, &lua_tx, &event_stream_tx); + dispatch_event(&event, &mut subscriptions, &lua_tx, &event_stream_tx, &subscription_count); if let (Some(before), Some(after)) = (before_snapshot, after_snapshot) { for (_id, path) in watches.iter() { @@ -174,7 +197,7 @@ pub async fn run_state_engine( "old": old_val, }), ); - dispatch_event(&synthetic, &mut subscriptions, &lua_tx, &event_stream_tx); + dispatch_event(&synthetic, &mut subscriptions, &lua_tx, &event_stream_tx, &subscription_count); } } } @@ -190,13 +213,17 @@ async fn handle_command( state: &Arc>, subscriptions: &mut SubscriptionTable, watches: &mut HashMap, + subscription_count: &Arc, ) { match cmd { StateCommand::RegisterSubscription { id, pattern, once } => { subscriptions.add_with_id(id, pattern, once); + subscription_count.fetch_add(1, Ordering::Relaxed); } StateCommand::RemoveSubscription { id } => { - subscriptions.remove(id); + if subscriptions.remove(id) { + subscription_count.fetch_sub(1, Ordering::Relaxed); + } } StateCommand::RegisterWatch { id, path } => { watches.insert(id, path); @@ -207,21 +234,25 @@ async fn handle_command( StateCommand::ClearSubscriptions => { subscriptions.clear(); watches.clear(); + subscription_count.store(0, Ordering::Relaxed); } StateCommand::SetModuleStatus { name, status, last_error, + builtin, } => { let mut guard = state.write().await; if let Some(existing) = guard.modules.iter_mut().find(|m| m.name == name) { existing.status = status; existing.last_error = last_error; + existing.builtin = builtin; } else { guard.modules.push(crate::core::types::ModuleStatus { name, status, last_error, + builtin, store: HashMap::new(), }); } @@ -242,6 +273,7 @@ fn dispatch_event( subscriptions: &mut SubscriptionTable, lua_tx: &mpsc::UnboundedSender, event_stream_tx: &broadcast::Sender, + subscription_count: &Arc, ) { let _ = event_stream_tx.send(event.clone()); @@ -254,7 +286,9 @@ fn dispatch_event( } for sub in matches.into_iter().filter(|s| s.once) { - subscriptions.remove(sub.id); + if subscriptions.remove(sub.id) { + subscription_count.fetch_sub(1, Ordering::Relaxed); + } let _ = lua_tx.send(LuaMessage::SubscriptionCancelled { id: sub.id }); } } @@ -302,11 +336,12 @@ fn apply_event_to_state(state: &mut RuntimeState, event: &BreadEvent) { .map(ToString::to_string); state.active_workspace = ws; } - "bread.window.focus.changed" => { + "bread.window.focus.changed" | "bread.window.focused" => { state.active_window = event .data .get("window") .or_else(|| event.data.get("class")) + .or_else(|| event.data.get("address")) .and_then(Value::as_str) .map(ToString::to_string); } diff --git a/breadd/src/core/types.rs b/breadd/src/core/types.rs index 254e075..45ccfa5 100644 --- a/breadd/src/core/types.rs +++ b/breadd/src/core/types.rs @@ -123,6 +123,8 @@ pub struct ModuleStatus { pub status: ModuleLoadState, pub last_error: Option, #[serde(default)] + pub builtin: bool, + #[serde(default)] pub store: HashMap, } diff --git a/breadd/src/ipc/mod.rs b/breadd/src/ipc/mod.rs index 1ac245b..f4aa092 100644 --- a/breadd/src/ipc/mod.rs +++ b/breadd/src/ipc/mod.rs @@ -1,18 +1,22 @@ +use std::collections::{HashMap, VecDeque}; use std::fs; use std::os::unix::fs::PermissionsExt; use std::path::PathBuf; use std::process; use std::time::Instant; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; use anyhow::{anyhow, Result}; -use bread_shared::{AdapterSource, BreadEvent}; +use bread_shared::{now_unix_ms, AdapterSource, BreadEvent}; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::{UnixListener, UnixStream}; -use tokio::sync::{broadcast, mpsc, watch}; +use tokio::sync::{broadcast, mpsc, watch, RwLock}; use tracing::{error, info, warn}; +use crate::adapters::AdapterStatus; use crate::core::state_engine::StateHandle; use crate::lua::RuntimeHandle; @@ -23,6 +27,9 @@ pub struct Server { event_tx: broadcast::Sender, lua_runtime: RuntimeHandle, emit_tx: mpsc::UnboundedSender, + adapter_status: Arc>>, + subscription_count: Arc, + event_buffer: Arc>>, started_at: Instant, pid: u32, } @@ -51,6 +58,9 @@ impl Server { event_tx: broadcast::Sender, lua_runtime: RuntimeHandle, emit_tx: mpsc::UnboundedSender, + adapter_status: Arc>>, + subscription_count: Arc, + event_buffer: Arc>>, ) -> Self { Self { socket_path, @@ -58,6 +68,9 @@ impl Server { event_tx, lua_runtime, emit_tx, + adapter_status, + subscription_count, + event_buffer, started_at: Instant::now(), pid: process::id(), } @@ -166,12 +179,25 @@ impl Server { let full = self.state_handle.state_dump().await; Ok(full.get("modules").cloned().unwrap_or_else(|| json!([]))) } - "modules.reload" => self - .lua_runtime - .reload() - .await - .map(|_| json!({ "reloaded": true })) - .map_err(|e| e.to_string()), + "modules.reload" => { + let started = Instant::now(); + if let Err(err) = self.lua_runtime.reload().await { + return Err((id, err.to_string())); + } + let duration_ms = started.elapsed().as_millis(); + let modules = self + .state_handle + .state_dump() + .await + .get("modules") + .cloned() + .unwrap_or_else(|| json!([])); + Ok(json!({ + "ok": true, + "duration_ms": duration_ms, + "modules": modules, + })) + } "profile.list" => { let full = self.state_handle.state_dump().await; let profiles = full @@ -224,13 +250,36 @@ impl Server { } "health" => { let uptime_ms = self.started_at.elapsed().as_millis(); + let state = self.state_handle.state_dump().await; + let modules = state.get("modules").cloned().unwrap_or_else(|| json!([])); + let adapters = self.adapter_status.read().await.clone(); + let subscription_count = self.subscription_count.load(std::sync::atomic::Ordering::Relaxed); + let recent_errors = self.lua_runtime.recent_errors(); Ok(json!({ "ok": true, "pid": self.pid, "version": env!("CARGO_PKG_VERSION"), "uptime_ms": uptime_ms, + "socket": self.socket_path.to_string_lossy(), + "adapters": adapters, + "modules": modules, + "subscriptions": subscription_count, + "recent_errors": recent_errors, })) } + "events.replay" => { + let since_ms = req.params.get("since_ms").and_then(Value::as_u64).unwrap_or(0); + let cutoff = now_unix_ms().saturating_sub(since_ms); + let mut replay = Vec::new(); + if let Ok(buf) = self.event_buffer.lock() { + for event in buf.iter() { + if event.timestamp >= cutoff { + replay.push(event); + } + } + } + Ok(serde_json::to_value(replay).unwrap_or_else(|_| json!([]))) + } _ => Err("unknown method".to_string()), }; diff --git a/breadd/src/lua/mod.rs b/breadd/src/lua/mod.rs index a73de1a..cf94a57 100644 --- a/breadd/src/lua/mod.rs +++ b/breadd/src/lua/mod.rs @@ -1,5 +1,5 @@ use std::cell::RefCell; -use std::collections::{HashMap, HashSet}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::fs; use std::path::{Path, PathBuf}; use std::rc::Rc; @@ -10,16 +10,18 @@ use std::time::Duration; use anyhow::{anyhow, Result}; use bread_shared::{AdapterSource, BreadEvent}; use mlua::{Error as LuaError, Function, Lua, LuaSerdeExt, RegistryKey, Table, Value}; +use serde::Serialize; use serde_json::Value as JsonValue; use tokio::sync::{mpsc, oneshot, watch, RwLock}; use tokio::task; -use tokio::time::{interval, sleep}; +use tokio::time::{interval_at, sleep, Instant}; use tracing::{error, info, warn}; -use crate::core::config::Config; +use crate::core::config::{Config, ModulesConfig, NotificationsConfig}; use crate::core::state_engine::StateHandle; use crate::core::subscriptions::SubscriptionId; use crate::core::types::{ModuleLoadState, RuntimeState}; +use bread_shared::now_unix_ms; pub enum LuaMessage { Event { @@ -38,9 +40,17 @@ pub enum LuaMessage { Shutdown, } +#[derive(Debug, Clone, Serialize)] +pub struct ErrorEntry { + pub timestamp: u64, + pub module: Option, + pub message: String, +} + #[derive(Clone)] pub struct RuntimeHandle { tx: mpsc::UnboundedSender, + recent_errors: Arc>>, } impl RuntimeHandle { @@ -63,6 +73,13 @@ impl RuntimeHandle { pub fn shutdown(&self) { let _ = self.tx.send(LuaMessage::Shutdown); } + + pub fn recent_errors(&self) -> Vec { + self.recent_errors + .lock() + .map(|buf| buf.iter().cloned().collect()) + .unwrap_or_default() + } } pub fn spawn_runtime( @@ -71,7 +88,11 @@ pub fn spawn_runtime( emit_tx: mpsc::UnboundedSender, ) -> Result { let (tx, mut rx) = mpsc::unbounded_channel(); - let handle = RuntimeHandle { tx }; + let recent_errors = Arc::new(Mutex::new(VecDeque::with_capacity(50))); + let handle = RuntimeHandle { + tx, + recent_errors: recent_errors.clone(), + }; let thread_tx = handle.tx.clone(); std::thread::Builder::new() @@ -83,7 +104,13 @@ pub fn spawn_runtime( .expect("failed to create lua runtime thread"); rt.block_on(async move { - let mut engine = match LuaEngine::new(config, state_handle, emit_tx, thread_tx.clone()) { + let mut engine = match LuaEngine::new( + config, + state_handle, + emit_tx, + thread_tx.clone(), + recent_errors, + ) { Ok(engine) => engine, Err(err) => { error!(error = %err, "failed to initialize lua engine"); @@ -160,6 +187,8 @@ struct ModuleDecl { version: Option, after: Vec, path: PathBuf, + source: Option<&'static str>, + builtin: bool, } struct ModuleInfo { @@ -182,6 +211,9 @@ struct LuaEngine { lua_tx: mpsc::UnboundedSender, entry_point: PathBuf, module_path: PathBuf, + modules_config: ModulesConfig, + notifications_config: NotificationsConfig, + recent_errors: Arc>>, } impl LuaEngine { @@ -190,6 +222,7 @@ impl LuaEngine { state_handle: StateHandle, emit_tx: mpsc::UnboundedSender, lua_tx: mpsc::UnboundedSender, + recent_errors: Arc>>, ) -> Result { Ok(Self { lua: Lua::new(), @@ -207,6 +240,9 @@ impl LuaEngine { lua_tx, entry_point: config.lua_entry_point(), module_path: config.lua_module_path(), + modules_config: config.modules.clone(), + notifications_config: config.notifications.clone(), + recent_errors, }) } @@ -324,7 +360,9 @@ impl LuaEngine { .map_err(|_| LuaError::external("missing filter function"))?; Some(lua.create_registry_value(filter_fn)?) } else { - return Err(LuaError::external("missing filter options")); + return Err(LuaError::external( + "bread.filter requires an opts table with a 'filter' function: bread.filter(pattern, fn, { filter = fn })", + )); }; let module = current_module .lock() @@ -503,6 +541,61 @@ impl LuaEngine { })?; bread.set("exec", exec_fn)?; + let notify_path = self.notifications_config.notify_send_path.clone(); + let default_urgency = self.notifications_config.default_urgency.clone(); + let default_timeout = self.notifications_config.default_timeout_ms; + let emit_tx = self.emit_tx.clone(); + let notify_fn = self + .lua + .create_function(move |_lua, (message, opts): (String, Option)| { + let title: String = opts + .as_ref() + .and_then(|o| o.get("title").ok()) + .unwrap_or_else(|| "bread".to_string()); + let urgency: String = opts + .as_ref() + .and_then(|o| o.get("urgency").ok()) + .unwrap_or_else(|| default_urgency.clone()); + let timeout: i64 = opts + .as_ref() + .and_then(|o| o.get("timeout").ok()) + .unwrap_or(default_timeout); + let icon: Option = opts.as_ref().and_then(|o| o.get("icon").ok()); + + let cmd_path = notify_path.clone(); + let title_clone = title.clone(); + let message_clone = message.clone(); + let urgency_clone = urgency.clone(); + task::spawn_blocking(move || { + let mut cmd = std::process::Command::new(cmd_path); + cmd.args([ + "--app-name", + "bread", + "--urgency", + &urgency_clone, + "--expire-time", + &timeout.to_string(), + ]); + if let Some(icon) = icon { + cmd.args(["--icon", &icon]); + } + let _ = cmd.args([&title_clone, &message_clone]).status(); + }); + + let _ = emit_tx.send(BreadEvent::new( + "bread.notify.sent", + AdapterSource::System, + serde_json::json!({ + "title": title, + "message": message, + "urgency": urgency, + }), + )); + + Ok(()) + })?; + bread.set("notify", notify_fn)?; + let timers = self.timers.clone(); let next_timer_id = self.next_timer_id.clone(); let lua_tx = self.lua_tx.clone(); @@ -556,7 +649,8 @@ impl LuaEngine { ); let lua_tx = lua_tx.clone(); task::spawn(async move { - let mut ticker = interval(Duration::from_millis(interval_ms)); + let start = Instant::now() + Duration::from_millis(interval_ms); + let mut ticker = interval_at(start, Duration::from_millis(interval_ms)); loop { tokio::select! { _ = ticker.tick() => { @@ -750,12 +844,22 @@ impl LuaEngine { } fn load_init_and_modules(&self) -> Result<()> { - self.load_lua_file(&self.entry_point, "init")?; + self.load_lua_file(&self.entry_point, "init", false)?; let mut files = list_lua_files(&self.module_path)?; files.sort(); + let disabled: HashSet = self + .modules_config + .disable + .iter() + .cloned() + .collect(); + let mut decls = Vec::new(); + if self.modules_config.builtin { + decls.extend(builtin_module_decls(&disabled)); + } for path in files.into_iter().filter(|p| !is_lib_path(&self.module_path, p)) { match self.scan_module_decl(&path) { Ok(decl) => decls.push(decl), @@ -765,6 +869,7 @@ impl LuaEngine { name, ModuleLoadState::LoadError, Some(err.to_string()), + false, ); } } @@ -784,7 +889,7 @@ impl LuaEngine { for (name, err) in dep_errors { self.state_handle - .set_module_status(name, ModuleLoadState::LoadError, Some(err)); + .set_module_status(name, ModuleLoadState::LoadError, Some(err), false); } let mut load_order = Vec::new(); @@ -792,14 +897,19 @@ impl LuaEngine { load_order.push(decl.name.clone()); match self.load_module(&decl) { Ok(()) => { - self.state_handle - .set_module_status(decl.name.clone(), ModuleLoadState::Loaded, None); + self.state_handle.set_module_status( + decl.name.clone(), + ModuleLoadState::Loaded, + None, + decl.builtin, + ); } Err(err) => { self.state_handle.set_module_status( decl.name.clone(), ModuleLoadState::LoadError, Some(err.to_string()), + decl.builtin, ); } } @@ -815,7 +925,11 @@ impl LuaEngine { fn load_module(&self, decl: &ModuleDecl) -> Result<()> { self.set_current_module(Some(decl.name.clone())); - let result = self.load_lua_file(&decl.path, &decl.name); + let result = if let Some(source) = decl.source.as_deref() { + self.load_lua_source(source, &decl.name) + } else { + self.load_lua_file(&decl.path, &decl.name, decl.builtin) + }; self.set_current_module(None); result?; @@ -827,13 +941,14 @@ impl LuaEngine { Ok(()) } - fn load_lua_file(&self, path: &Path, module_name: &str) -> Result<()> { + fn load_lua_file(&self, path: &Path, module_name: &str, builtin: bool) -> Result<()> { if !path.exists() { warn!(path = %path.display(), "lua file does not exist; skipping"); self.state_handle.set_module_status( module_name.to_string(), ModuleLoadState::NotFound, None, + builtin, ); return Ok(()); } @@ -843,6 +958,14 @@ impl LuaEngine { Ok(()) } + fn load_lua_source(&self, source: &str, module_name: &str) -> Result<()> { + self.lua + .load(source) + .set_name(module_name) + .exec() + .map_err(|e| anyhow!(e.to_string())) + } + fn handle_event(&self, id: SubscriptionId, event: BreadEvent) -> Result<()> { let (callback, filter, raw_kind, kind, module) = { let handlers = self.handlers.lock().expect("lua handlers mutex poisoned"); @@ -935,8 +1058,13 @@ impl LuaEngine { if let Some(hook) = self.get_module_hook(name, "on_load") { if let Err(err) = hook.call::<_, ()>(()) { error!(module = %name, error = %err, "module on_load failed"); - self.state_handle - .set_module_status(name.to_string(), ModuleLoadState::LoadError, Some(err.to_string())); + let builtin = self.module_is_builtin(name); + self.state_handle.set_module_status( + name.to_string(), + ModuleLoadState::LoadError, + Some(err.to_string()), + builtin, + ); } } } @@ -951,10 +1079,12 @@ impl LuaEngine { if let Some(hook) = self.get_module_hook(&name, "on_reload") { if let Err(err) = hook.call::<_, ()>(()) { error!(module = %name, error = %err, "module on_reload failed"); + let builtin = self.module_is_builtin(&name); self.state_handle.set_module_status( name.to_string(), ModuleLoadState::Degraded, Some(err.to_string()), + builtin, ); } } @@ -971,10 +1101,12 @@ impl LuaEngine { if let Some(hook) = self.get_module_hook(&name, "on_unload") { if let Err(err) = hook.call::<_, ()>(()) { error!(module = %name, error = %err, "module on_unload failed"); + let builtin = self.module_is_builtin(&name); self.state_handle.set_module_status( name.to_string(), ModuleLoadState::Degraded, Some(err.to_string()), + builtin, ); } } @@ -983,10 +1115,22 @@ impl LuaEngine { fn handle_callback_error(&self, module: Option<&str>, id: SubscriptionId, err: LuaError) { if let Some(module) = module { + let builtin = self.module_is_builtin(module); + if let Ok(mut buf) = self.recent_errors.lock() { + if buf.len() >= 50 { + buf.pop_front(); + } + buf.push_back(ErrorEntry { + timestamp: now_unix_ms(), + module: Some(module.to_string()), + message: err.to_string(), + }); + } self.state_handle.set_module_status( module.to_string(), ModuleLoadState::Degraded, Some(err.to_string()), + builtin, ); if let Some(hook) = self.get_module_hook(module, "on_error") { match hook.call::<_, bool>(err.to_string()) { @@ -1022,6 +1166,14 @@ impl LuaEngine { .unwrap_or(false) } + fn module_is_builtin(&self, name: &str) -> bool { + self.module_decls + .lock() + .ok() + .and_then(|map| map.get(name).map(|d| d.builtin)) + .unwrap_or(false) + } + fn set_current_module(&self, name: Option) { if let Ok(mut guard) = self.current_module.lock() { *guard = name; @@ -1052,6 +1204,8 @@ impl LuaEngine { version, after, path: module_path.clone(), + source: None, + builtin: false, }); Err(LuaError::RuntimeError(MODULE_DECL_ABORT.to_string())) })?; @@ -1119,6 +1273,14 @@ impl LuaEngine { self.lua .load( r#" + bread.spawn = function(fn) + local co = coroutine.create(fn) + local ok, err = coroutine.resume(co) + if not ok then + error(err) + end + end + bread.wait = function(pattern, opts) if type(pattern) ~= "string" then error("bread.wait requires a pattern string") @@ -1288,10 +1450,266 @@ fn module_store_set(state_arc: &Arc>, module: &str, key: St name: module.to_string(), status: ModuleLoadState::Loaded, last_error: None, + builtin: false, store, }); } +const BUILTIN_MONITORS: &str = r#" +local M = bread.module({ name = "bread.monitors", version = "1.0.0" }) + +local workflows = {} +local layouts = {} + +local function matches_when(event_name, when) + if when == "connected" then + return event_name == "bread.monitor.connected" + elseif when == "disconnected" then + return event_name == "bread.monitor.disconnected" + elseif when == "changed" then + return event_name == "bread.monitor.changed" + end + return false +end + +local function matches_monitors(list, event) + if not list or #list == 0 then + return true + end + local name = event.data and event.data.name + if not name then + return false + end + for _, monitor in ipairs(list) do + if monitor == name then + return true + end + end + return false +end + +local function run_workflow(wf, event) + if type(wf.run) == "function" then + wf.run(event) + elseif type(wf.run) == "string" then + bread.exec(wf.run) + end +end + +function M.on(opts) + table.insert(workflows, opts) +end + +function M.layout(name, fn) + layouts[name] = fn +end + +function M.apply(name) + return function() + local fn = layouts[name] + if fn then + fn() + end + end +end + +function M.on_load() + bread.on("bread.monitor.**", function(event) + for _, wf in ipairs(workflows) do + if matches_when(event.event, wf.when) and matches_monitors(wf.monitors, event) then + run_workflow(wf, event) + end + end + end) +end + +return M +"#; + +const BUILTIN_DEVICES: &str = r#" +local M = bread.module({ name = "bread.devices", version = "1.0.0" }) + +local rules = {} + +local function matches_rule(rule, event) + local class = rule.class + local when = rule.when + local data = event.data or {} + + if when == "connected" and event.event ~= "bread.device.connected" then + if not event.event:match("%.connected$") then + return false + end + elseif when == "disconnected" and event.event ~= "bread.device.disconnected" then + if not event.event:match("%.disconnected$") then + return false + end + end + + if class and data.class ~= class then + return false + end + + if rule.name and data.name and not tostring(data.name):match(rule.name) then + return false + end + + return true +end + +local function run_rule(rule, event) + if type(rule.run) == "function" then + rule.run(event) + elseif type(rule.run) == "string" then + bread.exec(rule.run) + end +end + +function M.on(opts) + table.insert(rules, opts) +end + +function M.on_load() + bread.on("bread.device.**", function(event) + for _, rule in ipairs(rules) do + if matches_rule(rule, event) then + run_rule(rule, event) + end + end + end) +end + +return M +"#; + +const BUILTIN_WORKSPACES: &str = r#" +local M = bread.module({ name = "bread.workspaces", version = "1.0.0", after = { "bread.monitors" } }) + +local assignments = {} +local rules = {} + +function M.assign(workspace, monitor) + table.insert(assignments, { workspace = workspace, monitor = monitor }) +end + +function M.pin(opts) + table.insert(rules, opts) +end + +function M.apply_assignments() + local monitors = bread.state.monitors() + local active = {} + for _, m in ipairs(monitors) do + if m.connected then + active[m.name] = true + end + end + + for _, a in ipairs(assignments) do + if active[a.monitor] then + bread.hyprland.dispatch("moveworkspacetomonitor", a.workspace .. " " .. a.monitor) + end + end +end + +function M.on_load() + bread.on("bread.monitor.**", function() + M.apply_assignments() + end) + + bread.on("bread.window.opened", function(event) + for _, rule in ipairs(rules) do + if event.data and event.data.class and event.data.class:match(rule.app) then + local address = event.data.address or "" + bread.hyprland.dispatch("movetoworkspacesilent", rule.workspace .. ",address:" .. address) + end + end + end) + + bread.once("bread.system.startup", function() + M.apply_assignments() + end) +end + +return M +"#; + +const BUILTIN_BINDS: &str = r#" +local M = bread.module({ name = "bread.binds", version = "1.0.0" }) + +local active = {} + +local function bind_string(opts) + local mods = table.concat(opts.mods or {}, " ") + local args = opts.args or "" + if mods ~= "" then + return mods .. ", " .. opts.key .. ", " .. opts.dispatch .. ", " .. args + end + return opts.key .. ", " .. opts.dispatch .. ", " .. args +end + +function M.add(opts) + local bind = bind_string(opts) + bread.hyprland.keyword("bind", bind) + active[opts.key] = opts + return opts.key +end + +function M.remove(key) + local bind = active[key] + if not bind then + return + end + bread.hyprland.keyword("unbind", bind_string(bind)) + active[key] = nil +end + +function M.replace(key, opts) + M.remove(key) + return M.add(opts) +end + +function M.on_unload() + for key, _ in pairs(active) do + M.remove(key) + end +end + +return M +"#; + +fn builtin_module_decls(disabled: &HashSet) -> Vec { + let mut out = Vec::new(); + + let entries = vec![ + ("bread.monitors", "1.0.0", Vec::new(), BUILTIN_MONITORS), + ("bread.devices", "1.0.0", Vec::new(), BUILTIN_DEVICES), + ( + "bread.workspaces", + "1.0.0", + vec!["bread.monitors".to_string()], + BUILTIN_WORKSPACES, + ), + ("bread.binds", "1.0.0", Vec::new(), BUILTIN_BINDS), + ]; + + for (name, version, after, source) in entries { + if disabled.contains(name) { + continue; + } + out.push(ModuleDecl { + name: name.to_string(), + version: Some(version.to_string()), + after, + path: PathBuf::from(format!("")), + source: Some(source), + builtin: true, + }); + } + + out +} + fn hyprland_request_socket() -> Result { let instance = std::env::var("HYPRLAND_INSTANCE_SIGNATURE") .map_err(|_| anyhow!("HYPRLAND_INSTANCE_SIGNATURE is not set"))?; @@ -1307,11 +1725,13 @@ fn hyprland_request(request: &str) -> Result { use std::os::unix::net::UnixStream; let socket = hyprland_request_socket()?; - let mut stream = UnixStream::connect(socket)?; - stream.write_all(request.as_bytes())?; - let mut buffer = String::new(); - stream.read_to_string(&mut buffer)?; - Ok(buffer) + tokio::task::block_in_place(|| { + let mut stream = UnixStream::connect(&socket)?; + stream.write_all(request.as_bytes())?; + let mut buffer = String::new(); + stream.read_to_string(&mut buffer)?; + Ok(buffer) + }) } fn list_lua_files(root: &Path) -> Result> { diff --git a/breadd/src/main.rs b/breadd/src/main.rs index c57fabd..bcb4daa 100644 --- a/breadd/src/main.rs +++ b/breadd/src/main.rs @@ -3,7 +3,9 @@ mod core; mod ipc; mod lua; +use std::collections::VecDeque; use std::sync::Arc; +use std::sync::atomic::AtomicU64; use anyhow::Result; use bread_shared::{AdapterSource, BreadEvent, RawEvent}; @@ -33,7 +35,8 @@ async fn main() -> Result<()> { let (event_stream_tx, _) = broadcast::channel(2048); let (shutdown_tx, shutdown_rx) = watch::channel(false); - let state_handle = StateHandle::new(state.clone(), state_cmd_tx); + let subscription_count = Arc::new(AtomicU64::new(0)); + let state_handle = StateHandle::new(state.clone(), state_cmd_tx, subscription_count.clone()); let lua_runtime = lua::spawn_runtime(config.clone(), state_handle.clone(), normalized_tx.clone())?; let lua_tx = lua_runtime.sender(); @@ -44,6 +47,7 @@ async fn main() -> Result<()> { state.clone(), lua_tx, event_stream_tx.clone(), + subscription_count.clone(), shutdown_rx.clone(), )); @@ -78,6 +82,28 @@ async fn main() -> Result<()> { let adapter_manager = adapters::Manager::new(raw_tx, config.clone(), shutdown_rx.clone()); adapter_manager.start_all().await?; + let adapter_status = adapter_manager.status_handle(); + + let event_buffer = Arc::new(std::sync::Mutex::new(VecDeque::with_capacity(1000))); + { + let mut rx = event_stream_tx.subscribe(); + let event_buffer = event_buffer.clone(); + tokio::spawn(async move { + loop { + let evt = match rx.recv().await { + Ok(evt) => evt, + Err(_) => break, + }; + if let Ok(mut buf) = event_buffer.lock() { + if buf.len() >= 1000 { + buf.pop_front(); + } + buf.push_back(evt); + } + } + }); + } + let _ = normalized_tx.send(BreadEvent::new( "bread.system.startup", AdapterSource::System, @@ -90,6 +116,9 @@ async fn main() -> Result<()> { event_stream_tx, lua_runtime.clone(), normalized_tx, + adapter_status, + subscription_count, + event_buffer, ); info!("breadd fully started");