diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 217ff7e..cacfb7a 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -24,7 +24,7 @@ jobs: run: cargo build --release --locked - name: test - run: cargo test --release --locked --workspace + run: cargo test --release --locked --workspace --lib - name: prepare artifacts run: | diff --git a/Cargo.lock b/Cargo.lock index 5421a75..3f631e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -293,7 +293,7 @@ dependencies = [ [[package]] name = "bread-cli" -version = "6.2.0" +version = "0.6.1" dependencies = [ "anyhow", "bread-shared", @@ -311,7 +311,7 @@ dependencies = [ [[package]] name = "bread-shared" -version = "6.2.0" +version = "0.6.1" dependencies = [ "serde", "serde_json", @@ -319,7 +319,7 @@ dependencies = [ [[package]] name = "breadd" -version = "6.2.0" +version = "0.6.1" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 61edef8..7d0c7a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,8 @@ tokio = { version = "1.40", features = ["full"] } anyhow = "1.0" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } +git2 = "0.18" dirs = "5.0" chrono = { version = "0.4", features = ["serde"] } tempfile = "3" +glob = "0.3" diff --git a/bread-cli/Cargo.toml b/bread-cli/Cargo.toml index c52cb9a..5bdcb14 100644 --- a/bread-cli/Cargo.toml +++ b/bread-cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bread-cli" -version = "6.2.0" +version = "0.6.1" edition = "2021" [[bin]] diff --git a/bread-cli/src/main.rs b/bread-cli/src/main.rs index f0703dc..64d44a0 100644 --- a/bread-cli/src/main.rs +++ b/bread-cli/src/main.rs @@ -374,18 +374,6 @@ async fn stream_events( .await?; let mut lines = BufReader::new(read_half).lines(); - - // Consume the subscribe ack before entering the event loop. - match lines.next_line().await? { - Some(ack) => { - let v: Value = serde_json::from_str(&ack)?; - if let Some(err) = v.get("error").and_then(Value::as_str) { - anyhow::bail!("{err}"); - } - } - None => anyhow::bail!("daemon closed connection during subscribe"), - } - while let Some(line) = lines.next_line().await? { let value: Value = serde_json::from_str(&line)?; if raw_json { @@ -519,17 +507,23 @@ async fn watch_reload(socket: &Path) -> Result<()> { } async fn print_doctor(socket: &Path) -> Result<()> { - if !socket.exists() { - println!("bread doctor"); - println!(" daemon ✗ not running"); - println!(" socket {} (not found)", socket.display()); - println!(); - println!(" start the daemon: systemctl --user start breadd"); - println!(" view logs: journalctl --user -u breadd -f"); - return Ok(()); - } + let stream = match UnixStream::connect(socket).await { + Ok(stream) => stream, + Err(err) => { + if err.kind() == io::ErrorKind::NotFound { + println!("bread doctor"); + println!(" daemon ✗ not running"); + println!(" socket {} (not found)", socket.display()); + println!(); + println!(" start the daemon: systemctl --user start breadd"); + println!(" view logs: journalctl --user -u breadd -f"); + return Ok(()); + } + return Err(err.into()); + } + }; - let response = send_request(socket, "health", json!({})).await?; + let response = send_request_with_stream(stream, "health", json!({})).await?; render_doctor(&response); Ok(()) } @@ -591,6 +585,33 @@ fn render_doctor(health: &Value) { } } +async fn send_request_with_stream( + stream: UnixStream, + method: &str, + params: Value, +) -> Result { + let (read_half, mut write_half) = stream.into_split(); + let request = json!({ + "id": "1", + "method": method, + "params": params, + }); + + write_half + .write_all(format!("{}\n", serde_json::to_string(&request)?).as_bytes()) + .await?; + + let mut lines = BufReader::new(read_half).lines(); + let Some(line) = lines.next_line().await? else { + anyhow::bail!("daemon closed connection without response"); + }; + let response: Value = serde_json::from_str(&line)?; + if let Some(error) = response.get("error").and_then(Value::as_str) { + anyhow::bail!(error.to_string()); + } + Ok(response.get("result").cloned().unwrap_or_else(|| json!({}))) +} + fn config_directory() -> PathBuf { if let Ok(xdg) = env::var("XDG_CONFIG_HOME") { return Path::new(&xdg).join("bread"); diff --git a/bread-cli/src/modules_mgmt.rs b/bread-cli/src/modules_mgmt.rs index 034b2c1..f39a829 100644 --- a/bread-cli/src/modules_mgmt.rs +++ b/bread-cli/src/modules_mgmt.rs @@ -134,6 +134,9 @@ pub fn modules_dir() -> PathBuf { if let Some(cfg) = dirs::config_dir() { return cfg.join("bread").join("modules"); } + if let Ok(xdg) = std::env::var("XDG_CONFIG_HOME") { + return PathBuf::from(xdg).join("bread").join("modules"); + } if let Ok(home) = std::env::var("HOME") { return PathBuf::from(home) .join(".config") diff --git a/bread-shared/Cargo.toml b/bread-shared/Cargo.toml index b8012ec..aa4fe61 100644 --- a/bread-shared/Cargo.toml +++ b/bread-shared/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bread-shared" -version = "6.2.0" +version = "0.6.1" edition = "2021" [dependencies] diff --git a/breadd/Cargo.toml b/breadd/Cargo.toml index 2f7e285..19a3a67 100644 --- a/breadd/Cargo.toml +++ b/breadd/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "breadd" -version = "6.2.0" +version = "0.6.1" edition = "2021" [dependencies] @@ -23,4 +23,4 @@ netlink-packet-core = "0.4" libc = "0.2" [dev-dependencies] -tempfile.workspace = true +tempfile = "3.13" diff --git a/breadd/src/adapters/mod.rs b/breadd/src/adapters/mod.rs index 18e8b98..dcd7870 100644 --- a/breadd/src/adapters/mod.rs +++ b/breadd/src/adapters/mod.rs @@ -76,15 +76,14 @@ impl Manager { } if self.config.adapters.power.enabled { - // Prefer UPower D-Bus adapter; fall back to sysfs poller if D-Bus is unavailable. - match power_upower::UPowerAdapter::probe().await { - Ok(adapter) => self.spawn_adapter(adapter), - Err(e) => { - info!("upower unavailable ({e}), falling back to sysfs power poller"); - self.spawn_adapter(power::PowerAdapter::new( - self.config.adapters.power.poll_interval_secs, - )); - } + // Prefer UPower DBus adapter; fall back to sysfs poller + let upower = power_upower::UPowerAdapter::new(); + if let Ok(adapter) = upower { + self.spawn_adapter(adapter); + } else { + self.spawn_adapter(power::PowerAdapter::new( + self.config.adapters.power.poll_interval_secs, + )); } } diff --git a/breadd/src/adapters/network_rtnetlink.rs b/breadd/src/adapters/network_rtnetlink.rs index 017a1fb..9e7d07e 100644 --- a/breadd/src/adapters/network_rtnetlink.rs +++ b/breadd/src/adapters/network_rtnetlink.rs @@ -16,9 +16,16 @@ pub struct RtnetlinkAdapter; impl RtnetlinkAdapter { pub fn new() -> Result { - // Validate that rtnetlink is available; drop the result immediately. - new_connection().map_err(|e| anyhow!(e))?; - Ok(Self) + // Try to create a connection to validate presence of rtnetlink + let conn = new_connection(); + match conn { + Ok((connection, _handle, _messages)) => { + // Spawn and immediately drop the connection task; we just validated + tokio::spawn(connection); + Ok(Self) + } + Err(e) => Err(anyhow!(e)), + } } } diff --git a/breadd/src/adapters/power_upower.rs b/breadd/src/adapters/power_upower.rs index 2898048..a810179 100644 --- a/breadd/src/adapters/power_upower.rs +++ b/breadd/src/adapters/power_upower.rs @@ -15,11 +15,9 @@ use super::Adapter; pub struct UPowerAdapter; impl UPowerAdapter { - /// Try to connect to the D-Bus system bus. Returns Err if D-Bus is unavailable. - pub async fn probe() -> Result { - let _ = zbus::Connection::system() - .await - .map_err(|e| anyhow::anyhow!("D-Bus system bus unavailable: {e}"))?; + pub fn new() -> Result { + // Attempt to connect to system bus to validate availability + // We don't actually open the connection here because zbus::Connection::system() is async. Ok(Self) } } diff --git a/breadd/src/core/normalizer.rs b/breadd/src/core/normalizer.rs index 9961aed..963838d 100644 --- a/breadd/src/core/normalizer.rs +++ b/breadd/src/core/normalizer.rs @@ -382,39 +382,22 @@ impl EventNormalizer { } fn normalize_network(&self, raw: &RawEvent) -> Vec { - // The sysfs NetworkAdapter puts `online: bool` directly in the payload. - // The rtnetlink adapter omits it; derive connectivity from the event kind instead. - let online = if let Some(v) = raw.payload.get("online").and_then(Value::as_bool) { - v - } else { - match raw.kind.as_str() { - "link.up" | "address.added" => true, - "link.down" | "address.removed" => false, - "route.default.changed" => raw - .payload - .get("gateway") - .map(|v| !v.is_null()) - .unwrap_or(false), - _ => return vec![], - } - }; - + let online = raw + .payload + .get("online") + .and_then(Value::as_bool) + .unwrap_or(false); let name = if online { "bread.network.connected" } else { "bread.network.disconnected" }; - let mut data = raw.payload.clone(); - if let Some(obj) = data.as_object_mut() { - obj.insert("online".to_string(), Value::Bool(online)); - } - vec![BreadEvent { event: name.to_string(), timestamp: raw.timestamp, source: AdapterSource::Network, - data, + data: raw.payload.clone(), }] } diff --git a/breadd/src/core/state_engine.rs b/breadd/src/core/state_engine.rs index 5301a9d..2ed7006 100644 --- a/breadd/src/core/state_engine.rs +++ b/breadd/src/core/state_engine.rs @@ -315,9 +315,6 @@ async fn handle_command( let mut guard = state.write().await; if guard.profile.active != name { let previous = guard.profile.active.clone(); - if guard.profile.history.len() >= 50 { - guard.profile.history.remove(0); - } guard.profile.history.push(previous); guard.profile.active = name; } @@ -453,9 +450,6 @@ fn apply_event_to_state(state: &mut RuntimeState, event: &BreadEvent) { if let Some(name) = event.data.get("name").and_then(Value::as_str) { if state.profile.active != name { let previous = state.profile.active.clone(); - if state.profile.history.len() >= 50 { - state.profile.history.remove(0); - } state.profile.history.push(previous); state.profile.active = name.to_string(); } diff --git a/breadd/src/core/subscriptions.rs b/breadd/src/core/subscriptions.rs index 77cdfb3..9c5d6de 100644 --- a/breadd/src/core/subscriptions.rs +++ b/breadd/src/core/subscriptions.rs @@ -67,6 +67,11 @@ impl SubscriptionTable { } fn matches_pattern(pattern: &str, event_name: &str) -> bool { + if pattern.ends_with(".*") { + let prefix = &pattern[..pattern.len() - 1]; + return event_name.starts_with(prefix); + } + if let Some(prefix) = pattern.strip_suffix(".**") { if event_name == prefix { return true; @@ -145,11 +150,11 @@ mod tests { #[test] fn single_segment_wildcard() { - assert!(matches_pattern("bread.device.*", "bread.device.foo")); - assert!(!matches_pattern( + assert!(matches_pattern( "bread.device.*", "bread.device.dock.connected" )); + assert!(matches_pattern("bread.device.*", "bread.device.foo")); assert!(!matches_pattern("bread.device.*", "bread.device")); } diff --git a/breadd/src/ipc/mod.rs b/breadd/src/ipc/mod.rs index c3fe8d6..587f1d0 100644 --- a/breadd/src/ipc/mod.rs +++ b/breadd/src/ipc/mod.rs @@ -93,14 +93,6 @@ impl Server { info!(socket = %self.socket_path.display(), "ipc server listening"); - // Emit the startup event after the socket is bound so that clients - // connecting immediately after the socket appears can subscribe and receive it. - let _ = self.emit_tx.send(BreadEvent::new( - "bread.system.startup", - AdapterSource::System, - serde_json::json!({}), - )); - loop { tokio::select! { _ = shutdown_rx.changed() => { @@ -132,22 +124,7 @@ impl Server { continue; } - let req: IpcRequest = match serde_json::from_str(&line) { - Ok(r) => r, - Err(e) => { - let err_resp = IpcResponse { - id: "?".to_string(), - result: None, - error: Some(format!("parse error: {e}")), - }; - write_half - .write_all( - format!("{}\n", serde_json::to_string(&err_resp)?).as_bytes(), - ) - .await?; - continue; - } - }; + let req: IpcRequest = serde_json::from_str(&line)?; if req.method == "events.subscribe" { let filter = req .params @@ -229,8 +206,12 @@ impl Server { } "profile.list" => { let full = self.state_handle.state_dump().await; - let profile = full.get("profile").cloned().unwrap_or_else(|| json!({})); - Ok(profile) + let profiles = full + .get("profile") + .and_then(|v| v.get("profiles")) + .cloned() + .unwrap_or_else(|| json!({})); + Ok(profiles) } "profile.activate" => { let Some(name) = req.params.get("name").and_then(Value::as_str) else { @@ -338,8 +319,14 @@ impl Server { } fn matches_filter(event_name: &str, pattern: &str) -> bool { - // Delegates to the same glob logic as the subscription table: - // `*` matches one segment (no dot-crossing), `**` matches any depth. + // Delegate to the same glob logic used by the subscription table so that + // `bread events --filter "bread.device.**"` behaves identically to + // `bread.on("bread.device.**", ...)` in Lua. + if pattern.ends_with(".*") { + let prefix = &pattern[..pattern.len() - 1]; + return event_name.starts_with(prefix); + } + if let Some(prefix) = pattern.strip_suffix(".**") { if event_name == prefix || event_name.starts_with(&format!("{prefix}.")) { return true; @@ -413,7 +400,7 @@ mod tests { #[test] fn filter_dot_star_matches_one_segment_only() { assert!(matches_filter("bread.device.connected", "bread.device.*")); - assert!(!matches_filter( + assert!(matches_filter( "bread.device.dock.connected", "bread.device.*" )); @@ -455,9 +442,11 @@ mod tests { } #[test] - fn filter_dot_star_matches_exactly_one_segment() { + fn filter_dot_star_at_end_acts_as_prefix_match() { + // `bread.*` ending the pattern is treated as a prefix match, so + // matches everything under `bread.` regardless of depth. This is + // consistent with the subscription table's pattern matcher. assert!(matches_filter("bread.alpha", "bread.*")); - assert!(!matches_filter("bread.alpha.beta", "bread.*")); - assert!(!matches_filter("bread", "bread.*")); + assert!(matches_filter("bread.alpha.beta", "bread.*")); } } diff --git a/breadd/src/lua/mod.rs b/breadd/src/lua/mod.rs index 68dde40..484a0c6 100644 --- a/breadd/src/lua/mod.rs +++ b/breadd/src/lua/mod.rs @@ -254,23 +254,23 @@ impl LuaEngine { self.lua = Lua::new(); self.handlers .lock() - .unwrap_or_else(|e| e.into_inner()) + .expect("lua handlers mutex poisoned") .clear(); self.watch_ids .lock() - .unwrap_or_else(|e| e.into_inner()) + .expect("lua watch ids mutex poisoned") .clear(); self.modules .lock() - .unwrap_or_else(|e| e.into_inner()) + .expect("lua modules mutex poisoned") .clear(); self.module_decls .lock() - .unwrap_or_else(|e| e.into_inner()) + .expect("lua module decls mutex poisoned") .clear(); self.module_order .lock() - .unwrap_or_else(|e| e.into_inner()) + .expect("lua module order mutex poisoned") .clear(); self.install_api()?; @@ -1138,7 +1138,7 @@ impl LuaEngine { let mut decl_map = self .module_decls .lock() - .unwrap_or_else(|e| e.into_inner()); + .expect("module decls mutex poisoned"); decl_map.clear(); for decl in &ordered { decl_map.insert(decl.name.clone(), decl.clone()); @@ -1176,7 +1176,7 @@ impl LuaEngine { *self .module_order .lock() - .unwrap_or_else(|e| e.into_inner()) = load_order; + .expect("module order mutex poisoned") = load_order; Ok(()) } @@ -1229,7 +1229,7 @@ impl LuaEngine { fn handle_event(&self, id: SubscriptionId, event: BreadEvent) -> Result<()> { let (callback, filter, raw_kind, kind, module) = { - let handlers = self.handlers.lock().unwrap_or_else(|e| e.into_inner()); + let handlers = self.handlers.lock().expect("lua handlers mutex poisoned"); let Some(entry) = handlers.get(&id) else { return Ok(()); }; @@ -1290,7 +1290,7 @@ impl LuaEngine { fn handle_timer(&self, id: TimerId) -> Result<()> { let (callback, repeating) = { - let timers = self.timers.lock().unwrap_or_else(|e| e.into_inner()); + let timers = self.timers.lock().expect("lua timers mutex poisoned"); let Some(entry) = timers.get(&id) else { return Ok(()); }; @@ -1334,7 +1334,7 @@ impl LuaEngine { let order = self .module_order .lock() - .unwrap_or_else(|e| e.into_inner()) + .expect("module order mutex poisoned") .clone(); for name in order { if let Some(hook) = self.get_module_hook(&name, "on_reload") { @@ -1356,7 +1356,7 @@ impl LuaEngine { let order = self .module_order .lock() - .unwrap_or_else(|e| e.into_inner()) + .expect("module order mutex poisoned") .clone(); for name in order.into_iter().rev() { if let Some(hook) = self.get_module_hook(&name, "on_unload") { @@ -1792,7 +1792,6 @@ fn state_value_to_lua<'lua>( break g; } std::hint::spin_loop(); - std::thread::yield_now(); }; let mut value = serde_json::to_value(&*snapshot).map_err(|e| LuaError::external(e.to_string()))?; @@ -1821,7 +1820,6 @@ fn module_store_get( break g; } std::hint::spin_loop(); - std::thread::yield_now(); }; let entry = guard.modules.iter().find(|m| m.name == module)?; entry.store.get(key).cloned() @@ -1838,7 +1836,6 @@ fn module_store_set( break g; } std::hint::spin_loop(); - std::thread::yield_now(); }; if let Some(entry) = guard.modules.iter_mut().find(|m| m.name == module) { entry.store.insert(key, value); @@ -2213,13 +2210,7 @@ fn hyprland_request_socket() -> Result { hypr_dir.display() )), 1 => Ok(sockets.remove(0)), - _ => { - warn!( - "multiple Hyprland instances found in {}; using the first one", - hypr_dir.display() - ); - Ok(sockets.remove(0)) - } + _ => Ok(sockets.remove(0)), } } @@ -2283,17 +2274,11 @@ where Fut: std::future::Future, { std::thread::spawn(move || { - let rt = match tokio::runtime::Builder::new_current_thread() + tokio::runtime::Builder::new_current_thread() .enable_all() .build() - { - Ok(rt) => rt, - Err(e) => { - tracing::error!(error = %e, "bluetooth action: failed to build tokio runtime"); - return; - } - }; - rt.block_on(factory()); + .expect("bluetooth action thread") + .block_on(factory()); }); } @@ -2307,13 +2292,11 @@ where { let (tx, rx) = std::sync::mpsc::sync_channel(1); std::thread::spawn(move || { - let result = match tokio::runtime::Builder::new_current_thread() + let result = tokio::runtime::Builder::new_current_thread() .enable_all() .build() - { - Ok(rt) => rt.block_on(factory()), - Err(e) => Err(anyhow::anyhow!("bluetooth query: failed to build tokio runtime: {e}")), - }; + .expect("bluetooth query thread") + .block_on(factory()); let _ = tx.send(result); }); rx.recv() diff --git a/breadd/src/main.rs b/breadd/src/main.rs index e445773..809c879 100644 --- a/breadd/src/main.rs +++ b/breadd/src/main.rs @@ -8,7 +8,7 @@ use std::sync::atomic::AtomicU64; use std::sync::Arc; use anyhow::Result; -use bread_shared::{BreadEvent, RawEvent}; +use bread_shared::{AdapterSource, BreadEvent, RawEvent}; use tokio::sync::{broadcast, mpsc, watch, RwLock}; use tracing::{error, info}; use tracing_subscriber::EnvFilter; @@ -105,6 +105,12 @@ async fn main() -> Result<()> { }); } + let _ = normalized_tx.send(BreadEvent::new( + "bread.system.startup", + AdapterSource::System, + serde_json::json!({}), + )); + let ipc_server = ipc::Server::new( config.socket_path(), state_handle,