This commit is contained in:
Breadway 2026-05-11 16:30:05 +08:00
parent 16f3765b65
commit 65f81db562
11 changed files with 1192 additions and 67 deletions

View file

@ -1,18 +1,22 @@
use std::collections::{HashMap, VecDeque};
use std::fs;
use std::os::unix::fs::PermissionsExt;
use std::path::PathBuf;
use std::process;
use std::time::Instant;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use anyhow::{anyhow, Result};
use bread_shared::{AdapterSource, BreadEvent};
use bread_shared::{now_unix_ms, AdapterSource, BreadEvent};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::{broadcast, mpsc, watch};
use tokio::sync::{broadcast, mpsc, watch, RwLock};
use tracing::{error, info, warn};
use crate::adapters::AdapterStatus;
use crate::core::state_engine::StateHandle;
use crate::lua::RuntimeHandle;
@ -23,6 +27,9 @@ pub struct Server {
event_tx: broadcast::Sender<BreadEvent>,
lua_runtime: RuntimeHandle,
emit_tx: mpsc::UnboundedSender<BreadEvent>,
adapter_status: Arc<RwLock<HashMap<String, AdapterStatus>>>,
subscription_count: Arc<AtomicU64>,
event_buffer: Arc<std::sync::Mutex<VecDeque<BreadEvent>>>,
started_at: Instant,
pid: u32,
}
@ -51,6 +58,9 @@ impl Server {
event_tx: broadcast::Sender<BreadEvent>,
lua_runtime: RuntimeHandle,
emit_tx: mpsc::UnboundedSender<BreadEvent>,
adapter_status: Arc<RwLock<HashMap<String, AdapterStatus>>>,
subscription_count: Arc<AtomicU64>,
event_buffer: Arc<std::sync::Mutex<VecDeque<BreadEvent>>>,
) -> Self {
Self {
socket_path,
@ -58,6 +68,9 @@ impl Server {
event_tx,
lua_runtime,
emit_tx,
adapter_status,
subscription_count,
event_buffer,
started_at: Instant::now(),
pid: process::id(),
}
@ -166,12 +179,25 @@ impl Server {
let full = self.state_handle.state_dump().await;
Ok(full.get("modules").cloned().unwrap_or_else(|| json!([])))
}
"modules.reload" => self
.lua_runtime
.reload()
.await
.map(|_| json!({ "reloaded": true }))
.map_err(|e| e.to_string()),
"modules.reload" => {
let started = Instant::now();
if let Err(err) = self.lua_runtime.reload().await {
return Err((id, err.to_string()));
}
let duration_ms = started.elapsed().as_millis();
let modules = self
.state_handle
.state_dump()
.await
.get("modules")
.cloned()
.unwrap_or_else(|| json!([]));
Ok(json!({
"ok": true,
"duration_ms": duration_ms,
"modules": modules,
}))
}
"profile.list" => {
let full = self.state_handle.state_dump().await;
let profiles = full
@ -224,13 +250,36 @@ impl Server {
}
"health" => {
let uptime_ms = self.started_at.elapsed().as_millis();
let state = self.state_handle.state_dump().await;
let modules = state.get("modules").cloned().unwrap_or_else(|| json!([]));
let adapters = self.adapter_status.read().await.clone();
let subscription_count = self.subscription_count.load(std::sync::atomic::Ordering::Relaxed);
let recent_errors = self.lua_runtime.recent_errors();
Ok(json!({
"ok": true,
"pid": self.pid,
"version": env!("CARGO_PKG_VERSION"),
"uptime_ms": uptime_ms,
"socket": self.socket_path.to_string_lossy(),
"adapters": adapters,
"modules": modules,
"subscriptions": subscription_count,
"recent_errors": recent_errors,
}))
}
"events.replay" => {
let since_ms = req.params.get("since_ms").and_then(Value::as_u64).unwrap_or(0);
let cutoff = now_unix_ms().saturating_sub(since_ms);
let mut replay = Vec::new();
if let Ok(buf) = self.event_buffer.lock() {
for event in buf.iter() {
if event.timestamp >= cutoff {
replay.push(event);
}
}
}
Ok(serde_json::to_value(replay).unwrap_or_else(|_| json!([])))
}
_ => Err("unknown method".to_string()),
};