From 2b990a603aeda90b910c02cc5151ade25f6c0075 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Mon, 11 May 2026 17:07:54 -0700 Subject: [PATCH] =?UTF-8?q?feat:=20log=20corpus=20receiver=20=E2=80=94=20a?= =?UTF-8?q?ccept=20Turnstone=20push=20batches=20and=20label=20for=20logrea?= =?UTF-8?q?ding=20fine-tune?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds corpus.db (corpus_sources, corpus_batches, corpus_entries), a FastAPI router at /api/corpus with receive/label/skip/stats/export endpoints, and seeds consent tokens for xanderland + orchard nodes from label_tool.yaml. PII flag excludes entries from JSONL export. Closes avocet#61. --- .gitignore | 3 + app/api.py | 3 + app/data/log_corpus.py | 352 +++++++++++++++++++++++++++++++++++++++ tests/test_log_corpus.py | 272 ++++++++++++++++++++++++++++++ 4 files changed, 630 insertions(+) create mode 100644 app/data/log_corpus.py create mode 100644 tests/test_log_corpus.py diff --git a/.gitignore b/.gitignore index b376926..a6a8b97 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,9 @@ __pycache__/ config/label_tool.yaml # Data files (user-generated, not for version control) +data/corpus.db +data/corpus.db-wal +data/corpus.db-shm data/email_score.jsonl data/email_label_queue.jsonl data/email_compare_sample.jsonl diff --git a/app/api.py b/app/api.py index b485f09..eca231f 100644 --- a/app/api.py +++ b/app/api.py @@ -67,6 +67,9 @@ def finetune_cancel_compat() -> dict: return {"status": "nothing_running"} return cancel_job(row["id"]) +from app.data.log_corpus import router as log_corpus_router +app.include_router(log_corpus_router, prefix="/api/corpus") + from app.dashboard import router as dashboard_router app.include_router(dashboard_router, prefix="/api") diff --git a/app/data/log_corpus.py b/app/data/log_corpus.py new file mode 100644 index 0000000..4212130 --- /dev/null +++ b/app/data/log_corpus.py @@ -0,0 +1,352 @@ +"""Avocet — Log Corpus receiver and labeling API. + +Receives push batches from consented Turnstone nodes, stores entries for labeling, +and exports labeled data as JSONL for the logreading fine-tune pipeline. + +DB: data/corpus.db (separate from train_jobs.db — different lifecycle) +Auth: Bearer token validated against corpus_sources table (seeded from label_tool.yaml). + +All endpoints registered on `router`. api.py includes this with prefix="/api/corpus". +""" +from __future__ import annotations + +import json +import logging +import sqlite3 +import uuid +from contextlib import contextmanager +from datetime import datetime, timezone +from pathlib import Path +from typing import Generator + +import yaml +from fastapi import APIRouter, Depends, HTTPException +from fastapi.requests import Request +from fastapi.responses import StreamingResponse + +logger = logging.getLogger(__name__) + +_ROOT = Path(__file__).parent.parent.parent +_CONFIG_DIR: Path | None = None +_DATA_DIR: Path = _ROOT / "data" + +router = APIRouter() + +_DB_PATH: Path = _ROOT / "data" / "corpus.db" + +_SCHEMA = """ +CREATE TABLE IF NOT EXISTS corpus_sources ( + token TEXT PRIMARY KEY, + source_host TEXT NOT NULL, + owner TEXT NOT NULL, + consent_date TEXT NOT NULL, + consent_method TEXT NOT NULL, + active INTEGER NOT NULL DEFAULT 1 +); + +CREATE TABLE IF NOT EXISTS corpus_batches ( + id TEXT PRIMARY KEY, + source_host TEXT NOT NULL, + batch_type TEXT NOT NULL, + received_at TEXT NOT NULL, + entry_count INTEGER NOT NULL, + watermark_from TEXT, + watermark_to TEXT, + raw_json TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS corpus_entries ( + id TEXT PRIMARY KEY, + batch_id TEXT NOT NULL REFERENCES corpus_batches(id), + source_host TEXT NOT NULL, + origin_entry_id TEXT, + timestamp_iso TEXT, + severity TEXT, + source_id TEXT, + text TEXT NOT NULL, + matched_patterns TEXT DEFAULT '[]', + label_state TEXT NOT NULL DEFAULT 'unlabeled', + failure_type TEXT, + plain_explanation TEXT, + known_pattern TEXT, + labeled_at TEXT, + labeled_by TEXT DEFAULT 'alan', + pii_flagged INTEGER NOT NULL DEFAULT 0 +); + +CREATE INDEX IF NOT EXISTS idx_ce_label_state ON corpus_entries(label_state); +CREATE INDEX IF NOT EXISTS idx_ce_source ON corpus_entries(source_host); +CREATE INDEX IF NOT EXISTS idx_ce_severity ON corpus_entries(severity); +""" + + +# ── Testability seams ────────────────────────────────────────────────────────── + +def set_config_dir(path: Path | None) -> None: + global _CONFIG_DIR + _CONFIG_DIR = path + + +def set_data_dir(path: Path) -> None: + global _DATA_DIR, _DB_PATH + _DATA_DIR = path + _DB_PATH = path / "corpus.db" + + +# ── Internal helpers ─────────────────────────────────────────────────────────── + +def _config_file() -> Path: + if _CONFIG_DIR is not None: + return _CONFIG_DIR / "label_tool.yaml" + return _ROOT / "config" / "label_tool.yaml" + + +@contextmanager +def _db() -> Generator[sqlite3.Connection, None, None]: + conn = sqlite3.connect(str(_DB_PATH)) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + try: + yield conn + conn.commit() + except Exception: + conn.rollback() + raise + finally: + conn.close() + + +def _init_db() -> None: + with _db() as conn: + conn.executescript(_SCHEMA) + _seed_sources(conn) + + +def _load_corpus_config() -> list[dict]: + f = _config_file() + if not f.exists(): + return [] + try: + raw = yaml.safe_load(f.read_text(encoding="utf-8")) or {} + except yaml.YAMLError as exc: + logger.warning("Failed to parse corpus config: %s", exc) + return [] + return raw.get("corpus", {}).get("sources", []) or [] + + +def _seed_sources(conn: sqlite3.Connection) -> None: + for src in _load_corpus_config(): + conn.execute( + "INSERT OR IGNORE INTO corpus_sources (token, source_host, owner, consent_date, consent_method) " + "VALUES (?, ?, ?, ?, ?)", + (src["token"], src["source_host"], src["owner"], + src["consent_date"], src["consent_method"]), + ) + + +def _validate_token(token: str, conn: sqlite3.Connection) -> str: + """Return source_host for token, or raise 403.""" + row = conn.execute( + "SELECT source_host FROM corpus_sources WHERE token = ? AND active = 1", + (token,), + ).fetchone() + if row is None: + raise HTTPException(status_code=403, detail="Unknown or revoked consent token") + return row["source_host"] + + +def _extract_bearer(request: Request) -> str: + auth = request.headers.get("Authorization", "") + if not auth.startswith("Bearer "): + raise HTTPException(status_code=401, detail="Bearer token required") + return auth.removeprefix("Bearer ").strip() + + +def _now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +# ── Startup ──────────────────────────────────────────────────────────────────── + +_init_db() + + +# ── POST /api/corpus/log-batch ───────────────────────────────────────────────── + +@router.post("/log-batch") +def receive_batch(request: Request, payload: dict) -> dict: + """Accept a push batch from a Turnstone node.""" + token = _extract_bearer(request) + + batch_type = payload.get("batch_type", "raw_entries") + entries_raw = payload.get("entries", []) + batch_id = payload.get("batch_id") or str(uuid.uuid4()) + + with _db() as conn: + source_host = _validate_token(token, conn) + + conn.execute( + "INSERT INTO corpus_batches (id, source_host, batch_type, received_at, entry_count, " + "watermark_from, watermark_to, raw_json) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + (batch_id, source_host, batch_type, _now_iso(), len(entries_raw), + str(payload.get("watermark_from", "")), + str(payload.get("watermark_to", "")), + json.dumps(payload)), + ) + + stored = 0 + for entry in entries_raw: + text = entry.get("text", "").strip() + if not text: + continue + conn.execute( + "INSERT OR IGNORE INTO corpus_entries " + "(id, batch_id, source_host, origin_entry_id, timestamp_iso, severity, " + "source_id, text, matched_patterns) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + (str(uuid.uuid4()), batch_id, source_host, + entry.get("entry_id") or entry.get("id"), + entry.get("timestamp_iso"), + entry.get("severity"), + entry.get("source_id"), + text, + json.dumps(entry.get("matched_patterns", []))), + ) + stored += 1 + + logger.info("Received batch %s from %s: %d/%d entries stored", + batch_id, source_host, stored, len(entries_raw)) + return {"received": True, "batch_id": batch_id, "entries_stored": stored} + + +# ── GET /api/corpus/entries ──────────────────────────────────────────────────── + +@router.get("/entries") +def list_entries( + state: str = "unlabeled", + source_host: str | None = None, + limit: int = 25, +) -> dict: + """Return entries for labeling. Default: unlabeled entries, oldest first.""" + with _db() as conn: + query = "SELECT * FROM corpus_entries WHERE label_state = ?" + params: list = [state] + if source_host: + query += " AND source_host = ?" + params.append(source_host) + query += " ORDER BY rowid LIMIT ?" + params.append(min(limit, 100)) + rows = conn.execute(query, params).fetchall() + return {"entries": [dict(r) for r in rows], "count": len(rows)} + + +# ── POST /api/corpus/entries/{id}/label ─────────────────────────────────────── + +@router.post("/entries/{entry_id}/label") +def label_entry(entry_id: str, body: dict) -> dict: + """Submit a label for a corpus entry.""" + failure_type = body.get("failure_type") + plain_explanation = body.get("plain_explanation", "").strip() + known_pattern = body.get("known_pattern") + pii_flagged = int(bool(body.get("pii_flagged", False))) + + if not failure_type: + raise HTTPException(status_code=422, detail="failure_type is required") + valid_types = {"hardware", "software", "network", "security", "application", "none", "other"} + if failure_type not in valid_types: + raise HTTPException(status_code=422, detail=f"failure_type must be one of {sorted(valid_types)}") + + with _db() as conn: + row = conn.execute("SELECT id FROM corpus_entries WHERE id = ?", (entry_id,)).fetchone() + if row is None: + raise HTTPException(status_code=404, detail="Entry not found") + conn.execute( + "UPDATE corpus_entries SET label_state='labeled', failure_type=?, plain_explanation=?, " + "known_pattern=?, labeled_at=?, pii_flagged=? WHERE id=?", + (failure_type, plain_explanation, known_pattern, _now_iso(), pii_flagged, entry_id), + ) + return {"labeled": True, "entry_id": entry_id} + + +# ── POST /api/corpus/entries/{id}/skip ──────────────────────────────────────── + +@router.post("/entries/{entry_id}/skip") +def skip_entry(entry_id: str) -> dict: + with _db() as conn: + row = conn.execute("SELECT id FROM corpus_entries WHERE id = ?", (entry_id,)).fetchone() + if row is None: + raise HTTPException(status_code=404, detail="Entry not found") + conn.execute( + "UPDATE corpus_entries SET label_state='skipped' WHERE id=?", (entry_id,) + ) + return {"skipped": True, "entry_id": entry_id} + + +# ── GET /api/corpus/stats ────────────────────────────────────────────────────── + +@router.get("/stats") +def get_stats() -> dict: + with _db() as conn: + total = conn.execute("SELECT COUNT(*) FROM corpus_entries").fetchone()[0] + by_state = { + r["label_state"]: r["cnt"] + for r in conn.execute( + "SELECT label_state, COUNT(*) AS cnt FROM corpus_entries GROUP BY label_state" + ).fetchall() + } + by_source = { + r["source_host"]: r["cnt"] + for r in conn.execute( + "SELECT source_host, COUNT(*) AS cnt FROM corpus_entries GROUP BY source_host" + ).fetchall() + } + by_severity = { + r["severity"]: r["cnt"] + for r in conn.execute( + "SELECT severity, COUNT(*) AS cnt FROM corpus_entries " + "WHERE severity IS NOT NULL GROUP BY severity" + ).fetchall() + } + batch_count = conn.execute("SELECT COUNT(*) FROM corpus_batches").fetchone()[0] + return { + "total_entries": total, + "batch_count": batch_count, + "by_label_state": by_state, + "by_source": by_source, + "by_severity": by_severity, + } + + +# ── GET /api/corpus/export ──────────────────────────────────────────────────── + +@router.get("/export") +def export_labeled() -> StreamingResponse: + """Stream labeled, non-PII entries as JSONL for SFT harness.""" + with _db() as conn: + rows = conn.execute( + "SELECT source_host, source_id, severity, text, failure_type, plain_explanation, known_pattern " + "FROM corpus_entries " + "WHERE label_state = 'labeled' AND pii_flagged = 0 AND plain_explanation != ''" + "ORDER BY rowid" + ).fetchall() + + def _generate(): + for row in rows: + record = { + "input": row["text"], + "output": row["plain_explanation"], + "metadata": { + "failure_type": row["failure_type"], + "source": row["source_host"], + "source_id": row["source_id"], + "severity": row["severity"], + "known_pattern": row["known_pattern"], + }, + } + yield json.dumps(record) + "\n" + + return StreamingResponse( + _generate(), + media_type="application/x-ndjson", + headers={"Content-Disposition": "attachment; filename=log_corpus_labeled.jsonl"}, + ) diff --git a/tests/test_log_corpus.py b/tests/test_log_corpus.py new file mode 100644 index 0000000..b50dcb6 --- /dev/null +++ b/tests/test_log_corpus.py @@ -0,0 +1,272 @@ +"""Tests for app/data/log_corpus.py — corpus receiver and labeling endpoints.""" +from __future__ import annotations + +import json +import uuid +from pathlib import Path + +import pytest +from fastapi.testclient import TestClient + +from app.data import log_corpus as lc + + +VALID_TOKEN = str(uuid.uuid4()) +VALID_HOST = "testnode.local" + + +@pytest.fixture(autouse=True) +def isolated_db(tmp_path, monkeypatch): + """Each test gets its own fresh corpus DB and config dir.""" + monkeypatch.setattr(lc, "_DATA_DIR", tmp_path) + monkeypatch.setattr(lc, "_DB_PATH", tmp_path / "corpus.db") + # Config dir pointing to a temp yaml with one test source + config_dir = tmp_path / "config" + config_dir.mkdir() + (config_dir / "label_tool.yaml").write_text( + f"corpus:\n sources:\n" + f" - token: \"{VALID_TOKEN}\"\n" + f" source_host: \"{VALID_HOST}\"\n" + f" owner: TestOwner\n" + f" consent_date: \"2026-05-11\"\n" + f" consent_method: signal_chat\n" + ) + monkeypatch.setattr(lc, "_CONFIG_DIR", config_dir) + lc._init_db() + + +@pytest.fixture() +def client(): + from fastapi import FastAPI + app = FastAPI() + app.include_router(lc.router, prefix="/api/corpus") + return TestClient(app) + + +def _batch(batch_type="raw_entries", entries=None, source_host=VALID_HOST): + return { + "batch_version": 1, + "batch_id": str(uuid.uuid4()), + "pushed_at": "2026-05-11T10:00:00Z", + "source_host": source_host, + "batch_type": batch_type, + "watermark_from": 0, + "watermark_to": 5, + "entries": entries or [ + { + "entry_id": str(uuid.uuid4()), + "source_id": "sonarr", + "timestamp_iso": "2026-05-11T09:58:00Z", + "severity": "ERROR", + "text": "Connection refused to indexer", + "matched_patterns": [], + } + ], + } + + +# ── Receive endpoint ─────────────────────────────────────────────────────────── + +def test_receive_missing_auth(client): + resp = client.post("/api/corpus/log-batch", json=_batch()) + assert resp.status_code == 401 + + +def test_receive_invalid_token(client): + resp = client.post( + "/api/corpus/log-batch", + json=_batch(), + headers={"Authorization": "Bearer bad-token"}, + ) + assert resp.status_code == 403 + + +def test_receive_valid_batch(client): + resp = client.post( + "/api/corpus/log-batch", + json=_batch(), + headers={"Authorization": f"Bearer {VALID_TOKEN}"}, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["received"] is True + assert data["entries_stored"] == 1 + + +def test_receive_stores_source_host_from_token_not_payload(client): + """source_host is always taken from the DB lookup, not the payload.""" + payload = _batch(source_host="attacker-injected-host") + resp = client.post( + "/api/corpus/log-batch", + json=payload, + headers={"Authorization": f"Bearer {VALID_TOKEN}"}, + ) + assert resp.status_code == 200 + entries_resp = client.get("/api/corpus/entries") + entry = entries_resp.json()["entries"][0] + assert entry["source_host"] == VALID_HOST + + +def test_receive_skips_empty_text_entries(client): + payload = _batch(entries=[ + {"entry_id": "e1", "source_id": "svc", "severity": "ERROR", "text": ""}, + {"entry_id": "e2", "source_id": "svc", "severity": "ERROR", "text": " "}, + {"entry_id": "e3", "source_id": "svc", "severity": "ERROR", "text": "real error"}, + ]) + resp = client.post( + "/api/corpus/log-batch", + json=payload, + headers={"Authorization": f"Bearer {VALID_TOKEN}"}, + ) + assert resp.json()["entries_stored"] == 1 + + +def test_receive_incident_bundle(client): + payload = _batch(batch_type="incident_bundles", entries=[ + {"id": "inc-1", "label": "plex crash", "issue_type": "plex", + "started_at": "2026-05-11T09:00:00", "ended_at": "2026-05-11T09:30:00", + "notes": "audio dropped", "created_at": "2026-05-11T09:35:00", + "severity": "high", "text": "plex crash"}, + ]) + resp = client.post( + "/api/corpus/log-batch", + json=payload, + headers={"Authorization": f"Bearer {VALID_TOKEN}"}, + ) + assert resp.status_code == 200 + assert resp.json()["entries_stored"] == 1 + + +# ── Labeling endpoints ───────────────────────────────────────────────────────── + +def test_label_entry(client): + client.post( + "/api/corpus/log-batch", + json=_batch(), + headers={"Authorization": f"Bearer {VALID_TOKEN}"}, + ) + entry_id = client.get("/api/corpus/entries").json()["entries"][0]["id"] + + resp = client.post(f"/api/corpus/entries/{entry_id}/label", json={ + "failure_type": "software", + "plain_explanation": "Sonarr lost connection to its indexer — restart the service.", + "known_pattern": "y", + }) + assert resp.status_code == 200 + assert resp.json()["labeled"] is True + + entries = client.get("/api/corpus/entries", params={"state": "labeled"}).json()["entries"] + assert len(entries) == 1 + assert entries[0]["failure_type"] == "software" + + +def test_label_entry_invalid_failure_type(client): + client.post( + "/api/corpus/log-batch", + json=_batch(), + headers={"Authorization": f"Bearer {VALID_TOKEN}"}, + ) + entry_id = client.get("/api/corpus/entries").json()["entries"][0]["id"] + resp = client.post(f"/api/corpus/entries/{entry_id}/label", json={"failure_type": "aliens"}) + assert resp.status_code == 422 + + +def test_label_entry_missing_failure_type(client): + client.post( + "/api/corpus/log-batch", + json=_batch(), + headers={"Authorization": f"Bearer {VALID_TOKEN}"}, + ) + entry_id = client.get("/api/corpus/entries").json()["entries"][0]["id"] + resp = client.post(f"/api/corpus/entries/{entry_id}/label", json={}) + assert resp.status_code == 422 + + +def test_label_entry_not_found(client): + resp = client.post("/api/corpus/entries/nonexistent/label", json={"failure_type": "software"}) + assert resp.status_code == 404 + + +def test_skip_entry(client): + client.post( + "/api/corpus/log-batch", + json=_batch(), + headers={"Authorization": f"Bearer {VALID_TOKEN}"}, + ) + entry_id = client.get("/api/corpus/entries").json()["entries"][0]["id"] + resp = client.post(f"/api/corpus/entries/{entry_id}/skip") + assert resp.status_code == 200 + + unlabeled = client.get("/api/corpus/entries").json()["entries"] + assert len(unlabeled) == 0 + + +# ── Stats ────────────────────────────────────────────────────────────────────── + +def test_stats_empty(client): + stats = client.get("/api/corpus/stats").json() + assert stats["total_entries"] == 0 + assert stats["batch_count"] == 0 + + +def test_stats_after_receive(client): + client.post( + "/api/corpus/log-batch", + json=_batch(), + headers={"Authorization": f"Bearer {VALID_TOKEN}"}, + ) + stats = client.get("/api/corpus/stats").json() + assert stats["total_entries"] == 1 + assert stats["batch_count"] == 1 + assert stats["by_label_state"].get("unlabeled", 0) == 1 + + +# ── Export ───────────────────────────────────────────────────────────────────── + +def test_export_excludes_unlabeled(client): + client.post( + "/api/corpus/log-batch", + json=_batch(), + headers={"Authorization": f"Bearer {VALID_TOKEN}"}, + ) + resp = client.get("/api/corpus/export") + assert resp.status_code == 200 + assert resp.text.strip() == "" + + +def test_export_includes_labeled(client): + client.post( + "/api/corpus/log-batch", + json=_batch(), + headers={"Authorization": f"Bearer {VALID_TOKEN}"}, + ) + entry_id = client.get("/api/corpus/entries").json()["entries"][0]["id"] + client.post(f"/api/corpus/entries/{entry_id}/label", json={ + "failure_type": "software", + "plain_explanation": "Sonarr lost connection to indexer.", + }) + + resp = client.get("/api/corpus/export") + assert resp.status_code == 200 + lines = [l for l in resp.text.strip().splitlines() if l] + assert len(lines) == 1 + record = json.loads(lines[0]) + assert record["output"] == "Sonarr lost connection to indexer." + assert record["metadata"]["failure_type"] == "software" + + +def test_export_excludes_pii_flagged(client): + client.post( + "/api/corpus/log-batch", + json=_batch(), + headers={"Authorization": f"Bearer {VALID_TOKEN}"}, + ) + entry_id = client.get("/api/corpus/entries").json()["entries"][0]["id"] + client.post(f"/api/corpus/entries/{entry_id}/label", json={ + "failure_type": "software", + "plain_explanation": "Contains username — should not export.", + "pii_flagged": True, + }) + + resp = client.get("/api/corpus/export") + assert resp.text.strip() == ""