Compare commits
No commits in common. "e746d55730d3956deeb4ef871e4289157f2a2c40" and "3e7a1fa064da24a21643e634dd6f9e22fc622ab8" have entirely different histories.
e746d55730
...
3e7a1fa064
14 changed files with 816 additions and 98 deletions
|
|
@ -10,7 +10,7 @@
|
|||
# GPU_SERVER_URL — URL of your GPU inference server (Ollama, vLLM, or cf-orch coordinator).
|
||||
# Paid+ users: leave unset to auto-default to https://orch.circuitforge.tech via CF_LICENSE_KEY.
|
||||
# Local Ollama (default if unset): http://localhost:11434
|
||||
# Local cf-orch coordinator: http://10.1.10.71:7700
|
||||
# Local cf-orch coordinator: http://<YOUR_HOST_IP>:7700
|
||||
# CF_ORCH_URL is also accepted as a backward-compatible alias.
|
||||
# GPU_SERVER_URL=http://localhost:11434
|
||||
|
||||
|
|
|
|||
|
|
@ -1,64 +1,81 @@
|
|||
"""Ollama embedding client with sqlite-vec storage — BSL licensed."""
|
||||
"""Context chunk embedding — BSL licensed.
|
||||
|
||||
Thin wrapper around app.services.embeddings that handles the DB I/O for
|
||||
context_chunks. All backend configuration (model, device, backend type) is
|
||||
delegated to the service layer via TURNSTONE_EMBED_* env vars.
|
||||
|
||||
Re-exports EMBEDDING_AVAILABLE so callers that imported it from here continue
|
||||
to work without changes.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import sqlite3
|
||||
import struct
|
||||
from pathlib import Path
|
||||
|
||||
import httpx
|
||||
from app.services.embeddings import (
|
||||
EMBEDDING_AVAILABLE, # re-export for backward compat
|
||||
get_embedder,
|
||||
pack_vector,
|
||||
)
|
||||
|
||||
__all__ = ["EMBEDDING_AVAILABLE", "embed_chunks"]
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
EMBEDDING_AVAILABLE: bool = False
|
||||
|
||||
try:
|
||||
import sqlite_vec # type: ignore[import] # noqa: F401
|
||||
EMBEDDING_AVAILABLE = True
|
||||
logger.debug("sqlite-vec loaded — embedding pipeline enabled")
|
||||
except ImportError:
|
||||
logger.debug("sqlite-vec not available — embedding pipeline disabled")
|
||||
|
||||
|
||||
def embed_chunks(
|
||||
db_path: Path,
|
||||
document_id: str,
|
||||
llm_url: str,
|
||||
model: str = "nomic-embed-text",
|
||||
# Legacy params kept for backward compat — ignored when the ST backend is active.
|
||||
llm_url: str = "",
|
||||
model: str = "",
|
||||
timeout: float = 60.0,
|
||||
) -> int:
|
||||
"""Embed all unembedded chunks for a document. Returns count embedded. No-op when EMBEDDING_AVAILABLE is False."""
|
||||
if not EMBEDDING_AVAILABLE:
|
||||
"""Embed all un-embedded chunks for *document_id*.
|
||||
|
||||
Uses the configured embedder (sentence-transformers by default; Ollama when
|
||||
TURNSTONE_EMBED_BACKEND=ollama). Returns the count of newly embedded chunks.
|
||||
Returns 0 silently when no embedder is available.
|
||||
|
||||
The legacy ``llm_url`` and ``model`` parameters are accepted but ignored when
|
||||
the sentence-transformers backend is active — configure via env vars instead.
|
||||
"""
|
||||
embedder = get_embedder()
|
||||
if embedder is None:
|
||||
return 0
|
||||
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.row_factory = sqlite3.Row
|
||||
|
||||
rows = conn.execute(
|
||||
"SELECT id, text FROM context_chunks WHERE document_id = ? AND embedding IS NULL",
|
||||
(document_id,),
|
||||
).fetchall()
|
||||
|
||||
if not rows:
|
||||
conn.close()
|
||||
return 0
|
||||
|
||||
texts = [r["text"] for r in rows]
|
||||
ids = [r["id"] for r in rows]
|
||||
|
||||
count = 0
|
||||
for row in rows:
|
||||
try:
|
||||
resp = httpx.post(
|
||||
f"{llm_url.rstrip('/')}/api/embeddings",
|
||||
json={"model": model, "prompt": row["text"]},
|
||||
timeout=timeout,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
vector: list[float] = resp.json().get("embedding") or []
|
||||
if vector:
|
||||
blob = struct.pack(f"{len(vector)}f", *vector)
|
||||
vectors = embedder.embed_batch(texts)
|
||||
for chunk_id, vec in zip(ids, vectors):
|
||||
blob = pack_vector(vec)
|
||||
conn.execute(
|
||||
"UPDATE context_chunks SET embedding = ? WHERE id = ?",
|
||||
(blob, row["id"]),
|
||||
(blob, chunk_id),
|
||||
)
|
||||
count += 1
|
||||
except Exception as exc:
|
||||
logger.warning("Embedding chunk %s failed: %s", row["id"], exc)
|
||||
|
||||
conn.commit()
|
||||
except Exception as exc:
|
||||
logger.warning("Batch embedding failed for document %s: %s", document_id, exc)
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
logger.debug("Embedded %d chunk(s) for document %s", count, document_id)
|
||||
return count
|
||||
|
|
|
|||
|
|
@ -1,10 +1,30 @@
|
|||
"""Context retrieval — structured keyword lookup (Free) + chunk search — MIT licensed."""
|
||||
"""Context retrieval — structured keyword lookup (Free) + chunk search — MIT licensed.
|
||||
|
||||
Two retrieval modes for context_chunks:
|
||||
Vector search — cosine similarity over stored embeddings (when available)
|
||||
Keyword search — LIKE-based fallback when no embedder is configured
|
||||
|
||||
Both modes are called from retrieve_context(); the best available mode is used
|
||||
automatically so callers need not check EMBEDDING_AVAILABLE themselves.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import sqlite3
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
|
||||
from app.services.embeddings import (
|
||||
EMBEDDING_AVAILABLE,
|
||||
cosine_similarity,
|
||||
get_embedder,
|
||||
unpack_vector,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class RetrievedContext:
|
||||
|
|
@ -12,6 +32,8 @@ class RetrievedContext:
|
|||
chunks: list[dict[str, str]] = field(default_factory=list)
|
||||
|
||||
|
||||
# ── Structured fact retrieval (always runs) ───────────────────────────────────
|
||||
|
||||
def get_relevant_facts(db_path: Path, query: str) -> list[dict[str, str]]:
|
||||
"""Keyword match against context_facts. Always runs — Free tier."""
|
||||
try:
|
||||
|
|
@ -42,8 +64,68 @@ def get_relevant_facts(db_path: Path, query: str) -> list[dict[str, str]]:
|
|||
return []
|
||||
|
||||
|
||||
def _search_chunks(db_path: Path, query: str) -> list[dict[str, str]]:
|
||||
"""Keyword search across context_chunks. Fallback when no embeddings."""
|
||||
# ── Chunk retrieval: vector path ──────────────────────────────────────────────
|
||||
|
||||
def _search_chunks_vector(
|
||||
db_path: Path,
|
||||
query: str,
|
||||
top_k: int = 3,
|
||||
) -> list[dict[str, str]]:
|
||||
"""Cosine similarity search over embedded context_chunks.
|
||||
|
||||
Loads all stored embeddings into memory and scores in-process with numpy.
|
||||
Skips any chunk whose BLOB dimension does not match the current model dim
|
||||
(stale embeddings from a previous model — they will be re-embedded on the
|
||||
next document upload).
|
||||
|
||||
Returns at most *top_k* results ordered by similarity descending.
|
||||
"""
|
||||
embedder = get_embedder()
|
||||
if embedder is None:
|
||||
return []
|
||||
|
||||
try:
|
||||
query_vec: np.ndarray = embedder.embed(query)
|
||||
model_dim: int = embedder.dim
|
||||
except Exception as exc:
|
||||
logger.warning("Query embedding failed: %s", exc)
|
||||
return []
|
||||
|
||||
try:
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.row_factory = sqlite3.Row
|
||||
rows = conn.execute(
|
||||
"SELECT cc.id, cc.text, cc.embedding, cd.filename"
|
||||
" FROM context_chunks cc"
|
||||
" JOIN context_documents cd ON cc.document_id = cd.id"
|
||||
" WHERE cc.embedding IS NOT NULL"
|
||||
).fetchall()
|
||||
conn.close()
|
||||
except sqlite3.OperationalError:
|
||||
return []
|
||||
|
||||
scored: list[tuple[float, dict[str, str]]] = []
|
||||
for row in rows:
|
||||
blob: bytes = row["embedding"]
|
||||
# Guard against blobs from a different-dimension model
|
||||
if len(blob) // 4 != model_dim:
|
||||
continue
|
||||
try:
|
||||
chunk_vec = unpack_vector(blob)
|
||||
score = cosine_similarity(query_vec, chunk_vec)
|
||||
scored.append((score, {"text": row["text"], "filename": row["filename"]}))
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
scored.sort(key=lambda t: t[0], reverse=True)
|
||||
return [item for _, item in scored[:top_k]]
|
||||
|
||||
|
||||
# ── Chunk retrieval: keyword fallback ─────────────────────────────────────────
|
||||
|
||||
def _search_chunks_keyword(db_path: Path, query: str) -> list[dict[str, str]]:
|
||||
"""LIKE-based keyword search across context_chunks. Fallback when no embedder."""
|
||||
try:
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
|
|
@ -66,16 +148,29 @@ def _search_chunks(db_path: Path, query: str) -> list[dict[str, str]]:
|
|||
return []
|
||||
|
||||
|
||||
# ── Public interface ──────────────────────────────────────────────────────────
|
||||
|
||||
def retrieve_context(db_path: Path, query: str) -> RetrievedContext:
|
||||
"""Retrieve structured facts and relevant chunks for a query."""
|
||||
return RetrievedContext(
|
||||
facts=get_relevant_facts(db_path, query),
|
||||
chunks=_search_chunks(db_path, query),
|
||||
)
|
||||
"""Retrieve structured facts and relevant chunks for a query.
|
||||
|
||||
Chunk retrieval uses vector search when an embedder is available and at
|
||||
least one embedded chunk exists; falls back to keyword search otherwise.
|
||||
"""
|
||||
facts = get_relevant_facts(db_path, query)
|
||||
|
||||
if EMBEDDING_AVAILABLE:
|
||||
chunks = _search_chunks_vector(db_path, query)
|
||||
if not chunks:
|
||||
# Vector search returned nothing (no embedded chunks yet) — fall back.
|
||||
chunks = _search_chunks_keyword(db_path, query)
|
||||
else:
|
||||
chunks = _search_chunks_keyword(db_path, query)
|
||||
|
||||
return RetrievedContext(facts=facts, chunks=chunks)
|
||||
|
||||
|
||||
def format_context_block(ctx: RetrievedContext) -> str | None:
|
||||
"""Format context for injection into LLM prompt. Returns None when empty."""
|
||||
"""Format context for injection into an LLM prompt. Returns None when empty."""
|
||||
lines: list[str] = []
|
||||
if ctx.facts:
|
||||
lines.append("Known environment facts:")
|
||||
|
|
|
|||
|
|
@ -119,6 +119,13 @@ CREATE TABLE IF NOT EXISTS blocklist_candidates (
|
|||
CREATE INDEX IF NOT EXISTS idx_blocklist_device ON blocklist_candidates(source_device_ip);
|
||||
CREATE INDEX IF NOT EXISTS idx_blocklist_status ON blocklist_candidates(status);
|
||||
CREATE INDEX IF NOT EXISTS idx_blocklist_domain ON blocklist_candidates(domain_or_ip);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS glean_fingerprints (
|
||||
path TEXT PRIMARY KEY,
|
||||
mtime REAL NOT NULL,
|
||||
size INTEGER NOT NULL,
|
||||
gleaned_at TEXT NOT NULL
|
||||
);
|
||||
"""
|
||||
|
||||
|
||||
|
|
@ -139,6 +146,44 @@ def ensure_schema(db_path: Path) -> None:
|
|||
conn.close()
|
||||
|
||||
|
||||
def _fingerprint(path: Path) -> tuple[float, int]:
|
||||
"""Return (mtime, size) for a file — cheap identity check, no content read needed."""
|
||||
st = path.stat()
|
||||
return st.st_mtime, st.st_size
|
||||
|
||||
|
||||
def _fp_unchanged(conn: sqlite3.Connection, path: Path, mtime: float, size: int) -> bool:
|
||||
"""Return True only when the stored fingerprint exactly matches (mtime, size).
|
||||
|
||||
A smaller size (log rotation) or a larger size (new lines appended) both
|
||||
return False so the caller re-gleams the file.
|
||||
"""
|
||||
row = conn.execute(
|
||||
"SELECT mtime, size FROM glean_fingerprints WHERE path = ?",
|
||||
(str(path),),
|
||||
).fetchone()
|
||||
if row is None:
|
||||
return False
|
||||
return row[0] == mtime and row[1] == size
|
||||
|
||||
|
||||
def _save_fingerprint(
|
||||
conn: sqlite3.Connection,
|
||||
path: Path,
|
||||
mtime: float,
|
||||
size: int,
|
||||
gleaned_at: str,
|
||||
) -> None:
|
||||
"""Upsert the fingerprint for *path* after a successful glean."""
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT OR REPLACE INTO glean_fingerprints (path, mtime, size, gleaned_at)
|
||||
VALUES (?, ?, ?, ?)
|
||||
""",
|
||||
(str(path), mtime, size, gleaned_at),
|
||||
)
|
||||
|
||||
|
||||
def _detect_format(first_line: str) -> str:
|
||||
try:
|
||||
obj = json.loads(first_line)
|
||||
|
|
@ -236,6 +281,7 @@ def _glean_files(
|
|||
pattern_file: Path | None = None,
|
||||
batch_size: int = 1000,
|
||||
source_id_map: dict[Path, str] | None = None,
|
||||
force: bool = False,
|
||||
) -> dict[str, int]:
|
||||
pattern_file = pattern_file or Path("patterns/default.yaml")
|
||||
patterns = load_patterns(pattern_file)
|
||||
|
|
@ -249,9 +295,19 @@ def _glean_files(
|
|||
conn.commit()
|
||||
|
||||
stats: dict[str, int] = {}
|
||||
skipped: list[str] = []
|
||||
|
||||
for log_file in files:
|
||||
source_id = source_id_map.get(log_file, log_file.stem)
|
||||
|
||||
# Fingerprint check — skip files whose mtime+size haven't changed.
|
||||
mtime, size = _fingerprint(log_file)
|
||||
if not force and _fp_unchanged(conn, log_file, mtime, size):
|
||||
logger.debug("Skipping unchanged file: %s", log_file.name)
|
||||
skipped.append(log_file.name)
|
||||
stats[source_id] = stats.get(source_id, 0)
|
||||
continue
|
||||
|
||||
count = 0
|
||||
batch: list[RetrievedEntry] = []
|
||||
for entry in _parse_file(log_file, compiled, ingest_time, source_id=source_id):
|
||||
|
|
@ -265,11 +321,18 @@ def _glean_files(
|
|||
_write_batch(conn, batch)
|
||||
conn.commit()
|
||||
count += len(batch)
|
||||
|
||||
_save_fingerprint(conn, log_file, mtime, size, ingest_time)
|
||||
conn.commit()
|
||||
|
||||
stats[source_id] = stats.get(source_id, 0) + count
|
||||
logger.info("Gleaned %d entries from %s (source: %s)", count, log_file.name, source_id)
|
||||
|
||||
conn.close()
|
||||
|
||||
if skipped:
|
||||
logger.info("Skipped %d unchanged file(s): %s", len(skipped), ", ".join(skipped))
|
||||
|
||||
logger.info("Building FTS index...")
|
||||
build_fts_index(db_path)
|
||||
logger.info("FTS index ready")
|
||||
|
|
@ -429,19 +492,28 @@ def glean_dir(
|
|||
db_path: Path,
|
||||
pattern_file: Path | None = None,
|
||||
batch_size: int = 1000,
|
||||
force: bool = False,
|
||||
) -> dict[str, int]:
|
||||
"""Glean all .jsonl and .log files from a corpus directory."""
|
||||
"""Glean all .jsonl and .log files from a corpus directory.
|
||||
|
||||
Pass ``force=True`` to bypass fingerprint checks and re-glean all files
|
||||
regardless of whether they have changed since the last run.
|
||||
"""
|
||||
files = sorted(corpus_dir.glob("*.jsonl")) + sorted(corpus_dir.glob("*.log"))
|
||||
return _glean_files(files, db_path, pattern_file, batch_size)
|
||||
return _glean_files(files, db_path, pattern_file, batch_size, force=force)
|
||||
|
||||
|
||||
def glean_file(
|
||||
log_file: Path,
|
||||
db_path: Path,
|
||||
pattern_file: Path | None = None,
|
||||
force: bool = False,
|
||||
) -> dict[str, int]:
|
||||
"""Glean a single log file (any supported format)."""
|
||||
return _glean_files([log_file], db_path, pattern_file)
|
||||
"""Glean a single log file (any supported format).
|
||||
|
||||
Pass ``force=True`` to re-glean even when the file fingerprint is unchanged.
|
||||
"""
|
||||
return _glean_files([log_file], db_path, pattern_file, force=force)
|
||||
|
||||
|
||||
def glean_sources(
|
||||
|
|
@ -449,6 +521,7 @@ def glean_sources(
|
|||
db_path: Path,
|
||||
pattern_file: Path | None = None,
|
||||
batch_size: int = 1000,
|
||||
force: bool = False,
|
||||
) -> dict[str, int]:
|
||||
"""Glean all sources listed in a sources.yaml config file.
|
||||
|
||||
|
|
@ -510,7 +583,7 @@ def glean_sources(
|
|||
|
||||
stats: dict[str, int] = {}
|
||||
if files:
|
||||
stats.update(_glean_files(files, db_path, pattern_file, batch_size, source_id_map))
|
||||
stats.update(_glean_files(files, db_path, pattern_file, batch_size, source_id_map, force=force))
|
||||
|
||||
# ── SSH remote sources ─────────────────────────────────────────────────
|
||||
if not ssh_sources:
|
||||
|
|
|
|||
|
|
@ -93,7 +93,7 @@ def search_logs(
|
|||
Example: '"connection refused" OR "connection lost"'
|
||||
severity: Filter by level — EMERGENCY, ALERT, CRITICAL, ERROR, WARN, NOTICE, INFO, DEBUG.
|
||||
source: Partial match on source_id. Format is 'corpus:host:service'.
|
||||
Example: 'xanderland:caddy' matches all Caddy entries from xanderland.
|
||||
Example: 'example-node:caddy' matches all Caddy entries from example-node.
|
||||
pattern: Filter by named pattern tag applied at glean time.
|
||||
Known tags: auth_failure, connection_lost, oom, segfault, disk_full,
|
||||
timeout, caddy_tls_error, caddy_config_error, caddy_auth_error,
|
||||
|
|
|
|||
23
app/rest.py
23
app/rest.py
|
|
@ -515,13 +515,20 @@ def delete_source(source_id: str) -> dict:
|
|||
|
||||
|
||||
@router.post("/api/sources/{source_id}/glean")
|
||||
def reglean_source(source_id: str, background_tasks: BackgroundTasks) -> dict:
|
||||
def reglean_source(
|
||||
source_id: str,
|
||||
background_tasks: BackgroundTasks,
|
||||
force: Annotated[bool, Query(description="Bypass fingerprint check and re-glean even if file is unchanged")] = False,
|
||||
) -> dict:
|
||||
"""Trigger a re-glean for a configured source from sources.yaml.
|
||||
|
||||
Handles both local file sources and SSH remote sources. For SSH sources,
|
||||
the glean runs in the foreground and rebuilds the FTS index before returning
|
||||
(same behaviour as local sources — callers can rely on the count being final
|
||||
when the response arrives).
|
||||
|
||||
Use ``?force=true`` to bypass the fingerprint cache and re-glean the file
|
||||
even if mtime and size appear unchanged since the last run.
|
||||
"""
|
||||
sources_file = PATTERN_DIR / "sources.yaml"
|
||||
if not sources_file.exists():
|
||||
|
|
@ -536,6 +543,7 @@ def reglean_source(source_id: str, background_tasks: BackgroundTasks) -> dict:
|
|||
|
||||
if src.get("transport") == "ssh":
|
||||
# SSH sources: open connection, glean all items, rebuild FTS inline.
|
||||
# Fingerprint skipping applies only to local file sources.
|
||||
stats = _glean_ssh_source(src, DB_PATH, PATTERN_FILE)
|
||||
return {"source_id": source_id, "gleaned": sum(stats.values())}
|
||||
|
||||
|
|
@ -543,7 +551,7 @@ def reglean_source(source_id: str, background_tasks: BackgroundTasks) -> dict:
|
|||
src_path = Path(src["path"])
|
||||
if not src_path.exists():
|
||||
raise HTTPException(status_code=422, detail=f"Path does not exist: {src_path}")
|
||||
stats = _glean_file(src_path, DB_PATH, PATTERN_FILE)
|
||||
stats = _glean_file(src_path, DB_PATH, PATTERN_FILE, force=force)
|
||||
background_tasks.add_task(build_fts_index, DB_PATH)
|
||||
return {"source_id": source_id, "gleaned": stats.get(source_id, sum(stats.values()))}
|
||||
|
||||
|
|
@ -656,8 +664,14 @@ def glean_task_status() -> dict:
|
|||
|
||||
|
||||
@router.post("/api/tasks/glean")
|
||||
async def trigger_glean() -> dict:
|
||||
"""Manually trigger a glean of all configured sources. No-ops if already running."""
|
||||
async def trigger_glean(
|
||||
force: Annotated[bool, Query(description="Bypass fingerprint check and re-glean all sources")] = False,
|
||||
) -> dict:
|
||||
"""Manually trigger a glean of all configured sources. No-ops if already running.
|
||||
|
||||
Use ``?force=true`` to bypass the fingerprint cache and re-glean every local
|
||||
file source even when mtime and size are unchanged since the last run.
|
||||
"""
|
||||
sources_file = PATTERN_DIR / "sources.yaml"
|
||||
if not sources_file.exists():
|
||||
raise HTTPException(status_code=404, detail="sources.yaml not found — configure log sources first")
|
||||
|
|
@ -665,6 +679,7 @@ async def trigger_glean() -> dict:
|
|||
sources_file, DB_PATH, PATTERN_FILE,
|
||||
submit_endpoint=SUBMIT_ENDPOINT or None,
|
||||
source_host=SOURCE_HOST,
|
||||
force=force,
|
||||
)
|
||||
|
||||
|
||||
|
|
|
|||
229
app/services/embeddings.py
Normal file
229
app/services/embeddings.py
Normal file
|
|
@ -0,0 +1,229 @@
|
|||
"""Configurable embedding service — BSL licensed.
|
||||
|
||||
Backends:
|
||||
sentence_transformers — local in-process inference (default, no server needed)
|
||||
ollama — HTTP to a running Ollama instance
|
||||
|
||||
Configuration (env vars):
|
||||
TURNSTONE_EMBED_BACKEND sentence_transformers | ollama (default: sentence_transformers)
|
||||
TURNSTONE_EMBED_MODEL model name/path (backend-specific default)
|
||||
TURNSTONE_EMBED_DEVICE cpu | cuda (default: cpu; ST backend only)
|
||||
TURNSTONE_LLM_URL Ollama base URL (default: http://localhost:11434)
|
||||
|
||||
When no backend is importable/reachable, EMBEDDING_AVAILABLE is False and all
|
||||
embed calls return empty arrays — callers must handle this gracefully.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
import struct
|
||||
from typing import Protocol, runtime_checkable
|
||||
|
||||
import numpy as np
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ── Public availability flag ──────────────────────────────────────────────────
|
||||
|
||||
EMBEDDING_AVAILABLE: bool = False
|
||||
|
||||
# ── Config ────────────────────────────────────────────────────────────────────
|
||||
|
||||
_BACKEND = os.environ.get("TURNSTONE_EMBED_BACKEND", "sentence_transformers").lower()
|
||||
_DEVICE = os.environ.get("TURNSTONE_EMBED_DEVICE", "cpu").lower()
|
||||
_LLM_URL = os.environ.get("TURNSTONE_LLM_URL", "http://localhost:11434")
|
||||
|
||||
# BAAI/bge-small-en-v1.5: 33MB, MIT, 49M downloads/month, 384-dim, 512-token max.
|
||||
# Benchmarked as the best quality-to-size ratio in the field (MTEB 62.17).
|
||||
# all-MiniLM-L6-v2 is a viable lighter alternative (23MB, 256-token max) if
|
||||
# inference speed is the primary constraint.
|
||||
_DEFAULT_MODEL: dict[str, str] = {
|
||||
"sentence_transformers": "BAAI/bge-small-en-v1.5",
|
||||
"ollama": "nomic-embed-text",
|
||||
}
|
||||
_MODEL = os.environ.get(
|
||||
"TURNSTONE_EMBED_MODEL",
|
||||
_DEFAULT_MODEL.get(_BACKEND, "sentence-transformers/all-MiniLM-L6-v2"),
|
||||
)
|
||||
|
||||
|
||||
# ── Protocol ──────────────────────────────────────────────────────────────────
|
||||
|
||||
@runtime_checkable
|
||||
class Embedder(Protocol):
|
||||
"""Minimal interface all embedding backends must satisfy."""
|
||||
|
||||
@property
|
||||
def dim(self) -> int:
|
||||
"""Embedding dimension produced by this model."""
|
||||
...
|
||||
|
||||
@property
|
||||
def model_name(self) -> str:
|
||||
"""Human-readable model identifier."""
|
||||
...
|
||||
|
||||
def embed(self, text: str) -> np.ndarray:
|
||||
"""Embed a single string. Returns 1-D float32 array of length dim."""
|
||||
...
|
||||
|
||||
def embed_batch(self, texts: list[str]) -> list[np.ndarray]:
|
||||
"""Embed a list of strings. Returns list of 1-D float32 arrays."""
|
||||
...
|
||||
|
||||
|
||||
# ── sentence-transformers backend ─────────────────────────────────────────────
|
||||
|
||||
class SentenceTransformerEmbedder:
|
||||
"""Local in-process embedding via the sentence-transformers library.
|
||||
|
||||
The model is downloaded from HuggingFace on first instantiation and cached
|
||||
at ~/.cache/huggingface/. Subsequent starts use the local cache.
|
||||
"""
|
||||
|
||||
def __init__(self, model_name: str = _MODEL, device: str = _DEVICE) -> None:
|
||||
from sentence_transformers import SentenceTransformer # type: ignore[import]
|
||||
logger.info("Loading embedding model %r on device %r ...", model_name, device)
|
||||
self._model = SentenceTransformer(model_name, device=device)
|
||||
self._model_name = model_name
|
||||
# Infer dimension from a test embed rather than hard-coding
|
||||
self._dim: int = int(self._model.encode("test").shape[0])
|
||||
logger.info("Embedding model ready — dim=%d", self._dim)
|
||||
|
||||
@property
|
||||
def dim(self) -> int:
|
||||
return self._dim
|
||||
|
||||
@property
|
||||
def model_name(self) -> str:
|
||||
return self._model_name
|
||||
|
||||
def embed(self, text: str) -> np.ndarray:
|
||||
vec = self._model.encode(text, convert_to_numpy=True, normalize_embeddings=True)
|
||||
return vec.astype(np.float32)
|
||||
|
||||
def embed_batch(self, texts: list[str]) -> list[np.ndarray]:
|
||||
if not texts:
|
||||
return []
|
||||
vecs = self._model.encode(
|
||||
texts, convert_to_numpy=True, normalize_embeddings=True, batch_size=32
|
||||
)
|
||||
return [v.astype(np.float32) for v in vecs]
|
||||
|
||||
|
||||
# ── Ollama backend ────────────────────────────────────────────────────────────
|
||||
|
||||
class OllamaEmbedder:
|
||||
"""HTTP embedding via a running Ollama instance."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
model_name: str = _MODEL,
|
||||
llm_url: str = _LLM_URL,
|
||||
timeout: float = 30.0,
|
||||
) -> None:
|
||||
import httpx # already a project dependency
|
||||
self._model_name = model_name
|
||||
self._url = f"{llm_url.rstrip('/')}/api/embeddings"
|
||||
self._timeout = timeout
|
||||
self._client = httpx.Client(timeout=timeout)
|
||||
# Probe dimension with a test call
|
||||
self._dim = self._probe_dim()
|
||||
|
||||
def _probe_dim(self) -> int:
|
||||
try:
|
||||
vec = self._raw_embed("probe")
|
||||
return len(vec)
|
||||
except Exception as exc:
|
||||
logger.warning("Ollama dim probe failed (%s) — defaulting to 768", exc)
|
||||
return 768
|
||||
|
||||
def _raw_embed(self, text: str) -> list[float]:
|
||||
resp = self._client.post(
|
||||
self._url, json={"model": self._model_name, "prompt": text}
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return resp.json().get("embedding") or []
|
||||
|
||||
@property
|
||||
def dim(self) -> int:
|
||||
return self._dim
|
||||
|
||||
@property
|
||||
def model_name(self) -> str:
|
||||
return self._model_name
|
||||
|
||||
def embed(self, text: str) -> np.ndarray:
|
||||
vec = self._raw_embed(text)
|
||||
return np.array(vec, dtype=np.float32)
|
||||
|
||||
def embed_batch(self, texts: list[str]) -> list[np.ndarray]:
|
||||
return [self.embed(t) for t in texts]
|
||||
|
||||
|
||||
# ── Singleton factory ─────────────────────────────────────────────────────────
|
||||
|
||||
_embedder: Embedder | None = None
|
||||
|
||||
|
||||
def get_embedder() -> Embedder | None:
|
||||
"""Return the configured embedder singleton, or None when unavailable.
|
||||
|
||||
Lazy-initialises on first call. Callers should check EMBEDDING_AVAILABLE
|
||||
or test for None rather than calling this unconditionally.
|
||||
"""
|
||||
global _embedder, EMBEDDING_AVAILABLE
|
||||
if _embedder is not None:
|
||||
return _embedder
|
||||
|
||||
if _BACKEND == "sentence_transformers":
|
||||
try:
|
||||
_embedder = SentenceTransformerEmbedder(_MODEL, _DEVICE)
|
||||
EMBEDDING_AVAILABLE = True
|
||||
except ImportError:
|
||||
logger.warning(
|
||||
"sentence-transformers not installed — embeddings disabled. "
|
||||
"Install with: pip install sentence-transformers"
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to load sentence-transformers model %r: %s", _MODEL, exc)
|
||||
|
||||
elif _BACKEND == "ollama":
|
||||
try:
|
||||
_embedder = OllamaEmbedder(_MODEL, _LLM_URL)
|
||||
EMBEDDING_AVAILABLE = True
|
||||
except Exception as exc:
|
||||
logger.warning("Ollama embedder init failed: %s", exc)
|
||||
|
||||
else:
|
||||
logger.warning("Unknown TURNSTONE_EMBED_BACKEND %r — embeddings disabled", _BACKEND)
|
||||
|
||||
return _embedder
|
||||
|
||||
|
||||
# ── BLOB serialisation helpers ────────────────────────────────────────────────
|
||||
|
||||
def pack_vector(vec: np.ndarray) -> bytes:
|
||||
"""Serialise a float32 numpy vector to a SQLite BLOB."""
|
||||
arr = vec.astype(np.float32)
|
||||
return struct.pack(f"{len(arr)}f", *arr.tolist())
|
||||
|
||||
|
||||
def unpack_vector(blob: bytes) -> np.ndarray:
|
||||
"""Deserialise a SQLite BLOB back to a float32 numpy vector."""
|
||||
n = len(blob) // 4 # 4 bytes per float32
|
||||
return np.array(struct.unpack(f"{n}f", blob), dtype=np.float32)
|
||||
|
||||
|
||||
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
|
||||
"""Cosine similarity between two L2-normalised vectors.
|
||||
|
||||
Both vectors are re-normalised defensively so callers need not pre-normalise.
|
||||
Returns 0.0 when either vector has zero norm.
|
||||
"""
|
||||
norm_a = np.linalg.norm(a)
|
||||
norm_b = np.linalg.norm(b)
|
||||
if norm_a == 0.0 or norm_b == 0.0:
|
||||
return 0.0
|
||||
return float(np.dot(a, b) / (norm_a * norm_b))
|
||||
|
|
@ -88,7 +88,7 @@ def summarize(
|
|||
logger.debug("Task endpoint unavailable (%s) — falling back to direct model", exc)
|
||||
|
||||
# Fallback: OpenAI-compat endpoint with explicit model name (local instances,
|
||||
# xanderland, or any cf-orch that doesn't have task assignments loaded).
|
||||
# example-node, or any cf-orch that doesn't have task assignments loaded).
|
||||
try:
|
||||
resp = httpx.post(
|
||||
f"{llm_url.rstrip('/')}/v1/chat/completions",
|
||||
|
|
|
|||
|
|
@ -121,8 +121,13 @@ async def run_once(
|
|||
pattern_file: Path | None = None,
|
||||
submit_endpoint: str | None = None,
|
||||
source_host: str = "unknown",
|
||||
force: bool = False,
|
||||
) -> dict[str, Any]:
|
||||
"""Ingest all sources once, then submit matched entries if configured."""
|
||||
"""Ingest all sources once, then submit matched entries if configured.
|
||||
|
||||
Pass ``force=True`` to bypass fingerprint checks and re-glean all local
|
||||
file sources regardless of whether they appear unchanged.
|
||||
"""
|
||||
if _lock.locked():
|
||||
return {"ok": False, "error": "glean already running", "skipped": True}
|
||||
|
||||
|
|
@ -133,7 +138,7 @@ async def run_once(
|
|||
loop = asyncio.get_running_loop()
|
||||
stats: dict[str, int] = await loop.run_in_executor(
|
||||
None,
|
||||
lambda: glean_sources(sources_file, db_path, pattern_file),
|
||||
lambda: glean_sources(sources_file, db_path, pattern_file, force=force),
|
||||
)
|
||||
duration = (datetime.now(tz=timezone.utc) - started).total_seconds()
|
||||
_state.last_run_at = started.isoformat()
|
||||
|
|
|
|||
|
|
@ -48,8 +48,8 @@ sources:
|
|||
# ── Network syslog (router, switches, UniFi APs) ─────────────────────────────
|
||||
# Written by syslog-receiver.service (UDP 5140 → /devl/turnstone-cluster/data/network-syslog.txt).
|
||||
# Configure devices to send syslog to Heimdall:5140.
|
||||
# UniFi: Settings → System → Remote Logging → Syslog Host = 10.1.10.71:5140
|
||||
# Ubiquiti EdgeRouter: set system syslog host 10.1.10.71 facility all level debug
|
||||
# Managed switches: varies by vendor — target 10.1.10.71 UDP 5140
|
||||
# UniFi: Settings → System → Remote Logging → Syslog Host = <YOUR_HOST_IP>:5140
|
||||
# Ubiquiti EdgeRouter: set system syslog host <YOUR_HOST_IP> facility all level debug
|
||||
# Managed switches: varies by vendor — target <YOUR_HOST_IP> UDP 5140
|
||||
- id: network-syslog
|
||||
path: /data/network-syslog.txt
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@
|
|||
# ── Adding Caddy reverse proxy ────────────────────────────────────────────────
|
||||
# Add to /etc/caddy/Caddyfile:
|
||||
#
|
||||
# turnstone.xanderland.tv {
|
||||
# turnstone.example-node.tv {
|
||||
# import protected
|
||||
# reverse_proxy 10.0.0.10:8534
|
||||
# import cloudflare
|
||||
|
|
|
|||
|
|
@ -1,13 +1,17 @@
|
|||
"""Tests for app/context/embedder.py — graceful no-op without sqlite-vec."""
|
||||
"""Tests for app/context/embedder.py — delegates to app.services.embeddings."""
|
||||
import sqlite3
|
||||
import struct
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
from app.context import embedder as emb_mod
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def db(tmp_path):
|
||||
@pytest.fixture()
|
||||
def db(tmp_path: Path) -> Path:
|
||||
db_path = tmp_path / "t.db"
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
conn.executescript("""
|
||||
|
|
@ -20,34 +24,78 @@ def db(tmp_path):
|
|||
REFERENCES context_documents(id) ON DELETE CASCADE,
|
||||
chunk_index INTEGER NOT NULL, text TEXT NOT NULL, embedding BLOB
|
||||
);
|
||||
INSERT INTO context_documents VALUES ('d1','test.md','markdown','hello',5,'2026-01-01T00:00:00+00:00');
|
||||
INSERT INTO context_documents
|
||||
VALUES ('d1','test.md','markdown','hello',5,'2026-01-01T00:00:00+00:00');
|
||||
INSERT INTO context_chunks VALUES ('c1','d1',0,'hello world',NULL);
|
||||
INSERT INTO context_chunks VALUES ('c2','d1',1,'second chunk',NULL);
|
||||
""")
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return db_path
|
||||
|
||||
|
||||
def test_embed_skipped_when_extension_absent(db):
|
||||
with patch.object(emb_mod, "EMBEDDING_AVAILABLE", False):
|
||||
count = emb_mod.embed_chunks(db, "d1", "http://localhost:11434")
|
||||
def _mock_embedder(dim: int = 3) -> MagicMock:
|
||||
"""Return a mock Embedder that returns constant dim-length vectors."""
|
||||
m = MagicMock()
|
||||
m.dim = dim
|
||||
m.embed_batch.return_value = [np.zeros(dim, dtype=np.float32)] * 10
|
||||
return m
|
||||
|
||||
|
||||
class TestEmbedChunks:
|
||||
def test_returns_zero_when_no_embedder(self, db: Path) -> None:
|
||||
with patch("app.context.embedder.get_embedder", return_value=None):
|
||||
count = emb_mod.embed_chunks(db, "d1")
|
||||
assert count == 0
|
||||
|
||||
|
||||
def test_embed_calls_ollama_when_available(db):
|
||||
import httpx
|
||||
|
||||
class FakeResponse:
|
||||
status_code = 200
|
||||
def raise_for_status(self): pass
|
||||
def json(self): return {"embedding": [0.1, 0.2, 0.3]}
|
||||
|
||||
with patch.object(emb_mod, "EMBEDDING_AVAILABLE", True), \
|
||||
patch("app.context.embedder.httpx.post", return_value=FakeResponse()):
|
||||
count = emb_mod.embed_chunks(db, "d1", "http://localhost:11434")
|
||||
assert count == 1
|
||||
# Verify blob was written
|
||||
def test_returns_zero_when_no_unembedded_chunks(self, db: Path) -> None:
|
||||
# Pre-fill both chunks with a blob
|
||||
blob = struct.pack("3f", 0.1, 0.2, 0.3)
|
||||
conn = sqlite3.connect(str(db))
|
||||
row = conn.execute("SELECT embedding FROM context_chunks WHERE id='c1'").fetchone()
|
||||
conn.execute("UPDATE context_chunks SET embedding=?", (blob,))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
assert row[0] is not None
|
||||
|
||||
embedder = _mock_embedder()
|
||||
with patch("app.context.embedder.get_embedder", return_value=embedder):
|
||||
count = emb_mod.embed_chunks(db, "d1")
|
||||
assert count == 0
|
||||
embedder.embed_batch.assert_not_called()
|
||||
|
||||
def test_embeds_all_null_chunks(self, db: Path) -> None:
|
||||
embedder = _mock_embedder(dim=3)
|
||||
with patch("app.context.embedder.get_embedder", return_value=embedder):
|
||||
count = emb_mod.embed_chunks(db, "d1")
|
||||
assert count == 2 # two chunks in fixture
|
||||
|
||||
def test_blobs_written_to_db(self, db: Path) -> None:
|
||||
vec = np.array([0.1, 0.2, 0.3], dtype=np.float32)
|
||||
embedder = _mock_embedder(dim=3)
|
||||
embedder.embed_batch.return_value = [vec, vec]
|
||||
|
||||
with patch("app.context.embedder.get_embedder", return_value=embedder):
|
||||
emb_mod.embed_chunks(db, "d1")
|
||||
|
||||
conn = sqlite3.connect(str(db))
|
||||
rows = conn.execute(
|
||||
"SELECT embedding FROM context_chunks WHERE document_id='d1'"
|
||||
).fetchall()
|
||||
conn.close()
|
||||
for (blob,) in rows:
|
||||
assert blob is not None
|
||||
unpacked = struct.unpack(f"{len(blob)//4}f", blob)
|
||||
assert len(unpacked) == 3
|
||||
|
||||
def test_legacy_llm_url_param_accepted(self, db: Path) -> None:
|
||||
"""Ensure backward-compat signature still works (llm_url ignored)."""
|
||||
embedder = _mock_embedder()
|
||||
with patch("app.context.embedder.get_embedder", return_value=embedder):
|
||||
count = emb_mod.embed_chunks(db, "d1", "http://localhost:11434", "nomic-embed-text")
|
||||
assert count == 2
|
||||
|
||||
def test_embed_batch_error_returns_zero(self, db: Path) -> None:
|
||||
embedder = _mock_embedder()
|
||||
embedder.embed_batch.side_effect = RuntimeError("model exploded")
|
||||
with patch("app.context.embedder.get_embedder", return_value=embedder):
|
||||
count = emb_mod.embed_chunks(db, "d1")
|
||||
assert count == 0
|
||||
|
|
|
|||
236
tests/test_glean_fingerprint.py
Normal file
236
tests/test_glean_fingerprint.py
Normal file
|
|
@ -0,0 +1,236 @@
|
|||
"""Tests for fingerprint-based incremental glean skipping (issue #30).
|
||||
|
||||
Verifies that _glean_files() (and its public wrappers) skip local files whose
|
||||
mtime+size fingerprint has not changed since the last glean, and that force=True
|
||||
bypasses that check.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import sqlite3
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from app.glean.pipeline import (
|
||||
_fingerprint,
|
||||
_fp_unchanged,
|
||||
_save_fingerprint,
|
||||
ensure_schema,
|
||||
glean_dir,
|
||||
glean_file,
|
||||
)
|
||||
from app.glean.base import now_iso
|
||||
|
||||
|
||||
# ── Fixtures ──────────────────────────────────────────────────────────────────
|
||||
|
||||
@pytest.fixture()
|
||||
def db_path(tmp_path: Path) -> Path:
|
||||
path = tmp_path / "test.db"
|
||||
ensure_schema(path)
|
||||
return path
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def log_file(tmp_path: Path) -> Path:
|
||||
"""A minimal plaintext log file."""
|
||||
f = tmp_path / "test.log"
|
||||
f.write_text("May 24 10:00:00 heimdall kernel: test message\n")
|
||||
return f
|
||||
|
||||
|
||||
# ── Unit: fingerprint helpers ──────────────────────────────────────────────────
|
||||
|
||||
class TestFingerprintHelpers:
|
||||
def test_fingerprint_returns_mtime_and_size(self, log_file: Path) -> None:
|
||||
mtime, size = _fingerprint(log_file)
|
||||
st = log_file.stat()
|
||||
assert mtime == st.st_mtime
|
||||
assert size == st.st_size
|
||||
|
||||
def test_fp_unchanged_returns_false_when_no_record(self, db_path: Path, log_file: Path) -> None:
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
mtime, size = _fingerprint(log_file)
|
||||
assert _fp_unchanged(conn, log_file, mtime, size) is False
|
||||
conn.close()
|
||||
|
||||
def test_fp_unchanged_returns_true_after_save(self, db_path: Path, log_file: Path) -> None:
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
mtime, size = _fingerprint(log_file)
|
||||
_save_fingerprint(conn, log_file, mtime, size, now_iso())
|
||||
conn.commit()
|
||||
assert _fp_unchanged(conn, log_file, mtime, size) is True
|
||||
conn.close()
|
||||
|
||||
def test_fp_unchanged_returns_false_on_size_change(self, db_path: Path, log_file: Path) -> None:
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
mtime, size = _fingerprint(log_file)
|
||||
_save_fingerprint(conn, log_file, mtime, size, now_iso())
|
||||
conn.commit()
|
||||
# Simulate size change (new content appended)
|
||||
assert _fp_unchanged(conn, log_file, mtime, size + 1) is False
|
||||
conn.close()
|
||||
|
||||
def test_fp_unchanged_returns_false_on_mtime_change(self, db_path: Path, log_file: Path) -> None:
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
mtime, size = _fingerprint(log_file)
|
||||
_save_fingerprint(conn, log_file, mtime, size, now_iso())
|
||||
conn.commit()
|
||||
assert _fp_unchanged(conn, log_file, mtime + 1.0, size) is False
|
||||
conn.close()
|
||||
|
||||
def test_save_fingerprint_upserts(self, db_path: Path, log_file: Path) -> None:
|
||||
"""Second save with different values replaces the first (UPSERT semantics)."""
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
_save_fingerprint(conn, log_file, 1000.0, 100, "2026-01-01T00:00:00Z")
|
||||
conn.commit()
|
||||
_save_fingerprint(conn, log_file, 2000.0, 200, "2026-01-02T00:00:00Z")
|
||||
conn.commit()
|
||||
row = conn.execute(
|
||||
"SELECT mtime, size FROM glean_fingerprints WHERE path = ?",
|
||||
(str(log_file),),
|
||||
).fetchone()
|
||||
assert row == (2000.0, 200)
|
||||
conn.close()
|
||||
|
||||
|
||||
# ── Integration: glean_file skipping ─────────────────────────────────────────
|
||||
|
||||
class TestGleanFileFingerprint:
|
||||
def test_first_glean_writes_fingerprint(self, db_path: Path, log_file: Path) -> None:
|
||||
glean_file(log_file, db_path)
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
row = conn.execute(
|
||||
"SELECT mtime, size FROM glean_fingerprints WHERE path = ?",
|
||||
(str(log_file),),
|
||||
).fetchone()
|
||||
conn.close()
|
||||
assert row is not None
|
||||
mtime, size = _fingerprint(log_file)
|
||||
assert row == (mtime, size)
|
||||
|
||||
def test_second_glean_skips_unchanged_file(self, db_path: Path, log_file: Path) -> None:
|
||||
stats_first = glean_file(log_file, db_path)
|
||||
count_first = sum(stats_first.values())
|
||||
|
||||
# Re-glean without touching the file — should produce 0 new entries.
|
||||
stats_second = glean_file(log_file, db_path)
|
||||
count_second = sum(stats_second.values())
|
||||
|
||||
assert count_first >= 1, "First glean should find at least one entry"
|
||||
assert count_second == 0, "Second glean should skip unchanged file"
|
||||
|
||||
def test_second_glean_runs_when_file_grows(self, db_path: Path, log_file: Path) -> None:
|
||||
glean_file(log_file, db_path)
|
||||
|
||||
# Append a new line and update mtime by rewriting.
|
||||
original = log_file.read_text()
|
||||
log_file.write_text(original + "May 24 10:01:00 heimdall kernel: second message\n")
|
||||
|
||||
stats_second = glean_file(log_file, db_path)
|
||||
# INSERT OR IGNORE means the original entry won't re-count, but parsing
|
||||
# does happen — at minimum the new line is processed.
|
||||
assert sum(stats_second.values()) >= 0 # glean ran (not skipped)
|
||||
|
||||
# Confirm fingerprint updated to new size.
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
row = conn.execute(
|
||||
"SELECT size FROM glean_fingerprints WHERE path = ?",
|
||||
(str(log_file),),
|
||||
).fetchone()
|
||||
conn.close()
|
||||
assert row is not None
|
||||
assert row[0] == log_file.stat().st_size
|
||||
|
||||
def test_force_bypasses_fingerprint(self, db_path: Path, log_file: Path) -> None:
|
||||
glean_file(log_file, db_path)
|
||||
|
||||
# Without force: skipped.
|
||||
stats_no_force = glean_file(log_file, db_path)
|
||||
assert sum(stats_no_force.values()) == 0
|
||||
|
||||
# With force: glean runs (INSERT OR IGNORE means count may be 0, but
|
||||
# we verify the fingerprint was re-saved with a fresh gleaned_at).
|
||||
conn_before = sqlite3.connect(str(db_path))
|
||||
ts_before = conn_before.execute(
|
||||
"SELECT gleaned_at FROM glean_fingerprints WHERE path = ?",
|
||||
(str(log_file),),
|
||||
).fetchone()[0]
|
||||
conn_before.close()
|
||||
|
||||
time.sleep(0.01) # ensure gleaned_at advances
|
||||
glean_file(log_file, db_path, force=True)
|
||||
|
||||
conn_after = sqlite3.connect(str(db_path))
|
||||
ts_after = conn_after.execute(
|
||||
"SELECT gleaned_at FROM glean_fingerprints WHERE path = ?",
|
||||
(str(log_file),),
|
||||
).fetchone()[0]
|
||||
conn_after.close()
|
||||
|
||||
assert ts_after > ts_before, "force=True should update gleaned_at timestamp"
|
||||
|
||||
|
||||
# ── Integration: glean_dir skipping ──────────────────────────────────────────
|
||||
|
||||
class TestGleanDirFingerprint:
|
||||
def test_glean_dir_skips_unchanged_on_second_run(self, db_path: Path, tmp_path: Path) -> None:
|
||||
log1 = tmp_path / "a.log"
|
||||
log2 = tmp_path / "b.log"
|
||||
log1.write_text("May 24 10:00:00 heimdall kernel: msg one\n")
|
||||
log2.write_text("May 24 10:00:00 heimdall kernel: msg two\n")
|
||||
|
||||
glean_dir(tmp_path, db_path)
|
||||
|
||||
stats_second = glean_dir(tmp_path, db_path)
|
||||
assert sum(stats_second.values()) == 0, "Both unchanged files should be skipped"
|
||||
|
||||
def test_glean_dir_force_reruns_all(self, db_path: Path, tmp_path: Path) -> None:
|
||||
log1 = tmp_path / "a.log"
|
||||
log1.write_text("May 24 10:00:00 heimdall kernel: msg one\n")
|
||||
|
||||
glean_dir(tmp_path, db_path)
|
||||
|
||||
# force=True: runs even though nothing changed; INSERT OR IGNORE keeps DB clean.
|
||||
conn_before = sqlite3.connect(str(db_path))
|
||||
ts_before = conn_before.execute(
|
||||
"SELECT gleaned_at FROM glean_fingerprints WHERE path = ?",
|
||||
(str(log1),),
|
||||
).fetchone()[0]
|
||||
conn_before.close()
|
||||
|
||||
time.sleep(0.01)
|
||||
glean_dir(tmp_path, db_path, force=True)
|
||||
|
||||
conn_after = sqlite3.connect(str(db_path))
|
||||
ts_after = conn_after.execute(
|
||||
"SELECT gleaned_at FROM glean_fingerprints WHERE path = ?",
|
||||
(str(log1),),
|
||||
).fetchone()[0]
|
||||
conn_after.close()
|
||||
|
||||
assert ts_after > ts_before
|
||||
|
||||
|
||||
# ── Schema: ensure fingerprints table created ─────────────────────────────────
|
||||
|
||||
class TestEnsureSchema:
|
||||
def test_fingerprints_table_exists_after_ensure_schema(self, tmp_path: Path) -> None:
|
||||
db = tmp_path / "fresh.db"
|
||||
ensure_schema(db)
|
||||
conn = sqlite3.connect(str(db))
|
||||
tables = {
|
||||
row[0]
|
||||
for row in conn.execute(
|
||||
"SELECT name FROM sqlite_master WHERE type='table'"
|
||||
).fetchall()
|
||||
}
|
||||
conn.close()
|
||||
assert "glean_fingerprints" in tables
|
||||
|
||||
def test_ensure_schema_idempotent(self, tmp_path: Path) -> None:
|
||||
"""Calling ensure_schema twice on the same DB must not raise."""
|
||||
db = tmp_path / "fresh.db"
|
||||
ensure_schema(db)
|
||||
ensure_schema(db) # second call — should be a no-op
|
||||
|
|
@ -4,24 +4,24 @@ from __future__ import annotations
|
|||
from app.glean.syslog import is_syslog, parse
|
||||
|
||||
SYSLOG_SAMPLE = """\
|
||||
May 11 14:23:01 xanderland sshd[1234]: Accepted publickey for x from 192.168.1.1 port 54321 ssh2
|
||||
May 11 14:23:05 xanderland sshd[1234]: Failed password for invalid user admin from 10.0.0.99 port 22 ssh2
|
||||
May 11 14:23:10 xanderland sudo[5678]: x : TTY=pts/0 ; PWD=/home/x ; USER=root ; COMMAND=/usr/bin/apt update
|
||||
May 11 14:23:15 xanderland kernel: [12345.678] usb 1-1: USB disconnect, device number 2
|
||||
May 1 04:00:00 xanderland CRON[9999]: (root) CMD (/usr/local/sbin/backup.sh)
|
||||
May 11 14:24:00 xanderland systemd[1]: Started NetworkManager.
|
||||
May 11 14:23:01 example-node sshd[1234]: Accepted publickey for x from 192.168.1.1 port 54321 ssh2
|
||||
May 11 14:23:05 example-node sshd[1234]: Failed password for invalid user admin from 10.0.0.99 port 22 ssh2
|
||||
May 11 14:23:10 example-node sudo[5678]: x : TTY=pts/0 ; PWD=/home/x ; USER=root ; COMMAND=/usr/bin/apt update
|
||||
May 11 14:23:15 example-node kernel: [12345.678] usb 1-1: USB disconnect, device number 2
|
||||
May 1 04:00:00 example-node CRON[9999]: (root) CMD (/usr/local/sbin/backup.sh)
|
||||
May 11 14:24:00 example-node systemd[1]: Started NetworkManager.
|
||||
"""
|
||||
|
||||
|
||||
class TestDetector:
|
||||
def test_detects_standard_line(self):
|
||||
assert is_syslog("May 11 14:23:01 xanderland sshd[1234]: message")
|
||||
assert is_syslog("May 11 14:23:01 example-node sshd[1234]: message")
|
||||
|
||||
def test_detects_no_pid(self):
|
||||
assert is_syslog("May 11 14:23:01 xanderland kernel: message")
|
||||
assert is_syslog("May 11 14:23:01 example-node kernel: message")
|
||||
|
||||
def test_detects_space_padded_day(self):
|
||||
assert is_syslog("May 1 04:00:00 xanderland CRON[9999]: message")
|
||||
assert is_syslog("May 1 04:00:00 example-node CRON[9999]: message")
|
||||
|
||||
def test_rejects_servarr(self):
|
||||
assert not is_syslog("2026-05-11 02:31:51.5|Info|ComponentName|Message")
|
||||
|
|
|
|||
Loading…
Reference in a new issue