feat: enhance device normalization and classification

- Introduced a new mechanism in EventNormalizer to suppress duplicate events from child nodes of the same physical device.
- Removed the device classification logic from the normalizer and replaced it with a rule-based system using Lua scripts.
- Added support for user-defined device rules in Lua, allowing for flexible device naming based on various conditions.
- Updated the state engine to handle device rules and resolve device names before dispatching events.
- Modified the installation script to set up default configuration files for the daemon and Lua modules.
- Improved the handling of systemd user services to dynamically set the ExecStart path based on the installation directory.
This commit is contained in:
Breadway 2026-05-12 21:27:07 +08:00
parent acbf8e1b1b
commit d44ece3649
12 changed files with 719 additions and 476 deletions

View file

@ -1,12 +1,9 @@
use std::collections::HashMap;
use std::fs;
use std::path::Path;
use std::os::unix::io::AsRawFd;
use anyhow::Result;
use bread_shared::{now_unix_ms, AdapterSource, RawEvent};
use serde_json::json;
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};
use tracing::debug;
use crate::adapters::Adapter;
@ -22,10 +19,7 @@ impl UdevAdapter {
}
pub async fn enumerate_existing(&self, tx: &mpsc::Sender<RawEvent>) -> Result<()> {
let devices = enumerate_with_udev(&self.subsystems).unwrap_or_else(|_| {
scan_devices(&self.subsystems).unwrap_or_default()
});
let devices = enumerate_with_udev(&self.subsystems)?;
for device in devices {
tx.send(RawEvent {
source: AdapterSource::Udev,
@ -52,122 +46,106 @@ impl Adapter for UdevAdapter {
async fn run(&self, tx: mpsc::Sender<RawEvent>) -> Result<()> {
debug!("udev adapter started");
match run_udev_monitor(self.subsystems.clone(), tx.clone()).await {
Ok(()) => return Ok(()),
Err(err) => {
tracing::warn!(error = %err, "udev netlink monitor unavailable, falling back to sysfs polling (add user to 'plugdev' group for real-time events)");
}
}
// Fallback: poll sysfs every 2 seconds for environments where the
// netlink socket is unavailable (missing plugdev membership, containers, etc).
let mut known: HashMap<String, ScannedDevice> = scan_devices(&self.subsystems)
.unwrap_or_default()
.into_iter()
.map(|d| (d.id.clone(), d))
.collect();
loop {
let current = scan_devices(&self.subsystems).unwrap_or_default();
let current_map: HashMap<String, ScannedDevice> = current
.into_iter()
.map(|d| (d.id.clone(), d))
.collect();
for (id, dev) in &current_map {
if !known.contains_key(id) {
if tx.send(raw_change_event("add", dev)).await.is_err() {
return Ok(());
}
}
}
for (id, dev) in &known {
if !current_map.contains_key(id) {
if tx.send(raw_change_event("remove", dev)).await.is_err() {
return Ok(());
}
}
}
known = current_map;
sleep(Duration::from_secs(2)).await;
}
run_udev_monitor(self.subsystems.clone(), tx).await
}
}
#[derive(Clone, Debug)]
struct ScannedDevice {
id: String,
name: String,
subsystem: String,
vendor_id: Option<String>,
product_id: Option<String>,
}
// udev::MonitorSocket uses a non-blocking socket; calling iter().next() without
// first polling the fd returns None immediately and exits the loop — which is
// why the old code silently fell back to sysfs on every start. We use poll(2)
// inside spawn_blocking so the thread truly blocks until events are available.
async fn run_udev_monitor(subsystems: Vec<String>, tx: mpsc::Sender<RawEvent>) -> Result<()> {
tokio::task::spawn_blocking(move || -> Result<()> {
let mut builder = udev::MonitorBuilder::new()?;
for subsystem in &subsystems {
builder = builder.match_subsystem(subsystem)?;
}
let monitor = builder.listen()?;
let socket = builder.listen()?;
let fd = socket.as_raw_fd();
for event in monitor.iter() {
let action = event
.action()
.map(|a| a.to_string_lossy().to_string())
.unwrap_or_else(|| "change".to_string());
let subsystem = event
.subsystem()
.map(|s| s.to_string_lossy().to_string())
.unwrap_or_else(|| "unknown".to_string());
let name = event
.property_value("ID_MODEL")
.or_else(|| event.property_value("NAME"))
.map(|v| v.to_string_lossy().to_string())
.or_else(|| event.devnode().map(|n| n.display().to_string()))
.unwrap_or_else(|| "unknown".to_string());
let id = event
.syspath()
.to_string_lossy()
.to_string();
let msg = RawEvent {
source: AdapterSource::Udev,
kind: "udev.change".to_string(),
payload: json!({
"action": action,
"id": id,
"name": name,
"subsystem": subsystem,
"id_input_keyboard": prop_bool(&event, "ID_INPUT_KEYBOARD"),
"id_input_mouse": prop_bool(&event, "ID_INPUT_MOUSE"),
"id_input_joystick": prop_bool(&event, "ID_INPUT_JOYSTICK"),
"id_input_touchpad": prop_bool(&event, "ID_INPUT_TOUCHPAD"),
"id_input_tablet": prop_bool(&event, "ID_INPUT_TABLET"),
"id_usb_class": prop_str(&event, "ID_USB_CLASS"),
"id_usb_interfaces": prop_str(&event, "ID_USB_INTERFACES"),
"id_vendor": prop_str(&event, "ID_VENDOR"),
"id_model": prop_str(&event, "ID_MODEL"),
"vendor_id": prop_str(&event, "ID_VENDOR_ID"),
"product_id": prop_str(&event, "ID_MODEL_ID"),
}),
timestamp: now_unix_ms(),
loop {
let mut pfd = libc::pollfd {
fd,
events: libc::POLLIN,
revents: 0,
};
if tx.blocking_send(msg).is_err() {
break;
let ret = unsafe { libc::poll(&mut pfd, 1, 1000) };
if ret < 0 {
let err = std::io::Error::last_os_error();
if err.kind() == std::io::ErrorKind::Interrupted {
continue;
}
return Err(err.into());
}
if ret == 0 {
// Timeout: bail if the downstream channel has been dropped.
if tx.is_closed() {
return Ok(());
}
continue;
}
if pfd.revents & libc::POLLIN != 0 {
while let Some(event) = socket.iter().next() {
if tx.blocking_send(build_event(&event)).is_err() {
return Ok(());
}
}
}
}
Ok(())
})
.await??;
Ok(())
}
fn build_event(event: &udev::Event) -> RawEvent {
let action = event
.action()
.map(|a| a.to_string_lossy().to_string())
.unwrap_or_else(|| "change".to_string());
let subsystem = event
.subsystem()
.map(|s| s.to_string_lossy().to_string())
.unwrap_or_else(|| "unknown".to_string());
let name = event
.property_value("ID_MODEL")
.or_else(|| event.property_value("NAME"))
.map(|v| v.to_string_lossy().to_string())
.or_else(|| event.devnode().map(|n| n.display().to_string()))
.unwrap_or_else(|| "unknown".to_string());
let id = event.syspath().to_string_lossy().to_string();
RawEvent {
source: AdapterSource::Udev,
kind: "udev.change".to_string(),
payload: json!({
"action": action,
"id": id,
"name": name,
"subsystem": subsystem,
"id_input_keyboard": prop_bool(event, "ID_INPUT_KEYBOARD"),
"id_input_mouse": prop_bool(event, "ID_INPUT_MOUSE"),
"id_input_joystick": prop_bool(event, "ID_INPUT_JOYSTICK"),
"id_input_touchpad": prop_bool(event, "ID_INPUT_TOUCHPAD"),
"id_input_tablet": prop_bool(event, "ID_INPUT_TABLET"),
"id_usb_class": prop_str(event, "ID_USB_CLASS"),
"id_usb_interfaces": prop_str(event, "ID_USB_INTERFACES"),
"id_vendor": prop_str(event, "ID_VENDOR"),
"id_model": prop_str(event, "ID_MODEL"),
"vendor_id": prop_str(event, "ID_VENDOR_ID"),
"product_id": prop_str(event, "ID_MODEL_ID"),
}),
timestamp: now_unix_ms(),
}
}
fn enumerate_with_udev(subsystems: &[String]) -> Result<Vec<ScannedDevice>> {
let mut enumerator = udev::Enumerator::new()?;
for subsystem in subsystems {
@ -187,125 +165,7 @@ fn enumerate_with_udev(subsystems: &[String]) -> Result<Vec<ScannedDevice>> {
.or_else(|| dev.sysname().to_str().map(ToString::to_string))
.unwrap_or_else(|| "unknown".to_string());
let id = dev.syspath().to_string_lossy().to_string();
let vendor_id = dev
.property_value("ID_VENDOR_ID")
.map(|v| v.to_string_lossy().to_string());
let product_id = dev
.property_value("ID_MODEL_ID")
.map(|v| v.to_string_lossy().to_string());
out.push(ScannedDevice {
id,
name,
subsystem,
vendor_id,
product_id,
});
}
Ok(out)
}
fn raw_change_event(action: &str, dev: &ScannedDevice) -> RawEvent {
RawEvent {
source: AdapterSource::Udev,
kind: "udev.change".to_string(),
payload: json!({
"action": action,
"id": dev.id,
"name": dev.name,
"subsystem": dev.subsystem,
"vendor_id": dev.vendor_id,
"product_id": dev.product_id,
}),
timestamp: now_unix_ms(),
}
}
fn scan_devices(subsystems: &[String]) -> Result<Vec<ScannedDevice>> {
let mut out = Vec::new();
if subsystems.iter().any(|s| s == "drm") {
let drm_dir = Path::new("/sys/class/drm");
if drm_dir.exists() {
for entry in fs::read_dir(drm_dir)? {
let entry = entry?;
let name = entry.file_name().to_string_lossy().to_string();
if !name.contains('-') {
continue;
}
let status = fs::read_to_string(entry.path().join("status")).unwrap_or_default();
if status.trim() == "connected" {
out.push(ScannedDevice {
id: format!("drm:{name}"),
name,
subsystem: "drm".to_string(),
vendor_id: None,
product_id: None,
});
}
}
}
}
if subsystems.iter().any(|s| s == "input") {
let input_dir = Path::new("/dev/input/by-id");
if input_dir.exists() {
for entry in fs::read_dir(input_dir)? {
let entry = entry?;
let name = entry.file_name().to_string_lossy().to_string();
out.push(ScannedDevice {
id: format!("input:{name}"),
name,
subsystem: "input".to_string(),
vendor_id: None,
product_id: None,
});
}
}
}
if subsystems.iter().any(|s| s == "power_supply") {
let pwr_dir = Path::new("/sys/class/power_supply");
if pwr_dir.exists() {
for entry in fs::read_dir(pwr_dir)? {
let entry = entry?;
let name = entry.file_name().to_string_lossy().to_string();
out.push(ScannedDevice {
id: format!("power_supply:{name}"),
name,
subsystem: "power_supply".to_string(),
vendor_id: None,
product_id: None,
});
}
}
}
if subsystems.iter().any(|s| s == "usb") {
let usb_dir = Path::new("/sys/bus/usb/devices");
if usb_dir.exists() {
for entry in fs::read_dir(usb_dir)? {
let entry = entry?;
let name = entry.file_name().to_string_lossy().to_string();
if !name.contains(':') && name.chars().any(|c| c.is_ascii_digit()) {
let syspath = entry.path();
let vendor_id = fs::read_to_string(syspath.join("idVendor"))
.ok()
.map(|s| s.trim().to_string());
let product_id = fs::read_to_string(syspath.join("idProduct"))
.ok()
.map(|s| s.trim().to_string());
out.push(ScannedDevice {
id: format!("usb:{name}"),
name,
subsystem: "usb".to_string(),
vendor_id,
product_id,
});
}
}
}
out.push(ScannedDevice { id, name, subsystem });
}
Ok(out)