From 0e3233009bfd595c0a6afb4e3c12112bde0c2e45 Mon Sep 17 00:00:00 2001 From: Breadway Date: Mon, 11 May 2026 16:03:05 +0800 Subject: [PATCH] Add lua runtime --- Cargo.lock | 526 --------------- breadd/src/core/state_engine.rs | 157 ++++- breadd/src/core/subscriptions.rs | 93 ++- breadd/src/core/types.rs | 5 + breadd/src/lua/mod.rs | 1083 ++++++++++++++++++++++++++++-- 5 files changed, 1251 insertions(+), 613 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a36c9da..0ab00f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,18 +2,6 @@ # It is not intended for manual editing. version = 4 -[[package]] -name = "ahash" -version = "0.8.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" -dependencies = [ - "cfg-if", - "once_cell", - "version_check", - "zerocopy", -] - [[package]] name = "aho-corasick" version = "1.1.4" @@ -260,12 +248,6 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" -[[package]] -name = "base64" -version = "0.22.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" - [[package]] name = "bitflags" version = "1.3.2" @@ -330,8 +312,6 @@ dependencies = [ "futures-util", "hex", "libc", - "metrics 0.23.1", - "metrics-exporter-prometheus", "mlua", "netlink-packet-core", "netlink-packet-route", @@ -357,12 +337,6 @@ dependencies = [ "serde", ] -[[package]] -name = "bumpalo" -version = "3.20.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb" - [[package]] name = "byteorder" version = "1.5.0" @@ -446,22 +420,6 @@ dependencies = [ "crossbeam-utils", ] -[[package]] -name = "core-foundation" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2a6cd9ae233e7f62ba4e9353e81a88df7fc8a5987b8d445b4d90c879bd156f6" -dependencies = [ - "core-foundation-sys", - "libc", -] - -[[package]] -name = "core-foundation-sys" -version = "0.8.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" - [[package]] name = "cpufeatures" version = "0.2.17" @@ -471,15 +429,6 @@ dependencies = [ "libc", ] -[[package]] -name = "crossbeam-epoch" -version = "0.9.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -636,33 +585,12 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" -[[package]] -name = "fnv" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" - [[package]] name = "foldhash" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" -[[package]] -name = "foreign-types" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" -dependencies = [ - "foreign-types-shared", -] - -[[package]] -name = "foreign-types-shared" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" - [[package]] name = "futures" version = "0.3.32" @@ -813,34 +741,6 @@ dependencies = [ "wasip3", ] -[[package]] -name = "h2" -version = "0.4.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "171fefbc92fe4a4de27e0698d6a5b392d6a0e333506bc49133760b3bcf948733" -dependencies = [ - "atomic-waker", - "bytes", - "fnv", - "futures-core", - "futures-sink", - "http", - "indexmap", - "slab", - "tokio", - "tokio-util", - "tracing", -] - -[[package]] -name = "hashbrown" -version = "0.14.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" -dependencies = [ - "ahash", -] - [[package]] name = "hashbrown" version = "0.15.5" @@ -880,109 +780,6 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" -[[package]] -name = "http" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3ba2a386d7f85a81f119ad7498ebe444d2e22c2af0b86b069416ace48b3311a" -dependencies = [ - "bytes", - "itoa", -] - -[[package]] -name = "http-body" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" -dependencies = [ - "bytes", - "http", -] - -[[package]] -name = "http-body-util" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" -dependencies = [ - "bytes", - "futures-core", - "http", - "http-body", - "pin-project-lite", -] - -[[package]] -name = "httparse" -version = "1.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" - -[[package]] -name = "httpdate" -version = "1.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" - -[[package]] -name = "hyper" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6299f016b246a94207e63da54dbe807655bf9e00044f73ded42c3ac5305fbcca" -dependencies = [ - "atomic-waker", - "bytes", - "futures-channel", - "futures-core", - "h2", - "http", - "http-body", - "httparse", - "httpdate", - "itoa", - "pin-project-lite", - "smallvec", - "tokio", - "want", -] - -[[package]] -name = "hyper-tls" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" -dependencies = [ - "bytes", - "http-body-util", - "hyper", - "hyper-util", - "native-tls", - "tokio", - "tokio-native-tls", - "tower-service", -] - -[[package]] -name = "hyper-util" -version = "0.1.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0" -dependencies = [ - "bytes", - "futures-channel", - "futures-util", - "http", - "http-body", - "hyper", - "libc", - "pin-project-lite", - "socket2 0.6.3", - "tokio", - "tower-service", - "tracing", -] - [[package]] name = "id-arena" version = "2.3.0" @@ -1021,12 +818,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "ipnet" -version = "2.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d98f6fed1fde3f8c21bc40a1abb88dd75e67924f9cffc3ef95607bad8017f8e2" - [[package]] name = "is_terminal_polyfill" version = "1.70.2" @@ -1039,18 +830,6 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" -[[package]] -name = "js-sys" -version = "0.3.98" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67df7112613f8bfd9150013a0314e196f4800d3201ae742489d999db2f979f08" -dependencies = [ - "cfg-if", - "futures-util", - "once_cell", - "wasm-bindgen", -] - [[package]] name = "lazy_static" version = "1.5.0" @@ -1173,62 +952,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "metrics" -version = "0.22.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56d05972e8cbac2671e85aa9d04d9160d193f8bebd1a5c1a2f4542c62e65d1d0" -dependencies = [ - "ahash", - "portable-atomic", -] - -[[package]] -name = "metrics" -version = "0.23.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3045b4193fbdc5b5681f32f11070da9be3609f189a79f3390706d42587f46bb5" -dependencies = [ - "ahash", - "portable-atomic", -] - -[[package]] -name = "metrics-exporter-prometheus" -version = "0.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d58e362dc7206e9456ddbcdbd53c71ba441020e62104703075a69151e38d85f" -dependencies = [ - "base64", - "http-body-util", - "hyper", - "hyper-tls", - "hyper-util", - "indexmap", - "ipnet", - "metrics 0.22.4", - "metrics-util", - "quanta", - "thiserror", - "tokio", - "tracing", -] - -[[package]] -name = "metrics-util" -version = "0.16.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b07a5eb561b8cbc16be2d216faf7757f9baf3bfb94dbb0fae3df8387a5bb47f" -dependencies = [ - "crossbeam-epoch", - "crossbeam-utils", - "hashbrown 0.14.5", - "metrics 0.22.4", - "num_cpus", - "quanta", - "sketches-ddsketch", -] - [[package]] name = "mio" version = "1.2.0" @@ -1270,23 +993,6 @@ dependencies = [ "pkg-config", ] -[[package]] -name = "native-tls" -version = "0.2.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "465500e14ea162429d264d44189adc38b199b62b1c21eea9f69e4b73cb03bbf2" -dependencies = [ - "libc", - "log", - "openssl", - "openssl-probe", - "openssl-sys", - "schannel", - "security-framework", - "security-framework-sys", - "tempfile", -] - [[package]] name = "netlink-packet-core" version = "0.4.2" @@ -1395,16 +1101,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "num_cpus" -version = "1.17.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" -dependencies = [ - "hermit-abi 0.5.2", - "libc", -] - [[package]] name = "once_cell" version = "1.21.4" @@ -1417,49 +1113,6 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" -[[package]] -name = "openssl" -version = "0.10.79" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf0b434746ee2832f4f0baf10137e1cabb18cbe6912c69e2e33263c45250f542" -dependencies = [ - "bitflags 2.11.1", - "cfg-if", - "foreign-types", - "libc", - "openssl-macros", - "openssl-sys", -] - -[[package]] -name = "openssl-macros" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.117", -] - -[[package]] -name = "openssl-probe" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" - -[[package]] -name = "openssl-sys" -version = "0.9.115" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "158fe5b292746440aa6e7a7e690e55aeb72d41505e2804c23c6973ad0e9c9781" -dependencies = [ - "cc", - "libc", - "pkg-config", - "vcpkg", -] - [[package]] name = "ordered-float" version = "2.10.1" @@ -1567,12 +1220,6 @@ dependencies = [ "windows-sys 0.61.2", ] -[[package]] -name = "portable-atomic" -version = "1.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49" - [[package]] name = "ppv-lite86" version = "0.2.21" @@ -1611,21 +1258,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "quanta" -version = "0.12.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7" -dependencies = [ - "crossbeam-utils", - "libc", - "once_cell", - "raw-cpuid", - "wasi", - "web-sys", - "winapi", -] - [[package]] name = "quote" version = "1.0.45" @@ -1671,15 +1303,6 @@ dependencies = [ "getrandom 0.2.17", ] -[[package]] -name = "raw-cpuid" -version = "11.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186" -dependencies = [ - "bitflags 2.11.1", -] - [[package]] name = "redox_syscall" version = "0.5.18" @@ -1779,50 +1402,12 @@ dependencies = [ "windows-sys 0.61.2", ] -[[package]] -name = "rustversion" -version = "1.0.22" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" - -[[package]] -name = "schannel" -version = "0.1.29" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91c1b7e4904c873ef0710c1f407dde2e6287de2bebc1bbbf7d430bb7cbffd939" -dependencies = [ - "windows-sys 0.61.2", -] - [[package]] name = "scopeguard" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" -[[package]] -name = "security-framework" -version = "3.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7f4bc775c73d9a02cde8bf7b2ec4c9d12743edf609006c7facc23998404cd1d" -dependencies = [ - "bitflags 2.11.1", - "core-foundation", - "core-foundation-sys", - "libc", - "security-framework-sys", -] - -[[package]] -name = "security-framework-sys" -version = "2.17.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ce2691df843ecc5d231c0b14ece2acc3efb62c0a398c7e1d875f3983ce020e3" -dependencies = [ - "core-foundation-sys", - "libc", -] - [[package]] name = "semver" version = "1.0.28" @@ -1938,12 +1523,6 @@ dependencies = [ "libc", ] -[[package]] -name = "sketches-ddsketch" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85636c14b73d81f541e525f585c0a2109e6744e1565b5c1668e31c70c10ed65c" - [[package]] name = "slab" version = "0.4.12" @@ -2081,29 +1660,6 @@ dependencies = [ "syn 2.0.117", ] -[[package]] -name = "tokio-native-tls" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" -dependencies = [ - "native-tls", - "tokio", -] - -[[package]] -name = "tokio-util" -version = "0.7.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ae9cec805b01e8fc3fd2fe289f89149a9b66dd16786abd8b19cfa7b48cb0098" -dependencies = [ - "bytes", - "futures-core", - "futures-sink", - "pin-project-lite", - "tokio", -] - [[package]] name = "toml" version = "0.8.23" @@ -2156,12 +1712,6 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801" -[[package]] -name = "tower-service" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" - [[package]] name = "tracing" version = "0.1.44" @@ -2223,12 +1773,6 @@ dependencies = [ "tracing-log", ] -[[package]] -name = "try-lock" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" - [[package]] name = "typeid" version = "1.0.3" @@ -2288,12 +1832,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" -[[package]] -name = "vcpkg" -version = "0.2.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" - [[package]] name = "version_check" version = "0.9.5" @@ -2306,15 +1844,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "317211a0dc0ceedd78fb2ca9a44aed3d7b9b26f81870d485c07122b4350673b7" -[[package]] -name = "want" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e" -dependencies = [ - "try-lock", -] - [[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" @@ -2339,51 +1868,6 @@ dependencies = [ "wit-bindgen 0.51.0", ] -[[package]] -name = "wasm-bindgen" -version = "0.2.121" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49ace1d07c165b0864824eee619580c4689389afa9dc9ed3a4c75040d82e6790" -dependencies = [ - "cfg-if", - "once_cell", - "rustversion", - "wasm-bindgen-macro", - "wasm-bindgen-shared", -] - -[[package]] -name = "wasm-bindgen-macro" -version = "0.2.121" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e68e6f4afd367a562002c05637acb8578ff2dea1943df76afb9e83d177c8578" -dependencies = [ - "quote", - "wasm-bindgen-macro-support", -] - -[[package]] -name = "wasm-bindgen-macro-support" -version = "0.2.121" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d95a9ec35c64b2a7cb35d3fead40c4238d0940c86d107136999567a4703259f2" -dependencies = [ - "bumpalo", - "proc-macro2", - "quote", - "syn 2.0.117", - "wasm-bindgen-shared", -] - -[[package]] -name = "wasm-bindgen-shared" -version = "0.2.121" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4e0100b01e9f0d03189a92b96772a1fb998639d981193d7dbab487302513441" -dependencies = [ - "unicode-ident", -] - [[package]] name = "wasm-encoder" version = "0.244.0" @@ -2418,16 +1902,6 @@ dependencies = [ "semver", ] -[[package]] -name = "web-sys" -version = "0.3.98" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b572dff8bcf38bad0fa19729c89bb5748b2b9b1d8be70cf90df697e3a8f32aa" -dependencies = [ - "js-sys", - "wasm-bindgen", -] - [[package]] name = "which" version = "7.0.3" diff --git a/breadd/src/core/state_engine.rs b/breadd/src/core/state_engine.rs index d824fd0..aecae3c 100644 --- a/breadd/src/core/state_engine.rs +++ b/breadd/src/core/state_engine.rs @@ -1,8 +1,9 @@ +use std::collections::HashMap; use std::sync::Arc; use anyhow::Result; -use bread_shared::BreadEvent; -use serde_json::Value; +use bread_shared::{AdapterSource, BreadEvent}; +use serde_json::{json, Value}; use tokio::sync::{broadcast, mpsc, watch, RwLock}; use tracing::warn; @@ -22,6 +23,16 @@ pub enum StateCommand { pattern: String, once: bool, }, + RemoveSubscription { + id: SubscriptionId, + }, + RegisterWatch { + id: SubscriptionId, + path: String, + }, + RemoveWatch { + id: SubscriptionId, + }, ClearSubscriptions, SetModuleStatus { name: String, @@ -72,6 +83,20 @@ impl StateHandle { .map_err(|_| anyhow::anyhow!("state engine command channel closed")) } + pub fn remove_subscription(&self, id: SubscriptionId) { + let _ = self.command_tx.send(StateCommand::RemoveSubscription { id }); + } + + pub fn register_watch(&self, id: SubscriptionId, path: String) -> Result<()> { + self.command_tx + .send(StateCommand::RegisterWatch { id, path }) + .map_err(|_| anyhow::anyhow!("state engine command channel closed")) + } + + pub fn remove_watch(&self, id: SubscriptionId) { + let _ = self.command_tx.send(StateCommand::RemoveWatch { id }); + } + pub fn clear_subscriptions(&self) { let _ = self.command_tx.send(StateCommand::ClearSubscriptions); } @@ -98,6 +123,7 @@ pub async fn run_state_engine( mut shutdown_rx: watch::Receiver, ) { let mut subscriptions = SubscriptionTable::default(); + let mut watches: HashMap = HashMap::new(); loop { tokio::select! { @@ -110,28 +136,47 @@ pub async fn run_state_engine( let Some(cmd) = maybe_cmd else { break; }; - handle_command(cmd, &state, &mut subscriptions).await; + handle_command(cmd, &state, &mut subscriptions, &mut watches).await; } maybe_event = event_rx.recv() => { let Some(event) = maybe_event else { break; }; - apply_event_to_state(&state, &event).await; + let (before_snapshot, after_snapshot) = if watches.is_empty() { + (None, None) + } else { + let mut guard = state.write().await; + let before = serde_json::to_value(&*guard).ok(); + apply_event_to_state(&mut guard, &event); + let after = serde_json::to_value(&*guard).ok(); + (before, after) + }; - let _ = event_stream_tx.send(event.clone()); - - let matches = subscriptions.match_event(&event.event); - for sub in &matches { - let _ = lua_tx.send(LuaMessage::Event { - subscription_id: sub.id, - event: event.clone(), - }); + if watches.is_empty() { + let mut guard = state.write().await; + apply_event_to_state(&mut guard, &event); } - for sub in matches.into_iter().filter(|s| s.once) { - subscriptions.remove(sub.id); - let _ = lua_tx.send(LuaMessage::SubscriptionCancelled { id: sub.id }); + dispatch_event(&event, &mut subscriptions, &lua_tx, &event_stream_tx); + + if let (Some(before), Some(after)) = (before_snapshot, after_snapshot) { + for (_id, path) in watches.iter() { + let old_val = value_at_path(&before, path).unwrap_or(Value::Null); + let new_val = value_at_path(&after, path).unwrap_or(Value::Null); + if old_val != new_val { + let synthetic = BreadEvent::new( + format!("bread.state.changed.{path}"), + AdapterSource::System, + json!({ + "path": path, + "new": new_val, + "old": old_val, + }), + ); + dispatch_event(&synthetic, &mut subscriptions, &lua_tx, &event_stream_tx); + } + } } } } @@ -144,13 +189,24 @@ async fn handle_command( cmd: StateCommand, state: &Arc>, subscriptions: &mut SubscriptionTable, + watches: &mut HashMap, ) { match cmd { StateCommand::RegisterSubscription { id, pattern, once } => { subscriptions.add_with_id(id, pattern, once); } + StateCommand::RemoveSubscription { id } => { + subscriptions.remove(id); + } + StateCommand::RegisterWatch { id, path } => { + watches.insert(id, path); + } + StateCommand::RemoveWatch { id } => { + watches.remove(&id); + } StateCommand::ClearSubscriptions => { subscriptions.clear(); + watches.clear(); } StateCommand::SetModuleStatus { name, @@ -166,6 +222,7 @@ async fn handle_command( name, status, last_error, + store: HashMap::new(), }); } } @@ -180,15 +237,47 @@ async fn handle_command( } } -async fn apply_event_to_state(state: &Arc>, event: &BreadEvent) { - let mut guard = state.write().await; +fn dispatch_event( + event: &BreadEvent, + subscriptions: &mut SubscriptionTable, + lua_tx: &mpsc::UnboundedSender, + event_stream_tx: &broadcast::Sender, +) { + let _ = event_stream_tx.send(event.clone()); + + let matches = subscriptions.match_event(&event.event); + for sub in &matches { + let _ = lua_tx.send(LuaMessage::Event { + subscription_id: sub.id, + event: event.clone(), + }); + } + + for sub in matches.into_iter().filter(|s| s.once) { + subscriptions.remove(sub.id); + let _ = lua_tx.send(LuaMessage::SubscriptionCancelled { id: sub.id }); + } +} + +fn value_at_path(value: &Value, path: &str) -> Option { + if path.is_empty() { + return Some(value.clone()); + } + let mut current = value; + for part in path.split('.') { + current = current.get(part)?; + } + Some(current.clone()) +} + +fn apply_event_to_state(state: &mut RuntimeState, event: &BreadEvent) { match event.event.as_str() { "bread.monitor.connected" => { if let Some(name) = event.data.get("name").and_then(Value::as_str) { - if let Some(m) = guard.monitors.iter_mut().find(|m| m.name == name) { + if let Some(m) = state.monitors.iter_mut().find(|m| m.name == name) { m.connected = true; } else { - guard.monitors.push(crate::core::types::Monitor { + state.monitors.push(crate::core::types::Monitor { name: name.to_string(), connected: true, resolution: event.data.get("resolution").and_then(Value::as_str).map(ToString::to_string), @@ -199,7 +288,7 @@ async fn apply_event_to_state(state: &Arc>, event: &BreadEv } "bread.monitor.disconnected" => { if let Some(name) = event.data.get("name").and_then(Value::as_str) { - if let Some(m) = guard.monitors.iter_mut().find(|m| m.name == name) { + if let Some(m) = state.monitors.iter_mut().find(|m| m.name == name) { m.connected = false; } } @@ -211,10 +300,10 @@ async fn apply_event_to_state(state: &Arc>, event: &BreadEv .or_else(|| event.data.get("id")) .and_then(Value::as_str) .map(ToString::to_string); - guard.active_workspace = ws; + state.active_workspace = ws; } "bread.window.focus.changed" => { - guard.active_window = event + state.active_window = event .data .get("window") .or_else(|| event.data.get("class")) @@ -222,20 +311,20 @@ async fn apply_event_to_state(state: &Arc>, event: &BreadEv .map(ToString::to_string); } "bread.device.connected" => { - apply_device_change(&mut guard, &event.data, true); + apply_device_change(state, &event.data, true); } "bread.device.disconnected" => { - apply_device_change(&mut guard, &event.data, false); + apply_device_change(state, &event.data, false); } "bread.network.connected" | "bread.network.disconnected" => { if let Some(online) = event.data.get("online").and_then(Value::as_bool) { - guard.network.online = online; + state.network.online = online; } if let Some(ifaces) = event.data.get("interfaces").and_then(Value::as_object) { - guard.network.interfaces.clear(); + state.network.interfaces.clear(); for (name, meta) in ifaces { let up = meta.get("up").and_then(Value::as_bool).unwrap_or(false); - guard.network.interfaces.insert(name.clone(), InterfaceState { up }); + state.network.interfaces.insert(name.clone(), InterfaceState { up }); } } } @@ -247,19 +336,19 @@ async fn apply_event_to_state(state: &Arc>, event: &BreadEv | "bread.power.battery.critical" | "bread.power.battery.full" => { if let Some(ac) = event.data.get("ac_connected").and_then(Value::as_bool) { - guard.power.ac_connected = ac; + state.power.ac_connected = ac; } if let Some(battery) = event.data.get("battery_percent").and_then(Value::as_u64) { - guard.power.battery_percent = Some(battery.min(100) as u8); - guard.power.battery_low = battery <= 20; + state.power.battery_percent = Some(battery.min(100) as u8); + state.power.battery_low = battery <= 20; } } "bread.profile.activated" => { if let Some(name) = event.data.get("name").and_then(Value::as_str) { - if guard.profile.active != name { - let previous = guard.profile.active.clone(); - guard.profile.history.push(previous); - guard.profile.active = name.to_string(); + if state.profile.active != name { + let previous = state.profile.active.clone(); + state.profile.history.push(previous); + state.profile.active = name.to_string(); } } } diff --git a/breadd/src/core/subscriptions.rs b/breadd/src/core/subscriptions.rs index d4e6925..a95d388 100644 --- a/breadd/src/core/subscriptions.rs +++ b/breadd/src/core/subscriptions.rs @@ -35,7 +35,7 @@ impl SubscriptionTable { // swap_remove moves the last element into `idx`. We need to update by_id // for that element. But first, remove its stale entry (it was at the last // position before the swap); then re-insert it at the new position. - let last_idx = self.entries.len() - 1; + let _last_idx = self.entries.len() - 1; self.entries.swap_remove(idx); if idx < self.entries.len() { @@ -68,5 +68,94 @@ fn matches_pattern(pattern: &str, event_name: &str) -> bool { return event_name.starts_with(prefix); } - pattern == event_name + if let Some(prefix) = pattern.strip_suffix(".**") { + if event_name == prefix { + return true; + } + } + + matches_glob(pattern.as_bytes(), event_name.as_bytes()) +} + +fn matches_glob(pattern: &[u8], text: &[u8]) -> bool { + if pattern.is_empty() { + return text.is_empty(); + } + + if pattern.len() >= 2 && pattern[0] == b'*' && pattern[1] == b'*' { + let mut idx = 2; + while pattern.len() >= idx + 2 && pattern[idx] == b'*' && pattern[idx + 1] == b'*' { + idx += 2; + } + let rest = &pattern[idx..]; + if rest.is_empty() { + return true; + } + for offset in 0..=text.len() { + if matches_glob(rest, &text[offset..]) { + return true; + } + } + return false; + } + + match pattern[0] { + b'*' => { + let mut offset = 0; + loop { + if matches_glob(&pattern[1..], &text[offset..]) { + return true; + } + if offset == text.len() || text[offset] == b'.' { + break; + } + offset += 1; + } + false + } + b'?' => { + if text.is_empty() || text[0] == b'.' { + return false; + } + matches_glob(&pattern[1..], &text[1..]) + } + ch => { + if text.first().copied() != Some(ch) { + return false; + } + matches_glob(&pattern[1..], &text[1..]) + } + } +} + +#[cfg(test)] +mod tests { + use super::matches_pattern; + + #[test] + fn exact_match() { + assert!(matches_pattern("bread.device.dock.connected", "bread.device.dock.connected")); + assert!(!matches_pattern("bread.device.dock.connected", "bread.device.dock.disconnected")); + } + + #[test] + fn single_segment_wildcard() { + assert!(matches_pattern("bread.device.*", "bread.device.dock.connected")); + assert!(matches_pattern("bread.device.*", "bread.device.foo")); + assert!(!matches_pattern("bread.device.*", "bread.device")); + } + + #[test] + fn recursive_wildcard() { + assert!(matches_pattern("bread.device.**", "bread.device.dock.connected")); + assert!(matches_pattern("bread.**", "bread.device.dock.connected")); + assert!(matches_pattern("bread.**", "bread")); + } + + #[test] + fn single_char_wildcard() { + assert!(matches_pattern("bread.monitor.?", "bread.monitor.1")); + assert!(!matches_pattern("bread.monitor.?", "bread.monitor.10")); + assert!(!matches_pattern("bread.monitor.?", "bread.monitor.")); + } } diff --git a/breadd/src/core/types.rs b/breadd/src/core/types.rs index 02886c9..254e075 100644 --- a/breadd/src/core/types.rs +++ b/breadd/src/core/types.rs @@ -1,6 +1,7 @@ use std::collections::{BTreeMap, HashMap}; use serde::{Deserialize, Serialize}; +use serde_json::Value; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RuntimeState { @@ -121,6 +122,8 @@ pub struct ModuleStatus { pub name: String, pub status: ModuleLoadState, pub last_error: Option, + #[serde(default)] + pub store: HashMap, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -129,4 +132,6 @@ pub enum ModuleLoadState { Loaded, LoadError, NotFound, + Degraded, + Disabled, } diff --git a/breadd/src/lua/mod.rs b/breadd/src/lua/mod.rs index beae215..a73de1a 100644 --- a/breadd/src/lua/mod.rs +++ b/breadd/src/lua/mod.rs @@ -1,20 +1,25 @@ -use std::collections::HashMap; +use std::cell::RefCell; +use std::collections::{HashMap, HashSet}; use std::fs; use std::path::{Path, PathBuf}; +use std::rc::Rc; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; +use std::time::Duration; use anyhow::{anyhow, Result}; use bread_shared::{AdapterSource, BreadEvent}; -use mlua::{Function, Lua, LuaSerdeExt, RegistryKey, Value}; -use tokio::sync::{mpsc, oneshot}; +use mlua::{Error as LuaError, Function, Lua, LuaSerdeExt, RegistryKey, Table, Value}; +use serde_json::Value as JsonValue; +use tokio::sync::{mpsc, oneshot, watch, RwLock}; use tokio::task; +use tokio::time::{interval, sleep}; use tracing::{error, info, warn}; use crate::core::config::Config; use crate::core::state_engine::StateHandle; use crate::core::subscriptions::SubscriptionId; -use crate::core::types::ModuleLoadState; +use crate::core::types::{ModuleLoadState, RuntimeState}; pub enum LuaMessage { Event { @@ -24,6 +29,9 @@ pub enum LuaMessage { SubscriptionCancelled { id: SubscriptionId, }, + TimerFired { + id: TimerId, + }, Reload { reply: oneshot::Sender>, }, @@ -75,7 +83,7 @@ pub fn spawn_runtime( .expect("failed to create lua runtime thread"); rt.block_on(async move { - let mut engine = match LuaEngine::new(config, state_handle, emit_tx) { + let mut engine = match LuaEngine::new(config, state_handle, emit_tx, thread_tx.clone()) { Ok(engine) => engine, Err(err) => { error!(error = %err, "failed to initialize lua engine"); @@ -100,6 +108,11 @@ pub fn spawn_runtime( LuaMessage::SubscriptionCancelled { id } => { engine.remove_handler(id); } + LuaMessage::TimerFired { id } => { + if let Err(err) = engine.handle_timer(id) { + error!(error = %err, "lua timer handler failed"); + } + } LuaMessage::Reload { reply } => { let result = engine.reload_internal().map_err(|e| e.to_string()); let _ = reply.send(result); @@ -118,39 +131,114 @@ pub fn spawn_runtime( Ok(handle) } +#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] +pub(crate) struct TimerId(u64); + +struct HandlerEntry { + callback: RegistryKey, + filter: Option, + module: Option, + raw_kind: Option, + kind: HandlerKind, +} + +#[derive(Clone, Copy, Eq, PartialEq)] +enum HandlerKind { + Event, + StateWatch, +} + +struct TimerEntry { + callback: RegistryKey, + repeating: bool, + cancel_tx: watch::Sender, +} + +#[derive(Clone)] +struct ModuleDecl { + name: String, + version: Option, + after: Vec, + path: PathBuf, +} + +struct ModuleInfo { + table_key: RegistryKey, +} + struct LuaEngine { lua: Lua, - handlers: Arc>>, + handlers: Arc>>, + watch_ids: Arc>>, + timers: Arc>>, next_sub_id: Arc, + next_timer_id: Arc, + current_module: Arc>>, + modules: Arc>>, + module_decls: Arc>>, + module_order: Arc>>, state_handle: StateHandle, emit_tx: mpsc::UnboundedSender, + lua_tx: mpsc::UnboundedSender, entry_point: PathBuf, module_path: PathBuf, } impl LuaEngine { - fn new(config: Config, state_handle: StateHandle, emit_tx: mpsc::UnboundedSender) -> Result { + fn new( + config: Config, + state_handle: StateHandle, + emit_tx: mpsc::UnboundedSender, + lua_tx: mpsc::UnboundedSender, + ) -> Result { Ok(Self { lua: Lua::new(), handlers: Arc::new(Mutex::new(HashMap::new())), + watch_ids: Arc::new(Mutex::new(HashSet::new())), + timers: Arc::new(Mutex::new(HashMap::new())), next_sub_id: Arc::new(AtomicU64::new(1)), + next_timer_id: Arc::new(AtomicU64::new(1)), + current_module: Arc::new(Mutex::new(None)), + modules: Arc::new(Mutex::new(HashMap::new())), + module_decls: Arc::new(Mutex::new(HashMap::new())), + module_order: Arc::new(Mutex::new(Vec::new())), state_handle, emit_tx, + lua_tx, entry_point: config.lua_entry_point(), module_path: config.lua_module_path(), }) } fn reload_internal(&mut self) -> Result<()> { + self.run_on_unload(); + self.cancel_all_timers(); self.state_handle.clear_subscriptions(); self.lua = Lua::new(); self.handlers .lock() .expect("lua handlers mutex poisoned") .clear(); + self.watch_ids + .lock() + .expect("lua watch ids mutex poisoned") + .clear(); + self.modules + .lock() + .expect("lua modules mutex poisoned") + .clear(); + self.module_decls + .lock() + .expect("lua module decls mutex poisoned") + .clear(); + self.module_order + .lock() + .expect("lua module order mutex poisoned") + .clear(); self.install_api()?; self.load_init_and_modules()?; + self.run_on_reload(); info!("lua runtime reloaded"); Ok(()) } @@ -162,16 +250,30 @@ impl LuaEngine { let handlers = self.handlers.clone(); let next_sub_id = self.next_sub_id.clone(); let state_handle = self.state_handle.clone(); + let current_module = self.current_module.clone(); let on_fn = self.lua.create_function(move |lua, (pattern, callback): (String, Function)| { let id = SubscriptionId(next_sub_id.fetch_add(1, Ordering::Relaxed)); let key = lua.create_registry_value(callback)?; + let module = current_module + .lock() + .map_err(|_| LuaError::external("module context lock poisoned"))? + .clone(); handlers .lock() - .map_err(|_| mlua::Error::external("handler lock poisoned"))? - .insert(id, key); + .map_err(|_| LuaError::external("handler lock poisoned"))? + .insert( + id, + HandlerEntry { + callback: key, + filter: None, + module, + raw_kind: None, + kind: HandlerKind::Event, + }, + ); state_handle .register_subscription(id, pattern, false) - .map_err(mlua::Error::external)?; + .map_err(LuaError::external)?; Ok(id.0) })?; bread.set("on", on_fn)?; @@ -179,20 +281,93 @@ impl LuaEngine { let handlers = self.handlers.clone(); let next_sub_id = self.next_sub_id.clone(); let state_handle = self.state_handle.clone(); + let current_module = self.current_module.clone(); let once_fn = self.lua.create_function(move |lua, (pattern, callback): (String, Function)| { let id = SubscriptionId(next_sub_id.fetch_add(1, Ordering::Relaxed)); let key = lua.create_registry_value(callback)?; + let module = current_module + .lock() + .map_err(|_| LuaError::external("module context lock poisoned"))? + .clone(); handlers .lock() - .map_err(|_| mlua::Error::external("handler lock poisoned"))? - .insert(id, key); + .map_err(|_| LuaError::external("handler lock poisoned"))? + .insert( + id, + HandlerEntry { + callback: key, + filter: None, + module, + raw_kind: None, + kind: HandlerKind::Event, + }, + ); state_handle .register_subscription(id, pattern, true) - .map_err(mlua::Error::external)?; + .map_err(LuaError::external)?; Ok(id.0) })?; bread.set("once", once_fn)?; + let handlers = self.handlers.clone(); + let next_sub_id = self.next_sub_id.clone(); + let state_handle = self.state_handle.clone(); + let current_module = self.current_module.clone(); + let filter_fn = self + .lua + .create_function(move |lua, (pattern, callback, opts): (String, Function, Option)| { + let id = SubscriptionId(next_sub_id.fetch_add(1, Ordering::Relaxed)); + let key = lua.create_registry_value(callback)?; + let filter = if let Some(opts) = opts { + let filter_fn: Function = opts + .get("filter") + .map_err(|_| LuaError::external("missing filter function"))?; + Some(lua.create_registry_value(filter_fn)?) + } else { + return Err(LuaError::external("missing filter options")); + }; + let module = current_module + .lock() + .map_err(|_| LuaError::external("module context lock poisoned"))? + .clone(); + handlers + .lock() + .map_err(|_| LuaError::external("handler lock poisoned"))? + .insert( + id, + HandlerEntry { + callback: key, + filter, + module, + raw_kind: None, + kind: HandlerKind::Event, + }, + ); + state_handle + .register_subscription(id, pattern, false) + .map_err(LuaError::external)?; + Ok(id.0) + })?; + bread.set("filter", filter_fn)?; + + let handlers = self.handlers.clone(); + let watch_ids = self.watch_ids.clone(); + let state_handle = self.state_handle.clone(); + let off_fn = self.lua.create_function(move |_lua, id: u64| { + let sub_id = SubscriptionId(id); + if let Ok(mut map) = handlers.lock() { + map.remove(&sub_id); + } + state_handle.remove_subscription(sub_id); + if let Ok(mut set) = watch_ids.lock() { + if set.remove(&sub_id) { + state_handle.remove_watch(sub_id); + } + } + Ok(()) + })?; + bread.set("off", off_fn)?; + let emit_tx = self.emit_tx.clone(); let emit_fn = self.lua.create_function(move |lua, (event_name, payload): (String, Value)| { let data = match payload { @@ -203,7 +378,7 @@ impl LuaEngine { }; emit_tx .send(BreadEvent::new(event_name, AdapterSource::System, data)) - .map_err(|_| mlua::Error::external("event channel closed"))?; + .map_err(|_| LuaError::external("event channel closed"))?; Ok(()) })?; bread.set("emit", emit_fn)?; @@ -211,24 +386,91 @@ impl LuaEngine { let state_arc = self.state_handle.state_arc(); let state_tbl = self.lua.create_table()?; let get_fn = self.lua.create_function(move |lua, path: String| { - let snapshot = state_arc.blocking_read(); - let mut value = serde_json::to_value(&*snapshot) - .map_err(|e| mlua::Error::external(e.to_string()))?; - if path.is_empty() { - return lua - .to_value(&value) - .map_err(|e| mlua::Error::external(e.to_string())); - } - for part in path.split('.') { - value = value - .get(part) - .cloned() - .ok_or_else(|| mlua::Error::external("state path not found"))?; - } - lua.to_value(&value) - .map_err(|e| mlua::Error::external(e.to_string())) + state_value_to_lua(lua, &state_arc, &path) })?; state_tbl.set("get", get_fn)?; + + let state_arc = self.state_handle.state_arc(); + let monitors_fn = self + .lua + .create_function(move |lua, ()| state_value_to_lua(lua, &state_arc, "monitors"))?; + state_tbl.set("monitors", monitors_fn)?; + + let state_arc = self.state_handle.state_arc(); + let active_ws_fn = self + .lua + .create_function(move |lua, ()| state_value_to_lua(lua, &state_arc, "active_workspace"))?; + state_tbl.set("active_workspace", active_ws_fn)?; + + let state_arc = self.state_handle.state_arc(); + let active_win_fn = self + .lua + .create_function(move |lua, ()| state_value_to_lua(lua, &state_arc, "active_window"))?; + state_tbl.set("active_window", active_win_fn)?; + + let state_arc = self.state_handle.state_arc(); + let devices_fn = self + .lua + .create_function(move |lua, ()| state_value_to_lua(lua, &state_arc, "devices"))?; + state_tbl.set("devices", devices_fn)?; + + let state_arc = self.state_handle.state_arc(); + let power_fn = self + .lua + .create_function(move |lua, ()| state_value_to_lua(lua, &state_arc, "power"))?; + state_tbl.set("power", power_fn)?; + + let state_arc = self.state_handle.state_arc(); + let network_fn = self + .lua + .create_function(move |lua, ()| state_value_to_lua(lua, &state_arc, "network"))?; + state_tbl.set("network", network_fn)?; + + let state_arc = self.state_handle.state_arc(); + let profile_state_fn = self + .lua + .create_function(move |lua, ()| state_value_to_lua(lua, &state_arc, "profile"))?; + state_tbl.set("profile", profile_state_fn)?; + + let handlers = self.handlers.clone(); + let watch_ids = self.watch_ids.clone(); + let next_sub_id = self.next_sub_id.clone(); + let state_handle = self.state_handle.clone(); + let current_module = self.current_module.clone(); + let watch_fn = self.lua.create_function(move |lua, (path, callback): (String, Function)| { + let id = SubscriptionId(next_sub_id.fetch_add(1, Ordering::Relaxed)); + let key = lua.create_registry_value(callback)?; + let module = current_module + .lock() + .map_err(|_| LuaError::external("module context lock poisoned"))? + .clone(); + handlers + .lock() + .map_err(|_| LuaError::external("handler lock poisoned"))? + .insert( + id, + HandlerEntry { + callback: key, + filter: None, + module, + raw_kind: None, + kind: HandlerKind::StateWatch, + }, + ); + watch_ids + .lock() + .map_err(|_| LuaError::external("watch id lock poisoned"))? + .insert(id); + state_handle + .register_watch(id, path.clone()) + .map_err(LuaError::external)?; + state_handle + .register_subscription(id, format!("bread.state.changed.{path}"), false) + .map_err(LuaError::external)?; + Ok(id.0) + })?; + state_tbl.set("watch", watch_fn)?; + bread.set("state", state_tbl)?; let profile_tbl = self.lua.create_table()?; @@ -240,9 +482,6 @@ impl LuaEngine { profile_tbl.set("activate", activate_fn)?; bread.set("profile", profile_tbl)?; - // Fire-and-forget: the process is launched on a blocking thread and the - // Lua handler returns immediately. The Lua runtime is never stalled waiting - // for a slow or hanging process. Exit code is logged but not returned to Lua. let exec_fn = self.lua.create_function(move |_lua, cmd: String| { task::spawn_blocking(move || { match std::process::Command::new("sh") @@ -264,7 +503,249 @@ impl LuaEngine { })?; bread.set("exec", exec_fn)?; + let timers = self.timers.clone(); + let next_timer_id = self.next_timer_id.clone(); + let lua_tx = self.lua_tx.clone(); + let after_fn = self.lua.create_function(move |lua, (delay_ms, callback): (u64, Function)| { + let id = TimerId(next_timer_id.fetch_add(1, Ordering::Relaxed)); + let key = lua.create_registry_value(callback)?; + let (cancel_tx, mut cancel_rx) = watch::channel(false); + timers + .lock() + .map_err(|_| LuaError::external("timer lock poisoned"))? + .insert( + id, + TimerEntry { + callback: key, + repeating: false, + cancel_tx, + }, + ); + let lua_tx = lua_tx.clone(); + task::spawn(async move { + tokio::select! { + _ = sleep(Duration::from_millis(delay_ms)) => { + if !*cancel_rx.borrow() { + let _ = lua_tx.send(LuaMessage::TimerFired { id }); + } + } + _ = cancel_rx.changed() => {} + } + }); + Ok(id.0) + })?; + bread.set("after", after_fn)?; + + let timers = self.timers.clone(); + let next_timer_id = self.next_timer_id.clone(); + let lua_tx = self.lua_tx.clone(); + let every_fn = self.lua.create_function(move |lua, (interval_ms, callback): (u64, Function)| { + let id = TimerId(next_timer_id.fetch_add(1, Ordering::Relaxed)); + let key = lua.create_registry_value(callback)?; + let (cancel_tx, mut cancel_rx) = watch::channel(false); + timers + .lock() + .map_err(|_| LuaError::external("timer lock poisoned"))? + .insert( + id, + TimerEntry { + callback: key, + repeating: true, + cancel_tx, + }, + ); + let lua_tx = lua_tx.clone(); + task::spawn(async move { + let mut ticker = interval(Duration::from_millis(interval_ms)); + loop { + tokio::select! { + _ = ticker.tick() => { + if *cancel_rx.borrow() { + break; + } + let _ = lua_tx.send(LuaMessage::TimerFired { id }); + } + _ = cancel_rx.changed() => { + if *cancel_rx.borrow() { + break; + } + } + } + } + }); + Ok(id.0) + })?; + bread.set("every", every_fn)?; + + let timers = self.timers.clone(); + let cancel_fn = self.lua.create_function(move |_lua, id: u64| { + let timer_id = TimerId(id); + if let Ok(mut map) = timers.lock() { + if let Some(entry) = map.remove(&timer_id) { + let _ = entry.cancel_tx.send(true); + } + } + Ok(()) + })?; + bread.set("cancel", cancel_fn)?; + + let hyprland_tbl = self.lua.create_table()?; + let dispatch_fn = self.lua.create_function(move |_lua, (cmd, args): (String, String)| { + let resp = hyprland_request(&format!("dispatch {cmd} {args}")) + .map_err(|e| LuaError::external(e.to_string()))?; + Ok(resp) + })?; + hyprland_tbl.set("dispatch", dispatch_fn)?; + + let keyword_fn = self.lua.create_function(move |_lua, (key, value): (String, String)| { + let resp = hyprland_request(&format!("keyword {key} {value}")) + .map_err(|e| LuaError::external(e.to_string()))?; + Ok(resp) + })?; + hyprland_tbl.set("keyword", keyword_fn)?; + + let active_window_fn = self.lua.create_function(move |lua, ()| { + let resp = hyprland_request("j/activewindow") + .map_err(|e| LuaError::external(e.to_string()))?; + let json: JsonValue = serde_json::from_str(&resp) + .map_err(|e| LuaError::external(e.to_string()))?; + lua.to_value(&json) + .map_err(|e| LuaError::external(e.to_string())) + })?; + hyprland_tbl.set("active_window", active_window_fn)?; + + let monitors_fn = self.lua.create_function(move |lua, ()| { + let resp = hyprland_request("j/monitors") + .map_err(|e| LuaError::external(e.to_string()))?; + let json: JsonValue = serde_json::from_str(&resp) + .map_err(|e| LuaError::external(e.to_string()))?; + lua.to_value(&json) + .map_err(|e| LuaError::external(e.to_string())) + })?; + hyprland_tbl.set("monitors", monitors_fn)?; + + let workspaces_fn = self.lua.create_function(move |lua, ()| { + let resp = hyprland_request("j/workspaces") + .map_err(|e| LuaError::external(e.to_string()))?; + let json: JsonValue = serde_json::from_str(&resp) + .map_err(|e| LuaError::external(e.to_string()))?; + lua.to_value(&json) + .map_err(|e| LuaError::external(e.to_string())) + })?; + hyprland_tbl.set("workspaces", workspaces_fn)?; + + let clients_fn = self.lua.create_function(move |lua, ()| { + let resp = hyprland_request("j/clients") + .map_err(|e| LuaError::external(e.to_string()))?; + let json: JsonValue = serde_json::from_str(&resp) + .map_err(|e| LuaError::external(e.to_string()))?; + lua.to_value(&json) + .map_err(|e| LuaError::external(e.to_string())) + })?; + hyprland_tbl.set("clients", clients_fn)?; + + let handlers = self.handlers.clone(); + let next_sub_id = self.next_sub_id.clone(); + let state_handle = self.state_handle.clone(); + let current_module = self.current_module.clone(); + let on_raw_fn = self + .lua + .create_function(move |lua, (event, callback): (String, Function)| { + let id = SubscriptionId(next_sub_id.fetch_add(1, Ordering::Relaxed)); + let key = lua.create_registry_value(callback)?; + let module = current_module + .lock() + .map_err(|_| LuaError::external("module context lock poisoned"))? + .clone(); + handlers + .lock() + .map_err(|_| LuaError::external("handler lock poisoned"))? + .insert( + id, + HandlerEntry { + callback: key, + filter: None, + module, + raw_kind: Some(event), + kind: HandlerKind::Event, + }, + ); + state_handle + .register_subscription(id, "bread.hyprland.event".to_string(), false) + .map_err(LuaError::external)?; + Ok(id.0) + })?; + hyprland_tbl.set("on_raw", on_raw_fn)?; + bread.set("hyprland", hyprland_tbl)?; + + let modules = self.modules.clone(); + let module_decls = self.module_decls.clone(); + let current_module = self.current_module.clone(); + let state_arc = self.state_handle.state_arc(); + let module_fn = self.lua.create_function(move |lua, decl: Table| { + let name: String = decl.get("name")?; + let expected = current_module + .lock() + .map_err(|_| LuaError::external("module context lock poisoned"))? + .clone(); + if expected.as_deref() != Some(&name) { + return Err(LuaError::external("module name does not match current load")); + } + + let decl = module_decls + .lock() + .map_err(|_| LuaError::external("module decls lock poisoned"))? + .get(&name) + .cloned() + .ok_or_else(|| LuaError::external("module declaration not found"))?; + + let module_tbl = lua.create_table()?; + module_tbl.set("name", decl.name.clone())?; + if let Some(version) = decl.version.clone() { + module_tbl.set("version", version)?; + } + + let store_tbl = lua.create_table()?; + let module_name = decl.name.clone(); + let state_arc_get = state_arc.clone(); + let get_fn = lua.create_function(move |lua, key: String| { + if let Some(value) = module_store_get(&state_arc_get, &module_name, &key) { + return lua + .to_value(&value) + .map_err(|e| LuaError::external(e.to_string())); + } + Ok(Value::Nil) + })?; + store_tbl.set("get", get_fn)?; + + let module_name = decl.name.clone(); + let state_arc_set = state_arc.clone(); + let set_fn = lua.create_function(move |lua, (key, value): (String, Value)| { + let json = lua + .from_value::(value) + .unwrap_or_else(|_| JsonValue::Null); + module_store_set(&state_arc_set, &module_name, key, json); + Ok(()) + })?; + store_tbl.set("set", set_fn)?; + module_tbl.set("store", store_tbl)?; + + let key = lua.create_registry_value(module_tbl.clone())?; + modules + .lock() + .map_err(|_| LuaError::external("module registry lock poisoned"))? + .insert( + decl.name.clone(), + ModuleInfo { table_key: key }, + ); + + Ok(module_tbl) + })?; + bread.set("module", module_fn)?; + globals.set("bread", bread)?; + self.install_require_loader()?; + self.install_wait_helper()?; Ok(()) } @@ -273,20 +754,15 @@ impl LuaEngine { let mut files = list_lua_files(&self.module_path)?; files.sort(); - for path in files { - let module_name = path - .file_stem() - .and_then(|v| v.to_str()) - .unwrap_or("unknown") - .to_string(); - match self.load_lua_file(&path, &module_name) { - Ok(()) => { - self.state_handle - .set_module_status(module_name, ModuleLoadState::Loaded, None); - } + + let mut decls = Vec::new(); + for path in files.into_iter().filter(|p| !is_lib_path(&self.module_path, p)) { + match self.scan_module_decl(&path) { + Ok(decl) => decls.push(decl), Err(err) => { + let name = module_name_from_path(&self.module_path, &path); self.state_handle.set_module_status( - module_name, + name, ModuleLoadState::LoadError, Some(err.to_string()), ); @@ -294,6 +770,60 @@ impl LuaEngine { } } + let (ordered, dep_errors) = order_module_decls(decls); + + let mut decl_map = self + .module_decls + .lock() + .expect("module decls mutex poisoned"); + decl_map.clear(); + for decl in &ordered { + decl_map.insert(decl.name.clone(), decl.clone()); + } + drop(decl_map); + + for (name, err) in dep_errors { + self.state_handle + .set_module_status(name, ModuleLoadState::LoadError, Some(err)); + } + + let mut load_order = Vec::new(); + for decl in ordered { + load_order.push(decl.name.clone()); + match self.load_module(&decl) { + Ok(()) => { + self.state_handle + .set_module_status(decl.name.clone(), ModuleLoadState::Loaded, None); + } + Err(err) => { + self.state_handle.set_module_status( + decl.name.clone(), + ModuleLoadState::LoadError, + Some(err.to_string()), + ); + } + } + } + + *self + .module_order + .lock() + .expect("module order mutex poisoned") = load_order; + + Ok(()) + } + + fn load_module(&self, decl: &ModuleDecl) -> Result<()> { + self.set_current_module(Some(decl.name.clone())); + let result = self.load_lua_file(&decl.path, &decl.name); + self.set_current_module(None); + result?; + + if !self.module_is_registered(&decl.name) { + return Err(anyhow!("module did not call bread.module")); + } + + self.run_on_load(&decl.name); Ok(()) } @@ -314,14 +844,83 @@ impl LuaEngine { } fn handle_event(&self, id: SubscriptionId, event: BreadEvent) -> Result<()> { - let handlers = self.handlers.lock().expect("lua handlers mutex poisoned"); - let Some(reg) = handlers.get(&id) else { - return Ok(()); + let (callback, filter, raw_kind, kind, module) = { + let handlers = self.handlers.lock().expect("lua handlers mutex poisoned"); + let Some(entry) = handlers.get(&id) else { + return Ok(()); + }; + let callback: Function = self.lua.registry_value(&entry.callback)?; + let filter = match entry.filter.as_ref() { + Some(key) => Some(self.lua.registry_value::(key)?), + None => None, + }; + ( + callback, + filter, + entry.raw_kind.clone(), + entry.kind, + entry.module.clone(), + ) }; - let callback: Function = self.lua.registry_value(reg)?; - let event_value = self.lua.to_value(&event)?; - if let Err(err) = callback.call::<_, ()>(event_value) { + + if let Some(kind) = raw_kind.as_deref() { + let matches = event + .data + .get("kind") + .and_then(JsonValue::as_str) + .map(|k| k == kind) + .unwrap_or(false); + if !matches { + return Ok(()); + } + } + + if let Some(filter) = filter { + let event_value = self.lua.to_value(&event)?; + let allowed = filter.call::<_, bool>(event_value).unwrap_or(false); + if !allowed { + return Ok(()); + } + } + + let result = match kind { + HandlerKind::Event => { + let event_value = self.lua.to_value(&event)?; + callback.call::<_, ()>(event_value) + } + HandlerKind::StateWatch => { + let new_val = event.data.get("new").cloned().unwrap_or(JsonValue::Null); + let old_val = event.data.get("old").cloned().unwrap_or(JsonValue::Null); + let new_lua = self.lua.to_value(&new_val)?; + let old_lua = self.lua.to_value(&old_val)?; + callback.call::<_, ()>((new_lua, old_lua)) + } + }; + + if let Err(err) = result { error!(subscription = id.0, error = %err, "lua callback failed"); + self.handle_callback_error(module.as_deref(), id, err); + } + Ok(()) + } + + fn handle_timer(&self, id: TimerId) -> Result<()> { + let (callback, repeating) = { + let timers = self.timers.lock().expect("lua timers mutex poisoned"); + let Some(entry) = timers.get(&id) else { + return Ok(()); + }; + let callback: Function = self.lua.registry_value(&entry.callback)?; + (callback, entry.repeating) + }; + if let Err(err) = callback.call::<_, ()>(()) { + error!(timer = id.0, error = %err, "lua timer callback failed"); + } + + if !repeating { + if let Ok(mut map) = self.timers.lock() { + map.remove(&id); + } } Ok(()) } @@ -331,6 +930,388 @@ impl LuaEngine { map.remove(&id); } } + + fn run_on_load(&self, name: &str) { + if let Some(hook) = self.get_module_hook(name, "on_load") { + if let Err(err) = hook.call::<_, ()>(()) { + error!(module = %name, error = %err, "module on_load failed"); + self.state_handle + .set_module_status(name.to_string(), ModuleLoadState::LoadError, Some(err.to_string())); + } + } + } + + fn run_on_reload(&self) { + let order = self + .module_order + .lock() + .expect("module order mutex poisoned") + .clone(); + for name in order { + if let Some(hook) = self.get_module_hook(&name, "on_reload") { + if let Err(err) = hook.call::<_, ()>(()) { + error!(module = %name, error = %err, "module on_reload failed"); + self.state_handle.set_module_status( + name.to_string(), + ModuleLoadState::Degraded, + Some(err.to_string()), + ); + } + } + } + } + + fn run_on_unload(&self) { + let order = self + .module_order + .lock() + .expect("module order mutex poisoned") + .clone(); + for name in order.into_iter().rev() { + if let Some(hook) = self.get_module_hook(&name, "on_unload") { + if let Err(err) = hook.call::<_, ()>(()) { + error!(module = %name, error = %err, "module on_unload failed"); + self.state_handle.set_module_status( + name.to_string(), + ModuleLoadState::Degraded, + Some(err.to_string()), + ); + } + } + } + } + + fn handle_callback_error(&self, module: Option<&str>, id: SubscriptionId, err: LuaError) { + if let Some(module) = module { + self.state_handle.set_module_status( + module.to_string(), + ModuleLoadState::Degraded, + Some(err.to_string()), + ); + if let Some(hook) = self.get_module_hook(module, "on_error") { + match hook.call::<_, bool>(err.to_string()) { + Ok(keep) => { + if !keep { + self.remove_handler(id); + self.state_handle.remove_subscription(id); + self.state_handle.remove_watch(id); + } + } + Err(hook_err) => { + error!(module = %module, error = %hook_err, "module on_error failed"); + } + } + } + } + } + + fn get_module_hook(&self, name: &str, hook: &str) -> Option> { + let modules = self.modules.lock().ok()?; + let info = modules.get(name)?; + let table: Table = self.lua.registry_value(&info.table_key).ok()?; + match table.get::<_, Value>(hook).ok()? { + Value::Function(func) => Some(func), + _ => None, + } + } + + fn module_is_registered(&self, name: &str) -> bool { + self.modules + .lock() + .map(|map| map.contains_key(name)) + .unwrap_or(false) + } + + fn set_current_module(&self, name: Option) { + if let Ok(mut guard) = self.current_module.lock() { + *guard = name; + } + } + + fn cancel_all_timers(&self) { + if let Ok(mut map) = self.timers.lock() { + for (_, entry) in map.drain() { + let _ = entry.cancel_tx.send(true); + } + } + } + + fn scan_module_decl(&self, path: &Path) -> Result { + const MODULE_DECL_ABORT: &str = "__bread_module_decl__"; + let lua = Lua::new(); + let decl_cell: Rc>> = Rc::new(RefCell::new(None)); + let decl_cell_cloned = decl_cell.clone(); + let module_path = path.to_path_buf(); + + let module_fn = lua.create_function(move |_lua, table: Table| -> mlua::Result<()> { + let name: String = table.get("name")?; + let version: Option = table.get("version").ok(); + let after: Vec = table.get("after").unwrap_or_default(); + *decl_cell_cloned.borrow_mut() = Some(ModuleDecl { + name, + version, + after, + path: module_path.clone(), + }); + Err(LuaError::RuntimeError(MODULE_DECL_ABORT.to_string())) + })?; + + let bread = lua.create_table()?; + bread.set("module", module_fn)?; + lua.globals().set("bread", bread)?; + + let src = fs::read_to_string(path)?; + let result = lua.load(&src).set_name(path.to_string_lossy().as_ref()).exec(); + if let Err(err) = result { + match err { + LuaError::RuntimeError(msg) if msg == MODULE_DECL_ABORT => {} + other => return Err(anyhow!(other.to_string())), + } + } + + let decl = decl_cell.borrow().clone(); + decl.ok_or_else(|| anyhow!("module missing bread.module declaration")) + } + + fn install_require_loader(&self) -> Result<()> { + let module_path = self.module_path.clone(); + let loader = self.lua.create_function(move |lua, name: String| { + if !name.starts_with("bread.") { + return Ok(Value::Nil); + } + + let rel = name.trim_start_matches("bread.").replace('.', "/"); + let path = module_path.join(format!("{rel}.lua")); + if !path.exists() { + return Ok(Value::Nil); + } + + let src = fs::read_to_string(&path) + .map_err(|e| LuaError::external(e.to_string()))?; + let func = lua + .load(&src) + .set_name(path.to_string_lossy().as_ref()) + .into_function() + .map_err(|e| LuaError::external(e.to_string()))?; + Ok(Value::Function(func)) + })?; + + let globals = self.lua.globals(); + let bread: Table = globals.get("bread")?; + bread.set("__require_loader", loader)?; + + self.lua.load( + r#" + local searchers = package.searchers or package.loaders + if searchers then + table.insert(searchers, 1, function(name) + return bread.__require_loader(name) + end) + end + "#, + ) + .exec()?; + + Ok(()) + } + + fn install_wait_helper(&self) -> Result<()> { + self.lua + .load( + r#" + bread.wait = function(pattern, opts) + if type(pattern) ~= "string" then + error("bread.wait requires a pattern string") + end + opts = opts or {} + local co = coroutine.running() + if not co then + error("bread.wait must be called inside a coroutine") + end + local id + local timer + id = bread.once(pattern, function(event) + if timer then + bread.cancel(timer) + end + coroutine.resume(co, event) + end) + if opts.timeout then + timer = bread.after(opts.timeout, function() + bread.off(id) + coroutine.resume(co, nil) + end) + end + return coroutine.yield() + end + "#, + ) + .exec()?; + Ok(()) + } +} + +fn order_module_decls(decls: Vec) -> (Vec, Vec<(String, String)>) { + let mut errors = Vec::new(); + let mut map: HashMap = HashMap::new(); + for decl in decls { + if map.contains_key(&decl.name) { + errors.push((decl.name.clone(), "duplicate module name".to_string())); + continue; + } + map.insert(decl.name.clone(), decl); + } + + let mut deps: HashMap> = HashMap::new(); + let mut reverse: HashMap> = HashMap::new(); + let mut invalid: HashSet = HashSet::new(); + + for (name, decl) in map.iter() { + let mut missing = Vec::new(); + for dep in &decl.after { + if map.contains_key(dep) { + deps.entry(name.clone()).or_default().insert(dep.clone()); + reverse.entry(dep.clone()).or_default().insert(name.clone()); + } else { + missing.push(dep.clone()); + } + } + if !missing.is_empty() { + errors.push(( + name.clone(), + format!("missing dependency: {}", missing.join(", ")), + )); + invalid.insert(name.clone()); + } + } + + let mut ready: Vec = map + .keys() + .filter(|name| !deps.contains_key(*name) && !invalid.contains(*name)) + .cloned() + .collect(); + ready.sort(); + + let mut ordered = Vec::new(); + let mut deps = deps; + + while let Some(name) = ready.pop() { + if let Some(decl) = map.get(&name) { + ordered.push(decl.clone()); + } + if let Some(children) = reverse.remove(&name) { + for child in children { + if invalid.contains(&child) { + continue; + } + if let Some(entry) = deps.get_mut(&child) { + entry.remove(&name); + if entry.is_empty() { + deps.remove(&child); + ready.push(child); + ready.sort(); + } + } + } + } + } + + for (name, _) in deps { + if !invalid.contains(&name) { + errors.push((name, "circular dependency".to_string())); + } + } + + (ordered, errors) +} + +fn module_name_from_path(module_root: &Path, path: &Path) -> String { + let rel = path.strip_prefix(module_root).unwrap_or(path); + let mut name = rel + .with_extension("") + .to_string_lossy() + .replace('/', "."); + if name.starts_with('.') { + name.remove(0); + } + name +} + +fn is_lib_path(module_root: &Path, path: &Path) -> bool { + let rel = path.strip_prefix(module_root).unwrap_or(path); + rel.components() + .next() + .and_then(|c| c.as_os_str().to_str()) + .map(|c| c == "lib") + .unwrap_or(false) +} + +fn state_value_to_lua<'lua>( + lua: &'lua Lua, + state_arc: &Arc>, + path: &str, +) -> mlua::Result> { + let snapshot = state_arc.blocking_read(); + let mut value = serde_json::to_value(&*snapshot) + .map_err(|e| LuaError::external(e.to_string()))?; + if path.is_empty() { + return lua + .to_value(&value) + .map_err(|e| LuaError::external(e.to_string())); + } + for part in path.split('.') { + value = value + .get(part) + .cloned() + .ok_or_else(|| LuaError::external("state path not found"))?; + } + lua.to_value(&value) + .map_err(|e| LuaError::external(e.to_string())) +} + +fn module_store_get(state_arc: &Arc>, module: &str, key: &str) -> Option { + let guard = state_arc.blocking_read(); + let entry = guard.modules.iter().find(|m| m.name == module)?; + entry.store.get(key).cloned() +} + +fn module_store_set(state_arc: &Arc>, module: &str, key: String, value: JsonValue) { + let mut guard = state_arc.blocking_write(); + if let Some(entry) = guard.modules.iter_mut().find(|m| m.name == module) { + entry.store.insert(key, value); + return; + } + + let mut store = HashMap::new(); + store.insert(key, value); + guard.modules.push(crate::core::types::ModuleStatus { + name: module.to_string(), + status: ModuleLoadState::Loaded, + last_error: None, + store, + }); +} + +fn hyprland_request_socket() -> Result { + let instance = std::env::var("HYPRLAND_INSTANCE_SIGNATURE") + .map_err(|_| anyhow!("HYPRLAND_INSTANCE_SIGNATURE is not set"))?; + let runtime = std::env::var("XDG_RUNTIME_DIR").unwrap_or_else(|_| "/tmp".to_string()); + Ok(PathBuf::from(runtime) + .join("hypr") + .join(instance) + .join(".socket.sock")) +} + +fn hyprland_request(request: &str) -> Result { + use std::io::{Read, Write}; + use std::os::unix::net::UnixStream; + + let socket = hyprland_request_socket()?; + let mut stream = UnixStream::connect(socket)?; + stream.write_all(request.as_bytes())?; + let mut buffer = String::new(); + stream.read_to_string(&mut buffer)?; + Ok(buffer) } fn list_lua_files(root: &Path) -> Result> {