Compare commits

..

No commits in common. "3e7a1fa064da24a21643e634dd6f9e22fc622ab8" and "e746d55730d3956deeb4ef871e4289157f2a2c40" have entirely different histories.

14 changed files with 96 additions and 814 deletions

View file

@ -10,7 +10,7 @@
# GPU_SERVER_URL — URL of your GPU inference server (Ollama, vLLM, or cf-orch coordinator).
# Paid+ users: leave unset to auto-default to https://orch.circuitforge.tech via CF_LICENSE_KEY.
# Local Ollama (default if unset): http://localhost:11434
# Local cf-orch coordinator: http://<YOUR_HOST_IP>:7700
# Local cf-orch coordinator: http://10.1.10.71:7700
# CF_ORCH_URL is also accepted as a backward-compatible alias.
# GPU_SERVER_URL=http://localhost:11434

View file

@ -1,81 +1,64 @@
"""Context chunk embedding — BSL licensed.
Thin wrapper around app.services.embeddings that handles the DB I/O for
context_chunks. All backend configuration (model, device, backend type) is
delegated to the service layer via TURNSTONE_EMBED_* env vars.
Re-exports EMBEDDING_AVAILABLE so callers that imported it from here continue
to work without changes.
"""
"""Ollama embedding client with sqlite-vec storage — BSL licensed."""
from __future__ import annotations
import logging
import sqlite3
import struct
from pathlib import Path
from app.services.embeddings import (
EMBEDDING_AVAILABLE, # re-export for backward compat
get_embedder,
pack_vector,
)
__all__ = ["EMBEDDING_AVAILABLE", "embed_chunks"]
import httpx
logger = logging.getLogger(__name__)
EMBEDDING_AVAILABLE: bool = False
try:
import sqlite_vec # type: ignore[import] # noqa: F401
EMBEDDING_AVAILABLE = True
logger.debug("sqlite-vec loaded — embedding pipeline enabled")
except ImportError:
logger.debug("sqlite-vec not available — embedding pipeline disabled")
def embed_chunks(
db_path: Path,
document_id: str,
# Legacy params kept for backward compat — ignored when the ST backend is active.
llm_url: str = "",
model: str = "",
llm_url: str,
model: str = "nomic-embed-text",
timeout: float = 60.0,
) -> int:
"""Embed all un-embedded chunks for *document_id*.
Uses the configured embedder (sentence-transformers by default; Ollama when
TURNSTONE_EMBED_BACKEND=ollama). Returns the count of newly embedded chunks.
Returns 0 silently when no embedder is available.
The legacy ``llm_url`` and ``model`` parameters are accepted but ignored when
the sentence-transformers backend is active configure via env vars instead.
"""
embedder = get_embedder()
if embedder is None:
"""Embed all unembedded chunks for a document. Returns count embedded. No-op when EMBEDDING_AVAILABLE is False."""
if not EMBEDDING_AVAILABLE:
return 0
conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA journal_mode=WAL")
conn.row_factory = sqlite3.Row
rows = conn.execute(
"SELECT id, text FROM context_chunks WHERE document_id=? AND embedding IS NULL",
(document_id,),
).fetchall()
if not rows:
conn.close()
return 0
texts = [r["text"] for r in rows]
ids = [r["id"] for r in rows]
count = 0
for row in rows:
try:
vectors = embedder.embed_batch(texts)
for chunk_id, vec in zip(ids, vectors):
blob = pack_vector(vec)
resp = httpx.post(
f"{llm_url.rstrip('/')}/api/embeddings",
json={"model": model, "prompt": row["text"]},
timeout=timeout,
)
resp.raise_for_status()
vector: list[float] = resp.json().get("embedding") or []
if vector:
blob = struct.pack(f"{len(vector)}f", *vector)
conn.execute(
"UPDATE context_chunks SET embedding=? WHERE id=?",
(blob, chunk_id),
(blob, row["id"]),
)
count += 1
conn.commit()
except Exception as exc:
logger.warning("Batch embedding failed for document %s: %s", document_id, exc)
finally:
conn.close()
logger.warning("Embedding chunk %s failed: %s", row["id"], exc)
logger.debug("Embedded %d chunk(s) for document %s", count, document_id)
conn.commit()
conn.close()
return count

View file

