Compare commits
No commits in common. "e746d55730d3956deeb4ef871e4289157f2a2c40" and "3e7a1fa064da24a21643e634dd6f9e22fc622ab8" have entirely different histories.
e746d55730
...
3e7a1fa064
14 changed files with 816 additions and 98 deletions
|
|
@ -10,7 +10,7 @@
|
||||||
# GPU_SERVER_URL — URL of your GPU inference server (Ollama, vLLM, or cf-orch coordinator).
|
# GPU_SERVER_URL — URL of your GPU inference server (Ollama, vLLM, or cf-orch coordinator).
|
||||||
# Paid+ users: leave unset to auto-default to https://orch.circuitforge.tech via CF_LICENSE_KEY.
|
# Paid+ users: leave unset to auto-default to https://orch.circuitforge.tech via CF_LICENSE_KEY.
|
||||||
# Local Ollama (default if unset): http://localhost:11434
|
# Local Ollama (default if unset): http://localhost:11434
|
||||||
# Local cf-orch coordinator: http://10.1.10.71:7700
|
# Local cf-orch coordinator: http://<YOUR_HOST_IP>:7700
|
||||||
# CF_ORCH_URL is also accepted as a backward-compatible alias.
|
# CF_ORCH_URL is also accepted as a backward-compatible alias.
|
||||||
# GPU_SERVER_URL=http://localhost:11434
|
# GPU_SERVER_URL=http://localhost:11434
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,64 +1,81 @@
|
||||||
"""Ollama embedding client with sqlite-vec storage — BSL licensed."""
|
"""Context chunk embedding — BSL licensed.
|
||||||
|
|
||||||
|
Thin wrapper around app.services.embeddings that handles the DB I/O for
|
||||||
|
context_chunks. All backend configuration (model, device, backend type) is
|
||||||
|
delegated to the service layer via TURNSTONE_EMBED_* env vars.
|
||||||
|
|
||||||
|
Re-exports EMBEDDING_AVAILABLE so callers that imported it from here continue
|
||||||
|
to work without changes.
|
||||||
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import sqlite3
|
import sqlite3
|
||||||
import struct
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
import httpx
|
from app.services.embeddings import (
|
||||||
|
EMBEDDING_AVAILABLE, # re-export for backward compat
|
||||||
|
get_embedder,
|
||||||
|
pack_vector,
|
||||||
|
)
|
||||||
|
|
||||||
|
__all__ = ["EMBEDDING_AVAILABLE", "embed_chunks"]
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
EMBEDDING_AVAILABLE: bool = False
|
|
||||||
|
|
||||||
try:
|
|
||||||
import sqlite_vec # type: ignore[import] # noqa: F401
|
|
||||||
EMBEDDING_AVAILABLE = True
|
|
||||||
logger.debug("sqlite-vec loaded — embedding pipeline enabled")
|
|
||||||
except ImportError:
|
|
||||||
logger.debug("sqlite-vec not available — embedding pipeline disabled")
|
|
||||||
|
|
||||||
|
|
||||||
def embed_chunks(
|
def embed_chunks(
|
||||||
db_path: Path,
|
db_path: Path,
|
||||||
document_id: str,
|
document_id: str,
|
||||||
llm_url: str,
|
# Legacy params kept for backward compat — ignored when the ST backend is active.
|
||||||
model: str = "nomic-embed-text",
|
llm_url: str = "",
|
||||||
|
model: str = "",
|
||||||
timeout: float = 60.0,
|
timeout: float = 60.0,
|
||||||
) -> int:
|
) -> int:
|
||||||
"""Embed all unembedded chunks for a document. Returns count embedded. No-op when EMBEDDING_AVAILABLE is False."""
|
"""Embed all un-embedded chunks for *document_id*.
|
||||||
if not EMBEDDING_AVAILABLE:
|
|
||||||
|
Uses the configured embedder (sentence-transformers by default; Ollama when
|
||||||
|
TURNSTONE_EMBED_BACKEND=ollama). Returns the count of newly embedded chunks.
|
||||||
|
Returns 0 silently when no embedder is available.
|
||||||
|
|
||||||
|
The legacy ``llm_url`` and ``model`` parameters are accepted but ignored when
|
||||||
|
the sentence-transformers backend is active — configure via env vars instead.
|
||||||
|
"""
|
||||||
|
embedder = get_embedder()
|
||||||
|
if embedder is None:
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
conn = sqlite3.connect(str(db_path))
|
conn = sqlite3.connect(str(db_path))
|
||||||
conn.execute("PRAGMA journal_mode=WAL")
|
conn.execute("PRAGMA journal_mode=WAL")
|
||||||
conn.row_factory = sqlite3.Row
|
conn.row_factory = sqlite3.Row
|
||||||
|
|
||||||
rows = conn.execute(
|
rows = conn.execute(
|
||||||
"SELECT id, text FROM context_chunks WHERE document_id = ? AND embedding IS NULL",
|
"SELECT id, text FROM context_chunks WHERE document_id = ? AND embedding IS NULL",
|
||||||
(document_id,),
|
(document_id,),
|
||||||
).fetchall()
|
).fetchall()
|
||||||
|
|
||||||
|
if not rows:
|
||||||
|
conn.close()
|
||||||
|
return 0
|
||||||
|
|
||||||
|
texts = [r["text"] for r in rows]
|
||||||
|
ids = [r["id"] for r in rows]
|
||||||
|
|
||||||
count = 0
|
count = 0
|
||||||
for row in rows:
|
|
||||||
try:
|
try:
|
||||||
resp = httpx.post(
|
vectors = embedder.embed_batch(texts)
|
||||||
f"{llm_url.rstrip('/')}/api/embeddings",
|
for chunk_id, vec in zip(ids, vectors):
|
||||||
json={"model": model, "prompt": row["text"]},
|
blob = pack_vector(vec)
|
||||||
timeout=timeout,
|
|
||||||
)
|
|
||||||
resp.raise_for_status()
|
|
||||||
vector: list[float] = resp.json().get("embedding") or []
|
|
||||||
if vector:
|
|
||||||
blob = struct.pack(f"{len(vector)}f", *vector)
|
|
||||||
conn.execute(
|
conn.execute(
|
||||||
"UPDATE context_chunks SET embedding = ? WHERE id = ?",
|
"UPDATE context_chunks SET embedding = ? WHERE id = ?",
|
||||||
(blob, row["id"]),
|
(blob, chunk_id),
|
||||||
)
|
)
|
||||||
count += 1
|
count += 1
|
||||||
except Exception as exc:
|
|
||||||
logger.warning("Embedding chunk %s failed: %s", row["id"], exc)
|
|
||||||
|
|
||||||
conn.commit()
|
conn.commit()
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("Batch embedding failed for document %s: %s", document_id, exc)
|
||||||
|
finally:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
logger.debug("Embedded %d chunk(s) for document %s", count, document_id)
|
||||||
return count
|
return count
|
||||||
|
|
|
||||||
|
|
@ -1,10 +1,30 @@
|
||||||
"""Context retrieval — structured keyword lookup (Free) + chunk search — MIT licensed."""
|
"""Context retrieval — structured keyword lookup (Free) + chunk search — MIT licensed.
|
||||||
|
|
||||||
|
Two retrieval modes for context_chunks:
|
||||||
|
Vector search — cosine similarity over stored embeddings (when available)
|
||||||
|
Keyword search — LIKE-based fallback when no embedder is configured
|
||||||
|
|
||||||
|
Both modes are called from retrieve_context(); the best available mode is used
|
||||||
|
automatically so callers need not check EMBEDDING_AVAILABLE themselves.
|
||||||
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
import sqlite3
|
import sqlite3
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from app.services.embeddings import (
|
||||||
|
EMBEDDING_AVAILABLE,
|
||||||
|
cosine_similarity,
|
||||||
|
get_embedder,
|
||||||
|
unpack_vector,
|
||||||
|
)
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class RetrievedContext:
|
class RetrievedContext:
|
||||||
|
|
@ -12,6 +32,8 @@ class RetrievedContext:
|
||||||
chunks: list[dict[str, str]] = field(default_factory=list)
|
chunks: list[dict[str, str]] = field(default_factory=list)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Structured fact retrieval (always runs) ───────────────────────────────────
|
||||||
|
|
||||||
def get_relevant_facts(db_path: Path, query: str) -> list[dict[str, str]]:
|
def get_relevant_facts(db_path: Path, query: str) -> list[dict[str, str]]:
|
||||||
"""Keyword match against context_facts. Always runs — Free tier."""
|
"""Keyword match against context_facts. Always runs — Free tier."""
|
||||||
try:
|
try:
|
||||||
|
|
@ -42,8 +64,68 @@ def get_relevant_facts(db_path: Path, query: str) -> list[dict[str, str]]:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
|
||||||
def _search_chunks(db_path: Path, query: str) -> list[dict[str, str]]:
|
# ── Chunk retrieval: vector path ──────────────────────────────────────────────
|
||||||
"""Keyword search across context_chunks. Fallback when no embeddings."""
|
|
||||||
|
def _search_chunks_vector(
|
||||||
|
db_path: Path,
|
||||||
|
query: str,
|
||||||
|
top_k: int = 3,
|
||||||
|
) -> list[dict[str, str]]:
|
||||||
|
"""Cosine similarity search over embedded context_chunks.
|
||||||
|
|
||||||
|
Loads all stored embeddings into memory and scores in-process with numpy.
|
||||||
|
Skips any chunk whose BLOB dimension does not match the current model dim
|
||||||
|
(stale embeddings from a previous model — they will be re-embedded on the
|
||||||
|
next document upload).
|
||||||
|
|
||||||
|
Returns at most *top_k* results ordered by similarity descending.
|
||||||
|
"""
|
||||||
|
embedder = get_embedder()
|
||||||
|
if embedder is None:
|
||||||
|
return []
|
||||||
|
|
||||||
|
try:
|
||||||
|
query_vec: np.ndarray = embedder.embed(query)
|
||||||
|
model_dim: int = embedder.dim
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("Query embedding failed: %s", exc)
|
||||||
|
return []
|
||||||
|
|
||||||
|
try:
|
||||||
|
conn = sqlite3.connect(str(db_path))
|
||||||
|
conn.execute("PRAGMA journal_mode=WAL")
|
||||||
|
conn.row_factory = sqlite3.Row
|
||||||
|
rows = conn.execute(
|
||||||
|
"SELECT cc.id, cc.text, cc.embedding, cd.filename"
|
||||||
|
" FROM context_chunks cc"
|
||||||
|
" JOIN context_documents cd ON cc.document_id = cd.id"
|
||||||
|
" WHERE cc.embedding IS NOT NULL"
|
||||||
|
).fetchall()
|
||||||
|
conn.close()
|
||||||
|
except sqlite3.OperationalError:
|
||||||
|
return []
|
||||||
|
|
||||||
|
scored: list[tuple[float, dict[str, str]]] = []
|
||||||
|
for row in rows:
|
||||||
|
blob: bytes = row["embedding"]
|
||||||
|
# Guard against blobs from a different-dimension model
|
||||||
|
if len(blob) // 4 != model_dim:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
chunk_vec = unpack_vector(blob)
|
||||||
|
score = cosine_similarity(query_vec, chunk_vec)
|
||||||
|
scored.append((score, {"text": row["text"], "filename": row["filename"]}))
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
|
||||||
|
scored.sort(key=lambda t: t[0], reverse=True)
|
||||||
|
return [item for _, item in scored[:top_k]]
|
||||||
|
|
||||||
|
|
||||||
|
# ── Chunk retrieval: keyword fallback ─────────────────────────────────────────
|
||||||
|
|
||||||
|
def _search_chunks_keyword(db_path: Path, query: str) -> list[dict[str, str]]:
|
||||||
|
"""LIKE-based keyword search across context_chunks. Fallback when no embedder."""
|
||||||
try:
|
try:
|
||||||
conn = sqlite3.connect(str(db_path))
|
conn = sqlite3.connect(str(db_path))
|
||||||
conn.execute("PRAGMA journal_mode=WAL")
|
conn.execute("PRAGMA journal_mode=WAL")
|
||||||
|
|
@ -66,16 +148,29 @@ def _search_chunks(db_path: Path, query: str) -> list[dict[str, str]]:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
# ── Public interface ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
def retrieve_context(db_path: Path, query: str) -> RetrievedContext:
|
def retrieve_context(db_path: Path, query: str) -> RetrievedContext:
|
||||||
"""Retrieve structured facts and relevant chunks for a query."""
|
"""Retrieve structured facts and relevant chunks for a query.
|
||||||
return RetrievedContext(
|
|
||||||
facts=get_relevant_facts(db_path, query),
|
Chunk retrieval uses vector search when an embedder is available and at
|
||||||
chunks=_search_chunks(db_path, query),
|
least one embedded chunk exists; falls back to keyword search otherwise.
|
||||||
)
|
"""
|
||||||
|
facts = get_relevant_facts(db_path, query)
|
||||||
|
|
||||||
|
if EMBEDDING_AVAILABLE:
|
||||||
|
chunks = _search_chunks_vector(db_path, query)
|
||||||
|
if not chunks:
|
||||||
|
# Vector search returned nothing (no embedded chunks yet) — fall back.
|
||||||
|
chunks = _search_chunks_keyword(db_path, query)
|
||||||
|
else:
|
||||||
|
chunks = _search_chunks_keyword(db_path, query)
|
||||||
|
|
||||||
|
return RetrievedContext(facts=facts, chunks=chunks)
|
||||||
|
|
||||||
|
|
||||||
def format_context_block(ctx: RetrievedContext) -> str | None:
|
def format_context_block(ctx: RetrievedContext) -> str | None:
|
||||||
"""Format context for injection into LLM prompt. Returns None when empty."""
|
"""Format context for injection into an LLM prompt. Returns None when empty."""
|
||||||
lines: list[str] = []
|
lines: list[str] = []
|
||||||
if ctx.facts:
|
if ctx.facts:
|
||||||
lines.append("Known environment facts:")
|
lines.append("Known environment facts:")
|
||||||
|
|
|
||||||
|
|
@ -119,6 +119,13 @@ CREATE TABLE IF NOT EXISTS blocklist_candidates (
|
||||||
CREATE INDEX IF NOT EXISTS idx_blocklist_device ON blocklist_candidates(source_device_ip);
|
CREATE INDEX IF NOT EXISTS idx_blocklist_device ON blocklist_candidates(source_device_ip);
|
||||||
CREATE INDEX IF NOT EXISTS idx_blocklist_status ON blocklist_candidates(status);
|
CREATE INDEX IF NOT EXISTS idx_blocklist_status ON blocklist_candidates(status);
|
||||||
CREATE INDEX IF NOT EXISTS idx_blocklist_domain ON blocklist_candidates(domain_or_ip);
|
CREATE INDEX IF NOT EXISTS idx_blocklist_domain ON blocklist_candidates(domain_or_ip);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS glean_fingerprints (
|
||||||
|
path TEXT PRIMARY KEY,
|
||||||
|
mtime REAL NOT NULL,
|
||||||
|
size INTEGER NOT NULL,
|
||||||
|
gleaned_at TEXT NOT NULL
|
||||||
|
);
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -139,6 +146,44 @@ def ensure_schema(db_path: Path) -> None:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
def _fingerprint(path: Path) -> tuple[float, int]:
|
||||||
|
"""Return (mtime, size) for a file — cheap identity check, no content read needed."""
|
||||||
|
st = path.stat()
|
||||||
|
return st.st_mtime, st.st_size
|
||||||
|
|
||||||
|
|
||||||
|
def _fp_unchanged(conn: sqlite3.Connection, path: Path, mtime: float, size: int) -> bool:
|
||||||
|
"""Return True only when the stored fingerprint exactly matches (mtime, size).
|
||||||
|
|
||||||
|
A smaller size (log rotation) or a larger size (new lines appended) both
|
||||||
|
return False so the caller re-gleams the file.
|
||||||
|
"""
|
||||||
|
row = conn.execute(
|
||||||
|
"SELECT mtime, size FROM glean_fingerprints WHERE path = ?",
|
||||||
|
(str(path),),
|
||||||
|
).fetchone()
|
||||||
|
if row is None:
|
||||||
|
return False
|
||||||
|
return row[0] == mtime and row[1] == size
|
||||||
|
|
||||||
|
|
||||||
|
def _save_fingerprint(
|
||||||
|
conn: sqlite3.Connection,
|
||||||
|
path: Path,
|
||||||
|
mtime: float,
|
||||||
|
size: int,
|
||||||
|
gleaned_at: str,
|
||||||
|
) -> None:
|
||||||
|
"""Upsert the fingerprint for *path* after a successful glean."""
|
||||||
|
conn.execute(
|
||||||
|
"""
|
||||||
|
INSERT OR REPLACE INTO glean_fingerprints (path, mtime, size, gleaned_at)
|
||||||
|
VALUES (?, ?, ?, ?)
|
||||||
|
""",
|
||||||
|
(str(path), mtime, size, gleaned_at),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _detect_format(first_line: str) -> str:
|
def _detect_format(first_line: str) -> str:
|
||||||
try:
|
try:
|
||||||
obj = json.loads(first_line)
|
obj = json.loads(first_line)
|
||||||
|
|
@ -236,6 +281,7 @@ def _glean_files(
|
||||||
pattern_file: Path | None = None,
|
pattern_file: Path | None = None,
|
||||||
batch_size: int = 1000,
|
batch_size: int = 1000,
|
||||||
source_id_map: dict[Path, str] | None = None,
|
source_id_map: dict[Path, str] | None = None,
|
||||||
|
force: bool = False,
|
||||||
) -> dict[str, int]:
|
) -> dict[str, int]:
|
||||||
pattern_file = pattern_file or Path("patterns/default.yaml")
|
pattern_file = pattern_file or Path("patterns/default.yaml")
|
||||||
patterns = load_patterns(pattern_file)
|
patterns = load_patterns(pattern_file)
|
||||||
|
|
@ -249,9 +295,19 @@ def _glean_files(
|
||||||
conn.commit()
|
conn.commit()
|
||||||
|
|
||||||
stats: dict[str, int] = {}
|
stats: dict[str, int] = {}
|
||||||
|
skipped: list[str] = []
|
||||||
|
|
||||||
for log_file in files:
|
for log_file in files:
|
||||||
source_id = source_id_map.get(log_file, log_file.stem)
|
source_id = source_id_map.get(log_file, log_file.stem)
|
||||||
|
|
||||||
|
# Fingerprint check — skip files whose mtime+size haven't changed.
|
||||||
|
mtime, size = _fingerprint(log_file)
|
||||||
|
if not force and _fp_unchanged(conn, log_file, mtime, size):
|
||||||
|
logger.debug("Skipping unchanged file: %s", log_file.name)
|
||||||
|
skipped.append(log_file.name)
|
||||||
|
stats[source_id] = stats.get(source_id, 0)
|
||||||
|
continue
|
||||||
|
|
||||||
count = 0
|
count = 0
|
||||||
batch: list[RetrievedEntry] = []
|
batch: list[RetrievedEntry] = []
|
||||||
for entry in _parse_file(log_file, compiled, ingest_time, source_id=source_id):
|
for entry in _parse_file(log_file, compiled, ingest_time, source_id=source_id):
|
||||||
|
|
@ -265,11 +321,18 @@ def _glean_files(
|
||||||
_write_batch(conn, batch)
|
_write_batch(conn, batch)
|
||||||
conn.commit()
|
conn.commit()
|
||||||
count += len(batch)
|
count += len(batch)
|
||||||
|
|
||||||
|
_save_fingerprint(conn, log_file, mtime, size, ingest_time)
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
stats[source_id] = stats.get(source_id, 0) + count
|
stats[source_id] = stats.get(source_id, 0) + count
|
||||||
logger.info("Gleaned %d entries from %s (source: %s)", count, log_file.name, source_id)
|
logger.info("Gleaned %d entries from %s (source: %s)", count, log_file.name, source_id)
|
||||||
|
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
if skipped:
|
||||||
|
logger.info("Skipped %d unchanged file(s): %s", len(skipped), ", ".join(skipped))
|
||||||
|
|
||||||
logger.info("Building FTS index...")
|
logger.info("Building FTS index...")
|
||||||
build_fts_index(db_path)
|
build_fts_index(db_path)
|
||||||
logger.info("FTS index ready")
|
logger.info("FTS index ready")
|
||||||
|
|
@ -429,19 +492,28 @@ def glean_dir(
|
||||||
db_path: Path,
|
db_path: Path,
|
||||||
pattern_file: Path | None = None,
|
pattern_file: Path | None = None,
|
||||||
batch_size: int = 1000,
|
batch_size: int = 1000,
|
||||||
|
force: bool = False,
|
||||||
) -> dict[str, int]:
|
) -> dict[str, int]:
|
||||||
"""Glean all .jsonl and .log files from a corpus directory."""
|
"""Glean all .jsonl and .log files from a corpus directory.
|
||||||
|
|
||||||
|
Pass ``force=True`` to bypass fingerprint checks and re-glean all files
|
||||||
|
regardless of whether they have changed since the last run.
|
||||||
|
"""
|
||||||
files = sorted(corpus_dir.glob("*.jsonl")) + sorted(corpus_dir.glob("*.log"))
|
files = sorted(corpus_dir.glob("*.jsonl")) + sorted(corpus_dir.glob("*.log"))
|
||||||
return _glean_files(files, db_path, pattern_file, batch_size)
|
return _glean_files(files, db_path, pattern_file, batch_size, force=force)
|
||||||
|
|
||||||
|
|
||||||
def glean_file(
|
def glean_file(
|
||||||
log_file: Path,
|
log_file: Path,
|
||||||
db_path: Path,
|
db_path: Path,
|
||||||
pattern_file: Path | None = None,
|
pattern_file: Path | None = None,
|
||||||
|
force: bool = False,
|
||||||
) -> dict[str, int]:
|
) -> dict[str, int]:
|
||||||
"""Glean a single log file (any supported format)."""
|
"""Glean a single log file (any supported format).
|
||||||
return _glean_files([log_file], db_path, pattern_file)
|
|
||||||
|
Pass ``force=True`` to re-glean even when the file fingerprint is unchanged.
|
||||||
|
"""
|
||||||
|
return _glean_files([log_file], db_path, pattern_file, force=force)
|
||||||
|
|
||||||
|
|
||||||
def glean_sources(
|
def glean_sources(
|
||||||
|
|
@ -449,6 +521,7 @@ def glean_sources(
|
||||||
db_path: Path,
|
db_path: Path,
|
||||||
pattern_file: Path | None = None,
|
pattern_file: Path | None = None,
|
||||||
batch_size: int = 1000,
|
batch_size: int = 1000,
|
||||||
|
force: bool = False,
|
||||||
) -> dict[str, int]:
|
) -> dict[str, int]:
|
||||||
"""Glean all sources listed in a sources.yaml config file.
|
"""Glean all sources listed in a sources.yaml config file.
|
||||||
|
|
||||||
|
|
@ -510,7 +583,7 @@ def glean_sources(
|
||||||
|
|
||||||
stats: dict[str, int] = {}
|
stats: dict[str, int] = {}
|
||||||
if files:
|
if files:
|
||||||
stats.update(_glean_files(files, db_path, pattern_file, batch_size, source_id_map))
|
stats.update(_glean_files(files, db_path, pattern_file, batch_size, source_id_map, force=force))
|
||||||
|
|
||||||
# ── SSH remote sources ─────────────────────────────────────────────────
|
# ── SSH remote sources ─────────────────────────────────────────────────
|
||||||
if not ssh_sources:
|
if not ssh_sources:
|
||||||
|
|
|
||||||
|
|
@ -93,7 +93,7 @@ def search_logs(
|
||||||
Example: '"connection refused" OR "connection lost"'
|
Example: '"connection refused" OR "connection lost"'
|
||||||
severity: Filter by level — EMERGENCY, ALERT, CRITICAL, ERROR, WARN, NOTICE, INFO, DEBUG.
|
severity: Filter by level — EMERGENCY, ALERT, CRITICAL, ERROR, WARN, NOTICE, INFO, DEBUG.
|
||||||
source: Partial match on source_id. Format is 'corpus:host:service'.
|
source: Partial match on source_id. Format is 'corpus:host:service'.
|
||||||
Example: 'xanderland:caddy' matches all Caddy entries from xanderland.
|
Example: 'example-node:caddy' matches all Caddy entries from example-node.
|
||||||
pattern: Filter by named pattern tag applied at glean time.
|
pattern: Filter by named pattern tag applied at glean time.
|
||||||
Known tags: auth_failure, connection_lost, oom, segfault, disk_full,
|
Known tags: auth_failure, connection_lost, oom, segfault, disk_full,
|
||||||
timeout, caddy_tls_error, caddy_config_error, caddy_auth_error,
|
timeout, caddy_tls_error, caddy_config_error, caddy_auth_error,
|
||||||
|
|
|
||||||
23
app/rest.py
23
app/rest.py
|
|
@ -515,13 +515,20 @@ def delete_source(source_id: str) -> dict:
|
||||||
|
|
||||||
|
|
||||||
@router.post("/api/sources/{source_id}/glean")
|
@router.post("/api/sources/{source_id}/glean")
|
||||||
def reglean_source(source_id: str, background_tasks: BackgroundTasks) -> dict:
|
def reglean_source(
|
||||||
|
source_id: str,
|
||||||
|
background_tasks: BackgroundTasks,
|
||||||
|
force: Annotated[bool, Query(description="Bypass fingerprint check and re-glean even if file is unchanged")] = False,
|
||||||
|
) -> dict:
|
||||||
"""Trigger a re-glean for a configured source from sources.yaml.
|
"""Trigger a re-glean for a configured source from sources.yaml.
|
||||||
|
|
||||||
Handles both local file sources and SSH remote sources. For SSH sources,
|
Handles both local file sources and SSH remote sources. For SSH sources,
|
||||||
the glean runs in the foreground and rebuilds the FTS index before returning
|
the glean runs in the foreground and rebuilds the FTS index before returning
|
||||||
(same behaviour as local sources — callers can rely on the count being final
|
(same behaviour as local sources — callers can rely on the count being final
|
||||||
when the response arrives).
|
when the response arrives).
|
||||||
|
|
||||||
|
Use ``?force=true`` to bypass the fingerprint cache and re-glean the file
|
||||||
|
even if mtime and size appear unchanged since the last run.
|
||||||
"""
|
"""
|
||||||
sources_file = PATTERN_DIR / "sources.yaml"
|
sources_file = PATTERN_DIR / "sources.yaml"
|
||||||
if not sources_file.exists():
|
if not sources_file.exists():
|
||||||
|
|
@ -536,6 +543,7 @@ def reglean_source(source_id: str, background_tasks: BackgroundTasks) -> dict:
|
||||||
|
|
||||||
if src.get("transport") == "ssh":
|
if src.get("transport") == "ssh":
|
||||||
# SSH sources: open connection, glean all items, rebuild FTS inline.
|
# SSH sources: open connection, glean all items, rebuild FTS inline.
|
||||||
|
# Fingerprint skipping applies only to local file sources.
|
||||||
stats = _glean_ssh_source(src, DB_PATH, PATTERN_FILE)
|
stats = _glean_ssh_source(src, DB_PATH, PATTERN_FILE)
|
||||||
return {"source_id": source_id, "gleaned": sum(stats.values())}
|
return {"source_id": source_id, "gleaned": sum(stats.values())}
|
||||||
|
|
||||||
|
|
@ -543,7 +551,7 @@ def reglean_source(source_id: str, background_tasks: BackgroundTasks) -> dict:
|
||||||
src_path = Path(src["path"])
|
src_path = Path(src["path"])
|
||||||
if not src_path.exists():
|
if not src_path.exists():
|
||||||
raise HTTPException(status_code=422, detail=f"Path does not exist: {src_path}")
|
raise HTTPException(status_code=422, detail=f"Path does not exist: {src_path}")
|
||||||
stats = _glean_file(src_path, DB_PATH, PATTERN_FILE)
|
stats = _glean_file(src_path, DB_PATH, PATTERN_FILE, force=force)
|
||||||
background_tasks.add_task(build_fts_index, DB_PATH)
|
background_tasks.add_task(build_fts_index, DB_PATH)
|
||||||
return {"source_id": source_id, "gleaned": stats.get(source_id, sum(stats.values()))}
|
return {"source_id": source_id, "gleaned": stats.get(source_id, sum(stats.values()))}
|
||||||
|
|
||||||
|
|
@ -656,8 +664,14 @@ def glean_task_status() -> dict:
|
||||||
|
|
||||||
|
|
||||||
@router.post("/api/tasks/glean")
|
@router.post("/api/tasks/glean")
|
||||||
async def trigger_glean() -> dict:
|
async def trigger_glean(
|
||||||
"""Manually trigger a glean of all configured sources. No-ops if already running."""
|
force: Annotated[bool, Query(description="Bypass fingerprint check and re-glean all sources")] = False,
|
||||||
|
) -> dict:
|
||||||
|
"""Manually trigger a glean of all configured sources. No-ops if already running.
|
||||||
|
|
||||||
|
Use ``?force=true`` to bypass the fingerprint cache and re-glean every local
|
||||||
|
file source even when mtime and size are unchanged since the last run.
|
||||||
|
"""
|
||||||
sources_file = PATTERN_DIR / "sources.yaml"
|
sources_file = PATTERN_DIR / "sources.yaml"
|
||||||
if not sources_file.exists():
|
if not sources_file.exists():
|
||||||
raise HTTPException(status_code=404, detail="sources.yaml not found — configure log sources first")
|
raise HTTPException(status_code=404, detail="sources.yaml not found — configure log sources first")
|
||||||
|
|
@ -665,6 +679,7 @@ async def trigger_glean() -> dict:
|
||||||
sources_file, DB_PATH, PATTERN_FILE,
|
sources_file, DB_PATH, PATTERN_FILE,
|
||||||
submit_endpoint=SUBMIT_ENDPOINT or None,
|
submit_endpoint=SUBMIT_ENDPOINT or None,
|
||||||
source_host=SOURCE_HOST,
|
source_host=SOURCE_HOST,
|
||||||
|
force=force,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
229
app/services/embeddings.py
Normal file
229
app/services/embeddings.py
Normal file
|
|
@ -0,0 +1,229 @@
|
||||||
|
"""Configurable embedding service — BSL licensed.
|
||||||
|
|
||||||
|
Backends:
|
||||||
|
sentence_transformers — local in-process inference (default, no server needed)
|
||||||
|
ollama — HTTP to a running Ollama instance
|
||||||
|
|
||||||
|
Configuration (env vars):
|
||||||
|
TURNSTONE_EMBED_BACKEND sentence_transformers | ollama (default: sentence_transformers)
|
||||||
|
TURNSTONE_EMBED_MODEL model name/path (backend-specific default)
|
||||||
|
TURNSTONE_EMBED_DEVICE cpu | cuda (default: cpu; ST backend only)
|
||||||
|
TURNSTONE_LLM_URL Ollama base URL (default: http://localhost:11434)
|
||||||
|
|
||||||
|
When no backend is importable/reachable, EMBEDDING_AVAILABLE is False and all
|
||||||
|
embed calls return empty arrays — callers must handle this gracefully.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import struct
|
||||||
|
from typing import Protocol, runtime_checkable
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# ── Public availability flag ──────────────────────────────────────────────────
|
||||||
|
|
||||||
|
EMBEDDING_AVAILABLE: bool = False
|
||||||
|
|
||||||
|
# ── Config ────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
_BACKEND = os.environ.get("TURNSTONE_EMBED_BACKEND", "sentence_transformers").lower()
|
||||||
|
_DEVICE = os.environ.get("TURNSTONE_EMBED_DEVICE", "cpu").lower()
|
||||||
|
_LLM_URL = os.environ.get("TURNSTONE_LLM_URL", "http://localhost:11434")
|
||||||
|
|
||||||
|
# BAAI/bge-small-en-v1.5: 33MB, MIT, 49M downloads/month, 384-dim, 512-token max.
|
||||||
|
# Benchmarked as the best quality-to-size ratio in the field (MTEB 62.17).
|
||||||
|
# all-MiniLM-L6-v2 is a viable lighter alternative (23MB, 256-token max) if
|
||||||
|
# inference speed is the primary constraint.
|
||||||
|
_DEFAULT_MODEL: dict[str, str] = {
|
||||||
|
"sentence_transformers": "BAAI/bge-small-en-v1.5",
|
||||||
|
"ollama": "nomic-embed-text",
|
||||||
|
}
|
||||||
|
_MODEL = os.environ.get(
|
||||||
|
"TURNSTONE_EMBED_MODEL",
|
||||||
|
_DEFAULT_MODEL.get(_BACKEND, "sentence-transformers/all-MiniLM-L6-v2"),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Protocol ──────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
@runtime_checkable
|
||||||
|
class Embedder(Protocol):
|
||||||
|
"""Minimal interface all embedding backends must satisfy."""
|
||||||
|
|
||||||
|
@property
|
||||||
|
def dim(self) -> int:
|
||||||
|
"""Embedding dimension produced by this model."""
|
||||||
|
...
|
||||||
|
|
||||||
|
@property
|
||||||
|
def model_name(self) -> str:
|
||||||
|
"""Human-readable model identifier."""
|
||||||
|
...
|
||||||
|
|
||||||
|
def embed(self, text: str) -> np.ndarray:
|
||||||
|
"""Embed a single string. Returns 1-D float32 array of length dim."""
|
||||||
|
...
|
||||||
|
|
||||||
|
def embed_batch(self, texts: list[str]) -> list[np.ndarray]:
|
||||||
|
"""Embed a list of strings. Returns list of 1-D float32 arrays."""
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
|
# ── sentence-transformers backend ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
class SentenceTransformerEmbedder:
|
||||||
|
"""Local in-process embedding via the sentence-transformers library.
|
||||||
|
|
||||||
|
The model is downloaded from HuggingFace on first instantiation and cached
|
||||||
|
at ~/.cache/huggingface/. Subsequent starts use the local cache.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, model_name: str = _MODEL, device: str = _DEVICE) -> None:
|
||||||
|
from sentence_transformers import SentenceTransformer # type: ignore[import]
|
||||||
|
logger.info("Loading embedding model %r on device %r ...", model_name, device)
|
||||||
|
self._model = SentenceTransformer(model_name, device=device)
|
||||||
|
self._model_name = model_name
|
||||||
|
# Infer dimension from a test embed rather than hard-coding
|
||||||
|
self._dim: int = int(self._model.encode("test").shape[0])
|
||||||
|
logger.info("Embedding model ready — dim=%d", self._dim)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def dim(self) -> int:
|
||||||
|
return self._dim
|
||||||
|
|
||||||
|
@property
|
||||||
|
def model_name(self) -> str:
|
||||||
|
return self._model_name
|
||||||
|
|
||||||
|
def embed(self, text: str) -> np.ndarray:
|
||||||
|
vec = self._model.encode(text, convert_to_numpy=True, normalize_embeddings=True)
|
||||||
|
return vec.astype(np.float32)
|
||||||
|
|
||||||
|
def embed_batch(self, texts: list[str]) -> list[np.ndarray]:
|
||||||
|
if not texts:
|
||||||
|
return []
|
||||||
|
vecs = self._model.encode(
|
||||||
|
texts, convert_to_numpy=True, normalize_embeddings=True, batch_size=32
|
||||||
|
)
|
||||||
|
return [v.astype(np.float32) for v in vecs]
|
||||||
|
|
||||||
|
|
||||||
|
# ── Ollama backend ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class OllamaEmbedder:
|
||||||
|
"""HTTP embedding via a running Ollama instance."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
model_name: str = _MODEL,
|
||||||
|
llm_url: str = _LLM_URL,
|
||||||
|
timeout: float = 30.0,
|
||||||
|
) -> None:
|
||||||
|
import httpx # already a project dependency
|
||||||
|
self._model_name = model_name
|
||||||
|
self._url = f"{llm_url.rstrip('/')}/api/embeddings"
|
||||||
|
self._timeout = timeout
|
||||||
|
self._client = httpx.Client(timeout=timeout)
|
||||||
|
# Probe dimension with a test call
|
||||||
|
self._dim = self._probe_dim()
|
||||||
|
|
||||||
|
def _probe_dim(self) -> int:
|
||||||
|
try:
|
||||||
|
vec = self._raw_embed("probe")
|
||||||
|
return len(vec)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("Ollama dim probe failed (%s) — defaulting to 768", exc)
|
||||||
|
return 768
|
||||||
|
|
||||||
|
def _raw_embed(self, text: str) -> list[float]:
|
||||||
|
resp = self._client.post(
|
||||||
|
self._url, json={"model": self._model_name, "prompt": text}
|
||||||
|
)
|
||||||
|
resp.raise_for_status()
|
||||||
|
return resp.json().get("embedding") or []
|
||||||
|
|
||||||
|
@property
|
||||||
|
def dim(self) -> int:
|
||||||
|
return self._dim
|
||||||
|
|
||||||
|
@property
|
||||||
|
def model_name(self) -> str:
|
||||||
|
return self._model_name
|
||||||
|
|
||||||
|
def embed(self, text: str) -> np.ndarray:
|
||||||
|
vec = self._raw_embed(text)
|
||||||
|
return np.array(vec, dtype=np.float32)
|
||||||
|
|
||||||
|
def embed_batch(self, texts: list[str]) -> list[np.ndarray]:
|
||||||
|
return [self.embed(t) for t in texts]
|
||||||
|
|
||||||
|
|
||||||
|
# ── Singleton factory ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
_embedder: Embedder | None = None
|
||||||
|
|
||||||
|
|
||||||
|
def get_embedder() -> Embedder | None:
|
||||||
|
"""Return the configured embedder singleton, or None when unavailable.
|
||||||
|
|
||||||
|
Lazy-initialises on first call. Callers should check EMBEDDING_AVAILABLE
|
||||||
|
or test for None rather than calling this unconditionally.
|
||||||
|
"""
|
||||||
|
global _embedder, EMBEDDING_AVAILABLE
|
||||||
|
if _embedder is not None:
|
||||||
|
return _embedder
|
||||||
|
|
||||||
|
if _BACKEND == "sentence_transformers":
|
||||||
|
try:
|
||||||
|
_embedder = SentenceTransformerEmbedder(_MODEL, _DEVICE)
|
||||||
|
EMBEDDING_AVAILABLE = True
|
||||||
|
except ImportError:
|
||||||
|
logger.warning(
|
||||||
|
"sentence-transformers not installed — embeddings disabled. "
|
||||||
|
"Install with: pip install sentence-transformers"
|
||||||
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("Failed to load sentence-transformers model %r: %s", _MODEL, exc)
|
||||||
|
|
||||||
|
elif _BACKEND == "ollama":
|
||||||
|
try:
|
||||||
|
_embedder = OllamaEmbedder(_MODEL, _LLM_URL)
|
||||||
|
EMBEDDING_AVAILABLE = True
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("Ollama embedder init failed: %s", exc)
|
||||||
|
|
||||||
|
else:
|
||||||
|
logger.warning("Unknown TURNSTONE_EMBED_BACKEND %r — embeddings disabled", _BACKEND)
|
||||||
|
|
||||||
|
return _embedder
|
||||||
|
|
||||||
|
|
||||||
|
# ── BLOB serialisation helpers ────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def pack_vector(vec: np.ndarray) -> bytes:
|
||||||
|
"""Serialise a float32 numpy vector to a SQLite BLOB."""
|
||||||
|
arr = vec.astype(np.float32)
|
||||||
|
return struct.pack(f"{len(arr)}f", *arr.tolist())
|
||||||
|
|
||||||
|
|
||||||
|
def unpack_vector(blob: bytes) -> np.ndarray:
|
||||||
|
"""Deserialise a SQLite BLOB back to a float32 numpy vector."""
|
||||||
|
n = len(blob) // 4 # 4 bytes per float32
|
||||||
|
return np.array(struct.unpack(f"{n}f", blob), dtype=np.float32)
|
||||||
|
|
||||||
|
|
||||||
|
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
|
||||||
|
"""Cosine similarity between two L2-normalised vectors.
|
||||||
|
|
||||||
|
Both vectors are re-normalised defensively so callers need not pre-normalise.
|
||||||
|
Returns 0.0 when either vector has zero norm.
|
||||||
|
"""
|
||||||
|
norm_a = np.linalg.norm(a)
|
||||||
|
norm_b = np.linalg.norm(b)
|
||||||
|
if norm_a == 0.0 or norm_b == 0.0:
|
||||||
|
return 0.0
|
||||||
|
return float(np.dot(a, b) / (norm_a * norm_b))
|
||||||
|
|
@ -88,7 +88,7 @@ def summarize(
|
||||||
logger.debug("Task endpoint unavailable (%s) — falling back to direct model", exc)
|
logger.debug("Task endpoint unavailable (%s) — falling back to direct model", exc)
|
||||||
|
|
||||||
# Fallback: OpenAI-compat endpoint with explicit model name (local instances,
|
# Fallback: OpenAI-compat endpoint with explicit model name (local instances,
|
||||||
# xanderland, or any cf-orch that doesn't have task assignments loaded).
|
# example-node, or any cf-orch that doesn't have task assignments loaded).
|
||||||
try:
|
try:
|
||||||
resp = httpx.post(
|
resp = httpx.post(
|
||||||
f"{llm_url.rstrip('/')}/v1/chat/completions",
|
f"{llm_url.rstrip('/')}/v1/chat/completions",
|
||||||
|
|
|
||||||
|
|
@ -121,8 +121,13 @@ async def run_once(
|
||||||
pattern_file: Path | None = None,
|
pattern_file: Path | None = None,
|
||||||
submit_endpoint: str | None = None,
|
submit_endpoint: str | None = None,
|
||||||
source_host: str = "unknown",
|
source_host: str = "unknown",
|
||||||
|
force: bool = False,
|
||||||
) -> dict[str, Any]:
|
) -> dict[str, Any]:
|
||||||
"""Ingest all sources once, then submit matched entries if configured."""
|
"""Ingest all sources once, then submit matched entries if configured.
|
||||||
|
|
||||||
|
Pass ``force=True`` to bypass fingerprint checks and re-glean all local
|
||||||
|
file sources regardless of whether they appear unchanged.
|
||||||
|
"""
|
||||||
if _lock.locked():
|
if _lock.locked():
|
||||||
return {"ok": False, "error": "glean already running", "skipped": True}
|
return {"ok": False, "error": "glean already running", "skipped": True}
|
||||||
|
|
||||||
|
|
@ -133,7 +138,7 @@ async def run_once(
|
||||||
loop = asyncio.get_running_loop()
|
loop = asyncio.get_running_loop()
|
||||||
stats: dict[str, int] = await loop.run_in_executor(
|
stats: dict[str, int] = await loop.run_in_executor(
|
||||||
None,
|
None,
|
||||||
lambda: glean_sources(sources_file, db_path, pattern_file),
|
lambda: glean_sources(sources_file, db_path, pattern_file, force=force),
|
||||||
)
|
)
|
||||||
duration = (datetime.now(tz=timezone.utc) - started).total_seconds()
|
duration = (datetime.now(tz=timezone.utc) - started).total_seconds()
|
||||||
_state.last_run_at = started.isoformat()
|
_state.last_run_at = started.isoformat()
|
||||||
|
|
|
||||||
|
|
@ -48,8 +48,8 @@ sources:
|
||||||
# ── Network syslog (router, switches, UniFi APs) ─────────────────────────────
|
# ── Network syslog (router, switches, UniFi APs) ─────────────────────────────
|
||||||
# Written by syslog-receiver.service (UDP 5140 → /devl/turnstone-cluster/data/network-syslog.txt).
|
# Written by syslog-receiver.service (UDP 5140 → /devl/turnstone-cluster/data/network-syslog.txt).
|
||||||
# Configure devices to send syslog to Heimdall:5140.
|
# Configure devices to send syslog to Heimdall:5140.
|
||||||
# UniFi: Settings → System → Remote Logging → Syslog Host = 10.1.10.71:5140
|
# UniFi: Settings → System → Remote Logging → Syslog Host = <YOUR_HOST_IP>:5140
|
||||||
# Ubiquiti EdgeRouter: set system syslog host 10.1.10.71 facility all level debug
|
# Ubiquiti EdgeRouter: set system syslog host <YOUR_HOST_IP> facility all level debug
|
||||||
# Managed switches: varies by vendor — target 10.1.10.71 UDP 5140
|
# Managed switches: varies by vendor — target <YOUR_HOST_IP> UDP 5140
|
||||||
- id: network-syslog
|
- id: network-syslog
|
||||||
path: /data/network-syslog.txt
|
path: /data/network-syslog.txt
|
||||||
|
|
|
||||||
|
|
@ -46,7 +46,7 @@
|
||||||
# ── Adding Caddy reverse proxy ────────────────────────────────────────────────
|
# ── Adding Caddy reverse proxy ────────────────────────────────────────────────
|
||||||
# Add to /etc/caddy/Caddyfile:
|
# Add to /etc/caddy/Caddyfile:
|
||||||
#
|
#
|
||||||
# turnstone.xanderland.tv {
|
# turnstone.example-node.tv {
|
||||||
# import protected
|
# import protected
|
||||||
# reverse_proxy 10.0.0.10:8534
|
# reverse_proxy 10.0.0.10:8534
|
||||||
# import cloudflare
|
# import cloudflare
|
||||||
|
|
|
||||||
|
|
@ -1,13 +1,17 @@
|
||||||
"""Tests for app/context/embedder.py — graceful no-op without sqlite-vec."""
|
"""Tests for app/context/embedder.py — delegates to app.services.embeddings."""
|
||||||
import sqlite3
|
import sqlite3
|
||||||
|
import struct
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from unittest.mock import patch
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from app.context import embedder as emb_mod
|
from app.context import embedder as emb_mod
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture()
|
||||||
def db(tmp_path):
|
def db(tmp_path: Path) -> Path:
|
||||||
db_path = tmp_path / "t.db"
|
db_path = tmp_path / "t.db"
|
||||||
conn = sqlite3.connect(str(db_path))
|
conn = sqlite3.connect(str(db_path))
|
||||||
conn.executescript("""
|
conn.executescript("""
|
||||||
|
|
@ -20,34 +24,78 @@ def db(tmp_path):
|
||||||
REFERENCES context_documents(id) ON DELETE CASCADE,
|
REFERENCES context_documents(id) ON DELETE CASCADE,
|
||||||
chunk_index INTEGER NOT NULL, text TEXT NOT NULL, embedding BLOB
|
chunk_index INTEGER NOT NULL, text TEXT NOT NULL, embedding BLOB
|
||||||
);
|
);
|
||||||
INSERT INTO context_documents VALUES ('d1','test.md','markdown','hello',5,'2026-01-01T00:00:00+00:00');
|
INSERT INTO context_documents
|
||||||
|
VALUES ('d1','test.md','markdown','hello',5,'2026-01-01T00:00:00+00:00');
|
||||||
INSERT INTO context_chunks VALUES ('c1','d1',0,'hello world',NULL);
|
INSERT INTO context_chunks VALUES ('c1','d1',0,'hello world',NULL);
|
||||||
|
INSERT INTO context_chunks VALUES ('c2','d1',1,'second chunk',NULL);
|
||||||
""")
|
""")
|
||||||
conn.commit()
|
conn.commit()
|
||||||
conn.close()
|
conn.close()
|
||||||
return db_path
|
return db_path
|
||||||
|
|
||||||
|
|
||||||
def test_embed_skipped_when_extension_absent(db):
|
def _mock_embedder(dim: int = 3) -> MagicMock:
|
||||||
with patch.object(emb_mod, "EMBEDDING_AVAILABLE", False):
|
"""Return a mock Embedder that returns constant dim-length vectors."""
|
||||||
count = emb_mod.embed_chunks(db, "d1", "http://localhost:11434")
|
m = MagicMock()
|
||||||
|
m.dim = dim
|
||||||
|
m.embed_batch.return_value = [np.zeros(dim, dtype=np.float32)] * 10
|
||||||
|
return m
|
||||||
|
|
||||||
|
|
||||||
|
class TestEmbedChunks:
|
||||||
|
def test_returns_zero_when_no_embedder(self, db: Path) -> None:
|
||||||
|
with patch("app.context.embedder.get_embedder", return_value=None):
|
||||||
|
count = emb_mod.embed_chunks(db, "d1")
|
||||||
assert count == 0
|
assert count == 0
|
||||||
|
|
||||||
|
def test_returns_zero_when_no_unembedded_chunks(self, db: Path) -> None:
|
||||||
def test_embed_calls_ollama_when_available(db):
|
# Pre-fill both chunks with a blob
|
||||||
import httpx
|
blob = struct.pack("3f", 0.1, 0.2, 0.3)
|
||||||
|
|
||||||
class FakeResponse:
|
|
||||||
status_code = 200
|
|
||||||
def raise_for_status(self): pass
|
|
||||||
def json(self): return {"embedding": [0.1, 0.2, 0.3]}
|
|
||||||
|
|
||||||
with patch.object(emb_mod, "EMBEDDING_AVAILABLE", True), \
|
|
||||||
patch("app.context.embedder.httpx.post", return_value=FakeResponse()):
|
|
||||||
count = emb_mod.embed_chunks(db, "d1", "http://localhost:11434")
|
|
||||||
assert count == 1
|
|
||||||
# Verify blob was written
|
|
||||||
conn = sqlite3.connect(str(db))
|
conn = sqlite3.connect(str(db))
|
||||||
row = conn.execute("SELECT embedding FROM context_chunks WHERE id='c1'").fetchone()
|
conn.execute("UPDATE context_chunks SET embedding=?", (blob,))
|
||||||
|
conn.commit()
|
||||||
conn.close()
|
conn.close()
|
||||||
assert row[0] is not None
|
|
||||||
|
embedder = _mock_embedder()
|
||||||
|
with patch("app.context.embedder.get_embedder", return_value=embedder):
|
||||||
|
count = emb_mod.embed_chunks(db, "d1")
|
||||||
|
assert count == 0
|
||||||
|
embedder.embed_batch.assert_not_called()
|
||||||
|
|
||||||
|
def test_embeds_all_null_chunks(self, db: Path) -> None:
|
||||||
|
embedder = _mock_embedder(dim=3)
|
||||||
|
with patch("app.context.embedder.get_embedder", return_value=embedder):
|
||||||
|
count = emb_mod.embed_chunks(db, "d1")
|
||||||
|
assert count == 2 # two chunks in fixture
|
||||||
|
|
||||||
|
def test_blobs_written_to_db(self, db: Path) -> None:
|
||||||
|
vec = np.array([0.1, 0.2, 0.3], dtype=np.float32)
|
||||||
|
embedder = _mock_embedder(dim=3)
|
||||||
|
embedder.embed_batch.return_value = [vec, vec]
|
||||||
|
|
||||||
|
with patch("app.context.embedder.get_embedder", return_value=embedder):
|
||||||
|
emb_mod.embed_chunks(db, "d1")
|
||||||
|
|
||||||
|
conn = sqlite3.connect(str(db))
|
||||||
|
rows = conn.execute(
|
||||||
|
"SELECT embedding FROM context_chunks WHERE document_id='d1'"
|
||||||
|
).fetchall()
|
||||||
|
conn.close()
|
||||||
|
for (blob,) in rows:
|
||||||
|
assert blob is not None
|
||||||
|
unpacked = struct.unpack(f"{len(blob)//4}f", blob)
|
||||||
|
assert len(unpacked) == 3
|
||||||
|
|
||||||
|
def test_legacy_llm_url_param_accepted(self, db: Path) -> None:
|
||||||
|
"""Ensure backward-compat signature still works (llm_url ignored)."""
|
||||||
|
embedder = _mock_embedder()
|
||||||
|
with patch("app.context.embedder.get_embedder", return_value=embedder):
|
||||||
|
count = emb_mod.embed_chunks(db, "d1", "http://localhost:11434", "nomic-embed-text")
|
||||||
|
assert count == 2
|
||||||
|
|
||||||
|
def test_embed_batch_error_returns_zero(self, db: Path) -> None:
|
||||||
|
embedder = _mock_embedder()
|
||||||
|
embedder.embed_batch.side_effect = RuntimeError("model exploded")
|
||||||
|
with patch("app.context.embedder.get_embedder", return_value=embedder):
|
||||||
|
count = emb_mod.embed_chunks(db, "d1")
|
||||||
|
assert count == 0
|
||||||
|
|
|
||||||
236
tests/test_glean_fingerprint.py
Normal file
236
tests/test_glean_fingerprint.py
Normal file
|
|
@ -0,0 +1,236 @@
|
||||||
|
"""Tests for fingerprint-based incremental glean skipping (issue #30).
|
||||||
|
|
||||||
|
Verifies that _glean_files() (and its public wrappers) skip local files whose
|
||||||
|
mtime+size fingerprint has not changed since the last glean, and that force=True
|
||||||
|
bypasses that check.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import sqlite3
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from app.glean.pipeline import (
|
||||||
|
_fingerprint,
|
||||||
|
_fp_unchanged,
|
||||||
|
_save_fingerprint,
|
||||||
|
ensure_schema,
|
||||||
|
glean_dir,
|
||||||
|
glean_file,
|
||||||
|
)
|
||||||
|
from app.glean.base import now_iso
|
||||||
|
|
||||||
|
|
||||||
|
# ── Fixtures ──────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def db_path(tmp_path: Path) -> Path:
|
||||||
|
path = tmp_path / "test.db"
|
||||||
|
ensure_schema(path)
|
||||||
|
return path
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def log_file(tmp_path: Path) -> Path:
|
||||||
|
"""A minimal plaintext log file."""
|
||||||
|
f = tmp_path / "test.log"
|
||||||
|
f.write_text("May 24 10:00:00 heimdall kernel: test message\n")
|
||||||
|
return f
|
||||||
|
|
||||||
|
|
||||||
|
# ── Unit: fingerprint helpers ──────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestFingerprintHelpers:
|
||||||
|
def test_fingerprint_returns_mtime_and_size(self, log_file: Path) -> None:
|
||||||
|
mtime, size = _fingerprint(log_file)
|
||||||
|
st = log_file.stat()
|
||||||
|
assert mtime == st.st_mtime
|
||||||
|
assert size == st.st_size
|
||||||
|
|
||||||
|
def test_fp_unchanged_returns_false_when_no_record(self, db_path: Path, log_file: Path) -> None:
|
||||||
|
conn = sqlite3.connect(str(db_path))
|
||||||
|
mtime, size = _fingerprint(log_file)
|
||||||
|
assert _fp_unchanged(conn, log_file, mtime, size) is False
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
def test_fp_unchanged_returns_true_after_save(self, db_path: Path, log_file: Path) -> None:
|
||||||
|
conn = sqlite3.connect(str(db_path))
|
||||||
|
mtime, size = _fingerprint(log_file)
|
||||||
|
_save_fingerprint(conn, log_file, mtime, size, now_iso())
|
||||||
|
conn.commit()
|
||||||
|
assert _fp_unchanged(conn, log_file, mtime, size) is True
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
def test_fp_unchanged_returns_false_on_size_change(self, db_path: Path, log_file: Path) -> None:
|
||||||
|
conn = sqlite3.connect(str(db_path))
|
||||||
|
mtime, size = _fingerprint(log_file)
|
||||||
|
_save_fingerprint(conn, log_file, mtime, size, now_iso())
|
||||||
|
conn.commit()
|
||||||
|
# Simulate size change (new content appended)
|
||||||
|
assert _fp_unchanged(conn, log_file, mtime, size + 1) is False
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
def test_fp_unchanged_returns_false_on_mtime_change(self, db_path: Path, log_file: Path) -> None:
|
||||||
|
conn = sqlite3.connect(str(db_path))
|
||||||
|
mtime, size = _fingerprint(log_file)
|
||||||
|
_save_fingerprint(conn, log_file, mtime, size, now_iso())
|
||||||
|
conn.commit()
|
||||||
|
assert _fp_unchanged(conn, log_file, mtime + 1.0, size) is False
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
def test_save_fingerprint_upserts(self, db_path: Path, log_file: Path) -> None:
|
||||||
|
"""Second save with different values replaces the first (UPSERT semantics)."""
|
||||||
|
conn = sqlite3.connect(str(db_path))
|
||||||
|
_save_fingerprint(conn, log_file, 1000.0, 100, "2026-01-01T00:00:00Z")
|
||||||
|
conn.commit()
|
||||||
|
_save_fingerprint(conn, log_file, 2000.0, 200, "2026-01-02T00:00:00Z")
|
||||||
|
conn.commit()
|
||||||
|
row = conn.execute(
|
||||||
|
"SELECT mtime, size FROM glean_fingerprints WHERE path = ?",
|
||||||
|
(str(log_file),),
|
||||||
|
).fetchone()
|
||||||
|
assert row == (2000.0, 200)
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
# ── Integration: glean_file skipping ─────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestGleanFileFingerprint:
|
||||||
|
def test_first_glean_writes_fingerprint(self, db_path: Path, log_file: Path) -> None:
|
||||||
|
glean_file(log_file, db_path)
|
||||||
|
conn = sqlite3.connect(str(db_path))
|
||||||
|
row = conn.execute(
|
||||||
|
"SELECT mtime, size FROM glean_fingerprints WHERE path = ?",
|
||||||
|
(str(log_file),),
|
||||||
|
).fetchone()
|
||||||
|
conn.close()
|
||||||
|
assert row is not None
|
||||||
|
mtime, size = _fingerprint(log_file)
|
||||||
|
assert row == (mtime, size)
|
||||||
|
|
||||||
|
def test_second_glean_skips_unchanged_file(self, db_path: Path, log_file: Path) -> None:
|
||||||
|
stats_first = glean_file(log_file, db_path)
|
||||||
|
count_first = sum(stats_first.values())
|
||||||
|
|
||||||
|
# Re-glean without touching the file — should produce 0 new entries.
|
||||||
|
stats_second = glean_file(log_file, db_path)
|
||||||
|
count_second = sum(stats_second.values())
|
||||||
|
|
||||||
|
assert count_first >= 1, "First glean should find at least one entry"
|
||||||
|
assert count_second == 0, "Second glean should skip unchanged file"
|
||||||
|
|
||||||
|
def test_second_glean_runs_when_file_grows(self, db_path: Path, log_file: Path) -> None:
|
||||||
|
glean_file(log_file, db_path)
|
||||||
|
|
||||||
|
# Append a new line and update mtime by rewriting.
|
||||||
|
original = log_file.read_text()
|
||||||
|
log_file.write_text(original + "May 24 10:01:00 heimdall kernel: second message\n")
|
||||||
|
|
||||||
|
stats_second = glean_file(log_file, db_path)
|
||||||
|
# INSERT OR IGNORE means the original entry won't re-count, but parsing
|
||||||
|
# does happen — at minimum the new line is processed.
|
||||||
|
assert sum(stats_second.values()) >= 0 # glean ran (not skipped)
|
||||||
|
|
||||||
|
# Confirm fingerprint updated to new size.
|
||||||
|
conn = sqlite3.connect(str(db_path))
|
||||||
|
row = conn.execute(
|
||||||
|
"SELECT size FROM glean_fingerprints WHERE path = ?",
|
||||||
|
(str(log_file),),
|
||||||
|
).fetchone()
|
||||||
|
conn.close()
|
||||||
|
assert row is not None
|
||||||
|
assert row[0] == log_file.stat().st_size
|
||||||
|
|
||||||
|
def test_force_bypasses_fingerprint(self, db_path: Path, log_file: Path) -> None:
|
||||||
|
glean_file(log_file, db_path)
|
||||||
|
|
||||||
|
# Without force: skipped.
|
||||||
|
stats_no_force = glean_file(log_file, db_path)
|
||||||
|
assert sum(stats_no_force.values()) == 0
|
||||||
|
|
||||||
|
# With force: glean runs (INSERT OR IGNORE means count may be 0, but
|
||||||
|
# we verify the fingerprint was re-saved with a fresh gleaned_at).
|
||||||
|
conn_before = sqlite3.connect(str(db_path))
|
||||||
|
ts_before = conn_before.execute(
|
||||||
|
"SELECT gleaned_at FROM glean_fingerprints WHERE path = ?",
|
||||||
|
(str(log_file),),
|
||||||
|
).fetchone()[0]
|
||||||
|
conn_before.close()
|
||||||
|
|
||||||
|
time.sleep(0.01) # ensure gleaned_at advances
|
||||||
|
glean_file(log_file, db_path, force=True)
|
||||||
|
|
||||||
|
conn_after = sqlite3.connect(str(db_path))
|
||||||
|
ts_after = conn_after.execute(
|
||||||
|
"SELECT gleaned_at FROM glean_fingerprints WHERE path = ?",
|
||||||
|
(str(log_file),),
|
||||||
|
).fetchone()[0]
|
||||||
|
conn_after.close()
|
||||||
|
|
||||||
|
assert ts_after > ts_before, "force=True should update gleaned_at timestamp"
|
||||||
|
|
||||||
|
|
||||||
|
# ── Integration: glean_dir skipping ──────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestGleanDirFingerprint:
|
||||||
|
def test_glean_dir_skips_unchanged_on_second_run(self, db_path: Path, tmp_path: Path) -> None:
|
||||||
|
log1 = tmp_path / "a.log"
|
||||||
|
log2 = tmp_path / "b.log"
|
||||||
|
log1.write_text("May 24 10:00:00 heimdall kernel: msg one\n")
|
||||||
|
log2.write_text("May 24 10:00:00 heimdall kernel: msg two\n")
|
||||||
|
|
||||||
|
glean_dir(tmp_path, db_path)
|
||||||
|
|
||||||
|
stats_second = glean_dir(tmp_path, db_path)
|
||||||
|
assert sum(stats_second.values()) == 0, "Both unchanged files should be skipped"
|
||||||
|
|
||||||
|
def test_glean_dir_force_reruns_all(self, db_path: Path, tmp_path: Path) -> None:
|
||||||
|
log1 = tmp_path / "a.log"
|
||||||
|
log1.write_text("May 24 10:00:00 heimdall kernel: msg one\n")
|
||||||
|
|
||||||
|
glean_dir(tmp_path, db_path)
|
||||||
|
|
||||||
|
# force=True: runs even though nothing changed; INSERT OR IGNORE keeps DB clean.
|
||||||
|
conn_before = sqlite3.connect(str(db_path))
|
||||||
|
ts_before = conn_before.execute(
|
||||||
|
"SELECT gleaned_at FROM glean_fingerprints WHERE path = ?",
|
||||||
|
(str(log1),),
|
||||||
|
).fetchone()[0]
|
||||||
|
conn_before.close()
|
||||||
|
|
||||||
|
time.sleep(0.01)
|
||||||
|
glean_dir(tmp_path, db_path, force=True)
|
||||||
|
|
||||||
|
conn_after = sqlite3.connect(str(db_path))
|
||||||
|
ts_after = conn_after.execute(
|
||||||
|
"SELECT gleaned_at FROM glean_fingerprints WHERE path = ?",
|
||||||
|
(str(log1),),
|
||||||
|
).fetchone()[0]
|
||||||
|
conn_after.close()
|
||||||
|
|
||||||
|
assert ts_after > ts_before
|
||||||
|
|
||||||
|
|
||||||
|
# ── Schema: ensure fingerprints table created ─────────────────────────────────
|
||||||
|
|
||||||
|
class TestEnsureSchema:
|
||||||
|
def test_fingerprints_table_exists_after_ensure_schema(self, tmp_path: Path) -> None:
|
||||||
|
db = tmp_path / "fresh.db"
|
||||||
|
ensure_schema(db)
|
||||||
|
conn = sqlite3.connect(str(db))
|
||||||
|
tables = {
|
||||||
|
row[0]
|
||||||
|
for row in conn.execute(
|
||||||
|
"SELECT name FROM sqlite_master WHERE type='table'"
|
||||||
|
).fetchall()
|
||||||
|
}
|
||||||
|
conn.close()
|
||||||
|
assert "glean_fingerprints" in tables
|
||||||
|
|
||||||
|
def test_ensure_schema_idempotent(self, tmp_path: Path) -> None:
|
||||||
|
"""Calling ensure_schema twice on the same DB must not raise."""
|
||||||
|
db = tmp_path / "fresh.db"
|
||||||
|
ensure_schema(db)
|
||||||
|
ensure_schema(db) # second call — should be a no-op
|
||||||
|
|
@ -4,24 +4,24 @@ from __future__ import annotations
|
||||||
from app.glean.syslog import is_syslog, parse
|
from app.glean.syslog import is_syslog, parse
|
||||||
|
|
||||||
SYSLOG_SAMPLE = """\
|
SYSLOG_SAMPLE = """\
|
||||||
May 11 14:23:01 xanderland sshd[1234]: Accepted publickey for x from 192.168.1.1 port 54321 ssh2
|
May 11 14:23:01 example-node sshd[1234]: Accepted publickey for x from 192.168.1.1 port 54321 ssh2
|
||||||
May 11 14:23:05 xanderland sshd[1234]: Failed password for invalid user admin from 10.0.0.99 port 22 ssh2
|
May 11 14:23:05 example-node sshd[1234]: Failed password for invalid user admin from 10.0.0.99 port 22 ssh2
|
||||||
May 11 14:23:10 xanderland sudo[5678]: x : TTY=pts/0 ; PWD=/home/x ; USER=root ; COMMAND=/usr/bin/apt update
|
May 11 14:23:10 example-node sudo[5678]: x : TTY=pts/0 ; PWD=/home/x ; USER=root ; COMMAND=/usr/bin/apt update
|
||||||
May 11 14:23:15 xanderland kernel: [12345.678] usb 1-1: USB disconnect, device number 2
|
May 11 14:23:15 example-node kernel: [12345.678] usb 1-1: USB disconnect, device number 2
|
||||||
May 1 04:00:00 xanderland CRON[9999]: (root) CMD (/usr/local/sbin/backup.sh)
|
May 1 04:00:00 example-node CRON[9999]: (root) CMD (/usr/local/sbin/backup.sh)
|
||||||
May 11 14:24:00 xanderland systemd[1]: Started NetworkManager.
|
May 11 14:24:00 example-node systemd[1]: Started NetworkManager.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
class TestDetector:
|
class TestDetector:
|
||||||
def test_detects_standard_line(self):
|
def test_detects_standard_line(self):
|
||||||
assert is_syslog("May 11 14:23:01 xanderland sshd[1234]: message")
|
assert is_syslog("May 11 14:23:01 example-node sshd[1234]: message")
|
||||||
|
|
||||||
def test_detects_no_pid(self):
|
def test_detects_no_pid(self):
|
||||||
assert is_syslog("May 11 14:23:01 xanderland kernel: message")
|
assert is_syslog("May 11 14:23:01 example-node kernel: message")
|
||||||
|
|
||||||
def test_detects_space_padded_day(self):
|
def test_detects_space_padded_day(self):
|
||||||
assert is_syslog("May 1 04:00:00 xanderland CRON[9999]: message")
|
assert is_syslog("May 1 04:00:00 example-node CRON[9999]: message")
|
||||||
|
|
||||||
def test_rejects_servarr(self):
|
def test_rejects_servarr(self):
|
||||||
assert not is_syslog("2026-05-11 02:31:51.5|Info|ComponentName|Message")
|
assert not is_syslog("2026-05-11 02:31:51.5|Info|ComponentName|Message")
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue