From 1a00daf6a8644e8642dc8d00fd61065e1c7f536c Mon Sep 17 00:00:00 2001 From: Breadway Date: Mon, 11 May 2026 16:30:05 +0800 Subject: [PATCH 1/4] unsure --- Cargo.lock | 130 ++++++++- bread-cli/Cargo.toml | 1 + bread-cli/src/main.rs | 317 +++++++++++++++++++++- breadd/src/adapters/mod.rs | 28 +- breadd/src/core/config.rs | 55 ++++ breadd/src/core/normalizer.rs | 117 ++++++-- breadd/src/core/state_engine.rs | 53 +++- breadd/src/core/types.rs | 2 + breadd/src/ipc/mod.rs | 65 ++++- breadd/src/lua/mod.rs | 460 ++++++++++++++++++++++++++++++-- breadd/src/main.rs | 31 ++- 11 files changed, 1192 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0ab00f1..d987076 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -289,6 +289,7 @@ dependencies = [ "anyhow", "bread-shared", "clap", + "notify", "serde", "serde_json", "tokio", @@ -429,6 +430,15 @@ dependencies = [ "libc", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -579,6 +589,16 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6" +[[package]] +name = "filetime" +version = "0.2.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d5b2eef6fafbf69f877e55509ce5b11a760690ac9700a2921be067aa6afaef6" +dependencies = [ + "cfg-if", + "libc", +] + [[package]] name = "find-msvc-tools" version = "0.1.9" @@ -591,6 +611,15 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "futures" version = "0.3.32" @@ -798,6 +827,26 @@ dependencies = [ "serde_core", ] +[[package]] +name = "inotify" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" +dependencies = [ + "bitflags 1.3.2", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "instant" version = "0.1.13" @@ -830,6 +879,26 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" +[[package]] +name = "kqueue" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac30106d7dce88daf4a3fcb4879ea939476d5074a9b7ddd0fb97fa4bed5596a" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "285efcf12ef41bec907b3000d5ffaeb54191d4d9d83c0d6157e6cbc2db255e64" +dependencies = [ + "bitflags 2.11.1", + "libc", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -952,6 +1021,18 @@ dependencies = [ "autocfg", ] +[[package]] +name = "mio" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +dependencies = [ + "libc", + "log", + "wasi", + "windows-sys 0.48.0", +] + [[package]] name = "mio" version = "1.2.0" @@ -1083,6 +1164,25 @@ dependencies = [ "memoffset 0.7.1", ] +[[package]] +name = "notify" +version = "6.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d" +dependencies = [ + "bitflags 2.11.1", + "crossbeam-channel", + "filetime", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "log", + "mio 0.8.11", + "walkdir", + "windows-sys 0.48.0", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -1402,6 +1502,15 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -1639,7 +1748,7 @@ checksum = "8fc7f01b389ac15039e4dc9531aa973a135d7a4135281b12d7c1bc79fd57fffe" dependencies = [ "bytes", "libc", - "mio", + "mio 1.2.0", "parking_lot", "pin-project-lite", "signal-hook-registry", @@ -1844,6 +1953,16 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "317211a0dc0ceedd78fb2ca9a44aed3d7b9b26f81870d485c07122b4350673b7" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" @@ -1930,6 +2049,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/bread-cli/Cargo.toml b/bread-cli/Cargo.toml index 0550c57..6d2541e 100644 --- a/bread-cli/Cargo.toml +++ b/bread-cli/Cargo.toml @@ -10,3 +10,4 @@ serde_json.workspace = true tokio.workspace = true anyhow.workspace = true clap = { version = "4.5", features = ["derive"] } +notify = "6.1" diff --git a/bread-cli/src/main.rs b/bread-cli/src/main.rs index e4af194..6e1982a 100644 --- a/bread-cli/src/main.rs +++ b/bread-cli/src/main.rs @@ -1,10 +1,14 @@ use anyhow::Result; use clap::{Parser, Subcommand}; +use notify::{RecommendedWatcher, RecursiveMode, Watcher}; use serde_json::{json, Value}; use std::env; +use std::io; use std::path::{Path, PathBuf}; +use std::time::{Duration, UNIX_EPOCH}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::UnixStream; +use tokio::sync::mpsc; #[derive(Parser, Debug)] #[command(author, version, about = "Bread CLI - the reactive desktop automation fabric")] @@ -16,13 +20,32 @@ struct Cli { #[derive(Subcommand, Debug)] enum Commands { /// Hot-reload all Lua modules - Reload, + Reload { + /// Watch config directory and reload on changes + #[arg(long)] + watch: bool, + }, /// Dump current runtime state - State, + State { + /// Optional dotted path into RuntimeState + path: Option, + /// Output raw JSON + #[arg(long)] + json: bool, + }, /// Stream live normalized events Events { #[arg(long)] filter: Option, + /// Output raw JSON + #[arg(long)] + json: bool, + /// Comma-separated fields to display + #[arg(long)] + fields: Option, + /// Replay events from the last N seconds + #[arg(long)] + since: Option, }, /// List loaded modules and status Modules, @@ -40,6 +63,12 @@ enum Commands { Ping, /// Fetch daemon health details Health, + /// Diagnose daemon and module health + Doctor { + /// Output raw JSON + #[arg(long)] + json: bool, + }, } #[tokio::main] @@ -48,16 +77,38 @@ async fn main() -> Result<()> { let socket = daemon_socket_path(); match &cli.command { - Commands::Reload => { - let response = send_request(&socket, "modules.reload", json!({})).await?; - print_json(&response)?; + Commands::Reload { watch } => { + if *watch { + watch_reload(&socket).await?; + } else { + let response = send_request(&socket, "modules.reload", json!({})).await?; + print_reload(&response); + } } - Commands::State => { - let response = send_request(&socket, "state.dump", json!({})).await?; - print_json(&response)?; + Commands::State { path, json } => { + if *json { + let response = if let Some(path) = path { + send_request(&socket, "state.get", json!({ "key": path })).await? + } else { + send_request(&socket, "state.dump", json!({})).await? + }; + print_json(&response)?; + } else { + let response = if let Some(path) = path { + send_request(&socket, "state.get", json!({ "key": path })).await? + } else { + send_request(&socket, "state.dump", json!({})).await? + }; + print_state_formatted(path.as_deref(), &response); + } } - Commands::Events { filter } => { - stream_events(&socket, filter.clone()).await?; + Commands::Events { + filter, + json, + fields, + since, + } => { + stream_events(&socket, filter.clone(), *json, fields.clone(), *since).await?; } Commands::Modules => { let response = send_request(&socket, "modules.list", json!({})).await?; @@ -92,6 +143,14 @@ async fn main() -> Result<()> { let response = send_request(&socket, "health", json!({})).await?; print_json(&response)?; } + Commands::Doctor { json } => { + if *json { + let response = send_request(&socket, "health", json!({})).await?; + print_json(&response)?; + } else { + print_doctor(&socket).await?; + } + } } Ok(()) @@ -128,7 +187,26 @@ async fn send_request(socket: &Path, method: &str, params: Value) -> Result) -> Result<()> { +async fn stream_events( + socket: &Path, + filter: Option, + raw_json: bool, + fields: Option, + since: Option, +) -> Result<()> { + if let Some(seconds) = since { + let replay = send_request(socket, "events.replay", json!({ "since_ms": seconds * 1000 })).await?; + if let Some(list) = replay.as_array() { + for item in list { + if raw_json { + println!("{}", serde_json::to_string_pretty(item)?); + } else { + print_event(item, fields.as_deref()); + } + } + } + } + let stream = UnixStream::connect(socket).await?; let (read_half, mut write_half) = stream.into_split(); let request = json!({ @@ -146,7 +224,11 @@ async fn stream_events(socket: &Path, filter: Option) -> Result<()> { let mut lines = BufReader::new(read_half).lines(); while let Some(line) = lines.next_line().await? { let value: Value = serde_json::from_str(&line)?; - println!("{}", serde_json::to_string_pretty(&value)?); + if raw_json { + println!("{}", serde_json::to_string_pretty(&value)?); + } else { + print_event(&value, fields.as_deref()); + } } Ok(()) @@ -156,3 +238,214 @@ fn print_json(value: &Value) -> Result<()> { println!("{}", serde_json::to_string_pretty(value)?); Ok(()) } + +fn print_state_formatted(path: Option<&str>, value: &Value) { + if let Some(path) = path { + println!("{path}"); + } + print_value(value, 0); +} + +fn print_value(value: &Value, indent: usize) { + let pad = " ".repeat(indent); + match value { + Value::Object(map) => { + for (key, val) in map { + println!("{pad}{key}"); + print_value(val, indent + 2); + } + } + Value::Array(list) => { + for (idx, val) in list.iter().enumerate() { + println!("{pad}[{idx}]"); + print_value(val, indent + 2); + } + } + other => { + println!("{pad}{}", other); + } + } +} + +fn print_event(event: &Value, fields: Option<&str>) { + if let Some(fields) = fields { + let mut out = serde_json::Map::new(); + for field in fields.split(',') { + let field = field.trim(); + if field.is_empty() { + continue; + } + if let Some(val) = event.get(field) { + out.insert(field.to_string(), val.clone()); + } + } + println!("{}", Value::Object(out)); + return; + } + + let ts = event.get("timestamp").and_then(Value::as_u64).unwrap_or(0); + let event_name = event.get("event").and_then(Value::as_str).unwrap_or("?"); + let source = event.get("source").and_then(Value::as_str).unwrap_or("?"); + let time = format_timestamp(ts); + println!("{time} {event_name} source={source}"); + if let Some(data) = event.get("data") { + println!(" data: {}", data); + } +} + +fn format_timestamp(ms: u64) -> String { + let secs = ms / 1000; + let millis = ms % 1000; + let time = UNIX_EPOCH + Duration::from_secs(secs); + let datetime = time.duration_since(UNIX_EPOCH).unwrap_or_default(); + let seconds = datetime.as_secs() % 60; + let minutes = (datetime.as_secs() / 60) % 60; + let hours = (datetime.as_secs() / 3600) % 24; + format!("{:02}:{:02}:{:02}.{:03}", hours, minutes, seconds, millis) +} + +fn print_reload(value: &Value) { + println!("reloading lua runtime..."); + if let Some(mods) = value.get("modules").and_then(Value::as_array) { + for module in mods { + let name = module.get("name").and_then(Value::as_str).unwrap_or("?"); + let status = module.get("status").and_then(Value::as_str).unwrap_or("?"); + let error = module.get("last_error").and_then(Value::as_str); + if let Some(error) = error { + println!(" ✗ {name} {status}"); + println!(" {error}"); + } else { + println!(" ✓ {name} {status}"); + } + } + } +} + +async fn watch_reload(socket: &Path) -> Result<()> { + let config_dir = config_directory(); + println!("watching {} for changes...", config_dir.display()); + + let (tx, mut rx) = mpsc::unbounded_channel(); + let mut watcher: RecommendedWatcher = notify::recommended_watcher(move |res| { + let _ = tx.send(res); + })?; + watcher.watch(&config_dir, RecursiveMode::Recursive)?; + + while let Some(msg) = rx.recv().await { + if msg.is_ok() { + let response = send_request(socket, "modules.reload", json!({})).await?; + print_reload(&response); + } + } + + Ok(()) +} + +async fn print_doctor(socket: &Path) -> Result<()> { + let stream = match UnixStream::connect(socket).await { + Ok(stream) => stream, + Err(err) => { + if err.kind() == io::ErrorKind::NotFound { + println!("bread doctor"); + println!(" daemon ✗ not running"); + println!(" socket {} (not found)", socket.display()); + println!(); + println!(" start the daemon: systemctl --user start breadd"); + println!(" view logs: journalctl --user -u breadd -f"); + return Ok(()); + } + return Err(err.into()); + } + }; + + let response = send_request_with_stream(stream, "health", json!({})).await?; + render_doctor(&response); + Ok(()) +} + +fn render_doctor(health: &Value) { + println!("bread doctor"); + let ok = health.get("ok").and_then(Value::as_bool).unwrap_or(false); + let pid = health.get("pid").and_then(Value::as_u64).unwrap_or(0); + let version = health.get("version").and_then(Value::as_str).unwrap_or("unknown"); + let uptime_ms = health.get("uptime_ms").and_then(Value::as_u64).unwrap_or(0); + let socket = health.get("socket").and_then(Value::as_str).unwrap_or("?"); + println!(" daemon {} (pid {})", if ok { "✓ running" } else { "✗ unreachable" }, pid); + println!(" version {version}"); + println!(" uptime {}s", uptime_ms / 1000); + println!(" socket {socket}"); + + if let Some(adapters) = health.get("adapters").and_then(Value::as_object) { + println!(); + println!("adapters"); + for (name, status) in adapters { + println!(" {:20} {}", name, status); + } + } + + if let Some(modules) = health.get("modules").and_then(Value::as_array) { + println!(); + println!("modules"); + for module in modules { + let name = module.get("name").and_then(Value::as_str).unwrap_or("?"); + let status = module.get("status").and_then(Value::as_str).unwrap_or("?"); + let error = module.get("last_error").and_then(Value::as_str); + println!(" {:30} {}", name, status); + if let Some(error) = error { + println!(" └ {error}"); + } + } + } + + if let Some(count) = health.get("subscriptions").and_then(Value::as_u64) { + println!(); + println!("subscriptions {count}"); + } + + if let Some(errors) = health.get("recent_errors").and_then(Value::as_array) { + if !errors.is_empty() { + println!(); + println!("recent errors ({} total)", errors.len()); + for entry in errors.iter().take(5) { + println!(" {entry}"); + } + } + } +} + +async fn send_request_with_stream( + stream: UnixStream, + method: &str, + params: Value, +) -> Result { + let (read_half, mut write_half) = stream.into_split(); + let request = json!({ + "id": "1", + "method": method, + "params": params, + }); + + write_half + .write_all(format!("{}\n", serde_json::to_string(&request)?).as_bytes()) + .await?; + + let mut lines = BufReader::new(read_half).lines(); + let Some(line) = lines.next_line().await? else { + anyhow::bail!("daemon closed connection without response"); + }; + let response: Value = serde_json::from_str(&line)?; + if let Some(error) = response.get("error").and_then(Value::as_str) { + anyhow::bail!(error.to_string()); + } + Ok(response.get("result").cloned().unwrap_or_else(|| json!({}))) +} + +fn config_directory() -> PathBuf { + if let Ok(xdg) = env::var("XDG_CONFIG_HOME") { + return Path::new(&xdg).join("bread"); + } + if let Ok(home) = env::var("HOME") { + return Path::new(&home).join(".config/bread"); + } + PathBuf::from(".config/bread") +} diff --git a/breadd/src/adapters/mod.rs b/breadd/src/adapters/mod.rs index d3a8a3f..b33569b 100644 --- a/breadd/src/adapters/mod.rs +++ b/breadd/src/adapters/mod.rs @@ -1,8 +1,11 @@ use anyhow::Result; use async_trait::async_trait; use bread_shared::RawEvent; -use tokio::sync::{mpsc, watch}; +use tokio::sync::{mpsc, watch, RwLock}; use tracing::info; +use serde::Serialize; +use std::collections::HashMap; +use std::sync::Arc; use crate::core::config::Config; use crate::core::supervisor::spawn_supervised; @@ -14,6 +17,13 @@ pub mod udev; pub mod network_rtnetlink; pub mod power_upower; +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum AdapterStatus { + Connected, + Disconnected, +} + #[async_trait] pub trait Adapter: Send + Sync { fn name(&self) -> &'static str; @@ -30,6 +40,7 @@ pub struct Manager { raw_tx: mpsc::Sender, config: Config, shutdown_rx: watch::Receiver, + status: Arc>>, } impl Manager { @@ -42,9 +53,14 @@ impl Manager { raw_tx, config, shutdown_rx, + status: Arc::new(RwLock::new(HashMap::new())), } } + pub fn status_handle(&self) -> Arc>> { + self.status.clone() + } + pub async fn start_all(&self) -> Result<()> { info!("starting adapters"); @@ -91,17 +107,27 @@ impl Manager { let tx = self.raw_tx.clone(); let shutdown_rx = self.shutdown_rx.clone(); let shutdown_for_task = shutdown_rx.clone(); + let status = self.status.clone(); spawn_supervised(name, shutdown_rx, move || { let adapter = adapter.clone(); let tx = tx.clone(); let mut shutdown_rx = shutdown_for_task.clone(); + let status = status.clone(); async move { adapter.on_connect().await?; + { + let mut guard = status.write().await; + guard.insert(adapter.name().to_string(), AdapterStatus::Connected); + } let result = tokio::select! { result = adapter.run(tx) => result, _ = shutdown_rx.changed() => Ok(()), }; adapter.on_disconnect().await?; + { + let mut guard = status.write().await; + guard.insert(adapter.name().to_string(), AdapterStatus::Disconnected); + } result } }); diff --git a/breadd/src/core/config.rs b/breadd/src/core/config.rs index dedd0d4..1c756a9 100644 --- a/breadd/src/core/config.rs +++ b/breadd/src/core/config.rs @@ -12,8 +12,12 @@ pub struct Config { #[serde(default)] pub lua: LuaConfig, #[serde(default)] + pub modules: ModulesConfig, + #[serde(default)] pub adapters: AdaptersConfig, #[serde(default)] + pub notifications: NotificationsConfig, + #[serde(default)] pub events: EventsConfig, } @@ -33,6 +37,14 @@ pub struct LuaConfig { pub module_path: String, } +#[derive(Debug, Clone, Deserialize)] +pub struct ModulesConfig { + #[serde(default = "default_true")] + pub builtin: bool, + #[serde(default)] + pub disable: Vec, +} + #[derive(Debug, Clone, Deserialize)] pub struct AdaptersConfig { #[serde(default)] @@ -73,12 +85,24 @@ pub struct EventsConfig { pub dedup_window_ms: u64, } +#[derive(Debug, Clone, Deserialize)] +pub struct NotificationsConfig { + #[serde(default = "default_notify_timeout")] + pub default_timeout_ms: i64, + #[serde(default = "default_notify_urgency")] + pub default_urgency: String, + #[serde(default = "default_notify_path")] + pub notify_send_path: String, +} + impl Default for Config { fn default() -> Self { Self { daemon: DaemonConfig::default(), lua: LuaConfig::default(), + modules: ModulesConfig::default(), adapters: AdaptersConfig::default(), + notifications: NotificationsConfig::default(), events: EventsConfig::default(), } } @@ -102,6 +126,15 @@ impl Default for LuaConfig { } } +impl Default for ModulesConfig { + fn default() -> Self { + Self { + builtin: default_true(), + disable: Vec::new(), + } + } +} + impl Default for AdaptersConfig { fn default() -> Self { Self { @@ -147,6 +180,16 @@ impl Default for EventsConfig { } } +impl Default for NotificationsConfig { + fn default() -> Self { + Self { + default_timeout_ms: default_notify_timeout(), + default_urgency: default_notify_urgency(), + notify_send_path: default_notify_path(), + } + } +} + impl Config { pub fn load() -> Result { let path = config_path(); @@ -218,6 +261,18 @@ fn default_dedup_window() -> u64 { 100 } +fn default_notify_timeout() -> i64 { + 3000 +} + +fn default_notify_urgency() -> String { + "normal".to_string() +} + +fn default_notify_path() -> String { + "notify-send".to_string() +} + fn default_udev_subsystems() -> Vec { vec![ "usb".to_string(), diff --git a/breadd/src/core/normalizer.rs b/breadd/src/core/normalizer.rs index d0f7b42..b424e1c 100644 --- a/breadd/src/core/normalizer.rs +++ b/breadd/src/core/normalizer.rs @@ -80,22 +80,102 @@ impl EventNormalizer { fn normalize_hyprland(&self, raw: &RawEvent) -> Vec { let kind = raw.payload.get("kind").and_then(Value::as_str).unwrap_or("unknown"); - let mapped = match kind { - "workspace" | "workspacev2" => "bread.workspace.changed", - "monitoradded" => "bread.monitor.connected", - "monitorremoved" => "bread.monitor.disconnected", - "activewindow" | "activewindowv2" => "bread.window.focus.changed", - "openwindow" => "bread.window.opened", - "closewindow" => "bread.window.closed", - _ => "bread.hyprland.event", - }; + let data = raw + .payload + .get("data") + .and_then(Value::as_str) + .unwrap_or(""); - vec![BreadEvent { - event: mapped.to_string(), - timestamp: raw.timestamp, - source: AdapterSource::Hyprland, - data: raw.payload.clone(), - }] + match kind { + "workspace" | "workspacev2" => vec![BreadEvent { + event: "bread.workspace.changed".to_string(), + timestamp: raw.timestamp, + source: AdapterSource::Hyprland, + data: raw.payload.clone(), + }], + "createworkspace" => vec![BreadEvent { + event: "bread.workspace.created".to_string(), + timestamp: raw.timestamp, + source: AdapterSource::Hyprland, + data: json!({ "workspace": data }), + }], + "destroyworkspace" => vec![BreadEvent { + event: "bread.workspace.destroyed".to_string(), + timestamp: raw.timestamp, + source: AdapterSource::Hyprland, + data: json!({ "workspace": data }), + }], + "monitoradded" => vec![BreadEvent { + event: "bread.monitor.connected".to_string(), + timestamp: raw.timestamp, + source: AdapterSource::Hyprland, + data: raw.payload.clone(), + }], + "monitorremoved" => vec![BreadEvent { + event: "bread.monitor.disconnected".to_string(), + timestamp: raw.timestamp, + source: AdapterSource::Hyprland, + data: raw.payload.clone(), + }], + "activewindow" => vec![BreadEvent { + event: "bread.window.focus.changed".to_string(), + timestamp: raw.timestamp, + source: AdapterSource::Hyprland, + data: raw.payload.clone(), + }], + "activewindowv2" => { + let fields = split_hyprland_fields(data); + vec![BreadEvent { + event: "bread.window.focused".to_string(), + timestamp: raw.timestamp, + source: AdapterSource::Hyprland, + data: json!({ + "address": fields.get(0).unwrap_or(&"") + }), + }] + } + "openwindow" => { + let fields = split_hyprland_fields(data); + vec![BreadEvent { + event: "bread.window.opened".to_string(), + timestamp: raw.timestamp, + source: AdapterSource::Hyprland, + data: json!({ + "address": fields.get(0).unwrap_or(&""), + "workspace": fields.get(1).unwrap_or(&""), + "class": fields.get(2).unwrap_or(&""), + "title": fields.get(3).unwrap_or(&""), + }), + }] + } + "closewindow" => { + let fields = split_hyprland_fields(data); + vec![BreadEvent { + event: "bread.window.closed".to_string(), + timestamp: raw.timestamp, + source: AdapterSource::Hyprland, + data: json!({ "address": fields.get(0).unwrap_or(&"") }), + }] + } + "movewindow" => { + let fields = split_hyprland_fields(data); + vec![BreadEvent { + event: "bread.window.moved".to_string(), + timestamp: raw.timestamp, + source: AdapterSource::Hyprland, + data: json!({ + "address": fields.get(0).unwrap_or(&""), + "workspace": fields.get(1).unwrap_or(&""), + }), + }] + } + _ => vec![BreadEvent { + event: "bread.hyprland.event".to_string(), + timestamp: raw.timestamp, + source: AdapterSource::Hyprland, + data: raw.payload.clone(), + }], + } } fn normalize_power(&self, raw: &RawEvent) -> Vec { @@ -201,6 +281,13 @@ impl EventNormalizer { } } +fn split_hyprland_fields(data: &str) -> Vec<&str> { + if data.is_empty() { + return Vec::new(); + } + data.split(">>").collect() +} + fn classify_device(payload: &Value) -> DeviceClass { let name = payload .get("name") diff --git a/breadd/src/core/state_engine.rs b/breadd/src/core/state_engine.rs index aecae3c..caea8dc 100644 --- a/breadd/src/core/state_engine.rs +++ b/breadd/src/core/state_engine.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; use anyhow::Result; use bread_shared::{AdapterSource, BreadEvent}; @@ -15,6 +16,7 @@ use crate::lua::LuaMessage; pub struct StateHandle { state: Arc>, command_tx: mpsc::UnboundedSender, + subscription_count: Arc, } pub enum StateCommand { @@ -38,6 +40,7 @@ pub enum StateCommand { name: String, status: ModuleLoadState, last_error: Option, + builtin: bool, }, SetProfile { name: String, @@ -45,8 +48,16 @@ pub enum StateCommand { } impl StateHandle { - pub fn new(state: Arc>, command_tx: mpsc::UnboundedSender) -> Self { - Self { state, command_tx } + pub fn new( + state: Arc>, + command_tx: mpsc::UnboundedSender, + subscription_count: Arc, + ) -> Self { + Self { + state, + command_tx, + subscription_count, + } } pub fn state_arc(&self) -> Arc> { @@ -101,17 +112,28 @@ impl StateHandle { let _ = self.command_tx.send(StateCommand::ClearSubscriptions); } - pub fn set_module_status(&self, name: String, status: ModuleLoadState, last_error: Option) { + pub fn set_module_status( + &self, + name: String, + status: ModuleLoadState, + last_error: Option, + builtin: bool, + ) { let _ = self.command_tx.send(StateCommand::SetModuleStatus { name, status, last_error, + builtin, }); } pub fn set_profile(&self, name: String) { let _ = self.command_tx.send(StateCommand::SetProfile { name }); } + + pub fn subscription_count(&self) -> Arc { + self.subscription_count.clone() + } } pub async fn run_state_engine( @@ -120,6 +142,7 @@ pub async fn run_state_engine( state: Arc>, lua_tx: mpsc::UnboundedSender, event_stream_tx: broadcast::Sender, + subscription_count: Arc, mut shutdown_rx: watch::Receiver, ) { let mut subscriptions = SubscriptionTable::default(); @@ -136,7 +159,7 @@ pub async fn run_state_engine( let Some(cmd) = maybe_cmd else { break; }; - handle_command(cmd, &state, &mut subscriptions, &mut watches).await; + handle_command(cmd, &state, &mut subscriptions, &mut watches, &subscription_count).await; } maybe_event = event_rx.recv() => { let Some(event) = maybe_event else { @@ -158,7 +181,7 @@ pub async fn run_state_engine( apply_event_to_state(&mut guard, &event); } - dispatch_event(&event, &mut subscriptions, &lua_tx, &event_stream_tx); + dispatch_event(&event, &mut subscriptions, &lua_tx, &event_stream_tx, &subscription_count); if let (Some(before), Some(after)) = (before_snapshot, after_snapshot) { for (_id, path) in watches.iter() { @@ -174,7 +197,7 @@ pub async fn run_state_engine( "old": old_val, }), ); - dispatch_event(&synthetic, &mut subscriptions, &lua_tx, &event_stream_tx); + dispatch_event(&synthetic, &mut subscriptions, &lua_tx, &event_stream_tx, &subscription_count); } } } @@ -190,13 +213,17 @@ async fn handle_command( state: &Arc>, subscriptions: &mut SubscriptionTable, watches: &mut HashMap, + subscription_count: &Arc, ) { match cmd { StateCommand::RegisterSubscription { id, pattern, once } => { subscriptions.add_with_id(id, pattern, once); + subscription_count.fetch_add(1, Ordering::Relaxed); } StateCommand::RemoveSubscription { id } => { - subscriptions.remove(id); + if subscriptions.remove(id) { + subscription_count.fetch_sub(1, Ordering::Relaxed); + } } StateCommand::RegisterWatch { id, path } => { watches.insert(id, path); @@ -207,21 +234,25 @@ async fn handle_command( StateCommand::ClearSubscriptions => { subscriptions.clear(); watches.clear(); + subscription_count.store(0, Ordering::Relaxed); } StateCommand::SetModuleStatus { name, status, last_error, + builtin, } => { let mut guard = state.write().await; if let Some(existing) = guard.modules.iter_mut().find(|m| m.name == name) { existing.status = status; existing.last_error = last_error; + existing.builtin = builtin; } else { guard.modules.push(crate::core::types::ModuleStatus { name, status, last_error, + builtin, store: HashMap::new(), }); } @@ -242,6 +273,7 @@ fn dispatch_event( subscriptions: &mut SubscriptionTable, lua_tx: &mpsc::UnboundedSender, event_stream_tx: &broadcast::Sender, + subscription_count: &Arc, ) { let _ = event_stream_tx.send(event.clone()); @@ -254,7 +286,9 @@ fn dispatch_event( } for sub in matches.into_iter().filter(|s| s.once) { - subscriptions.remove(sub.id); + if subscriptions.remove(sub.id) { + subscription_count.fetch_sub(1, Ordering::Relaxed); + } let _ = lua_tx.send(LuaMessage::SubscriptionCancelled { id: sub.id }); } } @@ -302,11 +336,12 @@ fn apply_event_to_state(state: &mut RuntimeState, event: &BreadEvent) { .map(ToString::to_string); state.active_workspace = ws; } - "bread.window.focus.changed" => { + "bread.window.focus.changed" | "bread.window.focused" => { state.active_window = event .data .get("window") .or_else(|| event.data.get("class")) + .or_else(|| event.data.get("address")) .and_then(Value::as_str) .map(ToString::to_string); } diff --git a/breadd/src/core/types.rs b/breadd/src/core/types.rs index 254e075..45ccfa5 100644 --- a/breadd/src/core/types.rs +++ b/breadd/src/core/types.rs @@ -123,6 +123,8 @@ pub struct ModuleStatus { pub status: ModuleLoadState, pub last_error: Option, #[serde(default)] + pub builtin: bool, + #[serde(default)] pub store: HashMap, } diff --git a/breadd/src/ipc/mod.rs b/breadd/src/ipc/mod.rs index 1ac245b..f4aa092 100644 --- a/breadd/src/ipc/mod.rs +++ b/breadd/src/ipc/mod.rs @@ -1,18 +1,22 @@ +use std::collections::{HashMap, VecDeque}; use std::fs; use std::os::unix::fs::PermissionsExt; use std::path::PathBuf; use std::process; use std::time::Instant; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; use anyhow::{anyhow, Result}; -use bread_shared::{AdapterSource, BreadEvent}; +use bread_shared::{now_unix_ms, AdapterSource, BreadEvent}; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::{UnixListener, UnixStream}; -use tokio::sync::{broadcast, mpsc, watch}; +use tokio::sync::{broadcast, mpsc, watch, RwLock}; use tracing::{error, info, warn}; +use crate::adapters::AdapterStatus; use crate::core::state_engine::StateHandle; use crate::lua::RuntimeHandle; @@ -23,6 +27,9 @@ pub struct Server { event_tx: broadcast::Sender, lua_runtime: RuntimeHandle, emit_tx: mpsc::UnboundedSender, + adapter_status: Arc>>, + subscription_count: Arc, + event_buffer: Arc>>, started_at: Instant, pid: u32, } @@ -51,6 +58,9 @@ impl Server { event_tx: broadcast::Sender, lua_runtime: RuntimeHandle, emit_tx: mpsc::UnboundedSender, + adapter_status: Arc>>, + subscription_count: Arc, + event_buffer: Arc>>, ) -> Self { Self { socket_path, @@ -58,6 +68,9 @@ impl Server { event_tx, lua_runtime, emit_tx, + adapter_status, + subscription_count, + event_buffer, started_at: Instant::now(), pid: process::id(), } @@ -166,12 +179,25 @@ impl Server { let full = self.state_handle.state_dump().await; Ok(full.get("modules").cloned().unwrap_or_else(|| json!([]))) } - "modules.reload" => self - .lua_runtime - .reload() - .await - .map(|_| json!({ "reloaded": true })) - .map_err(|e| e.to_string()), + "modules.reload" => { + let started = Instant::now(); + if let Err(err) = self.lua_runtime.reload().await { + return Err((id, err.to_string())); + } + let duration_ms = started.elapsed().as_millis(); + let modules = self + .state_handle + .state_dump() + .await + .get("modules") + .cloned() + .unwrap_or_else(|| json!([])); + Ok(json!({ + "ok": true, + "duration_ms": duration_ms, + "modules": modules, + })) + } "profile.list" => { let full = self.state_handle.state_dump().await; let profiles = full @@ -224,13 +250,36 @@ impl Server { } "health" => { let uptime_ms = self.started_at.elapsed().as_millis(); + let state = self.state_handle.state_dump().await; + let modules = state.get("modules").cloned().unwrap_or_else(|| json!([])); + let adapters = self.adapter_status.read().await.clone(); + let subscription_count = self.subscription_count.load(std::sync::atomic::Ordering::Relaxed); + let recent_errors = self.lua_runtime.recent_errors(); Ok(json!({ "ok": true, "pid": self.pid, "version": env!("CARGO_PKG_VERSION"), "uptime_ms": uptime_ms, + "socket": self.socket_path.to_string_lossy(), + "adapters": adapters, + "modules": modules, + "subscriptions": subscription_count, + "recent_errors": recent_errors, })) } + "events.replay" => { + let since_ms = req.params.get("since_ms").and_then(Value::as_u64).unwrap_or(0); + let cutoff = now_unix_ms().saturating_sub(since_ms); + let mut replay = Vec::new(); + if let Ok(buf) = self.event_buffer.lock() { + for event in buf.iter() { + if event.timestamp >= cutoff { + replay.push(event); + } + } + } + Ok(serde_json::to_value(replay).unwrap_or_else(|_| json!([]))) + } _ => Err("unknown method".to_string()), }; diff --git a/breadd/src/lua/mod.rs b/breadd/src/lua/mod.rs index a73de1a..cf94a57 100644 --- a/breadd/src/lua/mod.rs +++ b/breadd/src/lua/mod.rs @@ -1,5 +1,5 @@ use std::cell::RefCell; -use std::collections::{HashMap, HashSet}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::fs; use std::path::{Path, PathBuf}; use std::rc::Rc; @@ -10,16 +10,18 @@ use std::time::Duration; use anyhow::{anyhow, Result}; use bread_shared::{AdapterSource, BreadEvent}; use mlua::{Error as LuaError, Function, Lua, LuaSerdeExt, RegistryKey, Table, Value}; +use serde::Serialize; use serde_json::Value as JsonValue; use tokio::sync::{mpsc, oneshot, watch, RwLock}; use tokio::task; -use tokio::time::{interval, sleep}; +use tokio::time::{interval_at, sleep, Instant}; use tracing::{error, info, warn}; -use crate::core::config::Config; +use crate::core::config::{Config, ModulesConfig, NotificationsConfig}; use crate::core::state_engine::StateHandle; use crate::core::subscriptions::SubscriptionId; use crate::core::types::{ModuleLoadState, RuntimeState}; +use bread_shared::now_unix_ms; pub enum LuaMessage { Event { @@ -38,9 +40,17 @@ pub enum LuaMessage { Shutdown, } +#[derive(Debug, Clone, Serialize)] +pub struct ErrorEntry { + pub timestamp: u64, + pub module: Option, + pub message: String, +} + #[derive(Clone)] pub struct RuntimeHandle { tx: mpsc::UnboundedSender, + recent_errors: Arc>>, } impl RuntimeHandle { @@ -63,6 +73,13 @@ impl RuntimeHandle { pub fn shutdown(&self) { let _ = self.tx.send(LuaMessage::Shutdown); } + + pub fn recent_errors(&self) -> Vec { + self.recent_errors + .lock() + .map(|buf| buf.iter().cloned().collect()) + .unwrap_or_default() + } } pub fn spawn_runtime( @@ -71,7 +88,11 @@ pub fn spawn_runtime( emit_tx: mpsc::UnboundedSender, ) -> Result { let (tx, mut rx) = mpsc::unbounded_channel(); - let handle = RuntimeHandle { tx }; + let recent_errors = Arc::new(Mutex::new(VecDeque::with_capacity(50))); + let handle = RuntimeHandle { + tx, + recent_errors: recent_errors.clone(), + }; let thread_tx = handle.tx.clone(); std::thread::Builder::new() @@ -83,7 +104,13 @@ pub fn spawn_runtime( .expect("failed to create lua runtime thread"); rt.block_on(async move { - let mut engine = match LuaEngine::new(config, state_handle, emit_tx, thread_tx.clone()) { + let mut engine = match LuaEngine::new( + config, + state_handle, + emit_tx, + thread_tx.clone(), + recent_errors, + ) { Ok(engine) => engine, Err(err) => { error!(error = %err, "failed to initialize lua engine"); @@ -160,6 +187,8 @@ struct ModuleDecl { version: Option, after: Vec, path: PathBuf, + source: Option<&'static str>, + builtin: bool, } struct ModuleInfo { @@ -182,6 +211,9 @@ struct LuaEngine { lua_tx: mpsc::UnboundedSender, entry_point: PathBuf, module_path: PathBuf, + modules_config: ModulesConfig, + notifications_config: NotificationsConfig, + recent_errors: Arc>>, } impl LuaEngine { @@ -190,6 +222,7 @@ impl LuaEngine { state_handle: StateHandle, emit_tx: mpsc::UnboundedSender, lua_tx: mpsc::UnboundedSender, + recent_errors: Arc>>, ) -> Result { Ok(Self { lua: Lua::new(), @@ -207,6 +240,9 @@ impl LuaEngine { lua_tx, entry_point: config.lua_entry_point(), module_path: config.lua_module_path(), + modules_config: config.modules.clone(), + notifications_config: config.notifications.clone(), + recent_errors, }) } @@ -324,7 +360,9 @@ impl LuaEngine { .map_err(|_| LuaError::external("missing filter function"))?; Some(lua.create_registry_value(filter_fn)?) } else { - return Err(LuaError::external("missing filter options")); + return Err(LuaError::external( + "bread.filter requires an opts table with a 'filter' function: bread.filter(pattern, fn, { filter = fn })", + )); }; let module = current_module .lock() @@ -503,6 +541,61 @@ impl LuaEngine { })?; bread.set("exec", exec_fn)?; + let notify_path = self.notifications_config.notify_send_path.clone(); + let default_urgency = self.notifications_config.default_urgency.clone(); + let default_timeout = self.notifications_config.default_timeout_ms; + let emit_tx = self.emit_tx.clone(); + let notify_fn = self + .lua + .create_function(move |_lua, (message, opts): (String, Option)| { + let title: String = opts + .as_ref() + .and_then(|o| o.get("title").ok()) + .unwrap_or_else(|| "bread".to_string()); + let urgency: String = opts + .as_ref() + .and_then(|o| o.get("urgency").ok()) + .unwrap_or_else(|| default_urgency.clone()); + let timeout: i64 = opts + .as_ref() + .and_then(|o| o.get("timeout").ok()) + .unwrap_or(default_timeout); + let icon: Option = opts.as_ref().and_then(|o| o.get("icon").ok()); + + let cmd_path = notify_path.clone(); + let title_clone = title.clone(); + let message_clone = message.clone(); + let urgency_clone = urgency.clone(); + task::spawn_blocking(move || { + let mut cmd = std::process::Command::new(cmd_path); + cmd.args([ + "--app-name", + "bread", + "--urgency", + &urgency_clone, + "--expire-time", + &timeout.to_string(), + ]); + if let Some(icon) = icon { + cmd.args(["--icon", &icon]); + } + let _ = cmd.args([&title_clone, &message_clone]).status(); + }); + + let _ = emit_tx.send(BreadEvent::new( + "bread.notify.sent", + AdapterSource::System, + serde_json::json!({ + "title": title, + "message": message, + "urgency": urgency, + }), + )); + + Ok(()) + })?; + bread.set("notify", notify_fn)?; + let timers = self.timers.clone(); let next_timer_id = self.next_timer_id.clone(); let lua_tx = self.lua_tx.clone(); @@ -556,7 +649,8 @@ impl LuaEngine { ); let lua_tx = lua_tx.clone(); task::spawn(async move { - let mut ticker = interval(Duration::from_millis(interval_ms)); + let start = Instant::now() + Duration::from_millis(interval_ms); + let mut ticker = interval_at(start, Duration::from_millis(interval_ms)); loop { tokio::select! { _ = ticker.tick() => { @@ -750,12 +844,22 @@ impl LuaEngine { } fn load_init_and_modules(&self) -> Result<()> { - self.load_lua_file(&self.entry_point, "init")?; + self.load_lua_file(&self.entry_point, "init", false)?; let mut files = list_lua_files(&self.module_path)?; files.sort(); + let disabled: HashSet = self + .modules_config + .disable + .iter() + .cloned() + .collect(); + let mut decls = Vec::new(); + if self.modules_config.builtin { + decls.extend(builtin_module_decls(&disabled)); + } for path in files.into_iter().filter(|p| !is_lib_path(&self.module_path, p)) { match self.scan_module_decl(&path) { Ok(decl) => decls.push(decl), @@ -765,6 +869,7 @@ impl LuaEngine { name, ModuleLoadState::LoadError, Some(err.to_string()), + false, ); } } @@ -784,7 +889,7 @@ impl LuaEngine { for (name, err) in dep_errors { self.state_handle - .set_module_status(name, ModuleLoadState::LoadError, Some(err)); + .set_module_status(name, ModuleLoadState::LoadError, Some(err), false); } let mut load_order = Vec::new(); @@ -792,14 +897,19 @@ impl LuaEngine { load_order.push(decl.name.clone()); match self.load_module(&decl) { Ok(()) => { - self.state_handle - .set_module_status(decl.name.clone(), ModuleLoadState::Loaded, None); + self.state_handle.set_module_status( + decl.name.clone(), + ModuleLoadState::Loaded, + None, + decl.builtin, + ); } Err(err) => { self.state_handle.set_module_status( decl.name.clone(), ModuleLoadState::LoadError, Some(err.to_string()), + decl.builtin, ); } } @@ -815,7 +925,11 @@ impl LuaEngine { fn load_module(&self, decl: &ModuleDecl) -> Result<()> { self.set_current_module(Some(decl.name.clone())); - let result = self.load_lua_file(&decl.path, &decl.name); + let result = if let Some(source) = decl.source.as_deref() { + self.load_lua_source(source, &decl.name) + } else { + self.load_lua_file(&decl.path, &decl.name, decl.builtin) + }; self.set_current_module(None); result?; @@ -827,13 +941,14 @@ impl LuaEngine { Ok(()) } - fn load_lua_file(&self, path: &Path, module_name: &str) -> Result<()> { + fn load_lua_file(&self, path: &Path, module_name: &str, builtin: bool) -> Result<()> { if !path.exists() { warn!(path = %path.display(), "lua file does not exist; skipping"); self.state_handle.set_module_status( module_name.to_string(), ModuleLoadState::NotFound, None, + builtin, ); return Ok(()); } @@ -843,6 +958,14 @@ impl LuaEngine { Ok(()) } + fn load_lua_source(&self, source: &str, module_name: &str) -> Result<()> { + self.lua + .load(source) + .set_name(module_name) + .exec() + .map_err(|e| anyhow!(e.to_string())) + } + fn handle_event(&self, id: SubscriptionId, event: BreadEvent) -> Result<()> { let (callback, filter, raw_kind, kind, module) = { let handlers = self.handlers.lock().expect("lua handlers mutex poisoned"); @@ -935,8 +1058,13 @@ impl LuaEngine { if let Some(hook) = self.get_module_hook(name, "on_load") { if let Err(err) = hook.call::<_, ()>(()) { error!(module = %name, error = %err, "module on_load failed"); - self.state_handle - .set_module_status(name.to_string(), ModuleLoadState::LoadError, Some(err.to_string())); + let builtin = self.module_is_builtin(name); + self.state_handle.set_module_status( + name.to_string(), + ModuleLoadState::LoadError, + Some(err.to_string()), + builtin, + ); } } } @@ -951,10 +1079,12 @@ impl LuaEngine { if let Some(hook) = self.get_module_hook(&name, "on_reload") { if let Err(err) = hook.call::<_, ()>(()) { error!(module = %name, error = %err, "module on_reload failed"); + let builtin = self.module_is_builtin(&name); self.state_handle.set_module_status( name.to_string(), ModuleLoadState::Degraded, Some(err.to_string()), + builtin, ); } } @@ -971,10 +1101,12 @@ impl LuaEngine { if let Some(hook) = self.get_module_hook(&name, "on_unload") { if let Err(err) = hook.call::<_, ()>(()) { error!(module = %name, error = %err, "module on_unload failed"); + let builtin = self.module_is_builtin(&name); self.state_handle.set_module_status( name.to_string(), ModuleLoadState::Degraded, Some(err.to_string()), + builtin, ); } } @@ -983,10 +1115,22 @@ impl LuaEngine { fn handle_callback_error(&self, module: Option<&str>, id: SubscriptionId, err: LuaError) { if let Some(module) = module { + let builtin = self.module_is_builtin(module); + if let Ok(mut buf) = self.recent_errors.lock() { + if buf.len() >= 50 { + buf.pop_front(); + } + buf.push_back(ErrorEntry { + timestamp: now_unix_ms(), + module: Some(module.to_string()), + message: err.to_string(), + }); + } self.state_handle.set_module_status( module.to_string(), ModuleLoadState::Degraded, Some(err.to_string()), + builtin, ); if let Some(hook) = self.get_module_hook(module, "on_error") { match hook.call::<_, bool>(err.to_string()) { @@ -1022,6 +1166,14 @@ impl LuaEngine { .unwrap_or(false) } + fn module_is_builtin(&self, name: &str) -> bool { + self.module_decls + .lock() + .ok() + .and_then(|map| map.get(name).map(|d| d.builtin)) + .unwrap_or(false) + } + fn set_current_module(&self, name: Option) { if let Ok(mut guard) = self.current_module.lock() { *guard = name; @@ -1052,6 +1204,8 @@ impl LuaEngine { version, after, path: module_path.clone(), + source: None, + builtin: false, }); Err(LuaError::RuntimeError(MODULE_DECL_ABORT.to_string())) })?; @@ -1119,6 +1273,14 @@ impl LuaEngine { self.lua .load( r#" + bread.spawn = function(fn) + local co = coroutine.create(fn) + local ok, err = coroutine.resume(co) + if not ok then + error(err) + end + end + bread.wait = function(pattern, opts) if type(pattern) ~= "string" then error("bread.wait requires a pattern string") @@ -1288,10 +1450,266 @@ fn module_store_set(state_arc: &Arc>, module: &str, key: St name: module.to_string(), status: ModuleLoadState::Loaded, last_error: None, + builtin: false, store, }); } +const BUILTIN_MONITORS: &str = r#" +local M = bread.module({ name = "bread.monitors", version = "1.0.0" }) + +local workflows = {} +local layouts = {} + +local function matches_when(event_name, when) + if when == "connected" then + return event_name == "bread.monitor.connected" + elseif when == "disconnected" then + return event_name == "bread.monitor.disconnected" + elseif when == "changed" then + return event_name == "bread.monitor.changed" + end + return false +end + +local function matches_monitors(list, event) + if not list or #list == 0 then + return true + end + local name = event.data and event.data.name + if not name then + return false + end + for _, monitor in ipairs(list) do + if monitor == name then + return true + end + end + return false +end + +local function run_workflow(wf, event) + if type(wf.run) == "function" then + wf.run(event) + elseif type(wf.run) == "string" then + bread.exec(wf.run) + end +end + +function M.on(opts) + table.insert(workflows, opts) +end + +function M.layout(name, fn) + layouts[name] = fn +end + +function M.apply(name) + return function() + local fn = layouts[name] + if fn then + fn() + end + end +end + +function M.on_load() + bread.on("bread.monitor.**", function(event) + for _, wf in ipairs(workflows) do + if matches_when(event.event, wf.when) and matches_monitors(wf.monitors, event) then + run_workflow(wf, event) + end + end + end) +end + +return M +"#; + +const BUILTIN_DEVICES: &str = r#" +local M = bread.module({ name = "bread.devices", version = "1.0.0" }) + +local rules = {} + +local function matches_rule(rule, event) + local class = rule.class + local when = rule.when + local data = event.data or {} + + if when == "connected" and event.event ~= "bread.device.connected" then + if not event.event:match("%.connected$") then + return false + end + elseif when == "disconnected" and event.event ~= "bread.device.disconnected" then + if not event.event:match("%.disconnected$") then + return false + end + end + + if class and data.class ~= class then + return false + end + + if rule.name and data.name and not tostring(data.name):match(rule.name) then + return false + end + + return true +end + +local function run_rule(rule, event) + if type(rule.run) == "function" then + rule.run(event) + elseif type(rule.run) == "string" then + bread.exec(rule.run) + end +end + +function M.on(opts) + table.insert(rules, opts) +end + +function M.on_load() + bread.on("bread.device.**", function(event) + for _, rule in ipairs(rules) do + if matches_rule(rule, event) then + run_rule(rule, event) + end + end + end) +end + +return M +"#; + +const BUILTIN_WORKSPACES: &str = r#" +local M = bread.module({ name = "bread.workspaces", version = "1.0.0", after = { "bread.monitors" } }) + +local assignments = {} +local rules = {} + +function M.assign(workspace, monitor) + table.insert(assignments, { workspace = workspace, monitor = monitor }) +end + +function M.pin(opts) + table.insert(rules, opts) +end + +function M.apply_assignments() + local monitors = bread.state.monitors() + local active = {} + for _, m in ipairs(monitors) do + if m.connected then + active[m.name] = true + end + end + + for _, a in ipairs(assignments) do + if active[a.monitor] then + bread.hyprland.dispatch("moveworkspacetomonitor", a.workspace .. " " .. a.monitor) + end + end +end + +function M.on_load() + bread.on("bread.monitor.**", function() + M.apply_assignments() + end) + + bread.on("bread.window.opened", function(event) + for _, rule in ipairs(rules) do + if event.data and event.data.class and event.data.class:match(rule.app) then + local address = event.data.address or "" + bread.hyprland.dispatch("movetoworkspacesilent", rule.workspace .. ",address:" .. address) + end + end + end) + + bread.once("bread.system.startup", function() + M.apply_assignments() + end) +end + +return M +"#; + +const BUILTIN_BINDS: &str = r#" +local M = bread.module({ name = "bread.binds", version = "1.0.0" }) + +local active = {} + +local function bind_string(opts) + local mods = table.concat(opts.mods or {}, " ") + local args = opts.args or "" + if mods ~= "" then + return mods .. ", " .. opts.key .. ", " .. opts.dispatch .. ", " .. args + end + return opts.key .. ", " .. opts.dispatch .. ", " .. args +end + +function M.add(opts) + local bind = bind_string(opts) + bread.hyprland.keyword("bind", bind) + active[opts.key] = opts + return opts.key +end + +function M.remove(key) + local bind = active[key] + if not bind then + return + end + bread.hyprland.keyword("unbind", bind_string(bind)) + active[key] = nil +end + +function M.replace(key, opts) + M.remove(key) + return M.add(opts) +end + +function M.on_unload() + for key, _ in pairs(active) do + M.remove(key) + end +end + +return M +"#; + +fn builtin_module_decls(disabled: &HashSet) -> Vec { + let mut out = Vec::new(); + + let entries = vec![ + ("bread.monitors", "1.0.0", Vec::new(), BUILTIN_MONITORS), + ("bread.devices", "1.0.0", Vec::new(), BUILTIN_DEVICES), + ( + "bread.workspaces", + "1.0.0", + vec!["bread.monitors".to_string()], + BUILTIN_WORKSPACES, + ), + ("bread.binds", "1.0.0", Vec::new(), BUILTIN_BINDS), + ]; + + for (name, version, after, source) in entries { + if disabled.contains(name) { + continue; + } + out.push(ModuleDecl { + name: name.to_string(), + version: Some(version.to_string()), + after, + path: PathBuf::from(format!("")), + source: Some(source), + builtin: true, + }); + } + + out +} + fn hyprland_request_socket() -> Result { let instance = std::env::var("HYPRLAND_INSTANCE_SIGNATURE") .map_err(|_| anyhow!("HYPRLAND_INSTANCE_SIGNATURE is not set"))?; @@ -1307,11 +1725,13 @@ fn hyprland_request(request: &str) -> Result { use std::os::unix::net::UnixStream; let socket = hyprland_request_socket()?; - let mut stream = UnixStream::connect(socket)?; - stream.write_all(request.as_bytes())?; - let mut buffer = String::new(); - stream.read_to_string(&mut buffer)?; - Ok(buffer) + tokio::task::block_in_place(|| { + let mut stream = UnixStream::connect(&socket)?; + stream.write_all(request.as_bytes())?; + let mut buffer = String::new(); + stream.read_to_string(&mut buffer)?; + Ok(buffer) + }) } fn list_lua_files(root: &Path) -> Result> { diff --git a/breadd/src/main.rs b/breadd/src/main.rs index c57fabd..bcb4daa 100644 --- a/breadd/src/main.rs +++ b/breadd/src/main.rs @@ -3,7 +3,9 @@ mod core; mod ipc; mod lua; +use std::collections::VecDeque; use std::sync::Arc; +use std::sync::atomic::AtomicU64; use anyhow::Result; use bread_shared::{AdapterSource, BreadEvent, RawEvent}; @@ -33,7 +35,8 @@ async fn main() -> Result<()> { let (event_stream_tx, _) = broadcast::channel(2048); let (shutdown_tx, shutdown_rx) = watch::channel(false); - let state_handle = StateHandle::new(state.clone(), state_cmd_tx); + let subscription_count = Arc::new(AtomicU64::new(0)); + let state_handle = StateHandle::new(state.clone(), state_cmd_tx, subscription_count.clone()); let lua_runtime = lua::spawn_runtime(config.clone(), state_handle.clone(), normalized_tx.clone())?; let lua_tx = lua_runtime.sender(); @@ -44,6 +47,7 @@ async fn main() -> Result<()> { state.clone(), lua_tx, event_stream_tx.clone(), + subscription_count.clone(), shutdown_rx.clone(), )); @@ -78,6 +82,28 @@ async fn main() -> Result<()> { let adapter_manager = adapters::Manager::new(raw_tx, config.clone(), shutdown_rx.clone()); adapter_manager.start_all().await?; + let adapter_status = adapter_manager.status_handle(); + + let event_buffer = Arc::new(std::sync::Mutex::new(VecDeque::with_capacity(1000))); + { + let mut rx = event_stream_tx.subscribe(); + let event_buffer = event_buffer.clone(); + tokio::spawn(async move { + loop { + let evt = match rx.recv().await { + Ok(evt) => evt, + Err(_) => break, + }; + if let Ok(mut buf) = event_buffer.lock() { + if buf.len() >= 1000 { + buf.pop_front(); + } + buf.push_back(evt); + } + } + }); + } + let _ = normalized_tx.send(BreadEvent::new( "bread.system.startup", AdapterSource::System, @@ -90,6 +116,9 @@ async fn main() -> Result<()> { event_stream_tx, lua_runtime.clone(), normalized_tx, + adapter_status, + subscription_count, + event_buffer, ); info!("breadd fully started"); From 05123a5989dcb5b3538f941a8c101e1e002e0d08 Mon Sep 17 00:00:00 2001 From: Breadway Date: Mon, 11 May 2026 16:40:49 +0800 Subject: [PATCH 2/4] Enhance timestamp formatting and add reload watcher functionality --- bread-cli/Cargo.toml | 1 + bread-cli/src/main.rs | 50 +++++++++++++++++++++++++++++++++++-------- 2 files changed, 42 insertions(+), 9 deletions(-) diff --git a/bread-cli/Cargo.toml b/bread-cli/Cargo.toml index 6d2541e..5f18678 100644 --- a/bread-cli/Cargo.toml +++ b/bread-cli/Cargo.toml @@ -11,3 +11,4 @@ tokio.workspace = true anyhow.workspace = true clap = { version = "4.5", features = ["derive"] } notify = "6.1" +libc = "0.2" diff --git a/bread-cli/src/main.rs b/bread-cli/src/main.rs index 6e1982a..9382494 100644 --- a/bread-cli/src/main.rs +++ b/bread-cli/src/main.rs @@ -296,12 +296,22 @@ fn print_event(event: &Value, fields: Option<&str>) { fn format_timestamp(ms: u64) -> String { let secs = ms / 1000; let millis = ms % 1000; - let time = UNIX_EPOCH + Duration::from_secs(secs); - let datetime = time.duration_since(UNIX_EPOCH).unwrap_or_default(); - let seconds = datetime.as_secs() % 60; - let minutes = (datetime.as_secs() / 60) % 60; - let hours = (datetime.as_secs() / 3600) % 24; - format!("{:02}:{:02}:{:02}.{:03}", hours, minutes, seconds, millis) + + // SAFETY: localtime_r is thread-safe. We pass a valid pointer to a + // zeroed tm struct and read the result only after the call returns. + let local_secs = unsafe { + let mut tm: libc::tm = std::mem::zeroed(); + let t = secs as libc::time_t; + libc::localtime_r(&t, &mut tm); + tm.tm_hour as u64 * 3600 + + tm.tm_min as u64 * 60 + + tm.tm_sec as u64 + }; + + let h = (local_secs / 3600) % 24; + let m = (local_secs / 60) % 60; + let s = local_secs % 60; + format!("{:02}:{:02}:{:02}.{:03}", h, m, s, millis) } fn print_reload(value: &Value) { @@ -321,6 +331,18 @@ fn print_reload(value: &Value) { } } +async fn watch_reload(socket: &Path) -> Result<()> { + let config_dir = config_directory(); + println!("watching {} for changes...", config_dir.display()); + + let (tx, mut rx) = mpsc::unbounded_channel(); + let mut watcher: RecommendedWatcher = notify::recommended_watcher(move |res| { + let _ = tx.send(res); + })?; + watcher.watch(&config_dir, RecursiveMode::Recursive)?; + + use tokio::time::{sleep, Duration}; + async fn watch_reload(socket: &Path) -> Result<()> { let config_dir = config_directory(); println!("watching {} for changes...", config_dir.display()); @@ -332,15 +354,25 @@ async fn watch_reload(socket: &Path) -> Result<()> { watcher.watch(&config_dir, RecursiveMode::Recursive)?; while let Some(msg) = rx.recv().await { - if msg.is_ok() { - let response = send_request(socket, "modules.reload", json!({})).await?; - print_reload(&response); + if msg.is_err() { + continue; } + + // Debounce: drain any follow-up events that arrive within 150ms. + // A single file save typically generates 2-3 fs events in rapid succession. + sleep(Duration::from_millis(150)).await; + while rx.try_recv().is_ok() {} + + let response = send_request(socket, "modules.reload", json!({})).await?; + print_reload(&response); } Ok(()) } + Ok(()) +} + async fn print_doctor(socket: &Path) -> Result<()> { let stream = match UnixStream::connect(socket).await { Ok(stream) => stream, From edb2ba338a49b4260fc2b2388c746c3ac4dc1a8c Mon Sep 17 00:00:00 2001 From: Breadway Date: Mon, 11 May 2026 16:57:22 +0800 Subject: [PATCH 3/4] Refactor subscription table logic and enhance Lua logging and debounce functionality --- breadd/src/core/subscriptions.rs | 1 - breadd/src/ipc/mod.rs | 60 +++++++++++++++++++++- breadd/src/lua/mod.rs | 86 ++++++++++++++++++++++++++++++++ 3 files changed, 145 insertions(+), 2 deletions(-) diff --git a/breadd/src/core/subscriptions.rs b/breadd/src/core/subscriptions.rs index a95d388..9b218ff 100644 --- a/breadd/src/core/subscriptions.rs +++ b/breadd/src/core/subscriptions.rs @@ -35,7 +35,6 @@ impl SubscriptionTable { // swap_remove moves the last element into `idx`. We need to update by_id // for that element. But first, remove its stale entry (it was at the last // position before the swap); then re-insert it at the new position. - let _last_idx = self.entries.len() - 1; self.entries.swap_remove(idx); if idx < self.entries.len() { diff --git a/breadd/src/ipc/mod.rs b/breadd/src/ipc/mod.rs index f4aa092..f99ea74 100644 --- a/breadd/src/ipc/mod.rs +++ b/breadd/src/ipc/mod.rs @@ -313,9 +313,67 @@ impl Server { } fn matches_filter(event_name: &str, pattern: &str) -> bool { + // Delegate to the same glob logic used by the subscription table so that + // `bread events --filter "bread.device.**"` behaves identically to + // `bread.on("bread.device.**", ...)` in Lua. if pattern.ends_with(".*") { let prefix = &pattern[..pattern.len() - 1]; return event_name.starts_with(prefix); } - event_name == pattern + + if let Some(prefix) = pattern.strip_suffix(".**") { + if event_name == prefix || event_name.starts_with(&format!("{prefix}.")) { + return true; + } + return false; + } + + matches_glob_filter(pattern.as_bytes(), event_name.as_bytes()) +} + +fn matches_glob_filter(pattern: &[u8], text: &[u8]) -> bool { + if pattern.is_empty() { + return text.is_empty(); + } + + if pattern.len() >= 2 && pattern[0] == b'*' && pattern[1] == b'*' { + let rest = &pattern[2..]; + if rest.is_empty() { + return true; + } + for offset in 0..=text.len() { + if matches_glob_filter(rest, &text[offset..]) { + return true; + } + } + return false; + } + + match pattern[0] { + b'*' => { + let mut offset = 0; + loop { + if matches_glob_filter(&pattern[1..], &text[offset..]) { + return true; + } + if offset == text.len() || text[offset] == b'.' { + break; + } + offset += 1; + } + false + } + b'?' => { + if text.is_empty() || text[0] == b'.' { + return false; + } + matches_glob_filter(&pattern[1..], &text[1..]) + } + ch => { + if text.first().copied() != Some(ch) { + return false; + } + matches_glob_filter(&pattern[1..], &text[1..]) + } + } } diff --git a/breadd/src/lua/mod.rs b/breadd/src/lua/mod.rs index cf94a57..9acc814 100644 --- a/breadd/src/lua/mod.rs +++ b/breadd/src/lua/mod.rs @@ -840,6 +840,8 @@ impl LuaEngine { globals.set("bread", bread)?; self.install_require_loader()?; self.install_wait_helper()?; + self.install_log_helpers()?; + self.install_debounce()?; Ok(()) } @@ -1188,6 +1190,90 @@ impl LuaEngine { } } + fn install_log_helpers(&self) -> Result<()> { + // bread.log(msg) → tracing::info + // bread.warn(msg) → tracing::warn + // bread.error(msg) → tracing::error + // + // Each accepts any Lua value and coerces it to a string via tostring() + // so callers can do bread.log(some_table) without a crash. + self.lua.load(r#" + local _bread = bread + + local function stringify(v) + if type(v) == "string" then + return v + end + return tostring(v) + end + + function _bread.log(msg) + _bread.__log_info(stringify(msg)) + end + + function _bread.warn(msg) + _bread.__log_warn(stringify(msg)) + end + + function _bread.error(msg) + _bread.__log_error(stringify(msg)) + end + "#).exec()?; + + // Register the raw Rust-backed log functions that the Lua wrappers call. + let globals = self.lua.globals(); + let bread: mlua::Table = globals.get("bread")?; + + let info_fn = self.lua.create_function(|_, msg: String| { + tracing::info!(target: "bread.lua", "{}", msg); + Ok(()) + })?; + bread.set("__log_info", info_fn)?; + + let warn_fn = self.lua.create_function(|_, msg: String| { + tracing::warn!(target: "bread.lua", "{}", msg); + Ok(()) + })?; + bread.set("__log_warn", warn_fn)?; + + let error_fn = self.lua.create_function(|_, msg: String| { + tracing::error!(target: "bread.lua", "{}", msg); + Ok(()) + })?; + bread.set("__log_error", error_fn)?; + + Ok(()) + } + + fn install_debounce(&self) -> Result<()> { + // bread.debounce(delay_ms, fn) → wrapped_fn + // + // Returns a new function. When that function is called, it resets a + // timer. The original function is only called once the timer expires + // without being reset. Useful for rapid hardware events (e.g. monitor + // topology changes that fire multiple events in quick succession). + // + // Because the Lua runtime is single-threaded, we implement this in + // pure Lua using bread.cancel / bread.after. + self.lua.load(r#" + function bread.debounce(delay_ms, fn) + local timer_id = nil + return function(...) + local args = { ... } + if timer_id then + bread.cancel(timer_id) + timer_id = nil + end + timer_id = bread.after(delay_ms, function() + timer_id = nil + fn(table.unpack(args)) + end) + end + end + "#).exec()?; + Ok(()) + } + fn scan_module_decl(&self, path: &Path) -> Result { const MODULE_DECL_ABORT: &str = "__bread_module_decl__"; let lua = Lua::new(); From 55d103b3cfd2216040f2bc79501ad5b7ed55cdc7 Mon Sep 17 00:00:00 2001 From: Breadway Date: Mon, 11 May 2026 18:39:39 +0800 Subject: [PATCH 4/4] Enhance installation process, update service paths, and improve device classification --- .gitignore | 5 +- Cargo.lock | 1 + README.md | 84 ++++++++++++++++++++++++---- bread-cli/Cargo.toml | 4 ++ bread-cli/src/main.rs | 19 +------ breadd/src/adapters/udev.rs | 46 +++++++++++++--- breadd/src/core/normalizer.rs | 95 ++++++++++++++++++++++++++++---- breadd/src/ipc/mod.rs | 18 +++--- breadd/src/lua/mod.rs | 81 +++++++++++++++++++++++---- packaging/arch/PKGBUILD | 2 +- packaging/systemd/breadd.service | 2 +- scripts/install.sh | 36 ++++++++++++ 12 files changed, 323 insertions(+), 70 deletions(-) create mode 100755 scripts/install.sh diff --git a/.gitignore b/.gitignore index fdd4532..9472698 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ target/ Overview.md -DAEMON.md \ No newline at end of file +DAEMON.md +.claude +CLAUDE.md +.github/ \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index d987076..313315f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -289,6 +289,7 @@ dependencies = [ "anyhow", "bread-shared", "clap", + "libc", "notify", "serde", "serde_json", diff --git a/README.md b/README.md index 8da6518..73512df 100644 --- a/README.md +++ b/README.md @@ -72,16 +72,20 @@ Optional but preferred: ```bash git clone https://github.com/Breadway/bread.git cd bread -cargo build --release ``` -Binaries will be at `target/release/breadd` and `target/release/bread`. - -Install them: +Run the install script — it builds, installs to `/usr/bin`, sets up the systemd user service, and starts the daemon: ```bash -sudo install -Dm755 target/release/breadd /usr/local/bin/breadd -sudo install -Dm755 target/release/bread /usr/local/bin/bread +bash scripts/install.sh +``` + +Or do it step by step: + +```bash +cargo build --release +sudo install -Dm755 target/release/breadd /usr/bin/breadd +sudo install -Dm755 target/release/bread /usr/bin/bread ``` ### Arch Linux (PKGBUILD) @@ -130,6 +134,15 @@ enabled = true [events] dedup_window_ms = 100 + +[notifications] +default_timeout_ms = 5000 +default_urgency = "normal" +notify_send_path = "notify-send" + +[modules] +builtin = true # load built-in modules (monitors, devices, etc.) +disable = [] # list of built-in module names to disable ``` Your automation lives in `~/.config/bread/init.lua`: @@ -153,15 +166,18 @@ All commands communicate with the running daemon over a Unix socket at `$XDG_RUN ```bash bread reload # Hot-reload all Lua modules +bread reload --watch # Watch config dir and reload on changes bread state # Dump full runtime state as JSON bread events # Stream live normalized events bread events --filter bread.device.* # Stream filtered events +bread events --since 60 # Replay events from the last 60 seconds bread modules # List loaded modules and status bread profile-list # List defined profiles bread profile-activate # Activate a named profile bread emit --data '{}' # Manually fire an event (for testing) bread ping # Check daemon connectivity bread health # Daemon version, uptime, PID +bread doctor # Diagnose daemon and module health ``` --- @@ -201,16 +217,26 @@ Events follow the namespace convention `bread...`. ### Events ```lua --- Subscribe to an event -bread.on("bread.monitor.connected", function(event) +-- Subscribe to an event; returns a numeric ID +local id = bread.on("bread.monitor.connected", function(event) print(event.data.name) end) +-- Unsubscribe by ID +bread.off(id) + -- Subscribe once, then auto-unsubscribe bread.once("bread.system.startup", function(event) -- runs exactly once end) +-- Subscribe with a predicate filter +bread.filter("bread.device.connected", function(event) + return event.data.class == "keyboard" +end, function(event) + bread.exec("xset r rate 200 40") +end) + -- Emit a custom event (for cross-module communication) bread.emit("mymodule.something", { key = "value" }) ``` @@ -222,6 +248,12 @@ bread.emit("mymodule.something", { key = "value" }) local monitors = bread.state.get("monitors") local workspace = bread.state.get("active_workspace") local power = bread.state.get("power") +local devices = bread.state.get("devices") + +-- Watch a state key and fire on changes +bread.state.watch("active_workspace", function(new, old) + print("workspace changed from " .. tostring(old) .. " to " .. tostring(new)) +end) ``` ### Profiles @@ -231,12 +263,42 @@ bread.profile.activate("desk") bread.profile.activate("default") ``` -### Execution +### Execution and notifications ```lua -- Fire-and-forget: returns immediately, process runs in background bread.exec("kitty") -bread.exec("notify-send 'Dock connected'") + +-- Desktop notification +bread.notify("Dock connected", { urgency = "normal", timeout = 3000 }) +``` + +### Timers + +```lua +-- Run once after a delay (ms) +bread.after(500, function() + bread.exec("some-delayed-command") +end) + +-- Run on a repeating interval (ms); returns a timer ID +local id = bread.every(60000, function() + bread.log("tick") +end) +bread.cancel(id) + +-- Debounce a rapidly-firing handler +local fn = bread.debounce(200, function(event) + reconfigure_monitors() +end) +``` + +### Logging + +```lua +bread.log("Module loaded") +bread.warn("Unexpected state") +bread.error("Something failed") ``` --- @@ -255,7 +317,7 @@ Response: { "id": "1", "result": [ { "name": "HDMI-A-1", "connected": true } ] } ``` -Available methods: `ping`, `health`, `state.get`, `state.dump`, `modules.list`, `modules.reload`, `profile.list`, `profile.activate`, `events.subscribe`, `emit`. +Available methods: `ping`, `health`, `state.get`, `state.dump`, `modules.list`, `modules.reload`, `profile.list`, `profile.activate`, `events.subscribe`, `events.replay`, `emit`. `events.subscribe` upgrades the connection to a streaming mode — the daemon pushes events line by line until the client disconnects. diff --git a/bread-cli/Cargo.toml b/bread-cli/Cargo.toml index 5f18678..69a2c49 100644 --- a/bread-cli/Cargo.toml +++ b/bread-cli/Cargo.toml @@ -3,6 +3,10 @@ name = "bread-cli" version = "0.1.0" edition = "2021" +[[bin]] +name = "bread" +path = "src/main.rs" + [dependencies] bread-shared = { path = "../bread-shared" } serde.workspace = true diff --git a/bread-cli/src/main.rs b/bread-cli/src/main.rs index 9382494..0ca91df 100644 --- a/bread-cli/src/main.rs +++ b/bread-cli/src/main.rs @@ -5,7 +5,7 @@ use serde_json::{json, Value}; use std::env; use std::io; use std::path::{Path, PathBuf}; -use std::time::{Duration, UNIX_EPOCH}; +use std::time::Duration; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::UnixStream; use tokio::sync::mpsc; @@ -331,18 +331,6 @@ fn print_reload(value: &Value) { } } -async fn watch_reload(socket: &Path) -> Result<()> { - let config_dir = config_directory(); - println!("watching {} for changes...", config_dir.display()); - - let (tx, mut rx) = mpsc::unbounded_channel(); - let mut watcher: RecommendedWatcher = notify::recommended_watcher(move |res| { - let _ = tx.send(res); - })?; - watcher.watch(&config_dir, RecursiveMode::Recursive)?; - - use tokio::time::{sleep, Duration}; - async fn watch_reload(socket: &Path) -> Result<()> { let config_dir = config_directory(); println!("watching {} for changes...", config_dir.display()); @@ -360,7 +348,7 @@ async fn watch_reload(socket: &Path) -> Result<()> { // Debounce: drain any follow-up events that arrive within 150ms. // A single file save typically generates 2-3 fs events in rapid succession. - sleep(Duration::from_millis(150)).await; + tokio::time::sleep(Duration::from_millis(150)).await; while rx.try_recv().is_ok() {} let response = send_request(socket, "modules.reload", json!({})).await?; @@ -370,9 +358,6 @@ async fn watch_reload(socket: &Path) -> Result<()> { Ok(()) } - Ok(()) -} - async fn print_doctor(socket: &Path) -> Result<()> { let stream = match UnixStream::connect(socket).await { Ok(stream) => stream, diff --git a/breadd/src/adapters/udev.rs b/breadd/src/adapters/udev.rs index ffe4d15..ade7d2f 100644 --- a/breadd/src/adapters/udev.rs +++ b/breadd/src/adapters/udev.rs @@ -52,18 +52,23 @@ impl Adapter for UdevAdapter { async fn run(&self, tx: mpsc::Sender) -> Result<()> { debug!("udev adapter started"); - if let Ok(()) = run_udev_monitor(self.subsystems.clone(), tx.clone()).await { - return Ok(()); + match run_udev_monitor(self.subsystems.clone(), tx.clone()).await { + Ok(()) => return Ok(()), + Err(err) => { + tracing::warn!(error = %err, "udev netlink monitor unavailable, falling back to sysfs polling (add user to 'plugdev' group for real-time events)"); + } } - // Fallback for environments where monitor sockets are unavailable. - let mut known: HashMap = scan_devices(&self.subsystems)? + // Fallback: poll sysfs every 2 seconds for environments where the + // netlink socket is unavailable (missing plugdev membership, containers, etc). + let mut known: HashMap = scan_devices(&self.subsystems) + .unwrap_or_default() .into_iter() .map(|d| (d.id.clone(), d)) .collect(); loop { - let current = scan_devices(&self.subsystems)?; + let current = scan_devices(&self.subsystems).unwrap_or_default(); let current_map: HashMap = current .into_iter() .map(|d| (d.id.clone(), d)) @@ -71,13 +76,17 @@ impl Adapter for UdevAdapter { for (id, dev) in ¤t_map { if !known.contains_key(id) { - tx.send(raw_change_event("add", dev)).await?; + if tx.send(raw_change_event("add", dev)).await.is_err() { + return Ok(()); + } } } for (id, dev) in &known { if !current_map.contains_key(id) { - tx.send(raw_change_event("remove", dev)).await?; + if tx.send(raw_change_event("remove", dev)).await.is_err() { + return Ok(()); + } } } @@ -130,6 +139,15 @@ async fn run_udev_monitor(subsystems: Vec, tx: mpsc::Sender) - "id": id, "name": name, "subsystem": subsystem, + "id_input_keyboard": prop_bool(&event, "ID_INPUT_KEYBOARD"), + "id_input_mouse": prop_bool(&event, "ID_INPUT_MOUSE"), + "id_input_joystick": prop_bool(&event, "ID_INPUT_JOYSTICK"), + "id_input_touchpad": prop_bool(&event, "ID_INPUT_TOUCHPAD"), + "id_input_tablet": prop_bool(&event, "ID_INPUT_TABLET"), + "id_usb_class": prop_str(&event, "ID_USB_CLASS"), + "id_usb_interfaces": prop_str(&event, "ID_USB_INTERFACES"), + "id_vendor": prop_str(&event, "ID_VENDOR"), + "id_model": prop_str(&event, "ID_MODEL"), }), timestamp: now_unix_ms(), }; @@ -263,3 +281,17 @@ fn scan_devices(subsystems: &[String]) -> Result> { Ok(out) } + +fn prop_bool(event: &udev::Event, key: &str) -> bool { + event + .property_value(key) + .and_then(|v| v.to_str()) + .map(|v| v == "1") + .unwrap_or(false) +} + +fn prop_str(event: &udev::Event, key: &str) -> Option { + event + .property_value(key) + .map(|v| v.to_string_lossy().to_string()) +} diff --git a/breadd/src/core/normalizer.rs b/breadd/src/core/normalizer.rs index b424e1c..3eaef88 100644 --- a/breadd/src/core/normalizer.rs +++ b/breadd/src/core/normalizer.rs @@ -289,33 +289,104 @@ fn split_hyprland_fields(data: &str) -> Vec<&str> { } fn classify_device(payload: &Value) -> DeviceClass { - let name = payload - .get("name") - .and_then(Value::as_str) - .unwrap_or_default() - .to_lowercase(); let subsystem = payload .get("subsystem") .and_then(Value::as_str) .unwrap_or_default() .to_lowercase(); - if name.contains("dock") { - return DeviceClass::Dock; - } - if subsystem == "input" && name.contains("keyboard") { + // --- Property-based classification (reliable, hardware-agnostic) --- + + // udev sets ID_INPUT_KEYBOARD=1 for anything that presents as a keyboard HID device. + if payload.get("id_input_keyboard").and_then(Value::as_bool).unwrap_or(false) { return DeviceClass::Keyboard; } - if subsystem == "input" && name.contains("mouse") { + + // ID_INPUT_MOUSE=1 covers mice and trackballs. + if payload.get("id_input_mouse").and_then(Value::as_bool).unwrap_or(false) { return DeviceClass::Mouse; } + + // ID_INPUT_TABLET=1 covers drawing tablets (Wacom etc). + if payload.get("id_input_tablet").and_then(Value::as_bool).unwrap_or(false) { + return DeviceClass::Tablet; + } + + // USB class 0x09 = Hub. Docks expose a hub interface; they also typically + // expose video (0x0e), audio (0x01), and ethernet (CDC 0x02) interfaces. + // We check for hub + at least one of those secondary interfaces. + if let Some(ifaces) = payload.get("id_usb_interfaces").and_then(Value::as_str) { + let ifaces_lc = ifaces.to_lowercase(); + let has_hub = ifaces_lc.contains(":0900") || ifaces_lc.contains(":0902"); + let has_secondary = ifaces_lc.contains(":0e") // video + || ifaces_lc.contains(":0200") // CDC ethernet + || ifaces_lc.contains(":0100") // audio + || ifaces_lc.contains(":0801"); // mass storage + if has_hub && has_secondary { + return DeviceClass::Dock; + } + } + + // USB class 0x01 = Audio. + if let Some(cls) = payload.get("id_usb_class").and_then(Value::as_str) { + if cls == "01" || cls.to_lowercase() == "0x01" { + return DeviceClass::Audio; + } + // USB class 0x08 = Mass Storage. + if cls == "08" || cls.to_lowercase() == "0x08" { + return DeviceClass::Storage; + } + } + + // DRM subsystem = display connector. if subsystem == "drm" { return DeviceClass::Display; } - if subsystem == "sound" || name.contains("audio") { + + // Block devices = storage. + if subsystem == "block" { + return DeviceClass::Storage; + } + + // Sound subsystem = audio. + if subsystem == "sound" { return DeviceClass::Audio; } - if subsystem == "block" || name.contains("storage") { + + // --- Name-based fallback (catches user-registered patterns and obvious names) --- + // This runs last so the property-based rules above always win. + + let name = payload + .get("name") + .and_then(Value::as_str) + .or_else(|| payload.get("id_model").and_then(Value::as_str)) + .unwrap_or_default() + .to_lowercase(); + + let vendor = payload + .get("id_vendor") + .and_then(Value::as_str) + .unwrap_or_default() + .to_lowercase(); + + let combined = format!("{name} {vendor}"); + + if combined.contains("dock") || combined.contains("hub") || combined.contains("thunderbolt") { + return DeviceClass::Dock; + } + if combined.contains("keyboard") || combined.contains("kbd") { + return DeviceClass::Keyboard; + } + if combined.contains("mouse") || combined.contains("trackball") || combined.contains("trackpoint") { + return DeviceClass::Mouse; + } + if combined.contains("tablet") || combined.contains("wacom") || combined.contains("stylus") { + return DeviceClass::Tablet; + } + if combined.contains("audio") || combined.contains("headset") || combined.contains("speaker") || combined.contains("dac") { + return DeviceClass::Audio; + } + if combined.contains("storage") || combined.contains("drive") || combined.contains("flash") || combined.contains("disk") { return DeviceClass::Storage; } diff --git a/breadd/src/ipc/mod.rs b/breadd/src/ipc/mod.rs index f99ea74..fff3368 100644 --- a/breadd/src/ipc/mod.rs +++ b/breadd/src/ipc/mod.rs @@ -270,14 +270,16 @@ impl Server { "events.replay" => { let since_ms = req.params.get("since_ms").and_then(Value::as_u64).unwrap_or(0); let cutoff = now_unix_ms().saturating_sub(since_ms); - let mut replay = Vec::new(); - if let Ok(buf) = self.event_buffer.lock() { - for event in buf.iter() { - if event.timestamp >= cutoff { - replay.push(event); - } - } - } + let replay: Vec = self + .event_buffer + .lock() + .map(|buf| { + buf.iter() + .filter(|e| e.timestamp >= cutoff) + .cloned() + .collect() + }) + .unwrap_or_default(); Ok(serde_json::to_value(replay).unwrap_or_else(|_| json!([]))) } _ => Err("unknown method".to_string()), diff --git a/breadd/src/lua/mod.rs b/breadd/src/lua/mod.rs index 9acc814..228ac61 100644 --- a/breadd/src/lua/mod.rs +++ b/breadd/src/lua/mod.rs @@ -1499,7 +1499,15 @@ fn state_value_to_lua<'lua>( state_arc: &Arc>, path: &str, ) -> mlua::Result> { - let snapshot = state_arc.blocking_read(); + // The Lua thread runs a current_thread runtime. blocking_read and block_in_place + // both require the multi-thread runtime and panic here. try_read succeeds + // immediately in the common case; the write lock is held for microseconds. + let snapshot = loop { + if let Ok(g) = state_arc.try_read() { + break g; + } + std::hint::spin_loop(); + }; let mut value = serde_json::to_value(&*snapshot) .map_err(|e| LuaError::external(e.to_string()))?; if path.is_empty() { @@ -1518,13 +1526,23 @@ fn state_value_to_lua<'lua>( } fn module_store_get(state_arc: &Arc>, module: &str, key: &str) -> Option { - let guard = state_arc.blocking_read(); + let guard = loop { + if let Ok(g) = state_arc.try_read() { + break g; + } + std::hint::spin_loop(); + }; let entry = guard.modules.iter().find(|m| m.name == module)?; entry.store.get(key).cloned() } fn module_store_set(state_arc: &Arc>, module: &str, key: String, value: JsonValue) { - let mut guard = state_arc.blocking_write(); + let mut guard = loop { + if let Ok(g) = state_arc.try_write() { + break g; + } + std::hint::spin_loop(); + }; if let Some(entry) = guard.modules.iter_mut().find(|m| m.name == module) { entry.store.insert(key, value); return; @@ -1616,6 +1634,7 @@ const BUILTIN_DEVICES: &str = r#" local M = bread.module({ name = "bread.devices", version = "1.0.0" }) local rules = {} +local user_patterns = {} -- { { pattern = "...", class = "..." }, ... } local function matches_rule(rule, event) local class = rule.class @@ -1651,15 +1670,55 @@ local function run_rule(rule, event) end end +-- Reclassify an event's data.class based on user-registered name patterns. +-- Called before rule matching so that user-registered patterns take effect +-- even for devices that the daemon classified as Unknown. +local function apply_user_patterns(event) + if not event.data then return event end + local name = tostring(event.data.name or ""):lower() + local vendor = tostring(event.data.vendor or ""):lower() + local combined = name .. " " .. vendor + for _, p in ipairs(user_patterns) do + if combined:find(p.pattern, 1, true) then + -- Return a shallow copy with the class overridden so we don't + -- mutate the original event that other handlers may receive. + local patched = {} + for k, v in pairs(event) do patched[k] = v end + patched.data = {} + for k, v in pairs(event.data) do patched.data[k] = v end + patched.data.class = p.class + return patched + end + end + return event +end + function M.on(opts) table.insert(rules, opts) end +-- Register a user-defined device pattern so the daemon can correctly classify +-- hardware that the automatic classifier doesn't recognise. +-- +-- Usage: +-- local devices = require("bread.devices") +-- devices.register("CalDigit", "dock") +-- devices.register("Keychron", "keyboard") +-- devices.register("MX Master", "mouse") +-- +-- The pattern is matched case-insensitively against the device name and vendor +-- combined. The class must be one of: dock, keyboard, mouse, tablet, display, +-- storage, audio, unknown. +function M.register(pattern, class) + table.insert(user_patterns, { pattern = pattern:lower(), class = class }) +end + function M.on_load() bread.on("bread.device.**", function(event) + local patched = apply_user_patterns(event) for _, rule in ipairs(rules) do - if matches_rule(rule, event) then - run_rule(rule, event) + if matches_rule(rule, patched) then + run_rule(rule, patched) end end end) @@ -1811,13 +1870,11 @@ fn hyprland_request(request: &str) -> Result { use std::os::unix::net::UnixStream; let socket = hyprland_request_socket()?; - tokio::task::block_in_place(|| { - let mut stream = UnixStream::connect(&socket)?; - stream.write_all(request.as_bytes())?; - let mut buffer = String::new(); - stream.read_to_string(&mut buffer)?; - Ok(buffer) - }) + let mut stream = UnixStream::connect(&socket)?; + stream.write_all(request.as_bytes())?; + let mut buffer = String::new(); + stream.read_to_string(&mut buffer)?; + Ok(buffer) } fn list_lua_files(root: &Path) -> Result> { diff --git a/packaging/arch/PKGBUILD b/packaging/arch/PKGBUILD index 1a36db0..8ce69ee 100644 --- a/packaging/arch/PKGBUILD +++ b/packaging/arch/PKGBUILD @@ -20,6 +20,6 @@ build() { package() { cd "${srcdir}/${pkgname}-${pkgver}" install -Dm755 target/release/breadd "${pkgdir}/usr/bin/breadd" - install -Dm755 target/release/bread-cli "${pkgdir}/usr/bin/bread-cli" + install -Dm755 target/release/bread "${pkgdir}/usr/bin/bread" install -Dm644 packaging/systemd/breadd.service "${pkgdir}/usr/lib/systemd/user/breadd.service" } diff --git a/packaging/systemd/breadd.service b/packaging/systemd/breadd.service index 9f36697..95f0942 100644 --- a/packaging/systemd/breadd.service +++ b/packaging/systemd/breadd.service @@ -5,7 +5,7 @@ Wants=graphical-session.target [Service] Type=simple -ExecStart=%h/.cargo/bin/breadd +ExecStart=/usr/bin/breadd Restart=on-failure RestartSec=2 UMask=0077 diff --git a/scripts/install.sh b/scripts/install.sh new file mode 100755 index 0000000..961f792 --- /dev/null +++ b/scripts/install.sh @@ -0,0 +1,36 @@ +#!/usr/bin/env bash +set -euo pipefail + +REPO_ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +INSTALL_PREFIX="${INSTALL_PREFIX:-/usr/bin}" +SERVICE_DIR="${HOME}/.config/systemd/user" + +# ── build ────────────────────────────────────────────────────────────────────── +echo "building bread (release)..." +cargo build --release --manifest-path "$REPO_ROOT/Cargo.toml" + +# ── install binaries ─────────────────────────────────────────────────────────── +echo "installing binaries to $INSTALL_PREFIX (requires sudo)..." +sudo install -Dm755 "$REPO_ROOT/target/release/breadd" "$INSTALL_PREFIX/breadd" +sudo install -Dm755 "$REPO_ROOT/target/release/bread" "$INSTALL_PREFIX/bread" +echo " installed $INSTALL_PREFIX/breadd" +echo " installed $INSTALL_PREFIX/bread" + +# ── systemd user service ─────────────────────────────────────────────────────── +echo "installing systemd user service..." +mkdir -p "$SERVICE_DIR" +install -Dm644 "$REPO_ROOT/packaging/systemd/breadd.service" "$SERVICE_DIR/breadd.service" +echo " installed $SERVICE_DIR/breadd.service" + +systemctl --user daemon-reload +systemctl --user enable --now breadd +echo " breadd enabled and started" + +# ── verify ───────────────────────────────────────────────────────────────────── +sleep 0.5 +if bread ping &>/dev/null; then + echo "" + bread doctor +else + echo "warning: daemon did not respond to ping — check: journalctl --user -u breadd -n 20" +fi