From cbdd681faf1fa9bd6c890dc00fc5e97c9e6143ca Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Fri, 8 May 2026 17:50:01 -0700 Subject: [PATCH] feat: plain-text and Plex log ingestors - app/ingest/plex.py: Plex Media Server log parser Regex-based line parser for 'Mon DD, YYYY HH:MM:SS.mmm [pid] LEVEL - msg' format. Handles multi-line entries (stack traces). Detects plex_eae_failure and all other patterns via shared pattern library. - app/ingest/plaintext.py: generic fallback parser for unrecognized formats Extracts timestamps (ISO 8601, syslog, common log) and severity via regex. - pipeline.py: detect plex format via is_plex_log(); fall back to plaintext instead of skipping; process *.log files alongside *.jsonl; add ingest_file() for single-file ingestion. - scripts/ingest_corpus.py: accept single file or directory as target - manage.sh: ingest-plex command SSHes to Cass (or HOST arg), pulls Plex Media Server.log, and ingests it directly --- app/ingest/pipeline.py | 42 ++++++++++++---- app/ingest/plaintext.py | 79 ++++++++++++++++++++++++++++++ app/ingest/plex.py | 103 +++++++++++++++++++++++++++++++++++++++ manage.sh | 24 ++++++++- scripts/ingest_corpus.py | 24 ++++++--- 5 files changed, 253 insertions(+), 19 deletions(-) create mode 100644 app/ingest/plaintext.py create mode 100644 app/ingest/plex.py diff --git a/app/ingest/pipeline.py b/app/ingest/pipeline.py index ea25806..f85dabe 100644 --- a/app/ingest/pipeline.py +++ b/app/ingest/pipeline.py @@ -8,7 +8,7 @@ import sqlite3 from pathlib import Path from typing import Iterator -from app.ingest import caddy, docker_log, journald +from app.ingest import caddy, docker_log, journald, plaintext, plex from app.ingest.base import _compile, load_patterns, now_iso from app.services.models import LogPattern, RetrievedEntry from app.services.search import build_fts_index @@ -47,7 +47,9 @@ def _detect_format(first_line: str) -> str: return "caddy" except (json.JSONDecodeError, AttributeError): pass - return "unknown" + if plex.is_plex_log(first_line): + return "plex" + return "plaintext" def _parse_file( @@ -77,8 +79,10 @@ def _parse_file( yield from docker_log.parse(all_lines(), source_id, compiled, ingest_time) elif fmt == "caddy": yield from caddy.parse(all_lines(), source_id, compiled, ingest_time) + elif fmt == "plex": + yield from plex.parse(all_lines(), source_id, compiled, ingest_time) else: - logger.warning("Unknown format in %s — skipping", path.name) + yield from plaintext.parse(all_lines(), source_id, compiled, ingest_time) def _write_batch(conn: sqlite3.Connection, batch: list[RetrievedEntry]) -> None: @@ -102,8 +106,8 @@ def _write_batch(conn: sqlite3.Connection, batch: list[RetrievedEntry]) -> None: ) -def ingest( - corpus_dir: Path, +def _ingest_files( + files: list[Path], db_path: Path, pattern_file: Path | None = None, batch_size: int = 1000, @@ -120,10 +124,10 @@ def ingest( stats: dict[str, int] = {} - for jsonl_file in sorted(corpus_dir.glob("*.jsonl")): + for log_file in files: count = 0 batch: list[RetrievedEntry] = [] - for entry in _parse_file(jsonl_file, compiled, ingest_time): + for entry in _parse_file(log_file, compiled, ingest_time): batch.append(entry) if len(batch) >= batch_size: _write_batch(conn, batch) @@ -134,8 +138,8 @@ def ingest( _write_batch(conn, batch) conn.commit() count += len(batch) - stats[jsonl_file.name] = count - logger.info("Ingested %d entries from %s", count, jsonl_file.name) + stats[log_file.name] = count + logger.info("Ingested %d entries from %s", count, log_file.name) conn.close() @@ -144,3 +148,23 @@ def ingest( logger.info("FTS index ready") return stats + + +def ingest( + corpus_dir: Path, + db_path: Path, + pattern_file: Path | None = None, + batch_size: int = 1000, +) -> dict[str, int]: + """Ingest all .jsonl and .log files from a corpus directory.""" + files = sorted(corpus_dir.glob("*.jsonl")) + sorted(corpus_dir.glob("*.log")) + return _ingest_files(files, db_path, pattern_file, batch_size) + + +def ingest_file( + log_file: Path, + db_path: Path, + pattern_file: Path | None = None, +) -> dict[str, int]: + """Ingest a single log file (any supported format).""" + return _ingest_files([log_file], db_path, pattern_file) diff --git a/app/ingest/plaintext.py b/app/ingest/plaintext.py new file mode 100644 index 0000000..1ff0df4 --- /dev/null +++ b/app/ingest/plaintext.py @@ -0,0 +1,79 @@ +"""Generic plain-text log parser — fallback for unrecognized formats. + +Attempts to extract a timestamp and severity from each line using common +patterns (syslog, ISO 8601, nginx/apache). Lines that don't match any +timestamp pattern are still ingested as plain text with no timestamp. +""" +from __future__ import annotations + +import re +from datetime import datetime, timezone +from typing import Iterator + +from app.ingest.base import ( + SourceState, apply_patterns, detect_severity, make_entry_id, now_iso, +) +from app.services.models import LogPattern, RetrievedEntry + +# Ordered most-specific first +_TS_PATTERNS: list[tuple[re.Pattern, str]] = [ + # ISO 8601: 2026-05-07T14:23:01.123Z or 2026-05-07 14:23:01 + (re.compile(r"^(?P\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}(?:\.\d+)?(?:Z|[+-]\d{2}:?\d{2})?)"), "%Y-%m-%dT%H:%M:%S"), + # Syslog: May 7 14:23:01 + (re.compile(r"^(?P\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})"), "%b %d %H:%M:%S"), + # Common log: 07/May/2026:14:23:01 +0000 + (re.compile(r"^(?P\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2}\s+[+-]\d{4})"), "%d/%b/%Y:%H:%M:%S %z"), +] + + +def _extract_ts(line: str) -> tuple[str, str]: + for pattern, fmt in _TS_PATTERNS: + m = pattern.match(line) + if m: + ts_raw = m.group("ts") + try: + # Strip fractional seconds / TZ for strptime compat + clean = re.sub(r"(\.\d+)?([Zz]|[+-]\d{2}:?\d{2})?$", "", ts_raw).strip() + clean = clean.replace("T", " ") + dt = datetime.strptime(clean, fmt) + if dt.year == 1900: + dt = dt.replace(year=datetime.now().year) + dt = dt.replace(tzinfo=timezone.utc) + return ts_raw, dt.isoformat() + except ValueError: + pass + return "", "" + + +def parse( + lines: Iterator[str], + source_id: str, + compiled_patterns: list[tuple[LogPattern, object]], + ingest_time: str | None = None, +) -> Iterator[RetrievedEntry]: + ingest_time = ingest_time or now_iso() + state = SourceState() + + for raw_line in lines: + text = raw_line.strip() + if not text: + continue + + ts_raw, ts_iso = _extract_ts(text) + severity = detect_severity(text) + repeat, out_of_order = state.observe(text, ts_iso or None) + matched = apply_patterns(text, compiled_patterns) + + yield RetrievedEntry( + entry_id=make_entry_id(source_id, state.sequence, text), + source_id=source_id, + sequence=state.sequence, + timestamp_raw=ts_raw, + timestamp_iso=ts_iso or None, + ingest_time=ingest_time, + severity=severity, + repeat_count=repeat, + out_of_order=out_of_order, + matched_patterns=matched, + text=text, + ) diff --git a/app/ingest/plex.py b/app/ingest/plex.py new file mode 100644 index 0000000..c2e03d5 --- /dev/null +++ b/app/ingest/plex.py @@ -0,0 +1,103 @@ +"""Plex Media Server log parser. + +Handles the standard Plex log format: + Jan 01, 2026 12:00:00.000 [12345] DEBUG - Message text here + +Severity is read directly from the log level field. The EAE crash signature +(plex_eae_failure pattern) is matched by the shared pattern library. +""" +from __future__ import annotations + +import re +from datetime import datetime, timezone +from typing import Iterator + +from app.ingest.base import ( + SourceState, apply_patterns, make_entry_id, now_iso, +) +from app.services.models import LogPattern, RetrievedEntry + +# Jan 01, 2026 12:00:00.000 [12345] DEBUG - Message +_LINE_RE = re.compile( + r"^(?P\w{3})\s+(?P\d{1,2}),\s+(?P\d{4})" + r"\s+(?P