From 3115a4230b38a73a12b5a179e2dff85215c27152 Mon Sep 17 00:00:00 2001 From: Breadway Date: Tue, 23 Jun 2026 12:45:56 +0800 Subject: [PATCH] Fix 18 issues flagged in audit + bump to v0.6.2 P1-A: normalizer derives `online` from rtnetlink event kind (link.up/down, route.default.changed, address.added/removed) so bread.network.connected fires correctly on all systems using rtnetlink. P1-B: stream_events consumes the subscribe ack before the event loop so the first line is not printed as garbage. P1-C: UPowerAdapter::probe() validates D-Bus synchronously before committing; the sysfs fallback now actually triggers when D-Bus is unavailable. P2-A: profile.list returns the full profile state (active + history) instead of the always-empty profiles map. P2-B: profile history capped at 50 entries in both StateCommand and apply_event_to_state to prevent unbounded growth. P2-C: RtnetlinkAdapter::new() no longer spawns an orphaned tokio task; it validates availability by constructing and immediately dropping the connection tuple. P2-D: Lua-side hyprland_request_socket() logs a warn when multiple Hyprland instances are found, matching the adapter-side behaviour. P2-E: Malformed JSON from an IPC client returns an error response and continues rather than closing the entire connection. P3-A: Remove the `ends_with(".*")` prefix-match shortcut from both the subscription table and the IPC event filter. `bread.*` now means one segment (matching documented API semantics: `* = one segment`). Tests updated accordingly. P4-A: Remove unused `git2` and `glob` workspace dependencies (left over from bread-sync extraction). P4-B: breadd dev-dependency `tempfile` declared via workspace = true. P4-C: Remove unreachable XDG_CONFIG_HOME branch in modules_dir(); dirs already reads that var internally before returning None. P4-D: Delete duplicate send_request_with_stream(); print_doctor() now uses socket.exists() + send_request() directly. P5-A: release.yml drops `--lib` from cargo test so integration tests run in the release gate. P6-A: bluetooth_spawn / bluetooth_query replace expect() on tokio runtime construction with error logging / error propagation. P6-B: Spin loops in lua/mod.rs add std::thread::yield_now() after the PAUSE hint to reduce CPU burn under sustained RwLock contention. P6-C: All Mutex::lock().expect("... poisoned") in lua/mod.rs replaced with unwrap_or_else(|e| e.into_inner()) for poison recovery. P7-B: bread.system.startup event moved from main.rs into ipc::Server::serve() so it fires after the socket is bound (smaller race window for early subscribers). --- .github/workflows/release.yml | 2 +- Cargo.lock | 6 +-- Cargo.toml | 2 - bread-cli/Cargo.toml | 2 +- bread-cli/src/main.rs | 65 ++++++++---------------- bread-cli/src/modules_mgmt.rs | 3 -- bread-shared/Cargo.toml | 2 +- breadd/Cargo.toml | 4 +- breadd/src/adapters/mod.rs | 17 ++++--- breadd/src/adapters/network_rtnetlink.rs | 13 ++--- breadd/src/adapters/power_upower.rs | 8 +-- breadd/src/core/normalizer.rs | 29 ++++++++--- breadd/src/core/state_engine.rs | 6 +++ breadd/src/core/subscriptions.rs | 9 +--- breadd/src/ipc/mod.rs | 53 +++++++++++-------- breadd/src/lua/mod.rs | 53 ++++++++++++------- breadd/src/main.rs | 8 +-- 17 files changed, 146 insertions(+), 136 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index cacfb7a..217ff7e 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -24,7 +24,7 @@ jobs: run: cargo build --release --locked - name: test - run: cargo test --release --locked --workspace --lib + run: cargo test --release --locked --workspace - name: prepare artifacts run: | diff --git a/Cargo.lock b/Cargo.lock index 3f631e4..d7cdb06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -293,7 +293,7 @@ dependencies = [ [[package]] name = "bread-cli" -version = "0.6.1" +version = "0.6.2" dependencies = [ "anyhow", "bread-shared", @@ -311,7 +311,7 @@ dependencies = [ [[package]] name = "bread-shared" -version = "0.6.1" +version = "0.6.2" dependencies = [ "serde", "serde_json", @@ -319,7 +319,7 @@ dependencies = [ [[package]] name = "breadd" -version = "0.6.1" +version = "0.6.2" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 7d0c7a7..61edef8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,8 +18,6 @@ tokio = { version = "1.40", features = ["full"] } anyhow = "1.0" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } -git2 = "0.18" dirs = "5.0" chrono = { version = "0.4", features = ["serde"] } tempfile = "3" -glob = "0.3" diff --git a/bread-cli/Cargo.toml b/bread-cli/Cargo.toml index 5bdcb14..f415b1d 100644 --- a/bread-cli/Cargo.toml +++ b/bread-cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bread-cli" -version = "0.6.1" +version = "0.6.2" edition = "2021" [[bin]] diff --git a/bread-cli/src/main.rs b/bread-cli/src/main.rs index 64d44a0..f0703dc 100644 --- a/bread-cli/src/main.rs +++ b/bread-cli/src/main.rs @@ -374,6 +374,18 @@ async fn stream_events( .await?; let mut lines = BufReader::new(read_half).lines(); + + // Consume the subscribe ack before entering the event loop. + match lines.next_line().await? { + Some(ack) => { + let v: Value = serde_json::from_str(&ack)?; + if let Some(err) = v.get("error").and_then(Value::as_str) { + anyhow::bail!("{err}"); + } + } + None => anyhow::bail!("daemon closed connection during subscribe"), + } + while let Some(line) = lines.next_line().await? { let value: Value = serde_json::from_str(&line)?; if raw_json { @@ -507,23 +519,17 @@ async fn watch_reload(socket: &Path) -> Result<()> { } async fn print_doctor(socket: &Path) -> Result<()> { - let stream = match UnixStream::connect(socket).await { - Ok(stream) => stream, - Err(err) => { - if err.kind() == io::ErrorKind::NotFound { - println!("bread doctor"); - println!(" daemon ✗ not running"); - println!(" socket {} (not found)", socket.display()); - println!(); - println!(" start the daemon: systemctl --user start breadd"); - println!(" view logs: journalctl --user -u breadd -f"); - return Ok(()); - } - return Err(err.into()); - } - }; + if !socket.exists() { + println!("bread doctor"); + println!(" daemon ✗ not running"); + println!(" socket {} (not found)", socket.display()); + println!(); + println!(" start the daemon: systemctl --user start breadd"); + println!(" view logs: journalctl --user -u breadd -f"); + return Ok(()); + } - let response = send_request_with_stream(stream, "health", json!({})).await?; + let response = send_request(socket, "health", json!({})).await?; render_doctor(&response); Ok(()) } @@ -585,33 +591,6 @@ fn render_doctor(health: &Value) { } } -async fn send_request_with_stream( - stream: UnixStream, - method: &str, - params: Value, -) -> Result { - let (read_half, mut write_half) = stream.into_split(); - let request = json!({ - "id": "1", - "method": method, - "params": params, - }); - - write_half - .write_all(format!("{}\n", serde_json::to_string(&request)?).as_bytes()) - .await?; - - let mut lines = BufReader::new(read_half).lines(); - let Some(line) = lines.next_line().await? else { - anyhow::bail!("daemon closed connection without response"); - }; - let response: Value = serde_json::from_str(&line)?; - if let Some(error) = response.get("error").and_then(Value::as_str) { - anyhow::bail!(error.to_string()); - } - Ok(response.get("result").cloned().unwrap_or_else(|| json!({}))) -} - fn config_directory() -> PathBuf { if let Ok(xdg) = env::var("XDG_CONFIG_HOME") { return Path::new(&xdg).join("bread"); diff --git a/bread-cli/src/modules_mgmt.rs b/bread-cli/src/modules_mgmt.rs index f39a829..034b2c1 100644 --- a/bread-cli/src/modules_mgmt.rs +++ b/bread-cli/src/modules_mgmt.rs @@ -134,9 +134,6 @@ pub fn modules_dir() -> PathBuf { if let Some(cfg) = dirs::config_dir() { return cfg.join("bread").join("modules"); } - if let Ok(xdg) = std::env::var("XDG_CONFIG_HOME") { - return PathBuf::from(xdg).join("bread").join("modules"); - } if let Ok(home) = std::env::var("HOME") { return PathBuf::from(home) .join(".config") diff --git a/bread-shared/Cargo.toml b/bread-shared/Cargo.toml index aa4fe61..836c0ef 100644 --- a/bread-shared/Cargo.toml +++ b/bread-shared/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bread-shared" -version = "0.6.1" +version = "0.6.2" edition = "2021" [dependencies] diff --git a/breadd/Cargo.toml b/breadd/Cargo.toml index 19a3a67..3572947 100644 --- a/breadd/Cargo.toml +++ b/breadd/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "breadd" -version = "0.6.1" +version = "0.6.2" edition = "2021" [dependencies] @@ -23,4 +23,4 @@ netlink-packet-core = "0.4" libc = "0.2" [dev-dependencies] -tempfile = "3.13" +tempfile.workspace = true diff --git a/breadd/src/adapters/mod.rs b/breadd/src/adapters/mod.rs index dcd7870..18e8b98 100644 --- a/breadd/src/adapters/mod.rs +++ b/breadd/src/adapters/mod.rs @@ -76,14 +76,15 @@ impl Manager { } if self.config.adapters.power.enabled { - // Prefer UPower DBus adapter; fall back to sysfs poller - let upower = power_upower::UPowerAdapter::new(); - if let Ok(adapter) = upower { - self.spawn_adapter(adapter); - } else { - self.spawn_adapter(power::PowerAdapter::new( - self.config.adapters.power.poll_interval_secs, - )); + // Prefer UPower D-Bus adapter; fall back to sysfs poller if D-Bus is unavailable. + match power_upower::UPowerAdapter::probe().await { + Ok(adapter) => self.spawn_adapter(adapter), + Err(e) => { + info!("upower unavailable ({e}), falling back to sysfs power poller"); + self.spawn_adapter(power::PowerAdapter::new( + self.config.adapters.power.poll_interval_secs, + )); + } } } diff --git a/breadd/src/adapters/network_rtnetlink.rs b/breadd/src/adapters/network_rtnetlink.rs index 9e7d07e..017a1fb 100644 --- a/breadd/src/adapters/network_rtnetlink.rs +++ b/breadd/src/adapters/network_rtnetlink.rs @@ -16,16 +16,9 @@ pub struct RtnetlinkAdapter; impl RtnetlinkAdapter { pub fn new() -> Result { - // Try to create a connection to validate presence of rtnetlink - let conn = new_connection(); - match conn { - Ok((connection, _handle, _messages)) => { - // Spawn and immediately drop the connection task; we just validated - tokio::spawn(connection); - Ok(Self) - } - Err(e) => Err(anyhow!(e)), - } + // Validate that rtnetlink is available; drop the result immediately. + new_connection().map_err(|e| anyhow!(e))?; + Ok(Self) } } diff --git a/breadd/src/adapters/power_upower.rs b/breadd/src/adapters/power_upower.rs index a810179..2898048 100644 --- a/breadd/src/adapters/power_upower.rs +++ b/breadd/src/adapters/power_upower.rs @@ -15,9 +15,11 @@ use super::Adapter; pub struct UPowerAdapter; impl UPowerAdapter { - pub fn new() -> Result { - // Attempt to connect to system bus to validate availability - // We don't actually open the connection here because zbus::Connection::system() is async. + /// Try to connect to the D-Bus system bus. Returns Err if D-Bus is unavailable. + pub async fn probe() -> Result { + let _ = zbus::Connection::system() + .await + .map_err(|e| anyhow::anyhow!("D-Bus system bus unavailable: {e}"))?; Ok(Self) } } diff --git a/breadd/src/core/normalizer.rs b/breadd/src/core/normalizer.rs index 963838d..9961aed 100644 --- a/breadd/src/core/normalizer.rs +++ b/breadd/src/core/normalizer.rs @@ -382,22 +382,39 @@ impl EventNormalizer { } fn normalize_network(&self, raw: &RawEvent) -> Vec { - let online = raw - .payload - .get("online") - .and_then(Value::as_bool) - .unwrap_or(false); + // The sysfs NetworkAdapter puts `online: bool` directly in the payload. + // The rtnetlink adapter omits it; derive connectivity from the event kind instead. + let online = if let Some(v) = raw.payload.get("online").and_then(Value::as_bool) { + v + } else { + match raw.kind.as_str() { + "link.up" | "address.added" => true, + "link.down" | "address.removed" => false, + "route.default.changed" => raw + .payload + .get("gateway") + .map(|v| !v.is_null()) + .unwrap_or(false), + _ => return vec![], + } + }; + let name = if online { "bread.network.connected" } else { "bread.network.disconnected" }; + let mut data = raw.payload.clone(); + if let Some(obj) = data.as_object_mut() { + obj.insert("online".to_string(), Value::Bool(online)); + } + vec![BreadEvent { event: name.to_string(), timestamp: raw.timestamp, source: AdapterSource::Network, - data: raw.payload.clone(), + data, }] } diff --git a/breadd/src/core/state_engine.rs b/breadd/src/core/state_engine.rs index 2ed7006..5301a9d 100644 --- a/breadd/src/core/state_engine.rs +++ b/breadd/src/core/state_engine.rs @@ -315,6 +315,9 @@ async fn handle_command( let mut guard = state.write().await; if guard.profile.active != name { let previous = guard.profile.active.clone(); + if guard.profile.history.len() >= 50 { + guard.profile.history.remove(0); + } guard.profile.history.push(previous); guard.profile.active = name; } @@ -450,6 +453,9 @@ fn apply_event_to_state(state: &mut RuntimeState, event: &BreadEvent) { if let Some(name) = event.data.get("name").and_then(Value::as_str) { if state.profile.active != name { let previous = state.profile.active.clone(); + if state.profile.history.len() >= 50 { + state.profile.history.remove(0); + } state.profile.history.push(previous); state.profile.active = name.to_string(); } diff --git a/breadd/src/core/subscriptions.rs b/breadd/src/core/subscriptions.rs index 9c5d6de..77cdfb3 100644 --- a/breadd/src/core/subscriptions.rs +++ b/breadd/src/core/subscriptions.rs @@ -67,11 +67,6 @@ impl SubscriptionTable { } fn matches_pattern(pattern: &str, event_name: &str) -> bool { - if pattern.ends_with(".*") { - let prefix = &pattern[..pattern.len() - 1]; - return event_name.starts_with(prefix); - } - if let Some(prefix) = pattern.strip_suffix(".**") { if event_name == prefix { return true; @@ -150,11 +145,11 @@ mod tests { #[test] fn single_segment_wildcard() { - assert!(matches_pattern( + assert!(matches_pattern("bread.device.*", "bread.device.foo")); + assert!(!matches_pattern( "bread.device.*", "bread.device.dock.connected" )); - assert!(matches_pattern("bread.device.*", "bread.device.foo")); assert!(!matches_pattern("bread.device.*", "bread.device")); } diff --git a/breadd/src/ipc/mod.rs b/breadd/src/ipc/mod.rs index 587f1d0..c3fe8d6 100644 --- a/breadd/src/ipc/mod.rs +++ b/breadd/src/ipc/mod.rs @@ -93,6 +93,14 @@ impl Server { info!(socket = %self.socket_path.display(), "ipc server listening"); + // Emit the startup event after the socket is bound so that clients + // connecting immediately after the socket appears can subscribe and receive it. + let _ = self.emit_tx.send(BreadEvent::new( + "bread.system.startup", + AdapterSource::System, + serde_json::json!({}), + )); + loop { tokio::select! { _ = shutdown_rx.changed() => { @@ -124,7 +132,22 @@ impl Server { continue; } - let req: IpcRequest = serde_json::from_str(&line)?; + let req: IpcRequest = match serde_json::from_str(&line) { + Ok(r) => r, + Err(e) => { + let err_resp = IpcResponse { + id: "?".to_string(), + result: None, + error: Some(format!("parse error: {e}")), + }; + write_half + .write_all( + format!("{}\n", serde_json::to_string(&err_resp)?).as_bytes(), + ) + .await?; + continue; + } + }; if req.method == "events.subscribe" { let filter = req .params @@ -206,12 +229,8 @@ impl Server { } "profile.list" => { let full = self.state_handle.state_dump().await; - let profiles = full - .get("profile") - .and_then(|v| v.get("profiles")) - .cloned() - .unwrap_or_else(|| json!({})); - Ok(profiles) + let profile = full.get("profile").cloned().unwrap_or_else(|| json!({})); + Ok(profile) } "profile.activate" => { let Some(name) = req.params.get("name").and_then(Value::as_str) else { @@ -319,14 +338,8 @@ impl Server { } fn matches_filter(event_name: &str, pattern: &str) -> bool { - // Delegate to the same glob logic used by the subscription table so that - // `bread events --filter "bread.device.**"` behaves identically to - // `bread.on("bread.device.**", ...)` in Lua. - if pattern.ends_with(".*") { - let prefix = &pattern[..pattern.len() - 1]; - return event_name.starts_with(prefix); - } - + // Delegates to the same glob logic as the subscription table: + // `*` matches one segment (no dot-crossing), `**` matches any depth. if let Some(prefix) = pattern.strip_suffix(".**") { if event_name == prefix || event_name.starts_with(&format!("{prefix}.")) { return true; @@ -400,7 +413,7 @@ mod tests { #[test] fn filter_dot_star_matches_one_segment_only() { assert!(matches_filter("bread.device.connected", "bread.device.*")); - assert!(matches_filter( + assert!(!matches_filter( "bread.device.dock.connected", "bread.device.*" )); @@ -442,11 +455,9 @@ mod tests { } #[test] - fn filter_dot_star_at_end_acts_as_prefix_match() { - // `bread.*` ending the pattern is treated as a prefix match, so - // matches everything under `bread.` regardless of depth. This is - // consistent with the subscription table's pattern matcher. + fn filter_dot_star_matches_exactly_one_segment() { assert!(matches_filter("bread.alpha", "bread.*")); - assert!(matches_filter("bread.alpha.beta", "bread.*")); + assert!(!matches_filter("bread.alpha.beta", "bread.*")); + assert!(!matches_filter("bread", "bread.*")); } } diff --git a/breadd/src/lua/mod.rs b/breadd/src/lua/mod.rs index 484a0c6..68dde40 100644 --- a/breadd/src/lua/mod.rs +++ b/breadd/src/lua/mod.rs @@ -254,23 +254,23 @@ impl LuaEngine { self.lua = Lua::new(); self.handlers .lock() - .expect("lua handlers mutex poisoned") + .unwrap_or_else(|e| e.into_inner()) .clear(); self.watch_ids .lock() - .expect("lua watch ids mutex poisoned") + .unwrap_or_else(|e| e.into_inner()) .clear(); self.modules .lock() - .expect("lua modules mutex poisoned") + .unwrap_or_else(|e| e.into_inner()) .clear(); self.module_decls .lock() - .expect("lua module decls mutex poisoned") + .unwrap_or_else(|e| e.into_inner()) .clear(); self.module_order .lock() - .expect("lua module order mutex poisoned") + .unwrap_or_else(|e| e.into_inner()) .clear(); self.install_api()?; @@ -1138,7 +1138,7 @@ impl LuaEngine { let mut decl_map = self .module_decls .lock() - .expect("module decls mutex poisoned"); + .unwrap_or_else(|e| e.into_inner()); decl_map.clear(); for decl in &ordered { decl_map.insert(decl.name.clone(), decl.clone()); @@ -1176,7 +1176,7 @@ impl LuaEngine { *self .module_order .lock() - .expect("module order mutex poisoned") = load_order; + .unwrap_or_else(|e| e.into_inner()) = load_order; Ok(()) } @@ -1229,7 +1229,7 @@ impl LuaEngine { fn handle_event(&self, id: SubscriptionId, event: BreadEvent) -> Result<()> { let (callback, filter, raw_kind, kind, module) = { - let handlers = self.handlers.lock().expect("lua handlers mutex poisoned"); + let handlers = self.handlers.lock().unwrap_or_else(|e| e.into_inner()); let Some(entry) = handlers.get(&id) else { return Ok(()); }; @@ -1290,7 +1290,7 @@ impl LuaEngine { fn handle_timer(&self, id: TimerId) -> Result<()> { let (callback, repeating) = { - let timers = self.timers.lock().expect("lua timers mutex poisoned"); + let timers = self.timers.lock().unwrap_or_else(|e| e.into_inner()); let Some(entry) = timers.get(&id) else { return Ok(()); }; @@ -1334,7 +1334,7 @@ impl LuaEngine { let order = self .module_order .lock() - .expect("module order mutex poisoned") + .unwrap_or_else(|e| e.into_inner()) .clone(); for name in order { if let Some(hook) = self.get_module_hook(&name, "on_reload") { @@ -1356,7 +1356,7 @@ impl LuaEngine { let order = self .module_order .lock() - .expect("module order mutex poisoned") + .unwrap_or_else(|e| e.into_inner()) .clone(); for name in order.into_iter().rev() { if let Some(hook) = self.get_module_hook(&name, "on_unload") { @@ -1792,6 +1792,7 @@ fn state_value_to_lua<'lua>( break g; } std::hint::spin_loop(); + std::thread::yield_now(); }; let mut value = serde_json::to_value(&*snapshot).map_err(|e| LuaError::external(e.to_string()))?; @@ -1820,6 +1821,7 @@ fn module_store_get( break g; } std::hint::spin_loop(); + std::thread::yield_now(); }; let entry = guard.modules.iter().find(|m| m.name == module)?; entry.store.get(key).cloned() @@ -1836,6 +1838,7 @@ fn module_store_set( break g; } std::hint::spin_loop(); + std::thread::yield_now(); }; if let Some(entry) = guard.modules.iter_mut().find(|m| m.name == module) { entry.store.insert(key, value); @@ -2210,7 +2213,13 @@ fn hyprland_request_socket() -> Result { hypr_dir.display() )), 1 => Ok(sockets.remove(0)), - _ => Ok(sockets.remove(0)), + _ => { + warn!( + "multiple Hyprland instances found in {}; using the first one", + hypr_dir.display() + ); + Ok(sockets.remove(0)) + } } } @@ -2274,11 +2283,17 @@ where Fut: std::future::Future, { std::thread::spawn(move || { - tokio::runtime::Builder::new_current_thread() + let rt = match tokio::runtime::Builder::new_current_thread() .enable_all() .build() - .expect("bluetooth action thread") - .block_on(factory()); + { + Ok(rt) => rt, + Err(e) => { + tracing::error!(error = %e, "bluetooth action: failed to build tokio runtime"); + return; + } + }; + rt.block_on(factory()); }); } @@ -2292,11 +2307,13 @@ where { let (tx, rx) = std::sync::mpsc::sync_channel(1); std::thread::spawn(move || { - let result = tokio::runtime::Builder::new_current_thread() + let result = match tokio::runtime::Builder::new_current_thread() .enable_all() .build() - .expect("bluetooth query thread") - .block_on(factory()); + { + Ok(rt) => rt.block_on(factory()), + Err(e) => Err(anyhow::anyhow!("bluetooth query: failed to build tokio runtime: {e}")), + }; let _ = tx.send(result); }); rx.recv() diff --git a/breadd/src/main.rs b/breadd/src/main.rs index 809c879..e445773 100644 --- a/breadd/src/main.rs +++ b/breadd/src/main.rs @@ -8,7 +8,7 @@ use std::sync::atomic::AtomicU64; use std::sync::Arc; use anyhow::Result; -use bread_shared::{AdapterSource, BreadEvent, RawEvent}; +use bread_shared::{BreadEvent, RawEvent}; use tokio::sync::{broadcast, mpsc, watch, RwLock}; use tracing::{error, info}; use tracing_subscriber::EnvFilter; @@ -105,12 +105,6 @@ async fn main() -> Result<()> { }); } - let _ = normalized_tx.send(BreadEvent::new( - "bread.system.startup", - AdapterSource::System, - serde_json::json!({}), - )); - let ipc_server = ipc::Server::new( config.socket_path(), state_handle,