@ -1,30 +1,10 @@
"""Context retrieval — structured keyword lookup (Free) + chunk search — MIT licensed.
Two retrieval modes for context_chunks:
Vector search cosine similarity over stored embeddings (when available)
Keyword search LIKE-based fallback when no embedder is configured
Both modes are called from retrieve_context(); the best available mode is used
automatically so callers need not check EMBEDDING_AVAILABLE themselves.
"""
"""Context retrieval — structured keyword lookup (Free) + chunk search — MIT licensed."""
from __future__ import annotations
import logging
import sqlite3
from dataclasses import dataclass, field
from pathlib import Path
import numpy as np
from app.services.embeddings import (
EMBEDDING_AVAILABLE,
cosine_similarity,
get_embedder,
unpack_vector,
)
logger = logging.getLogger(__name__)
@dataclass
class RetrievedContext:
@ -32,8 +12,6 @@ class RetrievedContext:
chunks: list[dict[str, str]] = field(default_factory=list)
# ── Structured fact retrieval (always runs) ───────────────────────────────────
def get_relevant_facts(db_path: Path, query: str) -> list[dict[str, str]]:
"""Keyword match against context_facts. Always runs — Free tier."""
try:
@ -64,68 +42,8 @@ def get_relevant_facts(db_path: Path, query: str) -> list[dict[str, str]]:
return []
# ── Chunk retrieval: vector path ──────────────────────────────────────────────
def _search_chunks_vector(
db_path: Path,
query: str,
top_k: int = 3,
) -> list[dict[str, str]]:
"""Cosine similarity search over embedded context_chunks.
Loads all stored embeddings into memory and scores in-process with numpy.
Skips any chunk whose BLOB dimension does not match the current model dim
(stale embeddings from a previous model they will be re-embedded on the
next document upload).
Returns at most *top_k* results ordered by similarity descending.
"""
embedder = get_embedder()
if embedder is None:
return []
try:
query_vec: np.ndarray = embedder.embed(query)
model_dim: int = embedder.dim
except Exception as exc:
logger.warning("Query embedding failed: %s", exc)
return []
try:
conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA journal_mode=WAL")
conn.row_factory = sqlite3.Row
rows = conn.execute(
"SELECT cc.id, cc.text, cc.embedding, cd.filename"
" FROM context_chunks cc"
" JOIN context_documents cd ON cc.document_id = cd.id"
" WHERE cc.embedding IS NOT NULL"
).fetchall()
conn.close()
except sqlite3.OperationalError:
return []
scored: list[tuple[float, dict[str, str]]] = []
for row in rows:
blob: bytes = row["embedding"]
# Guard against blobs from a different-dimension model
if len(blob) // 4 != model_dim:
continue
try:
chunk_vec = unpack_vector(blob)
score = cosine_similarity(query_vec, chunk_vec)
scored.append((score, {"text": row["text"], "filename": row["filename"]}))
except Exception:
continue
scored.sort(key=lambda t: t[0], reverse=True)
return [item for _, item in scored[:top_k]]
# ── Chunk retrieval: keyword fallback ─────────────────────────────────────────
def _search_chunks_keyword(db_path: Path, query: str) -> list[dict[str, str]]:
"""LIKE-based keyword search across context_chunks. Fallback when no embedder."""
def _search_chunks(db_path: Path, query: str) -> list[dict[str, str]]:
"""Keyword search across context_chunks. Fallback when no embeddings."""
try:
conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA journal_mode=WAL")
@ -148,29 +66,16 @@ def _search_chunks_keyword(db_path: Path, query: str) -> list[dict[str, str]]:
return []
# ── Public interface ──────────────────────────────────────────────────────────
def retrieve_context(db_path: Path, query: str) -> RetrievedContext:
"""Retrieve structured facts and relevant chunks for a query.
Chunk retrieval uses vector search when an embedder is available and at
least one embedded chunk exists; falls back to keyword search otherwise.
"""
facts = get_relevant_facts(db_path, query)
if EMBEDDING_AVAILABLE:
chunks = _search_chunks_vector(db_path, query)
if not chunks:
# Vector search returned nothing (no embedded chunks yet) — fall back.
chunks = _search_chunks_keyword(db_path, query)
else:
chunks = _search_chunks_keyword(db_path, query)
return RetrievedContext(facts=facts, chunks=chunks)
"""Retrieve structured facts and relevant chunks for a query."""
return RetrievedContext(
facts=get_relevant_facts(db_path, query),
chunks=_search_chunks(db_path, query),
)
def format_context_block(ctx: RetrievedContext) -> str | None:
"""Format context for injection into an LLM prompt. Returns None when empty."""
"""Format context for injection into LLM prompt. Returns None when empty."""
lines: list[str] = []
if ctx.facts:
lines.append("Known environment facts:")

View file

