turnstone/scripts/ingest_corpus.py
pyr0ball f8a2f8007b feat: plain-text and Plex log ingestors
- app/ingest/plex.py: Plex Media Server log parser
  Regex-based line parser for 'Mon DD, YYYY HH:MM:SS.mmm [pid] LEVEL - msg'
  format. Handles multi-line entries (stack traces). Detects plex_eae_failure
  and all other patterns via shared pattern library.
- app/ingest/plaintext.py: generic fallback parser for unrecognized formats
  Extracts timestamps (ISO 8601, syslog, common log) and severity via regex.
- pipeline.py: detect plex format via is_plex_log(); fall back to plaintext
  instead of skipping; process *.log files alongside *.jsonl; add ingest_file()
  for single-file ingestion.
- scripts/ingest_corpus.py: accept single file or directory as target
- manage.sh: ingest-plex command SSHes to Cass (or HOST arg), pulls
  Plex Media Server.log, and ingests it directly
2026-05-08 17:50:01 -07:00

36 lines
1.1 KiB
Python

"""CLI: ingest a log file or corpus directory into the Turnstone SQLite database."""
from __future__ import annotations
import logging
import sys
from pathlib import Path
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
sys.path.insert(0, str(Path(__file__).parent.parent))
from app.ingest.pipeline import ingest, ingest_file
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: ingest_corpus.py <file_or_dir> [db_path]", file=sys.stderr)
sys.exit(1)
target = Path(sys.argv[1])
db_path = Path(sys.argv[2]) if len(sys.argv) > 2 else Path("data/turnstone.db")
db_path.parent.mkdir(parents=True, exist_ok=True)
print(f"Ingesting {target}{db_path}")
if target.is_file():
stats = ingest_file(target, db_path)
elif target.is_dir():
stats = ingest(target, db_path)
else:
print(f"Error: {target} is not a file or directory", file=sys.stderr)
sys.exit(1)
total = sum(stats.values())
for fname, count in sorted(stats.items()):
print(f" {fname}: {count:,}")
print(f" TOTAL: {total:,} entries")