From 9bb88b168f8d622e5bfaeed40f04c539dd8e7c5e Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Sun, 17 May 2026 11:28:33 -0700 Subject: [PATCH] feat(corpus): pipeline log ingest from shared dir (closes #67) Pull-side companion to kiwi#141. Ingests structured JSONL pipeline logs from /Library/Assets/logs/pipeline/ into the log corpus for Turnstone logreading model training. - app/data/log_corpus.py: add ingested_pipeline_files tracking table, _pipeline_ingest_dir() config helper, _ingest_one_file() parser, and POST /api/corpus/pipeline-ingest endpoint - source_host = "pipeline_scrape"; source_id from logger field; extra dict stored as matched_patterns; batch_type = "pipeline_log" - Idempotent by filename: skips files already in ingested_pipeline_files - config/label_tool.yaml.example: add corpus section with pipeline_ingest_dir and push sources comment block - tests/test_log_corpus.py: 8 new tests covering ingest, idempotency, non-JSONL filtering, malformed line resilience, incremental runs --- app/data/log_corpus.py | 110 ++++++++++++++++++++ config/label_tool.yaml.example | 16 +++ tests/test_log_corpus.py | 182 +++++++++++++++++++++++++++++++++ 3 files changed, 308 insertions(+) diff --git a/app/data/log_corpus.py b/app/data/log_corpus.py index 4212130..0f48f8a 100644 --- a/app/data/log_corpus.py +++ b/app/data/log_corpus.py @@ -34,6 +34,8 @@ router = APIRouter() _DB_PATH: Path = _ROOT / "data" / "corpus.db" +_PIPELINE_SOURCE_HOST = "pipeline_scrape" + _SCHEMA = """ CREATE TABLE IF NOT EXISTS corpus_sources ( token TEXT PRIMARY KEY, @@ -77,6 +79,12 @@ CREATE TABLE IF NOT EXISTS corpus_entries ( CREATE INDEX IF NOT EXISTS idx_ce_label_state ON corpus_entries(label_state); CREATE INDEX IF NOT EXISTS idx_ce_source ON corpus_entries(source_host); CREATE INDEX IF NOT EXISTS idx_ce_severity ON corpus_entries(severity); + +CREATE TABLE IF NOT EXISTS ingested_pipeline_files ( + filename TEXT PRIMARY KEY, + ingested_at TEXT NOT NULL, + entry_count INTEGER NOT NULL +); """ @@ -122,6 +130,19 @@ def _init_db() -> None: _seed_sources(conn) +def _pipeline_ingest_dir() -> Path | None: + """Return the configured pipeline log ingest directory, or None if unset.""" + f = _config_file() + if not f.exists(): + return None + try: + raw = yaml.safe_load(f.read_text(encoding="utf-8")) or {} + except yaml.YAMLError: + return None + val = raw.get("corpus", {}).get("pipeline_ingest_dir", "") or "" + return Path(val) if val else None + + def _load_corpus_config() -> list[dict]: f = _config_file() if not f.exists(): @@ -350,3 +371,92 @@ def export_labeled() -> StreamingResponse: media_type="application/x-ndjson", headers={"Content-Disposition": "attachment; filename=log_corpus_labeled.jsonl"}, ) + + +# ── POST /api/corpus/pipeline-ingest ───────────────────────────────────────── + +def _ingest_one_file(conn: sqlite3.Connection, path: Path) -> int: + """Parse a pipeline JSONL file and insert entries. Returns count stored.""" + batch_id = str(uuid.uuid4()) + lines = path.read_text(encoding="utf-8").splitlines() + entries_raw: list[dict] = [] + for line in lines: + line = line.strip() + if not line: + continue + try: + entries_raw.append(json.loads(line)) + except json.JSONDecodeError: + logger.debug("Skipping malformed line in %s", path.name) + + conn.execute( + "INSERT INTO corpus_batches (id, source_host, batch_type, received_at, entry_count, raw_json) " + "VALUES (?, ?, ?, ?, ?, ?)", + (batch_id, _PIPELINE_SOURCE_HOST, "pipeline_log", _now_iso(), + len(entries_raw), json.dumps({"file": path.name})), + ) + + stored = 0 + for entry in entries_raw: + text = (entry.get("msg") or "").strip() + if not text: + continue + conn.execute( + "INSERT OR IGNORE INTO corpus_entries " + "(id, batch_id, source_host, timestamp_iso, severity, source_id, text, matched_patterns) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + (str(uuid.uuid4()), batch_id, _PIPELINE_SOURCE_HOST, + entry.get("ts"), + entry.get("level"), + entry.get("logger"), + text, + json.dumps([entry["extra"]] if entry.get("extra") else [])), + ) + stored += 1 + + conn.execute( + "INSERT INTO ingested_pipeline_files (filename, ingested_at, entry_count) VALUES (?, ?, ?)", + (path.name, _now_iso(), stored), + ) + return stored + + +@router.post("/pipeline-ingest") +def pipeline_ingest() -> dict: + """Walk the configured pipeline log directory and ingest new JSONL files. + + Skips files already recorded in ingested_pipeline_files. Safe to call + repeatedly — idempotent by filename. + """ + ingest_dir = _pipeline_ingest_dir() + if ingest_dir is None: + raise HTTPException(404, "pipeline_ingest_dir not configured in label_tool.yaml") + + ingested = 0 + skipped = 0 + total_stored = 0 + files_detail: list[dict] = [] + + with _db() as conn: + already_done: set[str] = { + row[0] + for row in conn.execute("SELECT filename FROM ingested_pipeline_files").fetchall() + } + + for path in sorted(ingest_dir.glob("*.jsonl")): + if path.name in already_done: + skipped += 1 + continue + stored = _ingest_one_file(conn, path) + ingested += 1 + total_stored += stored + files_detail.append({"file": path.name, "entries_stored": stored}) + + logger.info("Pipeline ingest: %d files ingested, %d skipped, %d entries stored", + ingested, skipped, total_stored) + return { + "ingested_files": ingested, + "skipped_files": skipped, + "entries_stored": total_stored, + "files": files_detail, + } diff --git a/config/label_tool.yaml.example b/config/label_tool.yaml.example index 3f5f4b5..80b79f1 100644 --- a/config/label_tool.yaml.example +++ b/config/label_tool.yaml.example @@ -122,6 +122,22 @@ imitate: text_fields: [title] prompt_template: "Summarize the key rules described in this passage:\n\n{text}" +# ── Log corpus (Turnstone training data) ────────────────────────────────────── +corpus: + # Directory containing pipeline JSONL log files to ingest (pull-side). + # Files named