Add lua runtime

This commit is contained in:
Breadway 2026-05-11 16:03:05 +08:00
parent 6237f3d7e7
commit 0e3233009b
5 changed files with 1251 additions and 613 deletions

View file

@ -1,8 +1,9 @@
use std::collections::HashMap;
use std::sync::Arc;
use anyhow::Result;
use bread_shared::BreadEvent;
use serde_json::Value;
use bread_shared::{AdapterSource, BreadEvent};
use serde_json::{json, Value};
use tokio::sync::{broadcast, mpsc, watch, RwLock};
use tracing::warn;
@ -22,6 +23,16 @@ pub enum StateCommand {
pattern: String,
once: bool,
},
RemoveSubscription {
id: SubscriptionId,
},
RegisterWatch {
id: SubscriptionId,
path: String,
},
RemoveWatch {
id: SubscriptionId,
},
ClearSubscriptions,
SetModuleStatus {
name: String,
@ -72,6 +83,20 @@ impl StateHandle {
.map_err(|_| anyhow::anyhow!("state engine command channel closed"))
}
pub fn remove_subscription(&self, id: SubscriptionId) {
let _ = self.command_tx.send(StateCommand::RemoveSubscription { id });
}
pub fn register_watch(&self, id: SubscriptionId, path: String) -> Result<()> {
self.command_tx
.send(StateCommand::RegisterWatch { id, path })
.map_err(|_| anyhow::anyhow!("state engine command channel closed"))
}
pub fn remove_watch(&self, id: SubscriptionId) {
let _ = self.command_tx.send(StateCommand::RemoveWatch { id });
}
pub fn clear_subscriptions(&self) {
let _ = self.command_tx.send(StateCommand::ClearSubscriptions);
}
@ -98,6 +123,7 @@ pub async fn run_state_engine(
mut shutdown_rx: watch::Receiver<bool>,
) {
let mut subscriptions = SubscriptionTable::default();
let mut watches: HashMap<SubscriptionId, String> = HashMap::new();
loop {
tokio::select! {
@ -110,28 +136,47 @@ pub async fn run_state_engine(
let Some(cmd) = maybe_cmd else {
break;
};
handle_command(cmd, &state, &mut subscriptions).await;
handle_command(cmd, &state, &mut subscriptions, &mut watches).await;
}
maybe_event = event_rx.recv() => {
let Some(event) = maybe_event else {
break;
};
apply_event_to_state(&state, &event).await;
let (before_snapshot, after_snapshot) = if watches.is_empty() {
(None, None)
} else {
let mut guard = state.write().await;
let before = serde_json::to_value(&*guard).ok();
apply_event_to_state(&mut guard, &event);
let after = serde_json::to_value(&*guard).ok();
(before, after)
};
let _ = event_stream_tx.send(event.clone());
let matches = subscriptions.match_event(&event.event);
for sub in &matches {
let _ = lua_tx.send(LuaMessage::Event {
subscription_id: sub.id,
event: event.clone(),
});
if watches.is_empty() {
let mut guard = state.write().await;
apply_event_to_state(&mut guard, &event);
}
for sub in matches.into_iter().filter(|s| s.once) {
subscriptions.remove(sub.id);
let _ = lua_tx.send(LuaMessage::SubscriptionCancelled { id: sub.id });
dispatch_event(&event, &mut subscriptions, &lua_tx, &event_stream_tx);
if let (Some(before), Some(after)) = (before_snapshot, after_snapshot) {
for (_id, path) in watches.iter() {
let old_val = value_at_path(&before, path).unwrap_or(Value::Null);
let new_val = value_at_path(&after, path).unwrap_or(Value::Null);
if old_val != new_val {
let synthetic = BreadEvent::new(
format!("bread.state.changed.{path}"),
AdapterSource::System,
json!({
"path": path,
"new": new_val,
"old": old_val,
}),
);
dispatch_event(&synthetic, &mut subscriptions, &lua_tx, &event_stream_tx);
}
}
}
}
}
@ -144,13 +189,24 @@ async fn handle_command(
cmd: StateCommand,
state: &Arc<RwLock<RuntimeState>>,
subscriptions: &mut SubscriptionTable,
watches: &mut HashMap<SubscriptionId, String>,
) {
match cmd {
StateCommand::RegisterSubscription { id, pattern, once } => {
subscriptions.add_with_id(id, pattern, once);
}
StateCommand::RemoveSubscription { id } => {
subscriptions.remove(id);
}
StateCommand::RegisterWatch { id, path } => {
watches.insert(id, path);
}
StateCommand::RemoveWatch { id } => {
watches.remove(&id);
}
StateCommand::ClearSubscriptions => {
subscriptions.clear();
watches.clear();
}
StateCommand::SetModuleStatus {
name,
@ -166,6 +222,7 @@ async fn handle_command(
name,
status,
last_error,
store: HashMap::new(),
});
}
}
@ -180,15 +237,47 @@ async fn handle_command(
}
}
async fn apply_event_to_state(state: &Arc<RwLock<RuntimeState>>, event: &BreadEvent) {
let mut guard = state.write().await;
fn dispatch_event(
event: &BreadEvent,
subscriptions: &mut SubscriptionTable,
lua_tx: &mpsc::UnboundedSender<LuaMessage>,
event_stream_tx: &broadcast::Sender<BreadEvent>,
) {
let _ = event_stream_tx.send(event.clone());
let matches = subscriptions.match_event(&event.event);
for sub in &matches {
let _ = lua_tx.send(LuaMessage::Event {
subscription_id: sub.id,
event: event.clone(),
});
}
for sub in matches.into_iter().filter(|s| s.once) {
subscriptions.remove(sub.id);
let _ = lua_tx.send(LuaMessage::SubscriptionCancelled { id: sub.id });
}
}
fn value_at_path(value: &Value, path: &str) -> Option<Value> {
if path.is_empty() {
return Some(value.clone());
}
let mut current = value;
for part in path.split('.') {
current = current.get(part)?;
}
Some(current.clone())
}
fn apply_event_to_state(state: &mut RuntimeState, event: &BreadEvent) {
match event.event.as_str() {
"bread.monitor.connected" => {
if let Some(name) = event.data.get("name").and_then(Value::as_str) {
if let Some(m) = guard.monitors.iter_mut().find(|m| m.name == name) {
if let Some(m) = state.monitors.iter_mut().find(|m| m.name == name) {
m.connected = true;
} else {
guard.monitors.push(crate::core::types::Monitor {
state.monitors.push(crate::core::types::Monitor {
name: name.to_string(),
connected: true,
resolution: event.data.get("resolution").and_then(Value::as_str).map(ToString::to_string),
@ -199,7 +288,7 @@ async fn apply_event_to_state(state: &Arc<RwLock<RuntimeState>>, event: &BreadEv
}
"bread.monitor.disconnected" => {
if let Some(name) = event.data.get("name").and_then(Value::as_str) {
if let Some(m) = guard.monitors.iter_mut().find(|m| m.name == name) {
if let Some(m) = state.monitors.iter_mut().find(|m| m.name == name) {
m.connected = false;
}
}
@ -211,10 +300,10 @@ async fn apply_event_to_state(state: &Arc<RwLock<RuntimeState>>, event: &BreadEv
.or_else(|| event.data.get("id"))
.and_then(Value::as_str)
.map(ToString::to_string);
guard.active_workspace = ws;
state.active_workspace = ws;
}
"bread.window.focus.changed" => {
guard.active_window = event
state.active_window = event
.data
.get("window")
.or_else(|| event.data.get("class"))
@ -222,20 +311,20 @@ async fn apply_event_to_state(state: &Arc<RwLock<RuntimeState>>, event: &BreadEv
.map(ToString::to_string);
}
"bread.device.connected" => {
apply_device_change(&mut guard, &event.data, true);
apply_device_change(state, &event.data, true);
}
"bread.device.disconnected" => {
apply_device_change(&mut guard, &event.data, false);
apply_device_change(state, &event.data, false);
}
"bread.network.connected" | "bread.network.disconnected" => {
if let Some(online) = event.data.get("online").and_then(Value::as_bool) {
guard.network.online = online;
state.network.online = online;
}
if let Some(ifaces) = event.data.get("interfaces").and_then(Value::as_object) {
guard.network.interfaces.clear();
state.network.interfaces.clear();
for (name, meta) in ifaces {
let up = meta.get("up").and_then(Value::as_bool).unwrap_or(false);
guard.network.interfaces.insert(name.clone(), InterfaceState { up });
state.network.interfaces.insert(name.clone(), InterfaceState { up });
}
}
}
@ -247,19 +336,19 @@ async fn apply_event_to_state(state: &Arc<RwLock<RuntimeState>>, event: &BreadEv
| "bread.power.battery.critical"
| "bread.power.battery.full" => {
if let Some(ac) = event.data.get("ac_connected").and_then(Value::as_bool) {
guard.power.ac_connected = ac;
state.power.ac_connected = ac;
}
if let Some(battery) = event.data.get("battery_percent").and_then(Value::as_u64) {
guard.power.battery_percent = Some(battery.min(100) as u8);
guard.power.battery_low = battery <= 20;
state.power.battery_percent = Some(battery.min(100) as u8);
state.power.battery_low = battery <= 20;
}
}
"bread.profile.activated" => {
if let Some(name) = event.data.get("name").and_then(Value::as_str) {
if guard.profile.active != name {
let previous = guard.profile.active.clone();
guard.profile.history.push(previous);
guard.profile.active = name.to_string();
if state.profile.active != name {
let previous = state.profile.active.clone();
state.profile.history.push(previous);
state.profile.active = name.to_string();
}
}
}