Daemon release-ready

This commit is contained in:
Breadway 2026-05-11 12:21:23 +08:00
parent 730a8b61d7
commit d537fc9318
9 changed files with 358 additions and 1207 deletions

View file

@ -1,21 +1,24 @@
use std::collections::HashMap;
use std::sync::Mutex;
use std::sync::RwLock;
use bread_shared::{AdapterSource, BreadEvent, RawEvent};
use serde_json::{json, Value};
use crate::core::types::DeviceClass;
/// How many multiples of `dedup_window_ms` an entry must be idle before eviction.
const EVICT_MULTIPLIER: u64 = 60;
pub struct EventNormalizer {
dedup_window_ms: u64,
recent: Mutex<HashMap<String, u64>>,
recent: RwLock<HashMap<String, u64>>,
}
impl EventNormalizer {
pub fn new(dedup_window_ms: u64) -> Self {
Self {
dedup_window_ms,
recent: Mutex::new(HashMap::new()),
recent: RwLock::new(HashMap::new()),
}
}
@ -164,16 +167,36 @@ impl EventNormalizer {
fn accept(&self, event: &BreadEvent) -> bool {
let key = format!("{}:{}", event.event, event.data);
let mut recent = self.recent.lock().expect("normalizer dedup mutex poisoned");
let now = event.timestamp;
// Fast path: check under read lock first.
{
let recent = self.recent.read().unwrap_or_else(|p| p.into_inner());
if let Some(last) = recent.get(&key) {
if now.saturating_sub(*last) < self.dedup_window_ms {
return false;
}
}
}
// Slow path: acquire write lock, re-check, insert, and periodically evict.
let mut recent = self.recent.write().unwrap_or_else(|p| p.into_inner());
// Re-check after acquiring write lock (another thread may have inserted between locks).
if let Some(last) = recent.get(&key) {
if now.saturating_sub(*last) < self.dedup_window_ms {
return false;
}
}
recent.insert(key, now);
recent.insert(key.clone(), now);
// Evict stale entries to prevent unbounded growth.
let evict_before = now.saturating_sub(self.dedup_window_ms.saturating_mul(EVICT_MULTIPLIER));
if evict_before > 0 {
recent.retain(|_, &mut last| last >= evict_before);
}
true
}
}

View file

@ -32,10 +32,19 @@ impl SubscriptionTable {
return false;
};
// swap_remove moves the last element into `idx`. We need to update by_id
// for that element. But first, remove its stale entry (it was at the last
// position before the swap); then re-insert it at the new position.
let last_idx = self.entries.len() - 1;
self.entries.swap_remove(idx);
if let Some(swapped) = self.entries.get(idx) {
self.by_id.insert(swapped.id, idx);
if idx < self.entries.len() {
// The element that was at `last_idx` is now at `idx`.
let swapped_id = self.entries[idx].id;
self.by_id.remove(&swapped_id); // remove stale last_idx entry
self.by_id.insert(swapped_id, idx);
}
true
}