robin/src-tauri/src/watcher/inotify.rs
pyr0ball 3c77969680 fix(m1): inotify — use read_to_end for UTF-8 resilience, await spawn_blocking
- read_to_end + from_utf8_lossy replaces read_to_string so Wine/game logs
  with Latin-1 bytes are handled via U+FFFD replacement instead of silently
  dropping all events from that file
- bytes_read from I/O call used for new_pos (not content.len()) for correct
  byte position accounting
- spawn_blocking handle is now awaited so panics inside the blocking task
  surface to the caller instead of being silently swallowed
2026-05-18 17:25:30 -07:00

124 lines
4 KiB
Rust

use super::{now_unix, EventSource, SystemEvent};
use notify::{recommended_watcher, RecursiveMode, Watcher};
use std::collections::HashMap;
use std::io::{Read, Seek};
use tokio::sync::mpsc;
pub async fn watch(log_paths: HashMap<String, String>, tx: mpsc::Sender<SystemEvent>) {
if log_paths.is_empty() {
return;
}
let expanded: HashMap<String, String> = log_paths
.iter()
.map(|(k, v)| (k.clone(), expand_tilde(v)))
.collect();
tokio::task::spawn_blocking(move || {
let (notify_tx, notify_rx) = std::sync::mpsc::channel();
let mut watcher = match recommended_watcher(notify_tx) {
Ok(w) => w,
Err(e) => {
log::error!("inotify watcher init failed: {e}");
return;
}
};
// positions: path -> (app_name, byte_offset)
let mut positions: HashMap<String, (String, u64)> = HashMap::new();
for (app_name, path) in &expanded {
let pb = std::path::Path::new(path);
if pb.exists() {
let len = std::fs::metadata(pb).map(|m| m.len()).unwrap_or(0);
positions.insert(path.clone(), (app_name.clone(), len));
watcher.watch(pb, RecursiveMode::NonRecursive).ok();
}
}
for result in notify_rx {
if let Ok(event) = result {
for path in event.paths {
let path_str = path.to_string_lossy().to_string();
if let Some((app_name, pos)) = positions.get_mut(&path_str) {
let (lines, new_pos) = read_new_lines(&path_str, *pos);
*pos = new_pos;
for line in lines {
tx.blocking_send(SystemEvent {
source: EventSource::AppLog {
app: app_name.clone(),
},
raw_line: line,
timestamp: now_unix(),
})
.ok();
}
}
}
}
}
})
.await
.ok();
}
pub fn expand_tilde(path: &str) -> String {
if let Some(rest) = path.strip_prefix("~/") {
let home = std::env::var("HOME").unwrap_or_else(|_| "/home/user".into());
format!("{home}/{rest}")
} else {
path.to_string()
}
}
pub fn read_new_lines(path: &str, from_byte: u64) -> (Vec<String>, u64) {
let mut file = match std::fs::File::open(path) {
Ok(f) => f,
Err(_) => return (vec![], from_byte),
};
if file.seek(std::io::SeekFrom::Start(from_byte)).is_err() {
return (vec![], from_byte);
}
let mut raw = Vec::new();
let bytes_read = file.read_to_end(&mut raw).unwrap_or(0);
let content = String::from_utf8_lossy(&raw).into_owned();
let new_pos = from_byte + bytes_read as u64;
let lines: Vec<String> = content
.lines()
.filter(|l| !l.is_empty())
.map(|l| l.to_string())
.collect();
(lines, new_pos)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn expand_tilde_home() {
let home = std::env::var("HOME").unwrap_or("/home/user".into());
let expanded = expand_tilde("~/.config/retroarch/retroarch.log");
assert!(expanded.starts_with(&home));
assert!(expanded.ends_with(".config/retroarch/retroarch.log"));
}
#[test]
fn expand_tilde_no_tilde() {
let path = "/absolute/path/to/file.log";
assert_eq!(expand_tilde(path), path);
}
#[tokio::test]
async fn reads_new_lines_from_file() {
use std::io::Write;
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.log");
let mut f = std::fs::File::create(&path).unwrap();
write!(f, "line one\n").unwrap();
let (lines, pos) = read_new_lines(path.to_str().unwrap(), 0);
assert_eq!(lines, vec!["line one".to_string()]);
assert!(pos > 0);
}
}