Final Release of Version 1.0

This commit is contained in:
Breadway 2026-05-13 22:01:42 +08:00
parent d44ece3649
commit 9a471f3158
34 changed files with 3129 additions and 567 deletions

View file

@ -1,10 +1,11 @@
[package]
name = "breadd"
version = "0.1.0"
version = "1.0.0"
edition = "2021"
[dependencies]
bread-shared = { path = "../bread-shared" }
bread-sync = { path = "../bread-sync" }
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
@ -17,7 +18,6 @@ toml = "0.8"
udev = { version = "0.9", features = ["send"] }
rtnetlink = "0.9"
zbus = { version = "3.13", features = ["tokio"] }
hex = "0.4"
futures-util = "0.3"
netlink-packet-route = "0.11"
netlink-packet-core = "0.4"

View file

@ -71,7 +71,10 @@ fn hyprland_event_socket() -> Result<PathBuf> {
.collect();
match sockets.len() {
0 => Err(anyhow!("no Hyprland instance found in {}", hypr_dir.display())),
0 => Err(anyhow!(
"no Hyprland instance found in {}",
hypr_dir.display()
)),
1 => Ok(sockets.remove(0)),
n => {
warn!("found {n} Hyprland instances, using first");

View file

@ -1,21 +1,21 @@
use anyhow::Result;
use async_trait::async_trait;
use bread_shared::RawEvent;
use tokio::sync::{mpsc, watch, RwLock};
use tracing::info;
use serde::Serialize;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{mpsc, watch, RwLock};
use tracing::info;
use crate::core::config::Config;
use crate::core::supervisor::spawn_supervised;
pub mod hyprland;
pub mod network;
pub mod power;
pub mod udev;
pub mod network_rtnetlink;
pub mod power;
pub mod power_upower;
pub mod udev;
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "snake_case")]
@ -71,7 +71,7 @@ impl Manager {
}
if self.config.adapters.hyprland.enabled {
self.spawn_adapter(hyprland::HyprlandAdapter::default());
self.spawn_adapter(hyprland::HyprlandAdapter);
}
if self.config.adapters.power.enabled {
@ -92,7 +92,7 @@ impl Manager {
if let Ok(adapter) = rt {
self.spawn_adapter(adapter);
} else {
self.spawn_adapter(network::NetworkAdapter::default());
self.spawn_adapter(network::NetworkAdapter);
}
}

View file

@ -70,7 +70,14 @@ impl Adapter for RtnetlinkAdapter {
"netns_id": netns_id,
"netns_fd": netns_fd
});
let _ = tx.send(RawEvent { source: AdapterSource::Network, kind: kind.to_string(), payload, timestamp: bread_shared::now_unix_ms() }).await;
let _ = tx
.send(RawEvent {
source: AdapterSource::Network,
kind: kind.to_string(),
payload,
timestamp: bread_shared::now_unix_ms(),
})
.await;
}
}
netlink_packet_core::NetlinkPayload::InnerMessage(RtnlMessage::NewRoute(route)) => {
@ -86,17 +93,32 @@ impl Adapter for RtnetlinkAdapter {
"gateway": gateway_ip,
"table": route.header.table
});
let _ = tx.send(RawEvent { source: AdapterSource::Network, kind: "route.default.changed".to_string(), payload, timestamp: bread_shared::now_unix_ms() }).await;
let _ = tx
.send(RawEvent {
source: AdapterSource::Network,
kind: "route.default.changed".to_string(),
payload,
timestamp: bread_shared::now_unix_ms(),
})
.await;
}
}
netlink_packet_core::NetlinkPayload::InnerMessage(RtnlMessage::NewAddress(addr)) => {
netlink_packet_core::NetlinkPayload::InnerMessage(RtnlMessage::NewAddress(
addr,
)) => {
let address = addr.nlas.iter().find_map(|nla| match nla {
netlink_packet_route::address::nlas::Nla::Address(bytes) => Some(bytes.clone()),
netlink_packet_route::address::nlas::Nla::Local(bytes) => Some(bytes.clone()),
netlink_packet_route::address::nlas::Nla::Address(bytes) => {
Some(bytes.clone())
}
netlink_packet_route::address::nlas::Nla::Local(bytes) => {
Some(bytes.clone())
}
_ => None,
});
let label = addr.nlas.iter().find_map(|nla| match nla {
netlink_packet_route::address::nlas::Nla::Label(label) => Some(label.clone()),
netlink_packet_route::address::nlas::Nla::Label(label) => {
Some(label.clone())
}
_ => None,
});
let ip = address.as_deref().and_then(ip_from_bytes);
@ -107,16 +129,31 @@ impl Adapter for RtnetlinkAdapter {
"address": ip,
"label": label
});
let _ = tx.send(RawEvent { source: AdapterSource::Network, kind: "address.added".to_string(), payload, timestamp: bread_shared::now_unix_ms() }).await;
let _ = tx
.send(RawEvent {
source: AdapterSource::Network,
kind: "address.added".to_string(),
payload,
timestamp: bread_shared::now_unix_ms(),
})
.await;
}
netlink_packet_core::NetlinkPayload::InnerMessage(RtnlMessage::DelAddress(addr)) => {
netlink_packet_core::NetlinkPayload::InnerMessage(RtnlMessage::DelAddress(
addr,
)) => {
let address = addr.nlas.iter().find_map(|nla| match nla {
netlink_packet_route::address::nlas::Nla::Address(bytes) => Some(bytes.clone()),
netlink_packet_route::address::nlas::Nla::Local(bytes) => Some(bytes.clone()),
netlink_packet_route::address::nlas::Nla::Address(bytes) => {
Some(bytes.clone())
}
netlink_packet_route::address::nlas::Nla::Local(bytes) => {
Some(bytes.clone())
}
_ => None,
});
let label = addr.nlas.iter().find_map(|nla| match nla {
netlink_packet_route::address::nlas::Nla::Label(label) => Some(label.clone()),
netlink_packet_route::address::nlas::Nla::Label(label) => {
Some(label.clone())
}
_ => None,
});
let ip = address.as_deref().and_then(ip_from_bytes);
@ -127,7 +164,14 @@ impl Adapter for RtnetlinkAdapter {
"address": ip,
"label": label
});
let _ = tx.send(RawEvent { source: AdapterSource::Network, kind: "address.removed".to_string(), payload, timestamp: bread_shared::now_unix_ms() }).await;
let _ = tx
.send(RawEvent {
source: AdapterSource::Network,
kind: "address.removed".to_string(),
payload,
timestamp: bread_shared::now_unix_ms(),
})
.await;
}
_ => {
debug!("unhandled netlink message");

View file

@ -6,8 +6,8 @@ use serde_json::json;
use std::collections::HashMap;
use tokio::sync::mpsc;
use tracing::{debug, info};
use zbus::{Message, MessageStream};
use zbus::zvariant::{OwnedObjectPath, OwnedValue};
use zbus::{Message, MessageStream};
use super::Adapter;

View file

@ -165,7 +165,11 @@ fn enumerate_with_udev(subsystems: &[String]) -> Result<Vec<ScannedDevice>> {
.or_else(|| dev.sysname().to_str().map(ToString::to_string))
.unwrap_or_else(|| "unknown".to_string());
let id = dev.syspath().to_string_lossy().to_string();
out.push(ScannedDevice { id, name, subsystem });
out.push(ScannedDevice {
id,
name,
subsystem,
});
}
Ok(out)

View file

@ -5,7 +5,7 @@ use std::path::{Path, PathBuf};
use anyhow::Result;
use serde::Deserialize;
#[derive(Debug, Clone, Deserialize)]
#[derive(Debug, Clone, Default, Deserialize)]
pub struct Config {
#[serde(default)]
pub daemon: DaemonConfig,
@ -45,7 +45,7 @@ pub struct ModulesConfig {
pub disable: Vec<String>,
}
#[derive(Debug, Clone, Deserialize)]
#[derive(Debug, Clone, Default, Deserialize)]
pub struct AdaptersConfig {
#[serde(default)]
pub hyprland: AdapterToggle,
@ -95,19 +95,6 @@ pub struct NotificationsConfig {
pub notify_send_path: String,
}
impl Default for Config {
fn default() -> Self {
Self {
daemon: DaemonConfig::default(),
lua: LuaConfig::default(),
modules: ModulesConfig::default(),
adapters: AdaptersConfig::default(),
notifications: NotificationsConfig::default(),
events: EventsConfig::default(),
}
}
}
impl Default for DaemonConfig {
fn default() -> Self {
Self {
@ -135,17 +122,6 @@ impl Default for ModulesConfig {
}
}
impl Default for AdaptersConfig {
fn default() -> Self {
Self {
hyprland: AdapterToggle::default(),
udev: UdevConfig::default(),
power: PowerConfig::default(),
network: AdapterToggle::default(),
}
}
}
impl Default for AdapterToggle {
fn default() -> Self {
Self {
@ -281,3 +257,241 @@ fn default_udev_subsystems() -> Vec<String> {
"power_supply".to_string(),
]
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
// Tests that mutate process env vars must serialize against each other
// — cargo runs tests in parallel by default and HOME/XDG_RUNTIME_DIR are
// process-global. Tests that don't touch env are free to run unguarded.
static ENV_LOCK: Mutex<()> = Mutex::new(());
struct EnvGuard {
saved: Vec<(&'static str, Option<String>)>,
_guard: std::sync::MutexGuard<'static, ()>,
}
impl EnvGuard {
fn new(vars: &[&'static str]) -> Self {
let guard = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let saved = vars.iter().map(|k| (*k, std::env::var(k).ok())).collect();
Self {
saved,
_guard: guard,
}
}
}
impl Drop for EnvGuard {
fn drop(&mut self) {
for (key, value) in &self.saved {
match value {
Some(v) => std::env::set_var(key, v),
None => std::env::remove_var(key),
}
}
}
}
#[test]
fn default_config_uses_documented_defaults() {
let cfg = Config::default();
assert_eq!(cfg.daemon.log_level, "info");
assert!(cfg.daemon.socket_path.is_empty());
assert_eq!(cfg.lua.entry_point, "~/.config/bread/init.lua");
assert_eq!(cfg.lua.module_path, "~/.config/bread/modules");
assert!(cfg.adapters.hyprland.enabled);
assert!(cfg.adapters.udev.enabled);
assert!(cfg.adapters.power.enabled);
assert!(cfg.adapters.network.enabled);
assert_eq!(cfg.adapters.power.poll_interval_secs, 30);
assert_eq!(cfg.events.dedup_window_ms, 100);
assert_eq!(cfg.notifications.default_timeout_ms, 3000);
assert_eq!(cfg.notifications.default_urgency, "normal");
assert_eq!(cfg.notifications.notify_send_path, "notify-send");
assert!(cfg.modules.builtin);
assert!(cfg.modules.disable.is_empty());
}
#[test]
fn default_udev_subsystems_match_documented_list() {
assert_eq!(
default_udev_subsystems(),
vec!["usb", "input", "drm", "power_supply"]
);
}
#[test]
fn parse_empty_toml_yields_defaults() {
let cfg: Config = toml::from_str("").unwrap();
assert_eq!(cfg.daemon.log_level, "info");
assert!(cfg.adapters.hyprland.enabled);
}
#[test]
fn parse_full_toml_overrides_all_values() {
let raw = r#"
[daemon]
log_level = "debug"
socket_path = "/tmp/custom.sock"
[lua]
entry_point = "/abs/init.lua"
module_path = "/abs/mods"
[modules]
builtin = false
disable = ["foo", "bar"]
[adapters.hyprland]
enabled = false
[adapters.udev]
enabled = true
subsystems = ["usb"]
[adapters.power]
enabled = false
poll_interval_secs = 5
[adapters.network]
enabled = false
[events]
dedup_window_ms = 250
[notifications]
default_timeout_ms = 1000
default_urgency = "critical"
notify_send_path = "/usr/local/bin/notify-send"
"#;
let cfg: Config = toml::from_str(raw).unwrap();
assert_eq!(cfg.daemon.log_level, "debug");
assert_eq!(cfg.daemon.socket_path, "/tmp/custom.sock");
assert_eq!(cfg.lua.entry_point, "/abs/init.lua");
assert_eq!(cfg.lua.module_path, "/abs/mods");
assert!(!cfg.modules.builtin);
assert_eq!(cfg.modules.disable, vec!["foo", "bar"]);
assert!(!cfg.adapters.hyprland.enabled);
assert!(cfg.adapters.udev.enabled);
assert_eq!(cfg.adapters.udev.subsystems, vec!["usb"]);
assert!(!cfg.adapters.power.enabled);
assert_eq!(cfg.adapters.power.poll_interval_secs, 5);
assert!(!cfg.adapters.network.enabled);
assert_eq!(cfg.events.dedup_window_ms, 250);
assert_eq!(cfg.notifications.default_timeout_ms, 1000);
assert_eq!(cfg.notifications.default_urgency, "critical");
}
#[test]
fn parse_partial_toml_fills_missing_with_defaults() {
let raw = r#"
[daemon]
log_level = "trace"
"#;
let cfg: Config = toml::from_str(raw).unwrap();
assert_eq!(cfg.daemon.log_level, "trace");
// Untouched sections still get their defaults.
assert!(cfg.adapters.hyprland.enabled);
assert_eq!(cfg.events.dedup_window_ms, 100);
}
#[test]
fn invalid_toml_returns_error() {
let result: Result<Config, _> = toml::from_str("[daemon\nbroken");
assert!(result.is_err());
}
#[test]
fn socket_path_uses_explicit_path_verbatim() {
let mut cfg = Config::default();
cfg.daemon.socket_path = "/run/bread.sock".to_string();
assert_eq!(cfg.socket_path(), PathBuf::from("/run/bread.sock"));
}
#[test]
fn socket_path_expands_tilde_when_explicit() {
let _g = EnvGuard::new(&["HOME"]);
std::env::set_var("HOME", "/synthetic/home");
let mut cfg = Config::default();
cfg.daemon.socket_path = "~/sockets/bread.sock".to_string();
assert_eq!(
cfg.socket_path(),
PathBuf::from("/synthetic/home/sockets/bread.sock")
);
}
#[test]
fn socket_path_falls_back_to_xdg_runtime_dir() {
let _g = EnvGuard::new(&["XDG_RUNTIME_DIR"]);
std::env::set_var("XDG_RUNTIME_DIR", "/tmp/xdg");
let cfg = Config::default();
assert_eq!(
cfg.socket_path(),
PathBuf::from("/tmp/xdg/bread/breadd.sock")
);
}
#[test]
fn socket_path_uses_tmp_when_no_xdg_runtime_dir() {
let _g = EnvGuard::new(&["XDG_RUNTIME_DIR"]);
std::env::remove_var("XDG_RUNTIME_DIR");
let cfg = Config::default();
assert_eq!(cfg.socket_path(), PathBuf::from("/tmp/bread/breadd.sock"));
}
#[test]
fn lua_entry_point_and_module_path_expand_tilde() {
let _g = EnvGuard::new(&["HOME"]);
std::env::set_var("HOME", "/synthetic/home");
let cfg = Config::default();
assert_eq!(
cfg.lua_entry_point(),
PathBuf::from("/synthetic/home/.config/bread/init.lua")
);
assert_eq!(
cfg.lua_module_path(),
PathBuf::from("/synthetic/home/.config/bread/modules")
);
}
#[test]
fn lua_entry_point_returns_absolute_path_unchanged() {
let mut cfg = Config::default();
cfg.lua.entry_point = "/etc/bread/init.lua".to_string();
assert_eq!(cfg.lua_entry_point(), PathBuf::from("/etc/bread/init.lua"));
}
#[test]
fn expand_home_handles_missing_home_env() {
let _g = EnvGuard::new(&["HOME"]);
std::env::remove_var("HOME");
// Without HOME, ~/-prefixed paths fall back to the literal string.
assert_eq!(expand_home("~/foo"), PathBuf::from("~/foo"));
// Non-tilde paths are unchanged regardless.
assert_eq!(expand_home("/abs/path"), PathBuf::from("/abs/path"));
}
#[test]
fn config_path_respects_xdg_config_home() {
let _g = EnvGuard::new(&["XDG_CONFIG_HOME", "HOME"]);
std::env::set_var("XDG_CONFIG_HOME", "/synthetic/xdg-config");
assert_eq!(
config_path(),
PathBuf::from("/synthetic/xdg-config/bread/breadd.toml")
);
}
#[test]
fn config_path_falls_back_to_home_when_no_xdg() {
let _g = EnvGuard::new(&["XDG_CONFIG_HOME", "HOME"]);
std::env::remove_var("XDG_CONFIG_HOME");
std::env::set_var("HOME", "/synthetic/home");
assert_eq!(
config_path(),
PathBuf::from("/synthetic/home/.config/bread/breadd.toml")
);
}
}

View file

@ -44,7 +44,11 @@ impl EventNormalizer {
}
fn normalize_udev(&self, raw: &RawEvent) -> Vec<BreadEvent> {
let action = raw.payload.get("action").and_then(Value::as_str).unwrap_or("change");
let action = raw
.payload
.get("action")
.and_then(Value::as_str)
.unwrap_or("change");
// "bind" is the kernel attaching a driver to an interface — not a meaningful
// device state change for automation purposes.
@ -52,11 +56,31 @@ impl EventNormalizer {
return vec![];
}
let name = raw.payload.get("name").and_then(Value::as_str).unwrap_or("unknown");
let vendor = raw.payload.get("id_vendor").and_then(Value::as_str).unwrap_or_default();
let vendor_id = raw.payload.get("vendor_id").and_then(Value::as_str).unwrap_or_default();
let product_id = raw.payload.get("product_id").and_then(Value::as_str).unwrap_or_default();
let subsystem = raw.payload.get("subsystem").and_then(Value::as_str).unwrap_or_default();
let name = raw
.payload
.get("name")
.and_then(Value::as_str)
.unwrap_or("unknown");
let vendor = raw
.payload
.get("id_vendor")
.and_then(Value::as_str)
.unwrap_or_default();
let vendor_id = raw
.payload
.get("vendor_id")
.and_then(Value::as_str)
.unwrap_or_default();
let product_id = raw
.payload
.get("product_id")
.and_then(Value::as_str)
.unwrap_or_default();
let subsystem = raw
.payload
.get("subsystem")
.and_then(Value::as_str)
.unwrap_or_default();
// Drop anonymous child USB interfaces (e.g. 3-5:1.0, 3-5:1.1) that carry
// no identity information — they are USB protocol artefacts, not devices.
@ -74,7 +98,10 @@ impl EventNormalizer {
_ => "changed",
};
if (verb == "connected" || verb == "disconnected") && !vendor_id.is_empty() && !product_id.is_empty() {
if (verb == "connected" || verb == "disconnected")
&& !vendor_id.is_empty()
&& !product_id.is_empty()
{
let device_key = format!("{}:{}:{}", verb, vendor_id, product_id);
let now = raw.timestamp;
let already_seen = {
@ -89,13 +116,18 @@ impl EventNormalizer {
let mut seen = self.seen_devices.write().unwrap_or_else(|p| p.into_inner());
seen.insert(device_key, now);
// Evict stale entries
let evict_before = now.saturating_sub(self.dedup_window_ms.saturating_mul(EVICT_MULTIPLIER));
let evict_before =
now.saturating_sub(self.dedup_window_ms.saturating_mul(EVICT_MULTIPLIER));
if evict_before > 0 {
seen.retain(|_, &mut last| last >= evict_before);
}
}
let id = raw.payload.get("id").and_then(Value::as_str).unwrap_or("unknown");
let id = raw
.payload
.get("id")
.and_then(Value::as_str)
.unwrap_or("unknown");
// Device name is always "unknown" here; the state engine applies user-defined
// classification rules from devices.lua before dispatching to subscribers.
@ -117,7 +149,11 @@ impl EventNormalizer {
}
fn normalize_hyprland(&self, raw: &RawEvent) -> Vec<BreadEvent> {
let kind = raw.payload.get("kind").and_then(Value::as_str).unwrap_or("unknown");
let kind = raw
.payload
.get("kind")
.and_then(Value::as_str)
.unwrap_or("unknown");
let data = raw
.payload
.get("data")
@ -168,7 +204,7 @@ impl EventNormalizer {
timestamp: raw.timestamp,
source: AdapterSource::Hyprland,
data: json!({
"address": fields.get(0).unwrap_or(&"")
"address": fields.first().unwrap_or(&"")
}),
}]
}
@ -179,7 +215,7 @@ impl EventNormalizer {
timestamp: raw.timestamp,
source: AdapterSource::Hyprland,
data: json!({
"address": fields.get(0).unwrap_or(&""),
"address": fields.first().unwrap_or(&""),
"workspace": fields.get(1).unwrap_or(&""),
"class": fields.get(2).unwrap_or(&""),
"title": fields.get(3).unwrap_or(&""),
@ -192,7 +228,7 @@ impl EventNormalizer {
event: "bread.window.closed".to_string(),
timestamp: raw.timestamp,
source: AdapterSource::Hyprland,
data: json!({ "address": fields.get(0).unwrap_or(&"") }),
data: json!({ "address": fields.first().unwrap_or(&"") }),
}]
}
"movewindow" => {
@ -202,7 +238,7 @@ impl EventNormalizer {
timestamp: raw.timestamp,
source: AdapterSource::Hyprland,
data: json!({
"address": fields.get(0).unwrap_or(&""),
"address": fields.first().unwrap_or(&""),
"workspace": fields.get(1).unwrap_or(&""),
}),
}]
@ -268,7 +304,11 @@ impl EventNormalizer {
}
fn normalize_network(&self, raw: &RawEvent) -> Vec<BreadEvent> {
let online = raw.payload.get("online").and_then(Value::as_bool).unwrap_or(false);
let online = raw
.payload
.get("online")
.and_then(Value::as_bool)
.unwrap_or(false);
let name = if online {
"bread.network.connected"
} else {
@ -310,7 +350,8 @@ impl EventNormalizer {
recent.insert(key.clone(), now);
// Evict stale entries to prevent unbounded growth.
let evict_before = now.saturating_sub(self.dedup_window_ms.saturating_mul(EVICT_MULTIPLIER));
let evict_before =
now.saturating_sub(self.dedup_window_ms.saturating_mul(EVICT_MULTIPLIER));
if evict_before > 0 {
recent.retain(|_, &mut last| last >= evict_before);
}
@ -326,3 +367,403 @@ fn split_hyprland_fields(data: &str) -> Vec<&str> {
data.split(">>").collect()
}
#[cfg(test)]
mod tests {
use super::*;
fn raw(source: AdapterSource, kind: &str, payload: Value, ts: u64) -> RawEvent {
RawEvent {
source,
kind: kind.to_string(),
payload,
timestamp: ts,
}
}
// ─── Udev ─────────────────────────────────────────────────────────────
#[test]
fn udev_add_emits_connected_with_identity_fields() {
let n = EventNormalizer::new(100);
let ev = raw(
AdapterSource::Udev,
"udev",
json!({
"action": "add",
"name": "Logitech Mouse",
"id_vendor": "Logitech",
"vendor_id": "046d",
"product_id": "c52b",
"subsystem": "usb",
"id": "1-1.4",
}),
1000,
);
let out = n.normalize(&ev);
assert_eq!(out.len(), 1);
assert_eq!(out[0].event, "bread.device.connected");
assert_eq!(out[0].data.get("vendor_id").unwrap(), "046d");
assert_eq!(out[0].data.get("product_id").unwrap(), "c52b");
assert_eq!(out[0].data.get("name").unwrap(), "Logitech Mouse");
assert_eq!(out[0].data.get("subsystem").unwrap(), "usb");
assert_eq!(out[0].data.get("device").unwrap(), "unknown");
}
#[test]
fn udev_remove_emits_disconnected() {
let n = EventNormalizer::new(100);
let ev = raw(
AdapterSource::Udev,
"udev",
json!({
"action": "remove",
"name": "Logitech",
"vendor_id": "046d",
"product_id": "c52b",
"subsystem": "usb",
"id": "1-1.4",
}),
1000,
);
let out = n.normalize(&ev);
assert_eq!(out.len(), 1);
assert_eq!(out[0].event, "bread.device.disconnected");
}
#[test]
fn udev_bind_action_is_suppressed() {
let n = EventNormalizer::new(100);
let ev = raw(
AdapterSource::Udev,
"udev",
json!({
"action": "bind",
"name": "x",
"vendor_id": "046d",
"product_id": "c52b",
}),
1000,
);
assert!(n.normalize(&ev).is_empty());
}
#[test]
fn udev_anonymous_child_interface_is_dropped() {
let n = EventNormalizer::new(100);
// No name, no vendor — pure USB protocol artefact.
let ev = raw(
AdapterSource::Udev,
"udev",
json!({
"action": "add",
"id": "3-5:1.0",
}),
1000,
);
assert!(n.normalize(&ev).is_empty());
}
#[test]
fn udev_dedupes_child_nodes_of_same_physical_device() {
let n = EventNormalizer::new(1000);
let mk = |id: &str, ts: u64| {
raw(
AdapterSource::Udev,
"udev",
json!({
"action": "add",
"name": "Hub Device",
"vendor_id": "1d6b",
"product_id": "0002",
"subsystem": "usb",
"id": id,
}),
ts,
)
};
// First child fires
assert_eq!(n.normalize(&mk("usb-1", 1000)).len(), 1);
// Sibling within window is suppressed
assert_eq!(n.normalize(&mk("usb-2", 1050)).len(), 0);
// After the dedup window, a sibling fires again
assert_eq!(n.normalize(&mk("usb-3", 3000)).len(), 1);
}
#[test]
fn udev_disconnect_does_not_share_dedup_with_connect() {
let n = EventNormalizer::new(1000);
let connect = raw(
AdapterSource::Udev,
"udev",
json!({"action": "add", "name": "x", "vendor_id": "1", "product_id": "2", "id": "a"}),
1000,
);
let disconnect = raw(
AdapterSource::Udev,
"udev",
json!({"action": "remove", "name": "x", "vendor_id": "1", "product_id": "2", "id": "a"}),
1100,
);
assert_eq!(n.normalize(&connect).len(), 1);
// Disconnect uses a different verb in the dedup key, so it fires.
assert_eq!(n.normalize(&disconnect).len(), 1);
}
// ─── Hyprland ─────────────────────────────────────────────────────────
#[test]
fn hyprland_workspace_change() {
let n = EventNormalizer::new(0);
let ev = raw(
AdapterSource::Hyprland,
"hypr",
json!({"kind": "workspace", "data": "2"}),
1,
);
let out = n.normalize(&ev);
assert_eq!(out.len(), 1);
assert_eq!(out[0].event, "bread.workspace.changed");
}
#[test]
fn hyprland_active_window_v2_parses_address_from_fields() {
let n = EventNormalizer::new(0);
let ev = raw(
AdapterSource::Hyprland,
"hypr",
json!({"kind": "activewindowv2", "data": "0xdeadbeef"}),
1,
);
let out = n.normalize(&ev);
assert_eq!(out.len(), 1);
assert_eq!(out[0].event, "bread.window.focused");
assert_eq!(out[0].data.get("address").unwrap(), "0xdeadbeef");
}
#[test]
fn hyprland_openwindow_splits_all_fields() {
let n = EventNormalizer::new(0);
let ev = raw(
AdapterSource::Hyprland,
"hypr",
json!({"kind": "openwindow", "data": "0xabc>>2>>firefox>>Mozilla Firefox"}),
1,
);
let out = n.normalize(&ev);
assert_eq!(out.len(), 1);
assert_eq!(out[0].event, "bread.window.opened");
let d = &out[0].data;
assert_eq!(d.get("address").unwrap(), "0xabc");
assert_eq!(d.get("workspace").unwrap(), "2");
assert_eq!(d.get("class").unwrap(), "firefox");
assert_eq!(d.get("title").unwrap(), "Mozilla Firefox");
}
#[test]
fn hyprland_unknown_kind_falls_through_to_generic_event() {
let n = EventNormalizer::new(0);
let ev = raw(
AdapterSource::Hyprland,
"hypr",
json!({"kind": "submap", "data": "resize"}),
1,
);
let out = n.normalize(&ev);
assert_eq!(out.len(), 1);
assert_eq!(out[0].event, "bread.hyprland.event");
}
#[test]
fn hyprland_monitor_lifecycle() {
let n = EventNormalizer::new(0);
let added = n.normalize(&raw(
AdapterSource::Hyprland,
"hypr",
json!({"kind": "monitoradded", "data": "HDMI-A-1"}),
1,
));
let removed = n.normalize(&raw(
AdapterSource::Hyprland,
"hypr",
json!({"kind": "monitorremoved", "data": "HDMI-A-1"}),
2,
));
assert_eq!(added[0].event, "bread.monitor.connected");
assert_eq!(added[0].data.get("name").unwrap(), "HDMI-A-1");
assert_eq!(removed[0].event, "bread.monitor.disconnected");
}
// ─── Power ─────────────────────────────────────────────────────────────
#[test]
fn power_ac_connected_emits_named_event() {
let n = EventNormalizer::new(0);
let out = n.normalize(&raw(
AdapterSource::Power,
"power",
json!({"ac_connected": true}),
1,
));
assert_eq!(out.len(), 1);
assert_eq!(out[0].event, "bread.power.ac.connected");
}
#[test]
fn power_battery_thresholds_select_correct_event() {
let n = EventNormalizer::new(0);
let cases = [
(3, "bread.power.battery.critical"),
(5, "bread.power.battery.critical"),
(8, "bread.power.battery.very_low"),
(10, "bread.power.battery.very_low"),
(15, "bread.power.battery.low"),
(20, "bread.power.battery.low"),
(100, "bread.power.battery.full"),
];
for (level, expected) in cases {
let out = n.normalize(&raw(
AdapterSource::Power,
"power",
json!({"battery_percent": level}),
level * 1000,
));
assert_eq!(
out[0].event, expected,
"level {level} should map to {expected}"
);
}
}
#[test]
fn power_mid_range_battery_emits_generic_changed() {
let n = EventNormalizer::new(0);
let out = n.normalize(&raw(
AdapterSource::Power,
"power",
json!({"battery_percent": 50}),
1,
));
assert_eq!(out.len(), 1);
assert_eq!(out[0].event, "bread.power.changed");
}
#[test]
fn power_ac_and_battery_can_both_fire() {
let n = EventNormalizer::new(0);
let out = n.normalize(&raw(
AdapterSource::Power,
"power",
json!({"ac_connected": false, "battery_percent": 4}),
1,
));
let names: Vec<&str> = out.iter().map(|e| e.event.as_str()).collect();
assert!(names.contains(&"bread.power.ac.disconnected"));
assert!(names.contains(&"bread.power.battery.critical"));
}
// ─── Network ───────────────────────────────────────────────────────────
#[test]
fn network_online_and_offline() {
let n = EventNormalizer::new(0);
let online = n.normalize(&raw(
AdapterSource::Network,
"net",
json!({"online": true}),
1,
));
let offline = n.normalize(&raw(
AdapterSource::Network,
"net",
json!({"online": false}),
2,
));
assert_eq!(online[0].event, "bread.network.connected");
assert_eq!(offline[0].event, "bread.network.disconnected");
}
// ─── System pass-through ───────────────────────────────────────────────
#[test]
fn system_events_pass_through_unchanged() {
let n = EventNormalizer::new(0);
let out = n.normalize(&raw(
AdapterSource::System,
"bread.custom.event",
json!({"foo": "bar"}),
1,
));
assert_eq!(out.len(), 1);
assert_eq!(out[0].event, "bread.custom.event");
assert_eq!(out[0].source, AdapterSource::System);
assert_eq!(out[0].data.get("foo").unwrap(), "bar");
}
// ─── Dedup ─────────────────────────────────────────────────────────────
#[test]
fn dedup_drops_duplicate_within_window() {
let n = EventNormalizer::new(500);
let ev = raw(AdapterSource::Network, "net", json!({"online": true}), 1000);
assert_eq!(n.normalize(&ev).len(), 1);
let dup = raw(AdapterSource::Network, "net", json!({"online": true}), 1200);
assert_eq!(n.normalize(&dup).len(), 0);
}
#[test]
fn dedup_allows_after_window_elapses() {
let n = EventNormalizer::new(500);
let first = raw(AdapterSource::Network, "net", json!({"online": true}), 1000);
assert_eq!(n.normalize(&first).len(), 1);
let later = raw(AdapterSource::Network, "net", json!({"online": true}), 2000);
assert_eq!(n.normalize(&later).len(), 1);
}
#[test]
fn dedup_distinguishes_different_payloads() {
let n = EventNormalizer::new(10_000);
let a = raw(
AdapterSource::Hyprland,
"hypr",
json!({"kind": "workspace", "data": "1"}),
1000,
);
let b = raw(
AdapterSource::Hyprland,
"hypr",
json!({"kind": "workspace", "data": "2"}),
1100,
);
assert_eq!(n.normalize(&a).len(), 1);
// Different payloads = different dedup key
assert_eq!(n.normalize(&b).len(), 1);
}
#[test]
fn dedup_window_of_zero_allows_everything() {
let n = EventNormalizer::new(0);
for _ in 0..3 {
assert_eq!(
n.normalize(&raw(
AdapterSource::Network,
"net",
json!({"online": true}),
1000,
))
.len(),
1
);
}
}
// ─── Helper ────────────────────────────────────────────────────────────
#[test]
fn split_fields_handles_empty_and_single() {
assert!(split_hyprland_fields("").is_empty());
assert_eq!(split_hyprland_fields("only"), vec!["only"]);
assert_eq!(split_hyprland_fields("a>>b>>c"), vec!["a", "b", "c"]);
}
}

View file

@ -1,6 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use anyhow::Result;
use bread_shared::{AdapterSource, BreadEvent};
@ -9,14 +9,15 @@ use tokio::sync::{broadcast, mpsc, watch, RwLock};
use tracing::warn;
use crate::core::subscriptions::{SubscriptionId, SubscriptionTable};
use crate::core::types::{Device, DeviceRule, InterfaceState, MatchCondition, ModuleLoadState, RuntimeState};
use crate::core::types::{
Device, DeviceRule, InterfaceState, MatchCondition, ModuleLoadState, RuntimeState,
};
use crate::lua::LuaMessage;
#[derive(Clone)]
pub struct StateHandle {
state: Arc<RwLock<RuntimeState>>,
command_tx: mpsc::UnboundedSender<StateCommand>,
subscription_count: Arc<AtomicU64>,
}
pub enum StateCommand {
@ -53,13 +54,8 @@ impl StateHandle {
pub fn new(
state: Arc<RwLock<RuntimeState>>,
command_tx: mpsc::UnboundedSender<StateCommand>,
subscription_count: Arc<AtomicU64>,
) -> Self {
Self {
state,
command_tx,
subscription_count,
}
Self { state, command_tx }
}
pub fn state_arc(&self) -> Arc<RwLock<RuntimeState>> {
@ -86,18 +82,21 @@ impl StateHandle {
serde_json::to_value(&*state).unwrap_or_else(|_| serde_json::json!({}))
}
pub fn register_subscription(&self, id: SubscriptionId, pattern: String, once: bool) -> Result<()> {
pub fn register_subscription(
&self,
id: SubscriptionId,
pattern: String,
once: bool,
) -> Result<()> {
self.command_tx
.send(StateCommand::RegisterSubscription {
id,
pattern,
once,
})
.send(StateCommand::RegisterSubscription { id, pattern, once })
.map_err(|_| anyhow::anyhow!("state engine command channel closed"))
}
pub fn remove_subscription(&self, id: SubscriptionId) {
let _ = self.command_tx.send(StateCommand::RemoveSubscription { id });
let _ = self
.command_tx
.send(StateCommand::RemoveSubscription { id });
}
pub fn register_watch(&self, id: SubscriptionId, path: String) -> Result<()> {
@ -140,10 +139,6 @@ impl StateHandle {
pub fn set_device_rules(&self, rules: Vec<DeviceRule>) {
let _ = self.command_tx.send(StateCommand::SetDeviceRules(rules));
}
pub fn subscription_count(&self) -> Arc<AtomicU64> {
self.subscription_count.clone()
}
}
pub async fn run_state_engine(
@ -376,8 +371,16 @@ fn apply_event_to_state(state: &mut RuntimeState, event: &BreadEvent) {
state.monitors.push(crate::core::types::Monitor {
name: name.to_string(),
connected: true,
resolution: event.data.get("resolution").and_then(Value::as_str).map(ToString::to_string),
position: event.data.get("position").and_then(Value::as_str).map(ToString::to_string),
resolution: event
.data
.get("resolution")
.and_then(Value::as_str)
.map(ToString::to_string),
position: event
.data
.get("position")
.and_then(Value::as_str)
.map(ToString::to_string),
});
}
}
@ -403,7 +406,7 @@ fn apply_event_to_state(state: &mut RuntimeState, event: &BreadEvent) {
.data
.get("window")
.or_else(|| event.data.get("class"))
.or_else(|| event.data.get("address"))
.or_else(|| event.data.get("address"))
.and_then(Value::as_str)
.map(ToString::to_string);
}
@ -421,7 +424,10 @@ fn apply_event_to_state(state: &mut RuntimeState, event: &BreadEvent) {
state.network.interfaces.clear();
for (name, meta) in ifaces {
let up = meta.get("up").and_then(Value::as_bool).unwrap_or(false);
state.network.interfaces.insert(name.clone(), InterfaceState { up });
state
.network
.interfaces
.insert(name.clone(), InterfaceState { up });
}
}
}
@ -455,7 +461,8 @@ fn apply_event_to_state(state: &mut RuntimeState, event: &BreadEvent) {
fn resolve_device(rules: &[DeviceRule], data: &Value) -> String {
for rule in rules {
if !rule.conditions.is_empty() && rule.conditions.iter().all(|c| condition_matches(c, data)) {
if !rule.conditions.is_empty() && rule.conditions.iter().all(|c| condition_matches(c, data))
{
return rule.device.clone();
}
}
@ -476,37 +483,68 @@ fn condition_matches(cond: &MatchCondition, data: &Value) -> bool {
}
}
if let Some(ref expected) = cond.name {
let actual = data.get("name").and_then(Value::as_str).unwrap_or("").to_lowercase();
let actual = data
.get("name")
.and_then(Value::as_str)
.unwrap_or("")
.to_lowercase();
if actual != expected.to_lowercase() {
return false;
}
}
if let Some(ref expected) = cond.vendor {
let actual = data.get("vendor").and_then(Value::as_str).unwrap_or("").to_lowercase();
let actual = data
.get("vendor")
.and_then(Value::as_str)
.unwrap_or("")
.to_lowercase();
if actual != expected.to_lowercase() {
return false;
}
}
if let Some(ref contains) = cond.name_contains {
let name = data.get("name").and_then(Value::as_str).unwrap_or("").to_lowercase();
let vendor = data.get("vendor").and_then(Value::as_str).unwrap_or("").to_lowercase();
let name = data
.get("name")
.and_then(Value::as_str)
.unwrap_or("")
.to_lowercase();
let vendor = data
.get("vendor")
.and_then(Value::as_str)
.unwrap_or("")
.to_lowercase();
let combined = format!("{name} {vendor}");
if !combined.contains(contains.to_lowercase().as_str()) {
return false;
}
}
if let Some(expected) = cond.id_input_keyboard {
if data.get("id_input_keyboard").and_then(Value::as_bool).unwrap_or(false) != expected {
if data
.get("id_input_keyboard")
.and_then(Value::as_bool)
.unwrap_or(false)
!= expected
{
return false;
}
}
if let Some(expected) = cond.id_input_mouse {
if data.get("id_input_mouse").and_then(Value::as_bool).unwrap_or(false) != expected {
if data
.get("id_input_mouse")
.and_then(Value::as_bool)
.unwrap_or(false)
!= expected
{
return false;
}
}
if let Some(expected) = cond.id_input_tablet {
if data.get("id_input_tablet").and_then(Value::as_bool).unwrap_or(false) != expected {
if data
.get("id_input_tablet")
.and_then(Value::as_bool)
.unwrap_or(false)
!= expected
{
return false;
}
}
@ -526,7 +564,10 @@ fn condition_matches(cond: &MatchCondition, data: &Value) -> bool {
}
}
if let Some(ref expected) = cond.id_usb_class {
let actual = data.get("id_usb_class").and_then(Value::as_str).unwrap_or("");
let actual = data
.get("id_usb_class")
.and_then(Value::as_str)
.unwrap_or("");
if actual.to_lowercase() != expected.to_lowercase()
&& actual.to_lowercase() != format!("0x{}", expected.to_lowercase())
{
@ -534,7 +575,11 @@ fn condition_matches(cond: &MatchCondition, data: &Value) -> bool {
}
}
if let Some(ref expected) = cond.subsystem {
let actual = data.get("subsystem").and_then(Value::as_str).unwrap_or("").to_lowercase();
let actual = data
.get("subsystem")
.and_then(Value::as_str)
.unwrap_or("")
.to_lowercase();
if actual != expected.to_lowercase() {
return false;
}
@ -586,3 +631,407 @@ fn apply_device_change(state: &mut RuntimeState, data: &Value, connected: bool)
state.devices.connected.retain(|d| d.id != id);
}
}
#[cfg(test)]
mod tests {
use super::*;
fn ev(name: &str, data: Value) -> BreadEvent {
BreadEvent {
event: name.to_string(),
timestamp: 0,
source: AdapterSource::System,
data,
}
}
// ─── value_at_path ────────────────────────────────────────────────────
#[test]
fn value_at_path_returns_root_for_empty_path() {
let v = json!({"a": 1});
assert_eq!(value_at_path(&v, ""), Some(json!({"a": 1})));
}
#[test]
fn value_at_path_navigates_nested_keys() {
let v = json!({"a": {"b": {"c": 42}}});
assert_eq!(value_at_path(&v, "a.b.c"), Some(json!(42)));
}
#[test]
fn value_at_path_returns_none_on_missing_key() {
let v = json!({"a": 1});
assert!(value_at_path(&v, "missing").is_none());
assert!(value_at_path(&v, "a.b.c").is_none());
}
// ─── apply_event_to_state: monitors ───────────────────────────────────
#[test]
fn monitor_connect_adds_new_monitor() {
let mut state = RuntimeState::default();
apply_event_to_state(
&mut state,
&ev(
"bread.monitor.connected",
json!({"name": "DP-1", "resolution": "1920x1080", "position": "0x0"}),
),
);
assert_eq!(state.monitors.len(), 1);
assert_eq!(state.monitors[0].name, "DP-1");
assert!(state.monitors[0].connected);
assert_eq!(state.monitors[0].resolution.as_deref(), Some("1920x1080"));
assert_eq!(state.monitors[0].position.as_deref(), Some("0x0"));
}
#[test]
fn monitor_reconnect_does_not_duplicate() {
let mut state = RuntimeState::default();
let mk = || ev("bread.monitor.connected", json!({"name": "DP-1"}));
apply_event_to_state(&mut state, &mk());
apply_event_to_state(
&mut state,
&ev("bread.monitor.disconnected", json!({"name": "DP-1"})),
);
apply_event_to_state(&mut state, &mk());
assert_eq!(state.monitors.len(), 1);
assert!(state.monitors[0].connected);
}
#[test]
fn monitor_disconnect_keeps_record_but_flips_connected_flag() {
let mut state = RuntimeState::default();
apply_event_to_state(
&mut state,
&ev("bread.monitor.connected", json!({"name": "DP-1"})),
);
apply_event_to_state(
&mut state,
&ev("bread.monitor.disconnected", json!({"name": "DP-1"})),
);
assert_eq!(state.monitors.len(), 1);
assert!(!state.monitors[0].connected);
}
// ─── apply_event_to_state: workspace + window ─────────────────────────
#[test]
fn workspace_changed_updates_active_workspace() {
let mut state = RuntimeState::default();
apply_event_to_state(
&mut state,
&ev("bread.workspace.changed", json!({"workspace": "3"})),
);
assert_eq!(state.active_workspace.as_deref(), Some("3"));
// Falls back to `id` when `workspace` is absent.
apply_event_to_state(
&mut state,
&ev("bread.workspace.changed", json!({"id": "5"})),
);
assert_eq!(state.active_workspace.as_deref(), Some("5"));
}
#[test]
fn window_focus_change_updates_active_window() {
let mut state = RuntimeState::default();
apply_event_to_state(
&mut state,
&ev("bread.window.focus.changed", json!({"window": "firefox"})),
);
assert_eq!(state.active_window.as_deref(), Some("firefox"));
// Falls back to `class`, then `address`.
apply_event_to_state(
&mut state,
&ev("bread.window.focused", json!({"address": "0xdeadbeef"})),
);
assert_eq!(state.active_window.as_deref(), Some("0xdeadbeef"));
}
// ─── apply_device_change ──────────────────────────────────────────────
#[test]
fn device_connect_adds_device_with_all_fields() {
let mut state = RuntimeState::default();
apply_device_change(
&mut state,
&json!({
"id": "1-1.4",
"name": "Logitech Mouse",
"device": "mouse",
"subsystem": "usb",
"vendor_id": "046d",
"product_id": "c52b",
}),
true,
);
assert_eq!(state.devices.connected.len(), 1);
let d = &state.devices.connected[0];
assert_eq!(d.id, "1-1.4");
assert_eq!(d.name, "Logitech Mouse");
assert_eq!(d.device, "mouse");
assert_eq!(d.subsystem, "usb");
assert_eq!(d.vendor_id.as_deref(), Some("046d"));
assert_eq!(d.product_id.as_deref(), Some("c52b"));
}
#[test]
fn device_connect_is_idempotent_for_same_id() {
let mut state = RuntimeState::default();
let data = json!({"id": "x", "device": "dock", "name": "Dock"});
apply_device_change(&mut state, &data, true);
apply_device_change(&mut state, &data, true);
assert_eq!(state.devices.connected.len(), 1);
}
#[test]
fn device_disconnect_removes_matching_id() {
let mut state = RuntimeState::default();
apply_device_change(&mut state, &json!({"id": "a", "device": "x"}), true);
apply_device_change(&mut state, &json!({"id": "b", "device": "y"}), true);
assert_eq!(state.devices.connected.len(), 2);
apply_device_change(&mut state, &json!({"id": "a"}), false);
assert_eq!(state.devices.connected.len(), 1);
assert_eq!(state.devices.connected[0].id, "b");
}
#[test]
fn device_disconnect_of_unknown_id_is_noop() {
let mut state = RuntimeState::default();
apply_device_change(&mut state, &json!({"id": "a", "device": "x"}), true);
apply_device_change(&mut state, &json!({"id": "ghost"}), false);
assert_eq!(state.devices.connected.len(), 1);
}
// ─── apply_event_to_state: power ──────────────────────────────────────
#[test]
fn power_event_updates_ac_and_battery_low_flag() {
let mut state = RuntimeState::default();
apply_event_to_state(
&mut state,
&ev(
"bread.power.battery.low",
json!({"ac_connected": false, "battery_percent": 18}),
),
);
assert!(!state.power.ac_connected);
assert_eq!(state.power.battery_percent, Some(18));
assert!(state.power.battery_low);
// 25% is no longer "low"
apply_event_to_state(
&mut state,
&ev("bread.power.changed", json!({"battery_percent": 25})),
);
assert!(!state.power.battery_low);
}
#[test]
fn power_clamps_battery_percent_to_100() {
let mut state = RuntimeState::default();
apply_event_to_state(
&mut state,
&ev("bread.power.changed", json!({"battery_percent": 250u64})),
);
assert_eq!(state.power.battery_percent, Some(100));
}
// ─── apply_event_to_state: network ────────────────────────────────────
#[test]
fn network_event_updates_online_flag_and_interfaces() {
let mut state = RuntimeState::default();
apply_event_to_state(
&mut state,
&ev(
"bread.network.connected",
json!({
"online": true,
"interfaces": {
"wlan0": {"up": true},
"eth0": {"up": false},
}
}),
),
);
assert!(state.network.online);
assert_eq!(state.network.interfaces.len(), 2);
assert!(state.network.interfaces["wlan0"].up);
assert!(!state.network.interfaces["eth0"].up);
}
// ─── apply_event_to_state: profile ────────────────────────────────────
#[test]
fn profile_activated_pushes_previous_to_history() {
let mut state = RuntimeState::default();
// Initial active is "default".
apply_event_to_state(
&mut state,
&ev("bread.profile.activated", json!({"name": "battery"})),
);
assert_eq!(state.profile.active, "battery");
assert_eq!(state.profile.history, vec!["default"]);
apply_event_to_state(
&mut state,
&ev("bread.profile.activated", json!({"name": "ac"})),
);
assert_eq!(state.profile.active, "ac");
assert_eq!(state.profile.history, vec!["default", "battery"]);
}
#[test]
fn profile_activated_to_same_name_is_noop() {
let mut state = RuntimeState::default();
apply_event_to_state(
&mut state,
&ev("bread.profile.activated", json!({"name": "default"})),
);
assert_eq!(state.profile.active, "default");
assert!(state.profile.history.is_empty());
}
#[test]
fn unknown_event_does_not_mutate_state() {
let mut state = RuntimeState::default();
let before = serde_json::to_value(&state).unwrap();
apply_event_to_state(
&mut state,
&ev("bread.unknown.event", json!({"foo": "bar"})),
);
let after = serde_json::to_value(&state).unwrap();
assert_eq!(before, after);
}
// ─── condition_matches ────────────────────────────────────────────────
#[test]
fn condition_vendor_id_matches_case_insensitively() {
let cond = MatchCondition {
vendor_id: Some("046D".to_string()),
..Default::default()
};
assert!(condition_matches(&cond, &json!({"vendor_id": "046d"})));
assert!(!condition_matches(&cond, &json!({"vendor_id": "1234"})));
}
#[test]
fn condition_name_contains_searches_name_and_vendor() {
let cond = MatchCondition {
name_contains: Some("logi".to_string()),
..Default::default()
};
assert!(condition_matches(&cond, &json!({"name": "Logitech MX"})));
assert!(condition_matches(&cond, &json!({"vendor": "Logitech Inc"})));
assert!(!condition_matches(&cond, &json!({"name": "Apple"})));
}
#[test]
fn condition_input_flags_match_booleans() {
let cond = MatchCondition {
id_input_keyboard: Some(true),
..Default::default()
};
assert!(condition_matches(
&cond,
&json!({"id_input_keyboard": true})
));
assert!(!condition_matches(
&cond,
&json!({"id_input_keyboard": false})
));
// Missing field defaults to false.
assert!(!condition_matches(&cond, &json!({})));
}
#[test]
fn condition_usb_hub_requires_hub_and_secondary_class() {
let cond = MatchCondition {
usb_hub: Some(true),
..Default::default()
};
assert!(condition_matches(
&cond,
&json!({"id_usb_interfaces": ":0900:0e00:"})
));
// Hub alone is not enough.
assert!(!condition_matches(
&cond,
&json!({"id_usb_interfaces": ":0900:"})
));
// Secondary alone is not enough.
assert!(!condition_matches(
&cond,
&json!({"id_usb_interfaces": ":0e00:"})
));
}
#[test]
fn condition_id_usb_class_accepts_with_or_without_0x_prefix() {
let cond = MatchCondition {
id_usb_class: Some("0e".to_string()),
..Default::default()
};
assert!(condition_matches(&cond, &json!({"id_usb_class": "0e"})));
assert!(condition_matches(&cond, &json!({"id_usb_class": "0x0e"})));
assert!(!condition_matches(&cond, &json!({"id_usb_class": "ff"})));
}
#[test]
fn condition_empty_matches_anything() {
let cond = MatchCondition::default();
assert!(condition_matches(&cond, &json!({})));
assert!(condition_matches(&cond, &json!({"vendor_id": "anything"})));
}
// ─── resolve_device ───────────────────────────────────────────────────
#[test]
fn resolve_device_returns_first_matching_rule() {
let rules = vec![
DeviceRule {
device: "mouse".to_string(),
conditions: vec![MatchCondition {
vendor_id: Some("046d".to_string()),
..Default::default()
}],
},
DeviceRule {
device: "dock".to_string(),
conditions: vec![MatchCondition {
vendor_id: Some("17ef".to_string()),
..Default::default()
}],
},
];
assert_eq!(
resolve_device(&rules, &json!({"vendor_id": "046d"})),
"mouse"
);
assert_eq!(
resolve_device(&rules, &json!({"vendor_id": "17ef"})),
"dock"
);
assert_eq!(
resolve_device(&rules, &json!({"vendor_id": "0000"})),
"unknown"
);
}
#[test]
fn resolve_device_skips_rules_with_no_conditions() {
let rules = vec![DeviceRule {
device: "wildcard".to_string(),
conditions: vec![],
}];
assert_eq!(resolve_device(&rules, &json!({})), "unknown");
}
#[test]
fn resolve_device_with_empty_ruleset_returns_unknown() {
assert_eq!(resolve_device(&[], &json!({"vendor_id": "x"})), "unknown");
}
}

View file

@ -18,7 +18,12 @@ pub struct SubscriptionTable {
}
impl SubscriptionTable {
pub fn add_with_id(&mut self, id: SubscriptionId, pattern: String, once: bool) -> SubscriptionId {
pub fn add_with_id(
&mut self,
id: SubscriptionId,
pattern: String,
once: bool,
) -> SubscriptionId {
self.next_id = self.next_id.max(id.0.saturating_add(1));
let sub = Subscription { id, pattern, once };
@ -129,24 +134,36 @@ fn matches_glob(pattern: &[u8], text: &[u8]) -> bool {
#[cfg(test)]
mod tests {
use super::matches_pattern;
use super::*;
#[test]
fn exact_match() {
assert!(matches_pattern("bread.device.dock.connected", "bread.device.dock.connected"));
assert!(!matches_pattern("bread.device.dock.connected", "bread.device.dock.disconnected"));
assert!(matches_pattern(
"bread.device.dock.connected",
"bread.device.dock.connected"
));
assert!(!matches_pattern(
"bread.device.dock.connected",
"bread.device.dock.disconnected"
));
}
#[test]
fn single_segment_wildcard() {
assert!(matches_pattern("bread.device.*", "bread.device.dock.connected"));
assert!(matches_pattern(
"bread.device.*",
"bread.device.dock.connected"
));
assert!(matches_pattern("bread.device.*", "bread.device.foo"));
assert!(!matches_pattern("bread.device.*", "bread.device"));
}
#[test]
fn recursive_wildcard() {
assert!(matches_pattern("bread.device.**", "bread.device.dock.connected"));
assert!(matches_pattern(
"bread.device.**",
"bread.device.dock.connected"
));
assert!(matches_pattern("bread.**", "bread.device.dock.connected"));
assert!(matches_pattern("bread.**", "bread"));
}
@ -157,4 +174,120 @@ mod tests {
assert!(!matches_pattern("bread.monitor.?", "bread.monitor.10"));
assert!(!matches_pattern("bread.monitor.?", "bread.monitor."));
}
#[test]
fn star_does_not_cross_dot_segments() {
// `*` matches within a segment only.
assert!(matches_pattern(
"bread.*.connected",
"bread.device.connected"
));
assert!(!matches_pattern(
"bread.*.connected",
"bread.device.dock.connected"
));
}
#[test]
fn double_star_matches_zero_or_more_segments() {
assert!(matches_pattern("bread.**", "bread.a"));
assert!(matches_pattern("bread.**", "bread.a.b.c.d"));
}
#[test]
fn empty_pattern_matches_only_empty_text() {
assert!(matches_pattern("", ""));
assert!(!matches_pattern("", "bread"));
}
#[test]
fn empty_text_only_matches_wildcards() {
assert!(matches_pattern("**", ""));
assert!(!matches_pattern("bread.*", ""));
}
// ─── SubscriptionTable ────────────────────────────────────────────────
#[test]
fn table_add_assigns_provided_id_and_finds_match() {
let mut t = SubscriptionTable::default();
let id = t.add_with_id(SubscriptionId(7), "bread.window.*".into(), false);
assert_eq!(id, SubscriptionId(7));
let matches = t.match_event("bread.window.opened");
assert_eq!(matches.len(), 1);
assert_eq!(matches[0].id, SubscriptionId(7));
assert_eq!(matches[0].pattern, "bread.window.*");
assert!(!matches[0].once);
}
#[test]
fn table_match_returns_all_matching_subscriptions() {
let mut t = SubscriptionTable::default();
t.add_with_id(SubscriptionId(1), "bread.window.opened".into(), false);
t.add_with_id(SubscriptionId(2), "bread.window.*".into(), false);
t.add_with_id(SubscriptionId(3), "bread.**".into(), true);
t.add_with_id(SubscriptionId(4), "bread.device.*".into(), false);
let matches = t.match_event("bread.window.opened");
let ids: Vec<u64> = matches.iter().map(|s| s.id.0).collect();
assert!(ids.contains(&1));
assert!(ids.contains(&2));
assert!(ids.contains(&3));
assert!(!ids.contains(&4));
}
#[test]
fn table_remove_returns_true_only_for_known_ids() {
let mut t = SubscriptionTable::default();
t.add_with_id(SubscriptionId(1), "a".into(), false);
assert!(t.remove(SubscriptionId(1)));
// Second remove of the same id is false.
assert!(!t.remove(SubscriptionId(1)));
// Removing a never-known id is false.
assert!(!t.remove(SubscriptionId(999)));
}
#[test]
fn table_remove_preserves_other_entries_after_swap_remove() {
let mut t = SubscriptionTable::default();
t.add_with_id(SubscriptionId(1), "a".into(), false);
t.add_with_id(SubscriptionId(2), "b".into(), false);
t.add_with_id(SubscriptionId(3), "c".into(), false);
// Remove the middle entry — swap_remove will move entry 3 into the slot.
assert!(t.remove(SubscriptionId(2)));
// Subsequent removes still work, proving the by_id index was kept consistent.
assert!(t.remove(SubscriptionId(3)));
assert!(t.remove(SubscriptionId(1)));
}
#[test]
fn table_clear_removes_all() {
let mut t = SubscriptionTable::default();
t.add_with_id(SubscriptionId(1), "a".into(), false);
t.add_with_id(SubscriptionId(2), "b".into(), false);
t.clear();
assert!(t.match_event("a").is_empty());
assert!(t.match_event("b").is_empty());
// After clear, the ids are reusable.
assert!(!t.remove(SubscriptionId(1)));
}
#[test]
fn table_match_returns_empty_for_unmatched_event() {
let mut t = SubscriptionTable::default();
t.add_with_id(SubscriptionId(1), "bread.device.*".into(), false);
assert!(t.match_event("bread.window.opened").is_empty());
}
#[test]
fn table_once_flag_is_preserved_in_match_result() {
let mut t = SubscriptionTable::default();
t.add_with_id(SubscriptionId(1), "bread.test".into(), true);
let matches = t.match_event("bread.test");
assert_eq!(matches.len(), 1);
assert!(matches[0].once);
}
}

View file

@ -8,8 +8,7 @@ pub fn spawn_supervised<F, Fut>(
name: &'static str,
mut shutdown_rx: watch::Receiver<bool>,
mut task_factory: F,
)
where
) where
F: FnMut() -> Fut + Send + 'static,
Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
{
@ -50,7 +49,11 @@ where
}
let wait_ms = 500u64.saturating_mul(2u64.saturating_pow(attempt.min(6)));
warn!(adapter = name, delay_ms = wait_ms, "restarting adapter after failure");
warn!(
adapter = name,
delay_ms = wait_ms,
"restarting adapter after failure"
);
tokio::select! {
_ = sleep(Duration::from_millis(wait_ms)) => {},
_ = shutdown_rx.changed() => {

View file

@ -3,7 +3,7 @@ use std::collections::{BTreeMap, HashMap};
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct RuntimeState {
pub monitors: Vec<Monitor>,
pub workspaces: Vec<Workspace>,
@ -16,22 +16,6 @@ pub struct RuntimeState {
pub modules: Vec<ModuleStatus>,
}
impl Default for RuntimeState {
fn default() -> Self {
Self {
monitors: Vec::new(),
workspaces: Vec::new(),
active_workspace: None,
active_window: None,
devices: DeviceTopology::default(),
network: NetworkState::default(),
power: PowerState::default(),
profile: ProfileState::default(),
modules: Vec::new(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Monitor {
pub name: String,
@ -100,23 +84,13 @@ pub struct InterfaceState {
pub up: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct PowerState {
pub ac_connected: bool,
pub battery_percent: Option<u8>,
pub battery_low: bool,
}
impl Default for PowerState {
fn default() -> Self {
Self {
ac_connected: false,
battery_percent: None,
battery_low: false,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProfileState {
pub active: String,

View file

@ -3,9 +3,9 @@ use std::fs;
use std::os::unix::fs::PermissionsExt;
use std::path::PathBuf;
use std::process;
use std::time::Instant;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;
use anyhow::{anyhow, Result};
use bread_shared::{now_unix_ms, AdapterSource, BreadEvent};
@ -52,6 +52,9 @@ struct IpcResponse {
}
impl Server {
// Server::new legitimately requires all 8 fields; a builder pattern here would be
// over-engineering for a single-call-site constructor.
#[allow(clippy::too_many_arguments)]
pub fn new(
socket_path: PathBuf,
state_handle: StateHandle,
@ -161,7 +164,10 @@ impl Server {
Ok(())
}
async fn handle_request(&self, req: IpcRequest) -> std::result::Result<(String, Value), (String, String)> {
async fn handle_request(
&self,
req: IpcRequest,
) -> std::result::Result<(String, Value), (String, String)> {
let id = req.id.clone();
let result = match req.method.as_str() {
"ping" => Ok(json!({ "ok": true })),
@ -208,11 +214,7 @@ impl Server {
Ok(profiles)
}
"profile.activate" => {
let Some(name) = req
.params
.get("name")
.and_then(Value::as_str)
else {
let Some(name) = req.params.get("name").and_then(Value::as_str) else {
return Err((id, "missing profile name".to_string()));
};
@ -231,11 +233,7 @@ impl Server {
Ok(json!({ "active": name }))
}
"emit" => {
let Some(event) = req
.params
.get("event")
.and_then(Value::as_str)
else {
let Some(event) = req.params.get("event").and_then(Value::as_str) else {
return Err((id, "missing event name".to_string()));
};
let data = req.params.get("data").cloned().unwrap_or_else(|| json!({}));
@ -253,7 +251,9 @@ impl Server {
let state = self.state_handle.state_dump().await;
let modules = state.get("modules").cloned().unwrap_or_else(|| json!([]));
let adapters = self.adapter_status.read().await.clone();
let subscription_count = self.subscription_count.load(std::sync::atomic::Ordering::Relaxed);
let subscription_count = self
.subscription_count
.load(std::sync::atomic::Ordering::Relaxed);
let recent_errors = self.lua_runtime.recent_errors();
Ok(json!({
"ok": true,
@ -268,14 +268,7 @@ impl Server {
}))
}
"sync.status" => {
let cfg_home = std::env::var("XDG_CONFIG_HOME")
.map(std::path::PathBuf::from)
.or_else(|_| {
std::env::var("HOME")
.map(|h| std::path::PathBuf::from(h).join(".config"))
})
.unwrap_or_else(|_| std::path::PathBuf::from(".config"));
let sync_path = cfg_home.join("bread").join("sync.toml");
let sync_path = bread_sync::config::bread_config_dir().join("sync.toml");
match std::fs::read_to_string(&sync_path)
.ok()
.and_then(|s| s.parse::<toml::Value>().ok())
@ -301,7 +294,11 @@ impl Server {
}
}
"events.replay" => {
let since_ms = req.params.get("since_ms").and_then(Value::as_u64).unwrap_or(0);
let since_ms = req
.params
.get("since_ms")
.and_then(Value::as_u64)
.unwrap_or(0);
let cutoff = now_unix_ms().saturating_sub(since_ms);
let replay: Vec<BreadEvent> = self
.event_buffer
@ -412,3 +409,70 @@ fn matches_glob_filter(pattern: &[u8], text: &[u8]) -> bool {
}
}
}
#[cfg(test)]
mod tests {
use super::matches_filter;
#[test]
fn filter_exact_match() {
assert!(matches_filter("bread.window.opened", "bread.window.opened"));
assert!(!matches_filter(
"bread.window.opened",
"bread.window.closed"
));
}
#[test]
fn filter_dot_star_matches_one_segment_only() {
assert!(matches_filter("bread.device.connected", "bread.device.*"));
assert!(matches_filter(
"bread.device.dock.connected",
"bread.device.*"
));
assert!(!matches_filter("bread.device", "bread.device.*"));
}
#[test]
fn filter_dot_double_star_matches_zero_or_more_segments() {
// Matches the exact prefix (zero segments after).
assert!(matches_filter("bread.device", "bread.device.**"));
// And matches deeper paths.
assert!(matches_filter(
"bread.device.dock.connected",
"bread.device.**"
));
// But not a sibling at the same depth.
assert!(!matches_filter(
"bread.network.connected",
"bread.device.**"
));
}
#[test]
fn filter_question_mark_matches_single_char_not_dot() {
assert!(matches_filter("bread.x", "bread.?"));
assert!(!matches_filter("bread.xy", "bread.?"));
assert!(!matches_filter("bread.", "bread.?"));
}
#[test]
fn filter_mid_pattern_star_does_not_cross_dots() {
// A `*` in the middle of the pattern (not the `.*` suffix shortcut)
// matches within a single segment only.
assert!(matches_filter("bread.alpha.connected", "bread.*.connected"));
assert!(!matches_filter(
"bread.alpha.beta.connected",
"bread.*.connected"
));
}
#[test]
fn filter_dot_star_at_end_acts_as_prefix_match() {
// `bread.*` ending the pattern is treated as a prefix match, so
// matches everything under `bread.` regardless of depth. This is
// consistent with the subscription table's pattern matcher.
assert!(matches_filter("bread.alpha", "bread.*"));
assert!(matches_filter("bread.alpha.beta", "bread.*"));
}
}

View file

@ -9,7 +9,6 @@ use std::time::Duration;
use anyhow::{anyhow, Result};
use bread_shared::{AdapterSource, BreadEvent};
use libc;
use mlua::{Error as LuaError, Function, Lua, LuaSerdeExt, RegistryKey, Table, Value};
use serde::Serialize;
use serde_json::Value as JsonValue;
@ -291,62 +290,66 @@ impl LuaEngine {
let next_sub_id = self.next_sub_id.clone();
let state_handle = self.state_handle.clone();
let current_module = self.current_module.clone();
let on_fn = self.lua.create_function(move |lua, (pattern, callback): (String, Function)| {
let id = SubscriptionId(next_sub_id.fetch_add(1, Ordering::Relaxed));
let key = lua.create_registry_value(callback)?;
let module = current_module
.lock()
.map_err(|_| LuaError::external("module context lock poisoned"))?
.clone();
handlers
.lock()
.map_err(|_| LuaError::external("handler lock poisoned"))?
.insert(
id,
HandlerEntry {
callback: key,
filter: None,
module,
raw_kind: None,
kind: HandlerKind::Event,
},
);
state_handle
.register_subscription(id, pattern, false)
.map_err(LuaError::external)?;
Ok(id.0)
})?;
let on_fn =
self.lua
.create_function(move |lua, (pattern, callback): (String, Function)| {
let id = SubscriptionId(next_sub_id.fetch_add(1, Ordering::Relaxed));
let key = lua.create_registry_value(callback)?;
let module = current_module
.lock()
.map_err(|_| LuaError::external("module context lock poisoned"))?
.clone();
handlers
.lock()
.map_err(|_| LuaError::external("handler lock poisoned"))?
.insert(
id,
HandlerEntry {
callback: key,
filter: None,
module,
raw_kind: None,
kind: HandlerKind::Event,
},
);
state_handle
.register_subscription(id, pattern, false)
.map_err(LuaError::external)?;
Ok(id.0)
})?;
bread.set("on", on_fn)?;
let handlers = self.handlers.clone();
let next_sub_id = self.next_sub_id.clone();
let state_handle = self.state_handle.clone();
let current_module = self.current_module.clone();
let once_fn = self.lua.create_function(move |lua, (pattern, callback): (String, Function)| {
let id = SubscriptionId(next_sub_id.fetch_add(1, Ordering::Relaxed));
let key = lua.create_registry_value(callback)?;
let module = current_module
.lock()
.map_err(|_| LuaError::external("module context lock poisoned"))?
.clone();
handlers
.lock()
.map_err(|_| LuaError::external("handler lock poisoned"))?
.insert(
id,
HandlerEntry {
callback: key,
filter: None,
module,
raw_kind: None,
kind: HandlerKind::Event,
},
);
state_handle
.register_subscription(id, pattern, true)
.map_err(LuaError::external)?;
Ok(id.0)
})?;
let once_fn =
self.lua
.create_function(move |lua, (pattern, callback): (String, Function)| {
let id = SubscriptionId(next_sub_id.fetch_add(1, Ordering::Relaxed));
let key = lua.create_registry_value(callback)?;
let module = current_module
.lock()
.map_err(|_| LuaError::external("module context lock poisoned"))?
.clone();
handlers
.lock()
.map_err(|_| LuaError::external("handler lock poisoned"))?
.insert(
id,
HandlerEntry {
callback: key,
filter: None,
module,
raw_kind: None,
kind: HandlerKind::Event,
},
);
state_handle
.register_subscription(id, pattern, true)
.map_err(LuaError::external)?;
Ok(id.0)
})?;
bread.set("once", once_fn)?;
let handlers = self.handlers.clone();
@ -411,25 +414,27 @@ impl LuaEngine {
bread.set("off", off_fn)?;
let emit_tx = self.emit_tx.clone();
let emit_fn = self.lua.create_function(move |lua, (event_name, payload): (String, Value)| {
let data = match payload {
Value::Nil => serde_json::json!({}),
other => lua
.from_value::<serde_json::Value>(other)
.unwrap_or_else(|_| serde_json::json!({})),
};
emit_tx
.send(BreadEvent::new(event_name, AdapterSource::System, data))
.map_err(|_| LuaError::external("event channel closed"))?;
Ok(())
})?;
let emit_fn =
self.lua
.create_function(move |lua, (event_name, payload): (String, Value)| {
let data = match payload {
Value::Nil => serde_json::json!({}),
other => lua
.from_value::<serde_json::Value>(other)
.unwrap_or_else(|_| serde_json::json!({})),
};
emit_tx
.send(BreadEvent::new(event_name, AdapterSource::System, data))
.map_err(|_| LuaError::external("event channel closed"))?;
Ok(())
})?;
bread.set("emit", emit_fn)?;
let state_arc = self.state_handle.state_arc();
let state_tbl = self.lua.create_table()?;
let get_fn = self.lua.create_function(move |lua, path: String| {
state_value_to_lua(lua, &state_arc, &path)
})?;
let get_fn = self
.lua
.create_function(move |lua, path: String| state_value_to_lua(lua, &state_arc, &path))?;
state_tbl.set("get", get_fn)?;
let state_arc = self.state_handle.state_arc();
@ -439,9 +444,9 @@ impl LuaEngine {
state_tbl.set("monitors", monitors_fn)?;
let state_arc = self.state_handle.state_arc();
let active_ws_fn = self
.lua
.create_function(move |lua, ()| state_value_to_lua(lua, &state_arc, "active_workspace"))?;
let active_ws_fn = self.lua.create_function(move |lua, ()| {
state_value_to_lua(lua, &state_arc, "active_workspace")
})?;
state_tbl.set("active_workspace", active_ws_fn)?;
let state_arc = self.state_handle.state_arc();
@ -479,38 +484,40 @@ impl LuaEngine {
let next_sub_id = self.next_sub_id.clone();
let state_handle = self.state_handle.clone();
let current_module = self.current_module.clone();
let watch_fn = self.lua.create_function(move |lua, (path, callback): (String, Function)| {
let id = SubscriptionId(next_sub_id.fetch_add(1, Ordering::Relaxed));
let key = lua.create_registry_value(callback)?;
let module = current_module
.lock()
.map_err(|_| LuaError::external("module context lock poisoned"))?
.clone();
handlers
.lock()
.map_err(|_| LuaError::external("handler lock poisoned"))?
.insert(
id,
HandlerEntry {
callback: key,
filter: None,
module,
raw_kind: None,
kind: HandlerKind::StateWatch,
},
);
watch_ids
.lock()
.map_err(|_| LuaError::external("watch id lock poisoned"))?
.insert(id);
state_handle
.register_watch(id, path.clone())
.map_err(LuaError::external)?;
state_handle
.register_subscription(id, format!("bread.state.changed.{path}"), false)
.map_err(LuaError::external)?;
Ok(id.0)
})?;
let watch_fn =
self.lua
.create_function(move |lua, (path, callback): (String, Function)| {
let id = SubscriptionId(next_sub_id.fetch_add(1, Ordering::Relaxed));
let key = lua.create_registry_value(callback)?;
let module = current_module
.lock()
.map_err(|_| LuaError::external("module context lock poisoned"))?
.clone();
handlers
.lock()
.map_err(|_| LuaError::external("handler lock poisoned"))?
.insert(
id,
HandlerEntry {
callback: key,
filter: None,
module,
raw_kind: None,
kind: HandlerKind::StateWatch,
},
);
watch_ids
.lock()
.map_err(|_| LuaError::external("watch id lock poisoned"))?
.insert(id);
state_handle
.register_watch(id, path.clone())
.map_err(LuaError::external)?;
state_handle
.register_subscription(id, format!("bread.state.changed.{path}"), false)
.map_err(LuaError::external)?;
Ok(id.0)
})?;
state_tbl.set("watch", watch_fn)?;
bread.set("state", state_tbl)?;
@ -555,130 +562,134 @@ impl LuaEngine {
let default_urgency = self.notifications_config.default_urgency.clone();
let default_timeout = self.notifications_config.default_timeout_ms;
let emit_tx = self.emit_tx.clone();
let notify_fn = self
.lua
.create_function(move |_lua, (message, opts): (String, Option<Table>)| {
let title: String = opts
.as_ref()
.and_then(|o| o.get("title").ok())
.unwrap_or_else(|| "bread".to_string());
let urgency: String = opts
.as_ref()
.and_then(|o| o.get("urgency").ok())
.unwrap_or_else(|| default_urgency.clone());
let timeout: i64 = opts
.as_ref()
.and_then(|o| o.get("timeout").ok())
.unwrap_or(default_timeout);
let icon: Option<String> = opts.as_ref().and_then(|o| o.get("icon").ok());
let notify_fn =
self.lua
.create_function(move |_lua, (message, opts): (String, Option<Table>)| {
let title: String = opts
.as_ref()
.and_then(|o| o.get("title").ok())
.unwrap_or_else(|| "bread".to_string());
let urgency: String = opts
.as_ref()
.and_then(|o| o.get("urgency").ok())
.unwrap_or_else(|| default_urgency.clone());
let timeout: i64 = opts
.as_ref()
.and_then(|o| o.get("timeout").ok())
.unwrap_or(default_timeout);
let icon: Option<String> = opts.as_ref().and_then(|o| o.get("icon").ok());
let cmd_path = notify_path.clone();
let title_clone = title.clone();
let message_clone = message.clone();
let urgency_clone = urgency.clone();
task::spawn_blocking(move || {
let mut cmd = std::process::Command::new(cmd_path);
cmd.args([
"--app-name",
"bread",
"--urgency",
&urgency_clone,
"--expire-time",
&timeout.to_string(),
]);
if let Some(icon) = icon {
cmd.args(["--icon", &icon]);
}
let _ = cmd.args([&title_clone, &message_clone]).status();
});
let cmd_path = notify_path.clone();
let title_clone = title.clone();
let message_clone = message.clone();
let urgency_clone = urgency.clone();
task::spawn_blocking(move || {
let mut cmd = std::process::Command::new(cmd_path);
cmd.args([
"--app-name",
"bread",
"--urgency",
&urgency_clone,
"--expire-time",
&timeout.to_string(),
]);
if let Some(icon) = icon {
cmd.args(["--icon", &icon]);
}
let _ = cmd.args([&title_clone, &message_clone]).status();
});
let _ = emit_tx.send(BreadEvent::new(
"bread.notify.sent",
AdapterSource::System,
serde_json::json!({
"title": title,
"message": message,
"urgency": urgency,
}),
));
let _ = emit_tx.send(BreadEvent::new(
"bread.notify.sent",
AdapterSource::System,
serde_json::json!({
"title": title,
"message": message,
"urgency": urgency,
}),
));
Ok(())
})?;
Ok(())
})?;
bread.set("notify", notify_fn)?;
let timers = self.timers.clone();
let next_timer_id = self.next_timer_id.clone();
let lua_tx = self.lua_tx.clone();
let after_fn = self.lua.create_function(move |lua, (delay_ms, callback): (u64, Function)| {
let id = TimerId(next_timer_id.fetch_add(1, Ordering::Relaxed));
let key = lua.create_registry_value(callback)?;
let (cancel_tx, mut cancel_rx) = watch::channel(false);
timers
.lock()
.map_err(|_| LuaError::external("timer lock poisoned"))?
.insert(
id,
TimerEntry {
callback: key,
repeating: false,
cancel_tx,
},
);
let lua_tx = lua_tx.clone();
task::spawn(async move {
tokio::select! {
_ = sleep(Duration::from_millis(delay_ms)) => {
if !*cancel_rx.borrow() {
let _ = lua_tx.send(LuaMessage::TimerFired { id });
let after_fn =
self.lua
.create_function(move |lua, (delay_ms, callback): (u64, Function)| {
let id = TimerId(next_timer_id.fetch_add(1, Ordering::Relaxed));
let key = lua.create_registry_value(callback)?;
let (cancel_tx, mut cancel_rx) = watch::channel(false);
timers
.lock()
.map_err(|_| LuaError::external("timer lock poisoned"))?
.insert(
id,
TimerEntry {
callback: key,
repeating: false,
cancel_tx,
},
);
let lua_tx = lua_tx.clone();
task::spawn(async move {
tokio::select! {
_ = sleep(Duration::from_millis(delay_ms)) => {
if !*cancel_rx.borrow() {
let _ = lua_tx.send(LuaMessage::TimerFired { id });
}
}
_ = cancel_rx.changed() => {}
}
}
_ = cancel_rx.changed() => {}
}
});
Ok(id.0)
})?;
});
Ok(id.0)
})?;
bread.set("after", after_fn)?;
let timers = self.timers.clone();
let next_timer_id = self.next_timer_id.clone();
let lua_tx = self.lua_tx.clone();
let every_fn = self.lua.create_function(move |lua, (interval_ms, callback): (u64, Function)| {
let id = TimerId(next_timer_id.fetch_add(1, Ordering::Relaxed));
let key = lua.create_registry_value(callback)?;
let (cancel_tx, mut cancel_rx) = watch::channel(false);
timers
.lock()
.map_err(|_| LuaError::external("timer lock poisoned"))?
.insert(
id,
TimerEntry {
callback: key,
repeating: true,
cancel_tx,
},
);
let lua_tx = lua_tx.clone();
task::spawn(async move {
let start = Instant::now() + Duration::from_millis(interval_ms);
let mut ticker = interval_at(start, Duration::from_millis(interval_ms));
loop {
tokio::select! {
_ = ticker.tick() => {
if *cancel_rx.borrow() {
break;
}
let _ = lua_tx.send(LuaMessage::TimerFired { id });
}
_ = cancel_rx.changed() => {
if *cancel_rx.borrow() {
break;
let every_fn =
self.lua
.create_function(move |lua, (interval_ms, callback): (u64, Function)| {
let id = TimerId(next_timer_id.fetch_add(1, Ordering::Relaxed));
let key = lua.create_registry_value(callback)?;
let (cancel_tx, mut cancel_rx) = watch::channel(false);
timers
.lock()
.map_err(|_| LuaError::external("timer lock poisoned"))?
.insert(
id,
TimerEntry {
callback: key,
repeating: true,
cancel_tx,
},
);
let lua_tx = lua_tx.clone();
task::spawn(async move {
let start = Instant::now() + Duration::from_millis(interval_ms);
let mut ticker = interval_at(start, Duration::from_millis(interval_ms));
loop {
tokio::select! {
_ = ticker.tick() => {
if *cancel_rx.borrow() {
break;
}
let _ = lua_tx.send(LuaMessage::TimerFired { id });
}
_ = cancel_rx.changed() => {
if *cancel_rx.borrow() {
break;
}
}
}
}
}
}
});
Ok(id.0)
})?;
});
Ok(id.0)
})?;
bread.set("every", every_fn)?;
let timers = self.timers.clone();
@ -694,18 +705,22 @@ impl LuaEngine {
bread.set("cancel", cancel_fn)?;
let hyprland_tbl = self.lua.create_table()?;
let dispatch_fn = self.lua.create_function(move |_lua, (cmd, args): (String, String)| {
let resp = hyprland_request(&format!("dispatch {cmd} {args}"))
.map_err(|e| LuaError::external(e.to_string()))?;
Ok(resp)
})?;
let dispatch_fn =
self.lua
.create_function(move |_lua, (cmd, args): (String, String)| {
let resp = hyprland_request(&format!("dispatch {cmd} {args}"))
.map_err(|e| LuaError::external(e.to_string()))?;
Ok(resp)
})?;
hyprland_tbl.set("dispatch", dispatch_fn)?;
let keyword_fn = self.lua.create_function(move |_lua, (key, value): (String, String)| {
let resp = hyprland_request(&format!("keyword {key} {value}"))
.map_err(|e| LuaError::external(e.to_string()))?;
Ok(resp)
})?;
let keyword_fn =
self.lua
.create_function(move |_lua, (key, value): (String, String)| {
let resp = hyprland_request(&format!("keyword {key} {value}"))
.map_err(|e| LuaError::external(e.to_string()))?;
Ok(resp)
})?;
hyprland_tbl.set("keyword", keyword_fn)?;
let eval_fn = self.lua.create_function(move |_lua, expr: String| {
@ -718,38 +733,38 @@ impl LuaEngine {
let active_window_fn = self.lua.create_function(move |lua, ()| {
let resp = hyprland_request("j/activewindow")
.map_err(|e| LuaError::external(e.to_string()))?;
let json: JsonValue = serde_json::from_str(&resp)
.map_err(|e| LuaError::external(e.to_string()))?;
let json: JsonValue =
serde_json::from_str(&resp).map_err(|e| LuaError::external(e.to_string()))?;
lua.to_value(&json)
.map_err(|e| LuaError::external(e.to_string()))
})?;
hyprland_tbl.set("active_window", active_window_fn)?;
let monitors_fn = self.lua.create_function(move |lua, ()| {
let resp = hyprland_request("j/monitors")
.map_err(|e| LuaError::external(e.to_string()))?;
let json: JsonValue = serde_json::from_str(&resp)
.map_err(|e| LuaError::external(e.to_string()))?;
let resp =
hyprland_request("j/monitors").map_err(|e| LuaError::external(e.to_string()))?;
let json: JsonValue =
serde_json::from_str(&resp).map_err(|e| LuaError::external(e.to_string()))?;
lua.to_value(&json)
.map_err(|e| LuaError::external(e.to_string()))
})?;
hyprland_tbl.set("monitors", monitors_fn)?;
let workspaces_fn = self.lua.create_function(move |lua, ()| {
let resp = hyprland_request("j/workspaces")
.map_err(|e| LuaError::external(e.to_string()))?;
let json: JsonValue = serde_json::from_str(&resp)
.map_err(|e| LuaError::external(e.to_string()))?;
let resp =
hyprland_request("j/workspaces").map_err(|e| LuaError::external(e.to_string()))?;
let json: JsonValue =
serde_json::from_str(&resp).map_err(|e| LuaError::external(e.to_string()))?;
lua.to_value(&json)
.map_err(|e| LuaError::external(e.to_string()))
})?;
hyprland_tbl.set("workspaces", workspaces_fn)?;
let clients_fn = self.lua.create_function(move |lua, ()| {
let resp = hyprland_request("j/clients")
.map_err(|e| LuaError::external(e.to_string()))?;
let json: JsonValue = serde_json::from_str(&resp)
.map_err(|e| LuaError::external(e.to_string()))?;
let resp =
hyprland_request("j/clients").map_err(|e| LuaError::external(e.to_string()))?;
let json: JsonValue =
serde_json::from_str(&resp).map_err(|e| LuaError::external(e.to_string()))?;
lua.to_value(&json)
.map_err(|e| LuaError::external(e.to_string()))
})?;
@ -759,33 +774,33 @@ impl LuaEngine {
let next_sub_id = self.next_sub_id.clone();
let state_handle = self.state_handle.clone();
let current_module = self.current_module.clone();
let on_raw_fn = self
.lua
.create_function(move |lua, (event, callback): (String, Function)| {
let id = SubscriptionId(next_sub_id.fetch_add(1, Ordering::Relaxed));
let key = lua.create_registry_value(callback)?;
let module = current_module
.lock()
.map_err(|_| LuaError::external("module context lock poisoned"))?
.clone();
handlers
.lock()
.map_err(|_| LuaError::external("handler lock poisoned"))?
.insert(
id,
HandlerEntry {
callback: key,
filter: None,
module,
raw_kind: Some(event),
kind: HandlerKind::Event,
},
);
state_handle
.register_subscription(id, "bread.hyprland.event".to_string(), false)
.map_err(LuaError::external)?;
Ok(id.0)
})?;
let on_raw_fn =
self.lua
.create_function(move |lua, (event, callback): (String, Function)| {
let id = SubscriptionId(next_sub_id.fetch_add(1, Ordering::Relaxed));
let key = lua.create_registry_value(callback)?;
let module = current_module
.lock()
.map_err(|_| LuaError::external("module context lock poisoned"))?
.clone();
handlers
.lock()
.map_err(|_| LuaError::external("handler lock poisoned"))?
.insert(
id,
HandlerEntry {
callback: key,
filter: None,
module,
raw_kind: Some(event),
kind: HandlerKind::Event,
},
);
state_handle
.register_subscription(id, "bread.hyprland.event".to_string(), false)
.map_err(LuaError::external)?;
Ok(id.0)
})?;
hyprland_tbl.set("on_raw", on_raw_fn)?;
bread.set("hyprland", hyprland_tbl)?;
@ -800,7 +815,9 @@ impl LuaEngine {
.map_err(|_| LuaError::external("module context lock poisoned"))?
.clone();
if expected.as_deref() != Some(&name) {
return Err(LuaError::external("module name does not match current load"));
return Err(LuaError::external(
"module name does not match current load",
));
}
let decl = module_decls
@ -834,7 +851,7 @@ impl LuaEngine {
let set_fn = lua.create_function(move |lua, (key, value): (String, Value)| {
let json = lua
.from_value::<JsonValue>(value)
.unwrap_or_else(|_| JsonValue::Null);
.unwrap_or(JsonValue::Null);
module_store_set(&state_arc_set, &module_name, key, json);
Ok(())
})?;
@ -845,10 +862,7 @@ impl LuaEngine {
modules
.lock()
.map_err(|_| LuaError::external("module registry lock poisoned"))?
.insert(
decl.name.clone(),
ModuleInfo { table_key: key },
);
.insert(decl.name.clone(), ModuleInfo { table_key: key });
// Register in package.loaded so require("bread.devices") etc. works
let package: Table = lua.globals().get("package")?;
@ -862,9 +876,9 @@ impl LuaEngine {
// bread.machine — machine name and tags from sync.toml
let machine_tbl = self.lua.create_table()?;
let name_fn = self.lua.create_function(|_lua, ()| {
Ok(lua_machine_name())
})?;
let name_fn = self
.lua
.create_function(|_lua, ()| Ok(lua_machine_name()))?;
machine_tbl.set("name", name_fn)?;
let tags_fn = self.lua.create_function(|lua, ()| {
@ -877,9 +891,9 @@ impl LuaEngine {
})?;
machine_tbl.set("tags", tags_fn)?;
let has_tag_fn = self.lua.create_function(|_lua, tag: String| {
Ok(lua_machine_tags().contains(&tag))
})?;
let has_tag_fn = self
.lua
.create_function(|_lua, tag: String| Ok(lua_machine_tags().contains(&tag)))?;
machine_tbl.set("has_tag", has_tag_fn)?;
bread.set("machine", machine_tbl)?;
@ -887,15 +901,16 @@ impl LuaEngine {
// bread.fs — file system helpers
let fs_tbl = self.lua.create_table()?;
let write_fn = self.lua.create_function(|_lua, (path, content): (String, String)| {
let expanded = lua_expand_path(&path);
if let Some(parent) = expanded.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| LuaError::external(e.to_string()))?;
}
std::fs::write(&expanded, content)
.map_err(|e| LuaError::external(e.to_string()))
})?;
let write_fn = self
.lua
.create_function(|_lua, (path, content): (String, String)| {
let expanded = lua_expand_path(&path);
if let Some(parent) = expanded.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| LuaError::external(e.to_string()))?;
}
std::fs::write(&expanded, content).map_err(|e| LuaError::external(e.to_string()))
})?;
fs_tbl.set("write", write_fn)?;
let read_fn = self.lua.create_function(|_lua, path: String| {
@ -907,9 +922,9 @@ impl LuaEngine {
})?;
fs_tbl.set("read", read_fn)?;
let exists_fn = self.lua.create_function(|_lua, path: String| {
Ok(lua_expand_path(&path).exists())
})?;
let exists_fn = self
.lua
.create_function(|_lua, path: String| Ok(lua_expand_path(&path).exists()))?;
fs_tbl.set("exists", exists_fn)?;
let expand_fn = self.lua.create_function(|_lua, path: String| {
@ -1025,18 +1040,16 @@ impl LuaEngine {
let mut files = list_lua_files(&self.module_path)?;
files.sort();
let disabled: HashSet<String> = self
.modules_config
.disable
.iter()
.cloned()
.collect();
let disabled: HashSet<String> = self.modules_config.disable.iter().cloned().collect();
let mut decls = Vec::new();
if self.modules_config.builtin {
decls.extend(builtin_module_decls(&disabled));
}
for path in files.into_iter().filter(|p| !is_lib_path(&self.module_path, p)) {
for path in files
.into_iter()
.filter(|p| !is_lib_path(&self.module_path, p))
{
match self.scan_module_decl(&path) {
Ok(decl) => decls.push(decl),
Err(err) => {
@ -1130,7 +1143,10 @@ impl LuaEngine {
}
let src = fs::read_to_string(path)?;
self.lua.load(&src).set_name(path.to_string_lossy().as_ref()).exec()?;
self.lua
.load(&src)
.set_name(path.to_string_lossy().as_ref())
.exec()?;
Ok(())
}
@ -1371,7 +1387,9 @@ impl LuaEngine {
//
// Each accepts any Lua value and coerces it to a string via tostring()
// so callers can do bread.log(some_table) without a crash.
self.lua.load(r#"
self.lua
.load(
r#"
local _bread = bread
local function stringify(v)
@ -1392,7 +1410,9 @@ impl LuaEngine {
function _bread.error(msg)
_bread.__log_error(stringify(msg))
end
"#).exec()?;
"#,
)
.exec()?;
// Register the raw Rust-backed log functions that the Lua wrappers call.
let globals = self.lua.globals();
@ -1429,7 +1449,9 @@ impl LuaEngine {
//
// Because the Lua runtime is single-threaded, we implement this in
// pure Lua using bread.cancel / bread.after.
self.lua.load(r#"
self.lua
.load(
r#"
function bread.debounce(delay_ms, fn)
local timer_id = nil
return function(...)
@ -1444,7 +1466,9 @@ impl LuaEngine {
end)
end
end
"#).exec()?;
"#,
)
.exec()?;
Ok(())
}
@ -1476,7 +1500,8 @@ impl LuaEngine {
let bread = lua.create_table()?;
bread.set("module", module_fn)?;
lua.globals().set("bread", bread)?;
lua.load(r#"
lua.load(
r#"
local _noop = function(...) end
local _noop_tbl_mt = { __index = function() return _noop end, __call = _noop }
local _noop_tbl = setmetatable({}, _noop_tbl_mt)
@ -1486,10 +1511,15 @@ impl LuaEngine {
return _noop_tbl
end
})
"#).exec()?;
"#,
)
.exec()?;
let src = fs::read_to_string(path)?;
let result = lua.load(&src).set_name(path.to_string_lossy().as_ref()).exec();
let result = lua
.load(&src)
.set_name(path.to_string_lossy().as_ref())
.exec();
// bread.module() throws MODULE_DECL_ABORT to abort scanning early.
// mlua may wrap the error in CallbackError, so match on string content.
if let Err(err) = result {
@ -1515,8 +1545,7 @@ impl LuaEngine {
return Ok(Value::Nil);
}
let src = fs::read_to_string(&path)
.map_err(|e| LuaError::external(e.to_string()))?;
let src = fs::read_to_string(&path).map_err(|e| LuaError::external(e.to_string()))?;
let func = lua
.load(&src)
.set_name(path.to_string_lossy().as_ref())
@ -1529,8 +1558,9 @@ impl LuaEngine {
let bread: Table = globals.get("bread")?;
bread.set("__require_loader", loader)?;
self.lua.load(
r#"
self.lua
.load(
r#"
local searchers = package.searchers or package.loaders
if searchers then
table.insert(searchers, 1, function(name)
@ -1538,8 +1568,8 @@ impl LuaEngine {
end)
end
"#,
)
.exec()?;
)
.exec()?;
Ok(())
}
@ -1664,10 +1694,7 @@ fn order_module_decls(decls: Vec<ModuleDecl>) -> (Vec<ModuleDecl>, Vec<(String,
fn module_name_from_path(module_root: &Path, path: &Path) -> String {
let rel = path.strip_prefix(module_root).unwrap_or(path);
let mut name = rel
.with_extension("")
.to_string_lossy()
.replace('/', ".");
let mut name = rel.with_extension("").to_string_lossy().replace('/', ".");
if name.starts_with('.') {
name.remove(0);
}
@ -1697,8 +1724,8 @@ fn state_value_to_lua<'lua>(
}
std::hint::spin_loop();
};
let mut value = serde_json::to_value(&*snapshot)
.map_err(|e| LuaError::external(e.to_string()))?;
let mut value =
serde_json::to_value(&*snapshot).map_err(|e| LuaError::external(e.to_string()))?;
if path.is_empty() {
return lua
.to_value(&value)
@ -1714,7 +1741,11 @@ fn state_value_to_lua<'lua>(
.map_err(|e| LuaError::external(e.to_string()))
}
fn module_store_get(state_arc: &Arc<RwLock<RuntimeState>>, module: &str, key: &str) -> Option<JsonValue> {
fn module_store_get(
state_arc: &Arc<RwLock<RuntimeState>>,
module: &str,
key: &str,
) -> Option<JsonValue> {
let guard = loop {
if let Ok(g) = state_arc.try_read() {
break g;
@ -1725,7 +1756,12 @@ fn module_store_get(state_arc: &Arc<RwLock<RuntimeState>>, module: &str, key: &s
entry.store.get(key).cloned()
}
fn module_store_set(state_arc: &Arc<RwLock<RuntimeState>>, module: &str, key: String, value: JsonValue) {
fn module_store_set(
state_arc: &Arc<RwLock<RuntimeState>>,
module: &str,
key: String,
value: JsonValue,
) {
let mut guard = loop {
if let Ok(g) = state_arc.try_write() {
break g;
@ -1824,9 +1860,7 @@ fn lua_machine_tags() -> Vec<String> {
fn read_sync_toml() -> anyhow::Result<toml::Value> {
let config_dir = std::env::var("XDG_CONFIG_HOME")
.map(std::path::PathBuf::from)
.or_else(|_| {
std::env::var("HOME").map(|h| std::path::PathBuf::from(h).join(".config"))
})
.or_else(|_| std::env::var("HOME").map(|h| std::path::PathBuf::from(h).join(".config")))
.unwrap_or_else(|_| std::path::PathBuf::from(".config"));
let path = config_dir.join("bread").join("sync.toml");
let raw = std::fs::read_to_string(path)?;
@ -2102,7 +2136,10 @@ fn hyprland_request_socket() -> Result<PathBuf> {
.collect();
match sockets.len() {
0 => Err(anyhow!("no Hyprland instance found in {}", hypr_dir.display())),
0 => Err(anyhow!(
"no Hyprland instance found in {}",
hypr_dir.display()
)),
1 => Ok(sockets.remove(0)),
_ => Ok(sockets.remove(0)),
}

View file

@ -4,8 +4,8 @@ mod ipc;
mod lua;
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use anyhow::Result;
use bread_shared::{AdapterSource, BreadEvent, RawEvent};
@ -36,9 +36,10 @@ async fn main() -> Result<()> {
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let subscription_count = Arc::new(AtomicU64::new(0));
let state_handle = StateHandle::new(state.clone(), state_cmd_tx, subscription_count.clone());
let state_handle = StateHandle::new(state.clone(), state_cmd_tx);
let lua_runtime = lua::spawn_runtime(config.clone(), state_handle.clone(), normalized_tx.clone())?;
let lua_runtime =
lua::spawn_runtime(config.clone(), state_handle.clone(), normalized_tx.clone())?;
let lua_tx = lua_runtime.sender();
tokio::spawn(run_state_engine(
@ -144,7 +145,8 @@ async fn wait_for_shutdown() {
#[cfg(unix)]
{
use tokio::signal::unix::{signal, SignalKind};
let mut sigterm = signal(SignalKind::terminate()).expect("failed to install SIGTERM handler");
let mut sigterm =
signal(SignalKind::terminate()).expect("failed to install SIGTERM handler");
tokio::select! {
_ = ctrl_c => {},
_ = sigterm.recv() => {},

View file

@ -31,6 +31,291 @@ async fn ping_and_state_dump_work() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn unknown_method_returns_error() -> Result<()> {
let harness = TestHarness::spawn()?;
harness.wait_until_ready().await?;
let result = harness.send_request("not.a.real.method", json!({})).await;
assert!(result.is_err(), "expected error for unknown method");
let msg = result.err().unwrap().to_string();
assert!(
msg.contains("unknown method"),
"expected 'unknown method', got: {msg}"
);
harness.shutdown();
Ok(())
}
#[tokio::test]
async fn profile_activate_updates_state() -> Result<()> {
let harness = TestHarness::spawn()?;
harness.wait_until_ready().await?;
let result = harness
.send_request("profile.activate", json!({"name": "battery"}))
.await?;
assert_eq!(
result.get("active").and_then(Value::as_str),
Some("battery")
);
let dump = harness.send_request("state.dump", json!({})).await?;
assert_eq!(
dump.get("profile")
.and_then(|v| v.get("active"))
.and_then(Value::as_str),
Some("battery")
);
harness.shutdown();
Ok(())
}
#[tokio::test]
async fn profile_activate_without_name_errors() -> Result<()> {
let harness = TestHarness::spawn()?;
harness.wait_until_ready().await?;
let result = harness.send_request("profile.activate", json!({})).await;
assert!(result.is_err());
let msg = result.err().unwrap().to_string();
assert!(msg.contains("missing profile name"), "got: {msg}");
harness.shutdown();
Ok(())
}
#[tokio::test]
async fn emit_without_event_errors() -> Result<()> {
let harness = TestHarness::spawn()?;
harness.wait_until_ready().await?;
let result = harness.send_request("emit", json!({})).await;
assert!(result.is_err());
harness.shutdown();
Ok(())
}
#[tokio::test]
async fn state_get_returns_specific_subtree() -> Result<()> {
let harness = TestHarness::spawn()?;
harness.wait_until_ready().await?;
let modules = harness
.send_request("state.get", json!({"key": "modules"}))
.await?;
assert!(modules.is_array(), "expected modules to be an array");
let active = harness
.send_request("state.get", json!({"key": "profile.active"}))
.await?;
assert!(
active.as_str().is_some(),
"expected profile.active to be a string, got: {active:?}"
);
harness.shutdown();
Ok(())
}
#[tokio::test]
async fn state_get_missing_key_returns_error() -> Result<()> {
let harness = TestHarness::spawn()?;
harness.wait_until_ready().await?;
let result = harness
.send_request("state.get", json!({"key": "does.not.exist"}))
.await;
assert!(result.is_err());
harness.shutdown();
Ok(())
}
#[tokio::test]
async fn modules_list_returns_array() -> Result<()> {
let harness = TestHarness::spawn()?;
harness.wait_until_ready().await?;
let result = harness.send_request("modules.list", json!({})).await?;
assert!(result.is_array());
harness.shutdown();
Ok(())
}
#[tokio::test]
async fn modules_reload_succeeds() -> Result<()> {
let harness = TestHarness::spawn()?;
harness.wait_until_ready().await?;
let result = harness.send_request("modules.reload", json!({})).await?;
assert_eq!(result.get("ok").and_then(Value::as_bool), Some(true));
assert!(result.get("duration_ms").is_some());
harness.shutdown();
Ok(())
}
#[tokio::test]
async fn sync_status_uninitialized_when_no_config() -> Result<()> {
let harness = TestHarness::spawn()?;
harness.wait_until_ready().await?;
let result = harness.send_request("sync.status", json!({})).await?;
assert_eq!(
result.get("initialized").and_then(Value::as_bool),
Some(false)
);
harness.shutdown();
Ok(())
}
#[tokio::test]
async fn sync_status_reports_initialized_with_config() -> Result<()> {
let harness = TestHarness::spawn_with_sync_config("myhost", "git@example.com:user/repo.git")?;
harness.wait_until_ready().await?;
let result = harness.send_request("sync.status", json!({})).await?;
assert_eq!(
result.get("initialized").and_then(Value::as_bool),
Some(true)
);
assert_eq!(
result.get("machine").and_then(Value::as_str),
Some("myhost")
);
assert_eq!(
result.get("remote").and_then(Value::as_str),
Some("git@example.com:user/repo.git")
);
harness.shutdown();
Ok(())
}
#[tokio::test]
async fn events_replay_returns_buffered_events() -> Result<()> {
let harness = TestHarness::spawn()?;
harness.wait_until_ready().await?;
// Emit a couple of events.
harness
.send_request("emit", json!({"event": "bread.replay.a", "data": {}}))
.await?;
harness
.send_request("emit", json!({"event": "bread.replay.b", "data": {}}))
.await?;
// Small delay so the events make it into the buffer.
sleep(Duration::from_millis(100)).await;
let result = harness
.send_request("events.replay", json!({"since_ms": 10_000}))
.await?;
let arr = result.as_array().expect("replay result should be array");
let names: Vec<&str> = arr
.iter()
.filter_map(|e| e.get("event").and_then(Value::as_str))
.collect();
assert!(names.contains(&"bread.replay.a"));
assert!(names.contains(&"bread.replay.b"));
harness.shutdown();
Ok(())
}
#[tokio::test]
async fn event_stream_filter_excludes_non_matching_events() -> Result<()> {
let harness = TestHarness::spawn()?;
harness.wait_until_ready().await?;
let stream = UnixStream::connect(harness.socket_path()).await?;
let (read_half, mut write_half) = stream.into_split();
let subscribe = json!({
"id": "sub-x",
"method": "events.subscribe",
"params": {
"filter": "bread.match.*"
}
});
write_half
.write_all(format!("{}\n", serde_json::to_string(&subscribe)?).as_bytes())
.await?;
let mut reader = BufReader::new(read_half).lines();
// Consume the ack line.
reader.next_line().await?;
// Emit one matching and one non-matching event.
harness
.send_request("emit", json!({"event": "bread.nomatch.x", "data": {}}))
.await?;
harness
.send_request("emit", json!({"event": "bread.match.yes", "data": {}}))
.await?;
let deadline = Instant::now() + Duration::from_secs(5);
let mut matched = false;
while Instant::now() < deadline {
let Some(line) = reader.next_line().await? else {
break;
};
let event: Value = serde_json::from_str(&line)?;
let name = event.get("event").and_then(Value::as_str).unwrap_or("");
assert!(
!name.starts_with("bread.nomatch"),
"filter let through non-matching event: {name}"
);
if name == "bread.match.yes" {
matched = true;
break;
}
}
assert!(matched, "did not receive matching event through filter");
harness.shutdown();
Ok(())
}
#[tokio::test]
async fn multiple_concurrent_clients_each_get_response() -> Result<()> {
let harness = TestHarness::spawn()?;
harness.wait_until_ready().await?;
let socket = harness.socket_path().to_path_buf();
let mut handles = Vec::new();
for i in 0..8 {
let socket = socket.clone();
handles.push(tokio::spawn(async move {
let stream = UnixStream::connect(&socket).await?;
let (read_half, mut write_half) = stream.into_split();
let req = json!({"id": i.to_string(), "method": "ping", "params": {}});
write_half
.write_all(format!("{}\n", serde_json::to_string(&req)?).as_bytes())
.await?;
let mut lines = BufReader::new(read_half).lines();
let line = lines.next_line().await?.ok_or_else(|| anyhow!("eof"))?;
let parsed: Value = serde_json::from_str(&line)?;
assert_eq!(
parsed.get("id").and_then(Value::as_str),
Some(i.to_string().as_str())
);
Ok::<(), anyhow::Error>(())
}));
}
for h in handles {
h.await??;
}
harness.shutdown();
Ok(())
}
#[tokio::test]
async fn events_stream_receives_emitted_events() -> Result<()> {
let harness = TestHarness::spawn()?;
@ -100,6 +385,14 @@ struct TestHarness {
impl TestHarness {
fn spawn() -> Result<Self> {
Self::spawn_inner(None)
}
fn spawn_with_sync_config(machine: &str, remote_url: &str) -> Result<Self> {
Self::spawn_inner(Some((machine.to_string(), remote_url.to_string())))
}
fn spawn_inner(sync_config: Option<(String, String)>) -> Result<Self> {
let temp = tempfile::tempdir()?;
let runtime_dir = temp.path().join("runtime");
let config_home = temp.path().join("config");
@ -140,6 +433,21 @@ enabled = false
"#,
)?;
if let Some((machine, remote_url)) = sync_config {
let sync_toml = format!(
r#"
[remote]
url = "{remote_url}"
branch = "main"
[machine]
name = "{machine}"
tags = []
"#
);
fs::write(bread_cfg.join("sync.toml"), sync_toml)?;
}
let socket_path = runtime_dir.join("bread").join("breadd.sock");
let child = Command::new(env!("CARGO_BIN_EXE_breadd"))
.env("XDG_RUNTIME_DIR", &runtime_dir)