@ -119,13 +119,6 @@ CREATE TABLE IF NOT EXISTS blocklist_candidates (
CREATE INDEX IF NOT EXISTS idx_blocklist_device ON blocklist_candidates(source_device_ip);
CREATE INDEX IF NOT EXISTS idx_blocklist_status ON blocklist_candidates(status);
CREATE INDEX IF NOT EXISTS idx_blocklist_domain ON blocklist_candidates(domain_or_ip);
CREATE TABLE IF NOT EXISTS glean_fingerprints (
path TEXT PRIMARY KEY,
mtime REAL NOT NULL,
size INTEGER NOT NULL,
gleaned_at TEXT NOT NULL
);
"""
@ -146,44 +139,6 @@ def ensure_schema(db_path: Path) -> None:
conn.close()
def _fingerprint(path: Path) -> tuple[float, int]:
"""Return (mtime, size) for a file — cheap identity check, no content read needed."""
st = path.stat()
return st.st_mtime, st.st_size
def _fp_unchanged(conn: sqlite3.Connection, path: Path, mtime: float, size: int) -> bool:
"""Return True only when the stored fingerprint exactly matches (mtime, size).
A smaller size (log rotation) or a larger size (new lines appended) both
return False so the caller re-gleams the file.
"""
row = conn.execute(
"SELECT mtime, size FROM glean_fingerprints WHERE path = ?",
(str(path),),
).fetchone()
if row is None:
return False
return row[0] == mtime and row[1] == size
def _save_fingerprint(
conn: sqlite3.Connection,
path: Path,
mtime: float,
size: int,
gleaned_at: str,
) -> None:
"""Upsert the fingerprint for *path* after a successful glean."""
conn.execute(
"""
INSERT OR REPLACE INTO glean_fingerprints (path, mtime, size, gleaned_at)
VALUES (?, ?, ?, ?)
""",
(str(path), mtime, size, gleaned_at),
)
def _detect_format(first_line: str) -> str:
try:
obj = json.loads(first_line)
@ -281,7 +236,6 @@ def _glean_files(
pattern_file: Path | None = None,
batch_size: int = 1000,
source_id_map: dict[Path, str] | None = None,
force: bool = False,
) -> dict[str, int]:
pattern_file = pattern_file or Path("patterns/default.yaml")
patterns = load_patterns(pattern_file)
@ -295,19 +249,9 @@ def _glean_files(
conn.commit()
stats: dict[str, int] = {}
skipped: list[str] = []
for log_file in files:
source_id = source_id_map.get(log_file, log_file.stem)
# Fingerprint check — skip files whose mtime+size haven't changed.
mtime, size = _fingerprint(log_file)
if not force and _fp_unchanged(conn, log_file, mtime, size):
logger.debug("Skipping unchanged file: %s", log_file.name)
skipped.append(log_file.name)
stats[source_id] = stats.get(source_id, 0)
continue
count = 0
batch: list[RetrievedEntry] = []
for entry in _parse_file(log_file, compiled, ingest_time, source_id=source_id):
@ -321,18 +265,11 @@ def _glean_files(
_write_batch(conn, batch)
conn.commit()
count += len(batch)
_save_fingerprint(conn, log_file, mtime, size, ingest_time)
conn.commit()
stats[source_id] = stats.get(source_id, 0) + count
logger.info("Gleaned %d entries from %s (source: %s)", count, log_file.name, source_id)
conn.close()
if skipped:
logger.info("Skipped %d unchanged file(s): %s", len(skipped), ", ".join(skipped))
logger.info("Building FTS index...")
build_fts_index(db_path)
logger.info("FTS index ready")
@ -492,28 +429,19 @@ def glean_dir(
db_path: Path,
pattern_file: Path | None = None,
batch_size: int = 1000,
force: bool = False,
) -> dict[str, int]:
"""Glean all .jsonl and .log files from a corpus directory.
Pass ``force=True`` to bypass fingerprint checks and re-glean all files
regardless of whether they have changed since the last run.
"""
"""Glean all .jsonl and .log files from a corpus directory."""
files = sorted(corpus_dir.glob("*.jsonl")) + sorted(corpus_dir.glob("*.log"))
return _glean_files(files, db_path, pattern_file, batch_size, force=force)
return _glean_files(files, db_path, pattern_file, batch_size)
def glean_file(
log_file: Path,
db_path: Path,
pattern_file: Path | None = None,
force: bool = False,
) -> dict[str, int]:
"""Glean a single log file (any supported format).
Pass ``force=True`` to re-glean even when the file fingerprint is unchanged.
"""
return _glean_files([log_file], db_path, pattern_file, force=force)
"""Glean a single log file (any supported format)."""
return _glean_files([log_file], db_path, pattern_file)
def glean_sources(
@ -521,7 +449,6 @@ def glean_sources(
db_path: Path,
pattern_file: Path | None = None,
batch_size: int = 1000,
force: bool = False,
) -> dict[str, int]:
"""Glean all sources listed in a sources.yaml config file.
@ -583,7 +510,7 @@ def glean_sources(
stats: dict[str, int] = {}
if files:
stats.update(_glean_files(files, db_path, pattern_file, batch_size, source_id_map, force=force))
stats.update(_glean_files(files, db_path, pattern_file, batch_size, source_id_map))
# ── SSH remote sources ─────────────────────────────────────────────────
if not ssh_sources:

View file

@ -93,7 +93,7 @@ def search_logs(
Example: '"connection refused" OR "connection lost"'
severity: Filter by level EMERGENCY, ALERT, CRITICAL, ERROR, WARN, NOTICE, INFO, DEBUG.
source: Partial match on source_id. Format is 'corpus:host:service'.
Example: 'example-node:caddy' matches all Caddy entries from example-node.
Example: 'xanderland:caddy' matches all Caddy entries from xanderland.
pattern: Filter by named pattern tag applied at glean time.
Known tags: auth_failure, connection_lost, oom, segfault, disk_full,
timeout, caddy_tls_error, caddy_config_error, caddy_auth_error,

View file

@ -515,20 +515,13 @@ def delete_source(source_id: str) -> dict:
@router.post("/api/sources/{source_id}/glean")
def reglean_source(
source_id: str,
background_tasks: BackgroundTasks,
force: Annotated[bool, Query(description="Bypass fingerprint check and re-glean even if file is unchanged")] = False,
) -> dict:
def reglean_source(source_id: str, background_tasks: BackgroundTasks) -> dict:
"""Trigger a re-glean for a configured source from sources.yaml.
Handles both local file sources and SSH remote sources. For SSH sources,
the glean runs in the foreground and rebuilds the FTS index before returning
(same behaviour as local sources callers can rely on the count being final
when the response arrives).
Use ``?force=true`` to bypass the fingerprint cache and re-glean the file
even if mtime and size appear unchanged since the last run.
"""
sources_file = PATTERN_DIR / "sources.yaml"
if not sources_file.exists():
@ -543,7 +536,6 @@ def reglean_source(
if src.get("transport") == "ssh":
# SSH sources: open connection, glean all items, rebuild FTS inline.
# Fingerprint skipping applies only to local file sources.
stats = _glean_ssh_source(src, DB_PATH, PATTERN_FILE)
return {"source_id": source_id, "gleaned": sum(stats.values())}
@ -551,7 +543,7 @@ def reglean_source(
src_path = Path(src["path"])
if not src_path.exists():
raise HTTPException(status_code=422, detail=f"Path does not exist: {src_path}")
stats = _glean_file(src_path, DB_PATH, PATTERN_FILE, force=force)
stats = _glean_file(src_path, DB_PATH, PATTERN_FILE)
background_tasks.add_task(build_fts_index, DB_PATH)
return {"source_id": source_id, "gleaned": stats.get(source_id, sum(stats.values()))}
@ -664,14 +656,8 @@ def glean_task_status() -> dict:
@router.post("/api/tasks/glean")
async def trigger_glean(
force: Annotated[bool, Query(description="Bypass fingerprint check and re-glean all sources")] = False,
) -> dict:
"""Manually trigger a glean of all configured sources. No-ops if already running.
Use ``?force=true`` to bypass the fingerprint cache and re-glean every local
file source even when mtime and size are unchanged since the last run.
"""
async def trigger_glean() -> dict:
"""Manually trigger a glean of all configured sources. No-ops if already running."""
sources_file = PATTERN_DIR / "sources.yaml"
if not sources_file.exists():
raise HTTPException(status_code=404, detail="sources.yaml not found — configure log sources first")
@ -679,7 +665,6 @@ async def trigger_glean(
sources_file, DB_PATH, PATTERN_FILE,
submit_endpoint=SUBMIT_ENDPOINT or None,
source_host=SOURCE_HOST,
force=force,
)

View file

@ -1,229 +0,0 @@
"""Configurable embedding service — BSL licensed.
Backends:
sentence_transformers local in-process inference (default, no server needed)
ollama HTTP to a running Ollama instance
Configuration (env vars):
TURNSTONE_EMBED_BACKEND sentence_transformers | ollama (default: sentence_transformers)
TURNSTONE_EMBED_MODEL model name/path (backend-specific default)
TURNSTONE_EMBED_DEVICE cpu | cuda (default: cpu; ST backend only)
TURNSTONE_LLM_URL Ollama base URL (default: http://localhost:11434)
When no backend is importable/reachable, EMBEDDING_AVAILABLE is False and all
embed calls return empty arrays callers must handle this gracefully.
"""
from __future__ import annotations
import logging
import os
import struct
from typing import Protocol, runtime_checkable
import numpy as np
logger = logging.getLogger(__name__)
# ── Public availability flag ──────────────────────────────────────────────────
EMBEDDING_AVAILABLE: bool = False
# ── Config ────────────────────────────────────────────────────────────────────
_BACKEND = os.environ.get("TURNSTONE_EMBED_BACKEND", "sentence_transformers").lower()
_DEVICE = os.environ.get("TURNSTONE_EMBED_DEVICE", "cpu").lower()
_LLM_URL = os.environ.get("TURNSTONE_LLM_URL", "http://localhost:11434")
# BAAI/bge-small-en-v1.5: 33MB, MIT, 49M downloads/month, 384-dim, 512-token max.
# Benchmarked as the best quality-to-size ratio in the field (MTEB 62.17).
# all-MiniLM-L6-v2 is a viable lighter alternative (23MB, 256-token max) if
# inference speed is the primary constraint.
_DEFAULT_MODEL: dict[str, str] = {
"sentence_transformers": "BAAI/bge-small-en-v1.5",
"ollama": "nomic-embed-text",
}
_MODEL = os.environ.get(
"TURNSTONE_EMBED_MODEL",
_DEFAULT_MODEL.get(_BACKEND, "sentence-transformers/all-MiniLM-L6-v2"),
)
# ── Protocol ──────────────────────────────────────────────────────────────────
@runtime_checkable
class Embedder(Protocol):
"""Minimal interface all embedding backends must satisfy."""
@property
def dim(self) -> int:
"""Embedding dimension produced by this model."""
...
@property
def model_name(self) -> str:
"""Human-readable model identifier."""
...
def embed(self, text: str) -> np.ndarray:
"""Embed a single string. Returns 1-D float32 array of length dim."""
...
def embed_batch(self, texts: list[str]) -> list[np.ndarray]:
"""Embed a list of strings. Returns list of 1-D float32 arrays."""
...
# ── sentence-transformers backend ─────────────────────────────────────────────
class SentenceTransformerEmbedder:
"""Local in-process embedding via the sentence-transformers library.
The model is downloaded from HuggingFace on first instantiation and cached
at ~/.cache/huggingface/. Subsequent starts use the local cache.
"""
def __init__(self, model_name: str = _MODEL, device: str = _DEVICE) -> None:
from sentence_transformers import SentenceTransformer # type: ignore[import]
logger.info("Loading embedding model %r on device %r ...", model_name, device)
self._model = SentenceTransformer(model_name, device=device)
self._model_name = model_name
# Infer dimension from a test embed rather than hard-coding
self._dim: int = int(self._model.encode("test").shape[0])
logger.info("Embedding model ready — dim=%d", self._dim)
@property
def dim(self) -> int:
return self._dim
@property
def model_name(self) -> str:
return self._model_name
def embed(self, text: str) -> np.ndarray:
vec = self._model.encode(text, convert_to_numpy=True, normalize_embeddings=True)
return vec.astype(np.float32)
def embed_batch(self, texts: list[str]) -> list[np.ndarray]:
if not texts:
return []
vecs = self._model.encode(
texts, convert_to_numpy=True, normalize_embeddings=True, batch_size=32
)
return [v.astype(np.float32) for v in vecs]
# ── Ollama backend ────────────────────────────────────────────────────────────
class OllamaEmbedder:
"""HTTP embedding via a running Ollama instance."""
def __init__(
self,
model_name: str = _MODEL,
llm_url: str = _LLM_URL,
timeout: float = 30.0,
) -> None:
import httpx # already a project dependency
self._model_name = model_name
self._url = f"{llm_url.rstrip('/')}/api/embeddings"
self._timeout = timeout
self._client = httpx.Client(timeout=timeout)
# Probe dimension with a test call
self._dim = self._probe_dim()
def _probe_dim(self) -> int:
try:
vec = self._raw_embed("probe")
return len(vec)
except Exception as exc:
logger.warning("Ollama dim probe failed (%s) — defaulting to 768", exc)
return 768
def _raw_embed(self, text: str) -> list[float]:
resp = self._client.post(
self._url, json={"model": self._model_name, "prompt": text}
)
resp.raise_for_status()
return resp.json().get("embedding") or []
@property
def dim(self) -> int:
return self._dim
@property
def model_name(self) -> str:
return self._model_name
def embed(self, text: str) -> np.ndarray:
vec = self._raw_embed(text)
return np.array(vec, dtype=np.float32)
def embed_batch(self, texts: list[str]) -> list[np.ndarray]:
return [self.embed(t) for t in texts]
# ── Singleton factory ─────────────────────────────────────────────────────────
_embedder: Embedder | None = None
def get_embedder() -> Embedder | None:
"""Return the configured embedder singleton, or None when unavailable.
Lazy-initialises on first call. Callers should check EMBEDDING_AVAILABLE
or test for None rather than calling this unconditionally.
"""
global _embedder, EMBEDDING_AVAILABLE
if _embedder is not None:
return _embedder
if _BACKEND == "sentence_transformers":
try:
_embedder = SentenceTransformerEmbedder(_MODEL, _DEVICE)
EMBEDDING_AVAILABLE = True
except ImportError:
logger.warning(
"sentence-transformers not installed — embeddings disabled. "
"Install with: pip install sentence-transformers"
)
except Exception as exc:
logger.warning("Failed to load sentence-transformers model %r: %s", _MODEL, exc)
elif _BACKEND == "ollama":
try:
_embedder = OllamaEmbedder(_MODEL, _LLM_URL)
EMBEDDING_AVAILABLE = True
except Exception as exc:
logger.warning("Ollama embedder init failed: %s", exc)
else:
logger.warning("Unknown TURNSTONE_EMBED_BACKEND %r — embeddings disabled", _BACKEND)
return _embedder
# ── BLOB serialisation helpers ────────────────────────────────────────────────
def pack_vector(vec: np.ndarray) -> bytes:
"""Serialise a float32 numpy vector to a SQLite BLOB."""
arr = vec.astype(np.float32)
return struct.pack(f"{len(arr)}f", *arr.tolist())
def unpack_vector(blob: bytes) -> np.ndarray:
"""Deserialise a SQLite BLOB back to a float32 numpy vector."""
n = len(blob) // 4 # 4 bytes per float32
return np.array(struct.unpack(f"{n}f", blob), dtype=np.float32)
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
"""Cosine similarity between two L2-normalised vectors.
Both vectors are re-normalised defensively so callers need not pre-normalise.
Returns 0.0 when either vector has zero norm.
"""
norm_a = np.linalg.norm(a)
norm_b = np.linalg.norm(b)
if norm_a == 0.0 or norm_b == 0.0:
return 0.0
return float(np.dot(a, b) / (norm_a * norm_b))

View file

@ -88,7 +88,7 @@ def summarize(
logger.debug("Task endpoint unavailable (%s) — falling back to direct model", exc)
# Fallback: OpenAI-compat endpoint with explicit model name (local instances,
# example-node, or any cf-orch that doesn't have task assignments loaded).
# xanderland, or any cf-orch that doesn't have task assignments loaded).
try:
resp = httpx.post(
f"{llm_url.rstrip('/')}/v1/chat/completions",

View file

@ -121,13 +121,8 @@ async def run_once(
pattern_file: Path | None = None,
submit_endpoint: str | None = None,
source_host: str = "unknown",
force: bool = False,
) -> dict[str, Any]:
"""Ingest all sources once, then submit matched entries if configured.
Pass ``force=True`` to bypass fingerprint checks and re-glean all local
file sources regardless of whether they appear unchanged.
"""
"""Ingest all sources once, then submit matched entries if configured."""
if _lock.locked():
return {"ok": False, "error": "glean already running", "skipped": True}
@ -138,7 +133,7 @@ async def run_once(
loop = asyncio.get_running_loop()
stats: dict[str, int] = await loop.run_in_executor(
None,
lambda: glean_sources(sources_file, db_path, pattern_file, force=force),
lambda: glean_sources(sources_file, db_path, pattern_file),
)
duration = (datetime.now(tz=timezone.utc) - started).total_seconds()
_state.last_run_at = started.isoformat()

View file

@ -48,8 +48,8 @@ sources:
# ── Network syslog (router, switches, UniFi APs) ─────────────────────────────
# Written by syslog-receiver.service (UDP 5140 → /devl/turnstone-cluster/data/network-syslog.txt).
# Configure devices to send syslog to Heimdall:5140.
# UniFi: Settings → System → Remote Logging → Syslog Host = <YOUR_HOST_IP>:5140
# Ubiquiti EdgeRouter: set system syslog host <YOUR_HOST_IP> facility all level debug
# Managed switches: varies by vendor — target <YOUR_HOST_IP> UDP 5140
# UniFi: Settings → System → Remote Logging → Syslog Host = 10.1.10.71:5140
# Ubiquiti EdgeRouter: set system syslog host 10.1.10.71 facility all level debug
# Managed switches: varies by vendor — target 10.1.10.71 UDP 5140
- id: network-syslog
path: /data/network-syslog.txt

View file

@ -46,7 +46,7 @@
# ── Adding Caddy reverse proxy ────────────────────────────────────────────────
# Add to /etc/caddy/Caddyfile:
#
# turnstone.example-node.tv {
# turnstone.xanderland.tv {
# import protected
# reverse_proxy 10.0.0.10:8534
# import cloudflare

View file

@ -1,17 +1,13 @@
"""Tests for app/context/embedder.py — delegates to app.services.embeddings."""
"""Tests for app/context/embedder.py — graceful no-op without sqlite-vec."""
import sqlite3
import struct
from pathlib import Path
from unittest.mock import MagicMock, patch
import numpy as np
from unittest.mock import patch
import pytest
from app.context import embedder as emb_mod
@pytest.fixture()
def db(tmp_path: Path) -> Path:
@pytest.fixture
def db(tmp_path):
db_path = tmp_path / "t.db"
conn = sqlite3.connect(str(db_path))
conn.executescript("""
@ -24,78 +20,34 @@ def db(tmp_path: Path) -> Path:
REFERENCES context_documents(id) ON DELETE CASCADE,
chunk_index INTEGER NOT NULL, text TEXT NOT NULL, embedding BLOB
);
INSERT INTO context_documents
VALUES ('d1','test.md','markdown','hello',5,'2026-01-01T00:00:00+00:00');
INSERT INTO context_documents VALUES ('d1','test.md','markdown','hello',5,'2026-01-01T00:00:00+00:00');
INSERT INTO context_chunks VALUES ('c1','d1',0,'hello world',NULL);
INSERT INTO context_chunks VALUES ('c2','d1',1,'second chunk',NULL);
""")
conn.commit()
conn.close()
return db_path
def _mock_embedder(dim: int = 3) -> MagicMock:
"""Return a mock Embedder that returns constant dim-length vectors."""
m = MagicMock()
m.dim = dim
m.embed_batch.return_value = [np.zeros(dim, dtype=np.float32)] * 10
return m
class TestEmbedChunks:
def test_returns_zero_when_no_embedder(self, db: Path) -> None:
with patch("app.context.embedder.get_embedder", return_value=None):
count = emb_mod.embed_chunks(db, "d1")
def test_embed_skipped_when_extension_absent(db):
with patch.object(emb_mod, "EMBEDDING_AVAILABLE", False):
count = emb_mod.embed_chunks(db, "d1", "http://localhost:11434")
assert count == 0
def test_returns_zero_when_no_unembedded_chunks(self, db: Path) -> None:
# Pre-fill both chunks with a blob
blob = struct.pack("3f", 0.1, 0.2, 0.3)
def test_embed_calls_ollama_when_available(db):
import httpx
class FakeResponse:
status_code = 200
def raise_for_status(self): pass
def json(self): return {"embedding": [0.1, 0.2, 0.3]}
with patch.object(emb_mod, "EMBEDDING_AVAILABLE", True), \
patch("app.context.embedder.httpx.post", return_value=FakeResponse()):
count = emb_mod.embed_chunks(db, "d1", "http://localhost:11434")
assert count == 1
# Verify blob was written
conn = sqlite3.connect(str(db))
conn.execute("UPDATE context_chunks SET embedding=?", (blob,))
conn.commit()
row = conn.execute("SELECT embedding FROM context_chunks WHERE id='c1'").fetchone()
conn.close()
embedder = _mock_embedder()
with patch("app.context.embedder.get_embedder", return_value=embedder):
count = emb_mod.embed_chunks(db, "d1")
assert count == 0
embedder.embed_batch.assert_not_called()
def test_embeds_all_null_chunks(self, db: Path) -> None:
embedder = _mock_embedder(dim=3)
with patch("app.context.embedder.get_embedder", return_value=embedder):
count = emb_mod.embed_chunks(db, "d1")
assert count == 2 # two chunks in fixture
def test_blobs_written_to_db(self, db: Path) -> None:
vec = np.array([0.1, 0.2, 0.3], dtype=np.float32)
embedder = _mock_embedder(dim=3)
embedder.embed_batch.return_value = [vec, vec]
with patch("app.context.embedder.get_embedder", return_value=embedder):
emb_mod.embed_chunks(db, "d1")
conn = sqlite3.connect(str(db))
rows = conn.execute(
"SELECT embedding FROM context_chunks WHERE document_id='d1'"
).fetchall()
conn.close()
for (blob,) in rows:
assert blob is not None
unpacked = struct.unpack(f"{len(blob)//4}f", blob)
assert len(unpacked) == 3
def test_legacy_llm_url_param_accepted(self, db: Path) -> None:
"""Ensure backward-compat signature still works (llm_url ignored)."""
embedder = _mock_embedder()
with patch("app.context.embedder.get_embedder", return_value=embedder):
count = emb_mod.embed_chunks(db, "d1", "http://localhost:11434", "nomic-embed-text")
assert count == 2
def test_embed_batch_error_returns_zero(self, db: Path) -> None:
embedder = _mock_embedder()
embedder.embed_batch.side_effect = RuntimeError("model exploded")
with patch("app.context.embedder.get_embedder", return_value=embedder):
count = emb_mod.embed_chunks(db, "d1")
assert count == 0
assert row[0] is not None

View file

@ -1,236 +0,0 @@
"""Tests for fingerprint-based incremental glean skipping (issue #30).
Verifies that _glean_files() (and its public wrappers) skip local files whose
mtime+size fingerprint has not changed since the last glean, and that force=True
bypasses that check.
"""
from __future__ import annotations
import sqlite3
import time
from pathlib import Path
import pytest
from app.glean.pipeline import (
_fingerprint,
_fp_unchanged,
_save_fingerprint,
ensure_schema,
glean_dir,
glean_file,
)
from app.glean.base import now_iso
# ── Fixtures ──────────────────────────────────────────────────────────────────
@pytest.fixture()
def db_path(tmp_path: Path) -> Path:
path = tmp_path / "test.db"
ensure_schema(path)
return path
@pytest.fixture()
def log_file(tmp_path: Path) -> Path:
"""A minimal plaintext log file."""
f = tmp_path / "test.log"
f.write_text("May 24 10:00:00 heimdall kernel: test message\n")
return f
# ── Unit: fingerprint helpers ──────────────────────────────────────────────────
class TestFingerprintHelpers:
def test_fingerprint_returns_mtime_and_size(self, log_file: Path) -> None:
mtime, size = _fingerprint(log_file)
st = log_file.stat()
assert mtime == st.st_mtime
assert size == st.st_size
def test_fp_unchanged_returns_false_when_no_record(self, db_path: Path, log_file: Path) -> None:
conn = sqlite3.connect(str(db_path))
mtime, size = _fingerprint(log_file)
assert _fp_unchanged(conn, log_file, mtime, size) is False
conn.close()
def test_fp_unchanged_returns_true_after_save(self, db_path: Path, log_file: Path) -> None:
conn = sqlite3.connect(str(db_path))
mtime, size = _fingerprint(log_file)
_save_fingerprint(conn, log_file, mtime, size, now_iso())
conn.commit()
assert _fp_unchanged(conn, log_file, mtime, size) is True
conn.close()
def test_fp_unchanged_returns_false_on_size_change(self, db_path: Path, log_file: Path) -> None:
conn = sqlite3.connect(str(db_path))
mtime, size = _fingerprint(log_file)
_save_fingerprint(conn, log_file, mtime, size, now_iso())
conn.commit()
# Simulate size change (new content appended)
assert _fp_unchanged(conn, log_file, mtime, size + 1) is False
conn.close()
def test_fp_unchanged_returns_false_on_mtime_change(self, db_path: Path, log_file: Path) -> None:
conn = sqlite3.connect(str(db_path))
mtime, size = _fingerprint(log_file)
_save_fingerprint(conn, log_file, mtime, size, now_iso())
conn.commit()
assert _fp_unchanged(conn, log_file, mtime + 1.0, size) is False
conn.close()
def test_save_fingerprint_upserts(self, db_path: Path, log_file: Path) -> None:
"""Second save with different values replaces the first (UPSERT semantics)."""
conn = sqlite3.connect(str(db_path))
_save_fingerprint(conn, log_file, 1000.0, 100, "2026-01-01T00:00:00Z")
conn.commit()
_save_fingerprint(conn, log_file, 2000.0, 200, "2026-01-02T00:00:00Z")
conn.commit()
row = conn.execute(
"SELECT mtime, size FROM glean_fingerprints WHERE path = ?",
(str(log_file),),
).fetchone()
assert row == (2000.0, 200)
conn.close()
# ── Integration: glean_file skipping ─────────────────────────────────────────
class TestGleanFileFingerprint:
def test_first_glean_writes_fingerprint(self, db_path: Path, log_file: Path) -> None:
glean_file(log_file, db_path)
conn = sqlite3.connect(str(db_path))
row = conn.execute(
"SELECT mtime, size FROM glean_fingerprints WHERE path = ?",
(str(log_file),),
).fetchone()
conn.close()
assert row is not None
mtime, size = _fingerprint(log_file)
assert row == (mtime, size)
def test_second_glean_skips_unchanged_file(self, db_path: Path, log_file: Path) -> None:
stats_first = glean_file(log_file, db_path)
count_first = sum(stats_first.values())
# Re-glean without touching the file — should produce 0 new entries.
stats_second = glean_file(log_file, db_path)
count_second = sum(stats_second.values())
assert count_first >= 1, "First glean should find at least one entry"
assert count_second == 0, "Second glean should skip unchanged file"
def test_second_glean_runs_when_file_grows(self, db_path: Path, log_file: Path) -> None:
glean_file(log_file, db_path)
# Append a new line and update mtime by rewriting.
original = log_file.read_text()
log_file.write_text(original + "May 24 10:01:00 heimdall kernel: second message\n")
stats_second = glean_file(log_file, db_path)
# INSERT OR IGNORE means the original entry won't re-count, but parsing
# does happen — at minimum the new line is processed.
assert sum(stats_second.values()) >= 0 # glean ran (not skipped)
# Confirm fingerprint updated to new size.
conn = sqlite3.connect(str(db_path))
row = conn.execute(
"SELECT size FROM glean_fingerprints WHERE path = ?",
(str(log_file),),
).fetchone()
conn.close()
assert row is not None
assert row[0] == log_file.stat().st_size
def test_force_bypasses_fingerprint(self, db_path: Path, log_file: Path) -> None:
glean_file(log_file, db_path)
# Without force: skipped.
stats_no_force = glean_file(log_file, db_path)
assert sum(stats_no_force.values()) == 0
# With force: glean runs (INSERT OR IGNORE means count may be 0, but
# we verify the fingerprint was re-saved with a fresh gleaned_at).
conn_before = sqlite3.connect(str(db_path))
ts_before = conn_before.execute(
"SELECT gleaned_at FROM glean_fingerprints WHERE path = ?",
(str(log_file),),
).fetchone()[0]
conn_before.close()
time.sleep(0.01) # ensure gleaned_at advances
glean_file(log_file, db_path, force=True)
conn_after = sqlite3.connect(str(db_path))
ts_after = conn_after.execute(
"SELECT gleaned_at FROM glean_fingerprints WHERE path = ?",
(str(log_file),),
).fetchone()[0]
conn_after.close()
assert ts_after > ts_before, "force=True should update gleaned_at timestamp"
# ── Integration: glean_dir skipping ──────────────────────────────────────────
class TestGleanDirFingerprint:
def test_glean_dir_skips_unchanged_on_second_run(self, db_path: Path, tmp_path: Path) -> None:
log1 = tmp_path / "a.log"
log2 = tmp_path / "b.log"
log1.write_text("May 24 10:00:00 heimdall kernel: msg one\n")
log2.write_text("May 24 10:00:00 heimdall kernel: msg two\n")
glean_dir(tmp_path, db_path)
stats_second = glean_dir(tmp_path, db_path)
assert sum(stats_second.values()) == 0, "Both unchanged files should be skipped"
def test_glean_dir_force_reruns_all(self, db_path: Path, tmp_path: Path) -> None:
log1 = tmp_path / "a.log"
log1.write_text("May 24 10:00:00 heimdall kernel: msg one\n")
glean_dir(tmp_path, db_path)
# force=True: runs even though nothing changed; INSERT OR IGNORE keeps DB clean.
conn_before = sqlite3.connect(str(db_path))
ts_before = conn_before.execute(
"SELECT gleaned_at FROM glean_fingerprints WHERE path = ?",
(str(log1),),
).fetchone()[0]
conn_before.close()
time.sleep(0.01)
glean_dir(tmp_path, db_path, force=True)
conn_after = sqlite3.connect(str(db_path))
ts_after = conn_after.execute(
"SELECT gleaned_at FROM glean_fingerprints WHERE path = ?",
(str(log1),),
).fetchone()[0]
conn_after.close()
assert ts_after > ts_before
# ── Schema: ensure fingerprints table created ─────────────────────────────────
class TestEnsureSchema:
def test_fingerprints_table_exists_after_ensure_schema(self, tmp_path: Path) -> None:
db = tmp_path / "fresh.db"
ensure_schema(db)
conn = sqlite3.connect(str(db))
tables = {
row[0]
for row in conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()
}
conn.close()
assert "glean_fingerprints" in tables
def test_ensure_schema_idempotent(self, tmp_path: Path) -> None:
"""Calling ensure_schema twice on the same DB must not raise."""
db = tmp_path / "fresh.db"
ensure_schema(db)
ensure_schema(db) # second call — should be a no-op

View file

@ -4,24 +4,24 @@ from __future__ import annotations
from app.glean.syslog import is_syslog, parse
SYSLOG_SAMPLE = """\
May 11 14:23:01 example-node sshd[1234]: Accepted publickey for x from 192.168.1.1 port 54321 ssh2
May 11 14:23:05 example-node sshd[1234]: Failed password for invalid user admin from 10.0.0.99 port 22 ssh2
May 11 14:23:10 example-node sudo[5678]: x : TTY=pts/0 ; PWD=/home/x ; USER=root ; COMMAND=/usr/bin/apt update
May 11 14:23:15 example-node kernel: [12345.678] usb 1-1: USB disconnect, device number 2
May 1 04:00:00 example-node CRON[9999]: (root) CMD (/usr/local/sbin/backup.sh)
May 11 14:24:00 example-node systemd[1]: Started NetworkManager.
May 11 14:23:01 xanderland sshd[1234]: Accepted publickey for x from 192.168.1.1 port 54321 ssh2
May 11 14:23:05 xanderland sshd[1234]: Failed password for invalid user admin from 10.0.0.99 port 22 ssh2
May 11 14:23:10 xanderland sudo[5678]: x : TTY=pts/0 ; PWD=/home/x ; USER=root ; COMMAND=/usr/bin/apt update
May 11 14:23:15 xanderland kernel: [12345.678] usb 1-1: USB disconnect, device number 2
May 1 04:00:00 xanderland CRON[9999]: (root) CMD (/usr/local/sbin/backup.sh)
May 11 14:24:00 xanderland systemd[1]: Started NetworkManager.
"""
class TestDetector:
def test_detects_standard_line(self):
assert is_syslog("May 11 14:23:01 example-node sshd[1234]: message")
assert is_syslog("May 11 14:23:01 xanderland sshd[1234]: message")
def test_detects_no_pid(self):
assert is_syslog("May 11 14:23:01 example-node kernel: message")
assert is_syslog("May 11 14:23:01 xanderland kernel: message")
def test_detects_space_padded_day(self):
assert is_syslog("May 1 04:00:00 example-node CRON[9999]: message")
assert is_syslog("May 1 04:00:00 xanderland CRON[9999]: message")
def test_rejects_servarr(self):
assert not is_syslog("2026-05-11 02:31:51.5|Info|ComponentName|Message")