Compare commits

...

No commits in common. "e746d55730d3956deeb4ef871e4289157f2a2c40" and "3e7a1fa064da24a21643e634dd6f9e22fc622ab8" have entirely different histories.

14 changed files with 816 additions and 98 deletions

View file

@ -10,7 +10,7 @@
# GPU_SERVER_URL — URL of your GPU inference server (Ollama, vLLM, or cf-orch coordinator). # GPU_SERVER_URL — URL of your GPU inference server (Ollama, vLLM, or cf-orch coordinator).
# Paid+ users: leave unset to auto-default to https://orch.circuitforge.tech via CF_LICENSE_KEY. # Paid+ users: leave unset to auto-default to https://orch.circuitforge.tech via CF_LICENSE_KEY.
# Local Ollama (default if unset): http://localhost:11434 # Local Ollama (default if unset): http://localhost:11434
# Local cf-orch coordinator: http://10.1.10.71:7700 # Local cf-orch coordinator: http://<YOUR_HOST_IP>:7700
# CF_ORCH_URL is also accepted as a backward-compatible alias. # CF_ORCH_URL is also accepted as a backward-compatible alias.
# GPU_SERVER_URL=http://localhost:11434 # GPU_SERVER_URL=http://localhost:11434

View file

@ -1,64 +1,81 @@
"""Ollama embedding client with sqlite-vec storage — BSL licensed.""" """Context chunk embedding — BSL licensed.
Thin wrapper around app.services.embeddings that handles the DB I/O for
context_chunks. All backend configuration (model, device, backend type) is
delegated to the service layer via TURNSTONE_EMBED_* env vars.
Re-exports EMBEDDING_AVAILABLE so callers that imported it from here continue
to work without changes.
"""
from __future__ import annotations from __future__ import annotations
import logging import logging
import sqlite3 import sqlite3
import struct
from pathlib import Path from pathlib import Path
import httpx from app.services.embeddings import (
EMBEDDING_AVAILABLE, # re-export for backward compat
get_embedder,
pack_vector,
)
__all__ = ["EMBEDDING_AVAILABLE", "embed_chunks"]
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
EMBEDDING_AVAILABLE: bool = False
try:
import sqlite_vec # type: ignore[import] # noqa: F401
EMBEDDING_AVAILABLE = True
logger.debug("sqlite-vec loaded — embedding pipeline enabled")
except ImportError:
logger.debug("sqlite-vec not available — embedding pipeline disabled")
def embed_chunks( def embed_chunks(
db_path: Path, db_path: Path,
document_id: str, document_id: str,
llm_url: str, # Legacy params kept for backward compat — ignored when the ST backend is active.
model: str = "nomic-embed-text", llm_url: str = "",
model: str = "",
timeout: float = 60.0, timeout: float = 60.0,
) -> int: ) -> int:
"""Embed all unembedded chunks for a document. Returns count embedded. No-op when EMBEDDING_AVAILABLE is False.""" """Embed all un-embedded chunks for *document_id*.
if not EMBEDDING_AVAILABLE:
Uses the configured embedder (sentence-transformers by default; Ollama when
TURNSTONE_EMBED_BACKEND=ollama). Returns the count of newly embedded chunks.
Returns 0 silently when no embedder is available.
The legacy ``llm_url`` and ``model`` parameters are accepted but ignored when
the sentence-transformers backend is active configure via env vars instead.
"""
embedder = get_embedder()
if embedder is None:
return 0 return 0
conn = sqlite3.connect(str(db_path)) conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA journal_mode=WAL")
conn.row_factory = sqlite3.Row conn.row_factory = sqlite3.Row
rows = conn.execute( rows = conn.execute(
"SELECT id, text FROM context_chunks WHERE document_id=? AND embedding IS NULL", "SELECT id, text FROM context_chunks WHERE document_id = ? AND embedding IS NULL",
(document_id,), (document_id,),
).fetchall() ).fetchall()
if not rows:
conn.close()
return 0
texts = [r["text"] for r in rows]
ids = [r["id"] for r in rows]
count = 0 count = 0
for row in rows:
try: try:
resp = httpx.post( vectors = embedder.embed_batch(texts)
f"{llm_url.rstrip('/')}/api/embeddings", for chunk_id, vec in zip(ids, vectors):
json={"model": model, "prompt": row["text"]}, blob = pack_vector(vec)
timeout=timeout,
)
resp.raise_for_status()
vector: list[float] = resp.json().get("embedding") or []
if vector:
blob = struct.pack(f"{len(vector)}f", *vector)
conn.execute( conn.execute(
"UPDATE context_chunks SET embedding=? WHERE id=?", "UPDATE context_chunks SET embedding = ? WHERE id = ?",
(blob, row["id"]), (blob, chunk_id),
) )
count += 1 count += 1
except Exception as exc:
logger.warning("Embedding chunk %s failed: %s", row["id"], exc)
conn.commit() conn.commit()
except Exception as exc:
logger.warning("Batch embedding failed for document %s: %s", document_id, exc)
finally:
conn.close() conn.close()
logger.debug("Embedded %d chunk(s) for document %s", count, document_id)
return count return count

View file

@ -1,10 +1,30 @@
"""Context retrieval — structured keyword lookup (Free) + chunk search — MIT licensed.""" """Context retrieval — structured keyword lookup (Free) + chunk search — MIT licensed.
Two retrieval modes for context_chunks:
Vector search cosine similarity over stored embeddings (when available)
Keyword search LIKE-based fallback when no embedder is configured
Both modes are called from retrieve_context(); the best available mode is used
automatically so callers need not check EMBEDDING_AVAILABLE themselves.
"""
from __future__ import annotations from __future__ import annotations
import logging
import sqlite3 import sqlite3
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
import numpy as np
from app.services.embeddings import (
EMBEDDING_AVAILABLE,
cosine_similarity,
get_embedder,
unpack_vector,
)
logger = logging.getLogger(__name__)
@dataclass @dataclass
class RetrievedContext: class RetrievedContext:
@ -12,6 +32,8 @@ class RetrievedContext:
chunks: list[dict[str, str]] = field(default_factory=list) chunks: list[dict[str, str]] = field(default_factory=list)
# ── Structured fact retrieval (always runs) ───────────────────────────────────
def get_relevant_facts(db_path: Path, query: str) -> list[dict[str, str]]: def get_relevant_facts(db_path: Path, query: str) -> list[dict[str, str]]:
"""Keyword match against context_facts. Always runs — Free tier.""" """Keyword match against context_facts. Always runs — Free tier."""
try: try:
@ -42,8 +64,68 @@ def get_relevant_facts(db_path: Path, query: str) -> list[dict[str, str]]:
return [] return []
def _search_chunks(db_path: Path, query: str) -> list[dict[str, str]]: # ── Chunk retrieval: vector path ──────────────────────────────────────────────
"""Keyword search across context_chunks. Fallback when no embeddings."""
def _search_chunks_vector(
db_path: Path,
query: str,
top_k: int = 3,
) -> list[dict[str, str]]:
"""Cosine similarity search over embedded context_chunks.
Loads all stored embeddings into memory and scores in-process with numpy.
Skips any chunk whose BLOB dimension does not match the current model dim
(stale embeddings from a previous model they will be re-embedded on the
next document upload).
Returns at most *top_k* results ordered by similarity descending.
"""
embedder = get_embedder()
if embedder is None:
return []
try:
query_vec: np.ndarray = embedder.embed(query)
model_dim: int = embedder.dim
except Exception as exc:
logger.warning("Query embedding failed: %s", exc)
return []
try:
conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA journal_mode=WAL")
conn.row_factory = sqlite3.Row
rows = conn.execute(
"SELECT cc.id, cc.text, cc.embedding, cd.filename"
" FROM context_chunks cc"
" JOIN context_documents cd ON cc.document_id = cd.id"
" WHERE cc.embedding IS NOT NULL"
).fetchall()
conn.close()
except sqlite3.OperationalError:
return []
scored: list[tuple[float, dict[str, str]]] = []
for row in rows:
blob: bytes = row["embedding"]
# Guard against blobs from a different-dimension model
if len(blob) // 4 != model_dim:
continue
try:
chunk_vec = unpack_vector(blob)
score = cosine_similarity(query_vec, chunk_vec)
scored.append((score, {"text": row["text"], "filename": row["filename"]}))
except Exception:
continue
scored.sort(key=lambda t: t[0], reverse=True)
return [item for _, item in scored[:top_k]]
# ── Chunk retrieval: keyword fallback ─────────────────────────────────────────
def _search_chunks_keyword(db_path: Path, query: str) -> list[dict[str, str]]:
"""LIKE-based keyword search across context_chunks. Fallback when no embedder."""
try: try:
conn = sqlite3.connect(str(db_path)) conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA journal_mode=WAL")
@ -66,16 +148,29 @@ def _search_chunks(db_path: Path, query: str) -> list[dict[str, str]]:
return [] return []
# ── Public interface ──────────────────────────────────────────────────────────
def retrieve_context(db_path: Path, query: str) -> RetrievedContext: def retrieve_context(db_path: Path, query: str) -> RetrievedContext:
"""Retrieve structured facts and relevant chunks for a query.""" """Retrieve structured facts and relevant chunks for a query.
return RetrievedContext(
facts=get_relevant_facts(db_path, query), Chunk retrieval uses vector search when an embedder is available and at
chunks=_search_chunks(db_path, query), least one embedded chunk exists; falls back to keyword search otherwise.
) """
facts = get_relevant_facts(db_path, query)
if EMBEDDING_AVAILABLE:
chunks = _search_chunks_vector(db_path, query)
if not chunks:
# Vector search returned nothing (no embedded chunks yet) — fall back.
chunks = _search_chunks_keyword(db_path, query)
else:
chunks = _search_chunks_keyword(db_path, query)
return RetrievedContext(facts=facts, chunks=chunks)
def format_context_block(ctx: RetrievedContext) -> str | None: def format_context_block(ctx: RetrievedContext) -> str | None:
"""Format context for injection into LLM prompt. Returns None when empty.""" """Format context for injection into an LLM prompt. Returns None when empty."""
lines: list[str] = [] lines: list[str] = []
if ctx.facts: if ctx.facts:
lines.append("Known environment facts:") lines.append("Known environment facts:")

View file

@ -119,6 +119,13 @@ CREATE TABLE IF NOT EXISTS blocklist_candidates (
CREATE INDEX IF NOT EXISTS idx_blocklist_device ON blocklist_candidates(source_device_ip); CREATE INDEX IF NOT EXISTS idx_blocklist_device ON blocklist_candidates(source_device_ip);
CREATE INDEX IF NOT EXISTS idx_blocklist_status ON blocklist_candidates(status); CREATE INDEX IF NOT EXISTS idx_blocklist_status ON blocklist_candidates(status);
CREATE INDEX IF NOT EXISTS idx_blocklist_domain ON blocklist_candidates(domain_or_ip); CREATE INDEX IF NOT EXISTS idx_blocklist_domain ON blocklist_candidates(domain_or_ip);
CREATE TABLE IF NOT EXISTS glean_fingerprints (
path TEXT PRIMARY KEY,
mtime REAL NOT NULL,
size INTEGER NOT NULL,
gleaned_at TEXT NOT NULL
);
""" """
@ -139,6 +146,44 @@ def ensure_schema(db_path: Path) -> None:
conn.close() conn.close()
def _fingerprint(path: Path) -> tuple[float, int]:
"""Return (mtime, size) for a file — cheap identity check, no content read needed."""
st = path.stat()
return st.st_mtime, st.st_size
def _fp_unchanged(conn: sqlite3.Connection, path: Path, mtime: float, size: int) -> bool:
"""Return True only when the stored fingerprint exactly matches (mtime, size).
A smaller size (log rotation) or a larger size (new lines appended) both
return False so the caller re-gleams the file.
"""
row = conn.execute(
"SELECT mtime, size FROM glean_fingerprints WHERE path = ?",
(str(path),),
).fetchone()
if row is None:
return False
return row[0] == mtime and row[1] == size
def _save_fingerprint(
conn: sqlite3.Connection,
path: Path,
mtime: float,
size: int,
gleaned_at: str,
) -> None:
"""Upsert the fingerprint for *path* after a successful glean."""
conn.execute(
"""
INSERT OR REPLACE INTO glean_fingerprints (path, mtime, size, gleaned_at)
VALUES (?, ?, ?, ?)
""",
(str(path), mtime, size, gleaned_at),
)
def _detect_format(first_line: str) -> str: def _detect_format(first_line: str) -> str:
try: try:
obj = json.loads(first_line) obj = json.loads(first_line)
@ -236,6 +281,7 @@ def _glean_files(
pattern_file: Path | None = None, pattern_file: Path | None = None,
batch_size: int = 1000, batch_size: int = 1000,
source_id_map: dict[Path, str] | None = None, source_id_map: dict[Path, str] | None = None,
force: bool = False,
) -> dict[str, int]: ) -> dict[str, int]:
pattern_file = pattern_file or Path("patterns/default.yaml") pattern_file = pattern_file or Path("patterns/default.yaml")
patterns = load_patterns(pattern_file) patterns = load_patterns(pattern_file)
@ -249,9 +295,19 @@ def _glean_files(
conn.commit() conn.commit()
stats: dict[str, int] = {} stats: dict[str, int] = {}
skipped: list[str] = []
for log_file in files: for log_file in files:
source_id = source_id_map.get(log_file, log_file.stem) source_id = source_id_map.get(log_file, log_file.stem)
# Fingerprint check — skip files whose mtime+size haven't changed.
mtime, size = _fingerprint(log_file)
if not force and _fp_unchanged(conn, log_file, mtime, size):
logger.debug("Skipping unchanged file: %s", log_file.name)
skipped.append(log_file.name)
stats[source_id] = stats.get(source_id, 0)
continue
count = 0 count = 0
batch: list[RetrievedEntry] = [] batch: list[RetrievedEntry] = []
for entry in _parse_file(log_file, compiled, ingest_time, source_id=source_id): for entry in _parse_file(log_file, compiled, ingest_time, source_id=source_id):
@ -265,11 +321,18 @@ def _glean_files(
_write_batch(conn, batch) _write_batch(conn, batch)
conn.commit() conn.commit()
count += len(batch) count += len(batch)
_save_fingerprint(conn, log_file, mtime, size, ingest_time)
conn.commit()
stats[source_id] = stats.get(source_id, 0) + count stats[source_id] = stats.get(source_id, 0) + count
logger.info("Gleaned %d entries from %s (source: %s)", count, log_file.name, source_id) logger.info("Gleaned %d entries from %s (source: %s)", count, log_file.name, source_id)
conn.close() conn.close()
if skipped:
logger.info("Skipped %d unchanged file(s): %s", len(skipped), ", ".join(skipped))
logger.info("Building FTS index...") logger.info("Building FTS index...")
build_fts_index(db_path) build_fts_index(db_path)
logger.info("FTS index ready") logger.info("FTS index ready")
@ -429,19 +492,28 @@ def glean_dir(
db_path: Path, db_path: Path,
pattern_file: Path | None = None, pattern_file: Path | None = None,
batch_size: int = 1000, batch_size: int = 1000,
force: bool = False,
) -> dict[str, int]: ) -> dict[str, int]:
"""Glean all .jsonl and .log files from a corpus directory.""" """Glean all .jsonl and .log files from a corpus directory.
Pass ``force=True`` to bypass fingerprint checks and re-glean all files
regardless of whether they have changed since the last run.
"""
files = sorted(corpus_dir.glob("*.jsonl")) + sorted(corpus_dir.glob("*.log")) files = sorted(corpus_dir.glob("*.jsonl")) + sorted(corpus_dir.glob("*.log"))
return _glean_files(files, db_path, pattern_file, batch_size) return _glean_files(files, db_path, pattern_file, batch_size, force=force)
def glean_file( def glean_file(
log_file: Path, log_file: Path,
db_path: Path, db_path: Path,
pattern_file: Path | None = None, pattern_file: Path | None = None,
force: bool = False,
) -> dict[str, int]: ) -> dict[str, int]:
"""Glean a single log file (any supported format).""" """Glean a single log file (any supported format).
return _glean_files([log_file], db_path, pattern_file)
Pass ``force=True`` to re-glean even when the file fingerprint is unchanged.
"""
return _glean_files([log_file], db_path, pattern_file, force=force)
def glean_sources( def glean_sources(
@ -449,6 +521,7 @@ def glean_sources(
db_path: Path, db_path: Path,
pattern_file: Path | None = None, pattern_file: Path | None = None,
batch_size: int = 1000, batch_size: int = 1000,
force: bool = False,
) -> dict[str, int]: ) -> dict[str, int]:
"""Glean all sources listed in a sources.yaml config file. """Glean all sources listed in a sources.yaml config file.
@ -510,7 +583,7 @@ def glean_sources(
stats: dict[str, int] = {} stats: dict[str, int] = {}
if files: if files:
stats.update(_glean_files(files, db_path, pattern_file, batch_size, source_id_map)) stats.update(_glean_files(files, db_path, pattern_file, batch_size, source_id_map, force=force))
# ── SSH remote sources ───────────────────────────────────────────────── # ── SSH remote sources ─────────────────────────────────────────────────
if not ssh_sources: if not ssh_sources:

View file

@ -93,7 +93,7 @@ def search_logs(
Example: '"connection refused" OR "connection lost"' Example: '"connection refused" OR "connection lost"'
severity: Filter by level EMERGENCY, ALERT, CRITICAL, ERROR, WARN, NOTICE, INFO, DEBUG. severity: Filter by level EMERGENCY, ALERT, CRITICAL, ERROR, WARN, NOTICE, INFO, DEBUG.
source: Partial match on source_id. Format is 'corpus:host:service'. source: Partial match on source_id. Format is 'corpus:host:service'.
Example: 'xanderland:caddy' matches all Caddy entries from xanderland. Example: 'example-node:caddy' matches all Caddy entries from example-node.
pattern: Filter by named pattern tag applied at glean time. pattern: Filter by named pattern tag applied at glean time.
Known tags: auth_failure, connection_lost, oom, segfault, disk_full, Known tags: auth_failure, connection_lost, oom, segfault, disk_full,
timeout, caddy_tls_error, caddy_config_error, caddy_auth_error, timeout, caddy_tls_error, caddy_config_error, caddy_auth_error,

View file

@ -515,13 +515,20 @@ def delete_source(source_id: str) -> dict:
@router.post("/api/sources/{source_id}/glean") @router.post("/api/sources/{source_id}/glean")
def reglean_source(source_id: str, background_tasks: BackgroundTasks) -> dict: def reglean_source(
source_id: str,
background_tasks: BackgroundTasks,
force: Annotated[bool, Query(description="Bypass fingerprint check and re-glean even if file is unchanged")] = False,
) -> dict:
"""Trigger a re-glean for a configured source from sources.yaml. """Trigger a re-glean for a configured source from sources.yaml.
Handles both local file sources and SSH remote sources. For SSH sources, Handles both local file sources and SSH remote sources. For SSH sources,
the glean runs in the foreground and rebuilds the FTS index before returning the glean runs in the foreground and rebuilds the FTS index before returning
(same behaviour as local sources callers can rely on the count being final (same behaviour as local sources callers can rely on the count being final
when the response arrives). when the response arrives).
Use ``?force=true`` to bypass the fingerprint cache and re-glean the file
even if mtime and size appear unchanged since the last run.
""" """
sources_file = PATTERN_DIR / "sources.yaml" sources_file = PATTERN_DIR / "sources.yaml"
if not sources_file.exists(): if not sources_file.exists():
@ -536,6 +543,7 @@ def reglean_source(source_id: str, background_tasks: BackgroundTasks) -> dict:
if src.get("transport") == "ssh": if src.get("transport") == "ssh":
# SSH sources: open connection, glean all items, rebuild FTS inline. # SSH sources: open connection, glean all items, rebuild FTS inline.
# Fingerprint skipping applies only to local file sources.
stats = _glean_ssh_source(src, DB_PATH, PATTERN_FILE) stats = _glean_ssh_source(src, DB_PATH, PATTERN_FILE)
return {"source_id": source_id, "gleaned": sum(stats.values())} return {"source_id": source_id, "gleaned": sum(stats.values())}
@ -543,7 +551,7 @@ def reglean_source(source_id: str, background_tasks: BackgroundTasks) -> dict:
src_path = Path(src["path"]) src_path = Path(src["path"])
if not src_path.exists(): if not src_path.exists():
raise HTTPException(status_code=422, detail=f"Path does not exist: {src_path}") raise HTTPException(status_code=422, detail=f"Path does not exist: {src_path}")
stats = _glean_file(src_path, DB_PATH, PATTERN_FILE) stats = _glean_file(src_path, DB_PATH, PATTERN_FILE, force=force)
background_tasks.add_task(build_fts_index, DB_PATH) background_tasks.add_task(build_fts_index, DB_PATH)
return {"source_id": source_id, "gleaned": stats.get(source_id, sum(stats.values()))} return {"source_id": source_id, "gleaned": stats.get(source_id, sum(stats.values()))}
@ -656,8 +664,14 @@ def glean_task_status() -> dict:
@router.post("/api/tasks/glean") @router.post("/api/tasks/glean")
async def trigger_glean() -> dict: async def trigger_glean(
"""Manually trigger a glean of all configured sources. No-ops if already running.""" force: Annotated[bool, Query(description="Bypass fingerprint check and re-glean all sources")] = False,
) -> dict:
"""Manually trigger a glean of all configured sources. No-ops if already running.
Use ``?force=true`` to bypass the fingerprint cache and re-glean every local
file source even when mtime and size are unchanged since the last run.
"""
sources_file = PATTERN_DIR / "sources.yaml" sources_file = PATTERN_DIR / "sources.yaml"
if not sources_file.exists(): if not sources_file.exists():
raise HTTPException(status_code=404, detail="sources.yaml not found — configure log sources first") raise HTTPException(status_code=404, detail="sources.yaml not found — configure log sources first")
@ -665,6 +679,7 @@ async def trigger_glean() -> dict:
sources_file, DB_PATH, PATTERN_FILE, sources_file, DB_PATH, PATTERN_FILE,
submit_endpoint=SUBMIT_ENDPOINT or None, submit_endpoint=SUBMIT_ENDPOINT or None,
source_host=SOURCE_HOST, source_host=SOURCE_HOST,
force=force,
) )

229
app/services/embeddings.py Normal file
View file

@ -0,0 +1,229 @@
"""Configurable embedding service — BSL licensed.
Backends:
sentence_transformers local in-process inference (default, no server needed)
ollama HTTP to a running Ollama instance
Configuration (env vars):
TURNSTONE_EMBED_BACKEND sentence_transformers | ollama (default: sentence_transformers)
TURNSTONE_EMBED_MODEL model name/path (backend-specific default)
TURNSTONE_EMBED_DEVICE cpu | cuda (default: cpu; ST backend only)
TURNSTONE_LLM_URL Ollama base URL (default: http://localhost:11434)
When no backend is importable/reachable, EMBEDDING_AVAILABLE is False and all
embed calls return empty arrays callers must handle this gracefully.
"""
from __future__ import annotations
import logging
import os
import struct
from typing import Protocol, runtime_checkable
import numpy as np
logger = logging.getLogger(__name__)
# ── Public availability flag ──────────────────────────────────────────────────
EMBEDDING_AVAILABLE: bool = False
# ── Config ────────────────────────────────────────────────────────────────────
_BACKEND = os.environ.get("TURNSTONE_EMBED_BACKEND", "sentence_transformers").lower()
_DEVICE = os.environ.get("TURNSTONE_EMBED_DEVICE", "cpu").lower()
_LLM_URL = os.environ.get("TURNSTONE_LLM_URL", "http://localhost:11434")
# BAAI/bge-small-en-v1.5: 33MB, MIT, 49M downloads/month, 384-dim, 512-token max.
# Benchmarked as the best quality-to-size ratio in the field (MTEB 62.17).
# all-MiniLM-L6-v2 is a viable lighter alternative (23MB, 256-token max) if
# inference speed is the primary constraint.
_DEFAULT_MODEL: dict[str, str] = {
"sentence_transformers": "BAAI/bge-small-en-v1.5",
"ollama": "nomic-embed-text",
}
_MODEL = os.environ.get(
"TURNSTONE_EMBED_MODEL",
_DEFAULT_MODEL.get(_BACKEND, "sentence-transformers/all-MiniLM-L6-v2"),
)
# ── Protocol ──────────────────────────────────────────────────────────────────
@runtime_checkable
class Embedder(Protocol):
"""Minimal interface all embedding backends must satisfy."""
@property
def dim(self) -> int:
"""Embedding dimension produced by this model."""
...
@property
def model_name(self) -> str:
"""Human-readable model identifier."""
...
def embed(self, text: str) -> np.ndarray:
"""Embed a single string. Returns 1-D float32 array of length dim."""
...
def embed_batch(self, texts: list[str]) -> list[np.ndarray]:
"""Embed a list of strings. Returns list of 1-D float32 arrays."""
...
# ── sentence-transformers backend ─────────────────────────────────────────────
class SentenceTransformerEmbedder:
"""Local in-process embedding via the sentence-transformers library.
The model is downloaded from HuggingFace on first instantiation and cached
at ~/.cache/huggingface/. Subsequent starts use the local cache.
"""
def __init__(self, model_name: str = _MODEL, device: str = _DEVICE) -> None:
from sentence_transformers import SentenceTransformer # type: ignore[import]
logger.info("Loading embedding model %r on device %r ...", model_name, device)
self._model = SentenceTransformer(model_name, device=device)
self._model_name = model_name
# Infer dimension from a test embed rather than hard-coding
self._dim: int = int(self._model.encode("test").shape[0])
logger.info("Embedding model ready — dim=%d", self._dim)
@property
def dim(self) -> int:
return self._dim
@property
def model_name(self) -> str:
return self._model_name
def embed(self, text: str) -> np.ndarray:
vec = self._model.encode(text, convert_to_numpy=True, normalize_embeddings=True)
return vec.astype(np.float32)
def embed_batch(self, texts: list[str]) -> list[np.ndarray]:
if not texts:
return []
vecs = self._model.encode(
texts, convert_to_numpy=True, normalize_embeddings=True, batch_size=32
)
return [v.astype(np.float32) for v in vecs]
# ── Ollama backend ────────────────────────────────────────────────────────────
class OllamaEmbedder:
"""HTTP embedding via a running Ollama instance."""
def __init__(
self,
model_name: str = _MODEL,
llm_url: str = _LLM_URL,
timeout: float = 30.0,
) -> None:
import httpx # already a project dependency
self._model_name = model_name
self._url = f"{llm_url.rstrip('/')}/api/embeddings"
self._timeout = timeout
self._client = httpx.Client(timeout=timeout)
# Probe dimension with a test call
self._dim = self._probe_dim()
def _probe_dim(self) -> int:
try:
vec = self._raw_embed("probe")
return len(vec)
except Exception as exc:
logger.warning("Ollama dim probe failed (%s) — defaulting to 768", exc)
return 768
def _raw_embed(self, text: str) -> list[float]:
resp = self._client.post(
self._url, json={"model": self._model_name, "prompt": text}
)
resp.raise_for_status()
return resp.json().get("embedding") or []
@property
def dim(self) -> int:
return self._dim
@property
def model_name(self) -> str:
return self._model_name
def embed(self, text: str) -> np.ndarray:
vec = self._raw_embed(text)
return np.array(vec, dtype=np.float32)
def embed_batch(self, texts: list[str]) -> list[np.ndarray]:
return [self.embed(t) for t in texts]
# ── Singleton factory ─────────────────────────────────────────────────────────
_embedder: Embedder | None = None
def get_embedder() -> Embedder | None:
"""Return the configured embedder singleton, or None when unavailable.
Lazy-initialises on first call. Callers should check EMBEDDING_AVAILABLE
or test for None rather than calling this unconditionally.
"""
global _embedder, EMBEDDING_AVAILABLE
if _embedder is not None:
return _embedder
if _BACKEND == "sentence_transformers":
try:
_embedder = SentenceTransformerEmbedder(_MODEL, _DEVICE)
EMBEDDING_AVAILABLE = True
except ImportError:
logger.warning(
"sentence-transformers not installed — embeddings disabled. "
"Install with: pip install sentence-transformers"
)
except Exception as exc:
logger.warning("Failed to load sentence-transformers model %r: %s", _MODEL, exc)
elif _BACKEND == "ollama":
try:
_embedder = OllamaEmbedder(_MODEL, _LLM_URL)
EMBEDDING_AVAILABLE = True
except Exception as exc:
logger.warning("Ollama embedder init failed: %s", exc)
else:
logger.warning("Unknown TURNSTONE_EMBED_BACKEND %r — embeddings disabled", _BACKEND)
return _embedder
# ── BLOB serialisation helpers ────────────────────────────────────────────────
def pack_vector(vec: np.ndarray) -> bytes:
"""Serialise a float32 numpy vector to a SQLite BLOB."""
arr = vec.astype(np.float32)
return struct.pack(f"{len(arr)}f", *arr.tolist())
def unpack_vector(blob: bytes) -> np.ndarray:
"""Deserialise a SQLite BLOB back to a float32 numpy vector."""
n = len(blob) // 4 # 4 bytes per float32
return np.array(struct.unpack(f"{n}f", blob), dtype=np.float32)
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""Cosine similarity between two L2-normalised vectors.
Both vectors are re-normalised defensively so callers need not pre-normalise.
Returns 0.0 when either vector has zero norm.
"""
norm_a = np.linalg.norm(a)
norm_b = np.linalg.norm(b)
if norm_a == 0.0 or norm_b == 0.0:
return 0.0
return float(np.dot(a, b) / (norm_a * norm_b))

View file

@ -88,7 +88,7 @@ def summarize(
logger.debug("Task endpoint unavailable (%s) — falling back to direct model", exc) logger.debug("Task endpoint unavailable (%s) — falling back to direct model", exc)
# Fallback: OpenAI-compat endpoint with explicit model name (local instances, # Fallback: OpenAI-compat endpoint with explicit model name (local instances,
# xanderland, or any cf-orch that doesn't have task assignments loaded). # example-node, or any cf-orch that doesn't have task assignments loaded).
try: try:
resp = httpx.post( resp = httpx.post(
f"{llm_url.rstrip('/')}/v1/chat/completions", f"{llm_url.rstrip('/')}/v1/chat/completions",

View file

@ -121,8 +121,13 @@ async def run_once(
pattern_file: Path | None = None, pattern_file: Path | None = None,
submit_endpoint: str | None = None, submit_endpoint: str | None = None,
source_host: str = "unknown", source_host: str = "unknown",
force: bool = False,
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Ingest all sources once, then submit matched entries if configured.""" """Ingest all sources once, then submit matched entries if configured.
Pass ``force=True`` to bypass fingerprint checks and re-glean all local
file sources regardless of whether they appear unchanged.
"""
if _lock.locked(): if _lock.locked():
return {"ok": False, "error": "glean already running", "skipped": True} return {"ok": False, "error": "glean already running", "skipped": True}
@ -133,7 +138,7 @@ async def run_once(
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
stats: dict[str, int] = await loop.run_in_executor( stats: dict[str, int] = await loop.run_in_executor(
None, None,
lambda: glean_sources(sources_file, db_path, pattern_file), lambda: glean_sources(sources_file, db_path, pattern_file, force=force),
) )
duration = (datetime.now(tz=timezone.utc) - started).total_seconds() duration = (datetime.now(tz=timezone.utc) - started).total_seconds()
_state.last_run_at = started.isoformat() _state.last_run_at = started.isoformat()

View file

@ -48,8 +48,8 @@ sources:
# ── Network syslog (router, switches, UniFi APs) ───────────────────────────── # ── Network syslog (router, switches, UniFi APs) ─────────────────────────────
# Written by syslog-receiver.service (UDP 5140 → /devl/turnstone-cluster/data/network-syslog.txt). # Written by syslog-receiver.service (UDP 5140 → /devl/turnstone-cluster/data/network-syslog.txt).
# Configure devices to send syslog to Heimdall:5140. # Configure devices to send syslog to Heimdall:5140.
# UniFi: Settings → System → Remote Logging → Syslog Host = 10.1.10.71:5140 # UniFi: Settings → System → Remote Logging → Syslog Host = <YOUR_HOST_IP>:5140
# Ubiquiti EdgeRouter: set system syslog host 10.1.10.71 facility all level debug # Ubiquiti EdgeRouter: set system syslog host <YOUR_HOST_IP> facility all level debug
# Managed switches: varies by vendor — target 10.1.10.71 UDP 5140 # Managed switches: varies by vendor — target <YOUR_HOST_IP> UDP 5140
- id: network-syslog - id: network-syslog
path: /data/network-syslog.txt path: /data/network-syslog.txt

View file

@ -46,7 +46,7 @@
# ── Adding Caddy reverse proxy ──────────────────────────────────────────────── # ── Adding Caddy reverse proxy ────────────────────────────────────────────────
# Add to /etc/caddy/Caddyfile: # Add to /etc/caddy/Caddyfile:
# #
# turnstone.xanderland.tv { # turnstone.example-node.tv {
# import protected # import protected
# reverse_proxy 10.0.0.10:8534 # reverse_proxy 10.0.0.10:8534
# import cloudflare # import cloudflare

View file

@ -1,13 +1,17 @@
"""Tests for app/context/embedder.py — graceful no-op without sqlite-vec.""" """Tests for app/context/embedder.py — delegates to app.services.embeddings."""
import sqlite3 import sqlite3
import struct
from pathlib import Path from pathlib import Path
from unittest.mock import patch from unittest.mock import MagicMock, patch
import numpy as np
import pytest import pytest
from app.context import embedder as emb_mod from app.context import embedder as emb_mod
@pytest.fixture @pytest.fixture()
def db(tmp_path): def db(tmp_path: Path) -> Path:
db_path = tmp_path / "t.db" db_path = tmp_path / "t.db"
conn = sqlite3.connect(str(db_path)) conn = sqlite3.connect(str(db_path))
conn.executescript(""" conn.executescript("""
@ -20,34 +24,78 @@ def db(tmp_path):
REFERENCES context_documents(id) ON DELETE CASCADE, REFERENCES context_documents(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL, text TEXT NOT NULL, embedding BLOB chunk_index INTEGER NOT NULL, text TEXT NOT NULL, embedding BLOB
); );
INSERT INTO context_documents VALUES ('d1','test.md','markdown','hello',5,'2026-01-01T00:00:00+00:00'); INSERT INTO context_documents
VALUES ('d1','test.md','markdown','hello',5,'2026-01-01T00:00:00+00:00');
INSERT INTO context_chunks VALUES ('c1','d1',0,'hello world',NULL); INSERT INTO context_chunks VALUES ('c1','d1',0,'hello world',NULL);
INSERT INTO context_chunks VALUES ('c2','d1',1,'second chunk',NULL);
""") """)
conn.commit() conn.commit()
conn.close() conn.close()
return db_path return db_path
def test_embed_skipped_when_extension_absent(db): def _mock_embedder(dim: int = 3) -> MagicMock:
with patch.object(emb_mod, "EMBEDDING_AVAILABLE", False): """Return a mock Embedder that returns constant dim-length vectors."""
count = emb_mod.embed_chunks(db, "d1", "http://localhost:11434") m = MagicMock()
m.dim = dim
m.embed_batch.return_value = [np.zeros(dim, dtype=np.float32)] * 10
return m
class TestEmbedChunks:
def test_returns_zero_when_no_embedder(self, db: Path) -> None:
with patch("app.context.embedder.get_embedder", return_value=None):
count = emb_mod.embed_chunks(db, "d1")
assert count == 0 assert count == 0
def test_returns_zero_when_no_unembedded_chunks(self, db: Path) -> None:
def test_embed_calls_ollama_when_available(db): # Pre-fill both chunks with a blob
import httpx blob = struct.pack("3f", 0.1, 0.2, 0.3)
class FakeResponse:
status_code = 200
def raise_for_status(self): pass
def json(self): return {"embedding": [0.1, 0.2, 0.3]}
with patch.object(emb_mod, "EMBEDDING_AVAILABLE", True), \
patch("app.context.embedder.httpx.post", return_value=FakeResponse()):
count = emb_mod.embed_chunks(db, "d1", "http://localhost:11434")
assert count == 1
# Verify blob was written
conn = sqlite3.connect(str(db)) conn = sqlite3.connect(str(db))
row = conn.execute("SELECT embedding FROM context_chunks WHERE id='c1'").fetchone() conn.execute("UPDATE context_chunks SET embedding=?", (blob,))
conn.commit()
conn.close() conn.close()
assert row[0] is not None
embedder = _mock_embedder()
with patch("app.context.embedder.get_embedder", return_value=embedder):
count = emb_mod.embed_chunks(db, "d1")
assert count == 0
embedder.embed_batch.assert_not_called()
def test_embeds_all_null_chunks(self, db: Path) -> None:
embedder = _mock_embedder(dim=3)
with patch("app.context.embedder.get_embedder", return_value=embedder):
count = emb_mod.embed_chunks(db, "d1")
assert count == 2 # two chunks in fixture
def test_blobs_written_to_db(self, db: Path) -> None:
vec = np.array([0.1, 0.2, 0.3], dtype=np.float32)
embedder = _mock_embedder(dim=3)
embedder.embed_batch.return_value = [vec, vec]
with patch("app.context.embedder.get_embedder", return_value=embedder):
emb_mod.embed_chunks(db, "d1")
conn = sqlite3.connect(str(db))
rows = conn.execute(
"SELECT embedding FROM context_chunks WHERE document_id='d1'"
).fetchall()
conn.close()
for (blob,) in rows:
assert blob is not None
unpacked = struct.unpack(f"{len(blob)//4}f", blob)
assert len(unpacked) == 3
def test_legacy_llm_url_param_accepted(self, db: Path) -> None:
"""Ensure backward-compat signature still works (llm_url ignored)."""
embedder = _mock_embedder()
with patch("app.context.embedder.get_embedder", return_value=embedder):
count = emb_mod.embed_chunks(db, "d1", "http://localhost:11434", "nomic-embed-text")
assert count == 2
def test_embed_batch_error_returns_zero(self, db: Path) -> None:
embedder = _mock_embedder()
embedder.embed_batch.side_effect = RuntimeError("model exploded")
with patch("app.context.embedder.get_embedder", return_value=embedder):
count = emb_mod.embed_chunks(db, "d1")
assert count == 0

View file

@ -0,0 +1,236 @@
"""Tests for fingerprint-based incremental glean skipping (issue #30).
Verifies that _glean_files() (and its public wrappers) skip local files whose
mtime+size fingerprint has not changed since the last glean, and that force=True
bypasses that check.
"""
from __future__ import annotations
import sqlite3
import time
from pathlib import Path
import pytest
from app.glean.pipeline import (
_fingerprint,
_fp_unchanged,
_save_fingerprint,
ensure_schema,
glean_dir,
glean_file,
)
from app.glean.base import now_iso
# ── Fixtures ──────────────────────────────────────────────────────────────────
@pytest.fixture()
def db_path(tmp_path: Path) -> Path:
path = tmp_path / "test.db"
ensure_schema(path)
return path
@pytest.fixture()
def log_file(tmp_path: Path) -> Path:
"""A minimal plaintext log file."""
f = tmp_path / "test.log"
f.write_text("May 24 10:00:00 heimdall kernel: test message\n")
return f
# ── Unit: fingerprint helpers ──────────────────────────────────────────────────
class TestFingerprintHelpers:
def test_fingerprint_returns_mtime_and_size(self, log_file: Path) -> None:
mtime, size = _fingerprint(log_file)
st = log_file.stat()
assert mtime == st.st_mtime
assert size == st.st_size
def test_fp_unchanged_returns_false_when_no_record(self, db_path: Path, log_file: Path) -> None:
conn = sqlite3.connect(str(db_path))
mtime, size = _fingerprint(log_file)
assert _fp_unchanged(conn, log_file, mtime, size) is False
conn.close()
def test_fp_unchanged_returns_true_after_save(self, db_path: Path, log_file: Path) -> None:
conn = sqlite3.connect(str(db_path))
mtime, size = _fingerprint(log_file)
_save_fingerprint(conn, log_file, mtime, size, now_iso())
conn.commit()
assert _fp_unchanged(conn, log_file, mtime, size) is True
conn.close()
def test_fp_unchanged_returns_false_on_size_change(self, db_path: Path, log_file: Path) -> None:
conn = sqlite3.connect(str(db_path))
mtime, size = _fingerprint(log_file)
_save_fingerprint(conn, log_file, mtime, size, now_iso())
conn.commit()
# Simulate size change (new content appended)
assert _fp_unchanged(conn, log_file, mtime, size + 1) is False
conn.close()
def test_fp_unchanged_returns_false_on_mtime_change(self, db_path: Path, log_file: Path) -> None:
conn = sqlite3.connect(str(db_path))
mtime, size = _fingerprint(log_file)
_save_fingerprint(conn, log_file, mtime, size, now_iso())
conn.commit()
assert _fp_unchanged(conn, log_file, mtime + 1.0, size) is False
conn.close()
def test_save_fingerprint_upserts(self, db_path: Path, log_file: Path) -> None:
"""Second save with different values replaces the first (UPSERT semantics)."""
conn = sqlite3.connect(str(db_path))
_save_fingerprint(conn, log_file, 1000.0, 100, "2026-01-01T00:00:00Z")
conn.commit()
_save_fingerprint(conn, log_file, 2000.0, 200, "2026-01-02T00:00:00Z")
conn.commit()
row = conn.execute(
"SELECT mtime, size FROM glean_fingerprints WHERE path = ?",
(str(log_file),),
).fetchone()
assert row == (2000.0, 200)
conn.close()
# ── Integration: glean_file skipping ─────────────────────────────────────────
class TestGleanFileFingerprint:
def test_first_glean_writes_fingerprint(self, db_path: Path, log_file: Path) -> None:
glean_file(log_file, db_path)
conn = sqlite3.connect(str(db_path))
row = conn.execute(
"SELECT mtime, size FROM glean_fingerprints WHERE path = ?",
(str(log_file),),
).fetchone()
conn.close()
assert row is not None
mtime, size = _fingerprint(log_file)
assert row == (mtime, size)
def test_second_glean_skips_unchanged_file(self, db_path: Path, log_file: Path) -> None:
stats_first = glean_file(log_file, db_path)
count_first = sum(stats_first.values())
# Re-glean without touching the file — should produce 0 new entries.
stats_second = glean_file(log_file, db_path)
count_second = sum(stats_second.values())
assert count_first >= 1, "First glean should find at least one entry"
assert count_second == 0, "Second glean should skip unchanged file"
def test_second_glean_runs_when_file_grows(self, db_path: Path, log_file: Path) -> None:
glean_file(log_file, db_path)
# Append a new line and update mtime by rewriting.
original = log_file.read_text()
log_file.write_text(original + "May 24 10:01:00 heimdall kernel: second message\n")
stats_second = glean_file(log_file, db_path)
# INSERT OR IGNORE means the original entry won't re-count, but parsing
# does happen — at minimum the new line is processed.
assert sum(stats_second.values()) >= 0 # glean ran (not skipped)
# Confirm fingerprint updated to new size.
conn = sqlite3.connect(str(db_path))
row = conn.execute(
"SELECT size FROM glean_fingerprints WHERE path = ?",
(str(log_file),),
).fetchone()
conn.close()
assert row is not None
assert row[0] == log_file.stat().st_size
def test_force_bypasses_fingerprint(self, db_path: Path, log_file: Path) -> None:
glean_file(log_file, db_path)
# Without force: skipped.
stats_no_force = glean_file(log_file, db_path)
assert sum(stats_no_force.values()) == 0
# With force: glean runs (INSERT OR IGNORE means count may be 0, but
# we verify the fingerprint was re-saved with a fresh gleaned_at).
conn_before = sqlite3.connect(str(db_path))
ts_before = conn_before.execute(
"SELECT gleaned_at FROM glean_fingerprints WHERE path = ?",
(str(log_file),),
).fetchone()[0]
conn_before.close()
time.sleep(0.01) # ensure gleaned_at advances
glean_file(log_file, db_path, force=True)
conn_after = sqlite3.connect(str(db_path))
ts_after = conn_after.execute(
"SELECT gleaned_at FROM glean_fingerprints WHERE path = ?",
(str(log_file),),
).fetchone()[0]
conn_after.close()
assert ts_after > ts_before, "force=True should update gleaned_at timestamp"
# ── Integration: glean_dir skipping ──────────────────────────────────────────
class TestGleanDirFingerprint:
def test_glean_dir_skips_unchanged_on_second_run(self, db_path: Path, tmp_path: Path) -> None:
log1 = tmp_path / "a.log"
log2 = tmp_path / "b.log"
log1.write_text("May 24 10:00:00 heimdall kernel: msg one\n")
log2.write_text("May 24 10:00:00 heimdall kernel: msg two\n")
glean_dir(tmp_path, db_path)
stats_second = glean_dir(tmp_path, db_path)
assert sum(stats_second.values()) == 0, "Both unchanged files should be skipped"
def test_glean_dir_force_reruns_all(self, db_path: Path, tmp_path: Path) -> None:
log1 = tmp_path / "a.log"
log1.write_text("May 24 10:00:00 heimdall kernel: msg one\n")
glean_dir(tmp_path, db_path)
# force=True: runs even though nothing changed; INSERT OR IGNORE keeps DB clean.
conn_before = sqlite3.connect(str(db_path))
ts_before = conn_before.execute(
"SELECT gleaned_at FROM glean_fingerprints WHERE path = ?",
(str(log1),),
).fetchone()[0]
conn_before.close()
time.sleep(0.01)
glean_dir(tmp_path, db_path, force=True)
conn_after = sqlite3.connect(str(db_path))
ts_after = conn_after.execute(
"SELECT gleaned_at FROM glean_fingerprints WHERE path = ?",
(str(log1),),
).fetchone()[0]
conn_after.close()
assert ts_after > ts_before
# ── Schema: ensure fingerprints table created ─────────────────────────────────
class TestEnsureSchema:
def test_fingerprints_table_exists_after_ensure_schema(self, tmp_path: Path) -> None:
db = tmp_path / "fresh.db"
ensure_schema(db)
conn = sqlite3.connect(str(db))
tables = {
row[0]
for row in conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()
}
conn.close()
assert "glean_fingerprints" in tables
def test_ensure_schema_idempotent(self, tmp_path: Path) -> None:
"""Calling ensure_schema twice on the same DB must not raise."""
db = tmp_path / "fresh.db"
ensure_schema(db)
ensure_schema(db) # second call — should be a no-op

View file

@ -4,24 +4,24 @@ from __future__ import annotations
from app.glean.syslog import is_syslog, parse from app.glean.syslog import is_syslog, parse
SYSLOG_SAMPLE = """\ SYSLOG_SAMPLE = """\
May 11 14:23:01 xanderland sshd[1234]: Accepted publickey for x from 192.168.1.1 port 54321 ssh2 May 11 14:23:01 example-node sshd[1234]: Accepted publickey for x from 192.168.1.1 port 54321 ssh2
May 11 14:23:05 xanderland sshd[1234]: Failed password for invalid user admin from 10.0.0.99 port 22 ssh2 May 11 14:23:05 example-node sshd[1234]: Failed password for invalid user admin from 10.0.0.99 port 22 ssh2
May 11 14:23:10 xanderland sudo[5678]: x : TTY=pts/0 ; PWD=/home/x ; USER=root ; COMMAND=/usr/bin/apt update May 11 14:23:10 example-node sudo[5678]: x : TTY=pts/0 ; PWD=/home/x ; USER=root ; COMMAND=/usr/bin/apt update
May 11 14:23:15 xanderland kernel: [12345.678] usb 1-1: USB disconnect, device number 2 May 11 14:23:15 example-node kernel: [12345.678] usb 1-1: USB disconnect, device number 2
May 1 04:00:00 xanderland CRON[9999]: (root) CMD (/usr/local/sbin/backup.sh) May 1 04:00:00 example-node CRON[9999]: (root) CMD (/usr/local/sbin/backup.sh)
May 11 14:24:00 xanderland systemd[1]: Started NetworkManager. May 11 14:24:00 example-node systemd[1]: Started NetworkManager.
""" """
class TestDetector: class TestDetector:
def test_detects_standard_line(self): def test_detects_standard_line(self):
assert is_syslog("May 11 14:23:01 xanderland sshd[1234]: message") assert is_syslog("May 11 14:23:01 example-node sshd[1234]: message")
def test_detects_no_pid(self): def test_detects_no_pid(self):
assert is_syslog("May 11 14:23:01 xanderland kernel: message") assert is_syslog("May 11 14:23:01 example-node kernel: message")
def test_detects_space_padded_day(self): def test_detects_space_padded_day(self):
assert is_syslog("May 1 04:00:00 xanderland CRON[9999]: message") assert is_syslog("May 1 04:00:00 example-node CRON[9999]: message")
def test_rejects_servarr(self): def test_rejects_servarr(self):
assert not is_syslog("2026-05-11 02:31:51.5|Info|ComponentName|Message") assert not is_syslog("2026-05-11 02:31:51.5|Info|ComponentName|Message")