feat: plain-text and Plex log ingestors
- app/ingest/plex.py: Plex Media Server log parser Regex-based line parser for 'Mon DD, YYYY HH:MM:SS.mmm [pid] LEVEL - msg' format. Handles multi-line entries (stack traces). Detects plex_eae_failure and all other patterns via shared pattern library. - app/ingest/plaintext.py: generic fallback parser for unrecognized formats Extracts timestamps (ISO 8601, syslog, common log) and severity via regex. - pipeline.py: detect plex format via is_plex_log(); fall back to plaintext instead of skipping; process *.log files alongside *.jsonl; add ingest_file() for single-file ingestion. - scripts/ingest_corpus.py: accept single file or directory as target - manage.sh: ingest-plex command SSHes to Cass (or HOST arg), pulls Plex Media Server.log, and ingests it directly
This commit is contained in:
parent
e579396eb8
commit
f8a2f8007b
5 changed files with 253 additions and 19 deletions
|
|
@ -8,7 +8,7 @@ import sqlite3
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Iterator
|
from typing import Iterator
|
||||||
|
|
||||||
from app.ingest import caddy, docker_log, journald
|
from app.ingest import caddy, docker_log, journald, plaintext, plex
|
||||||
from app.ingest.base import _compile, load_patterns, now_iso
|
from app.ingest.base import _compile, load_patterns, now_iso
|
||||||
from app.services.models import LogPattern, RetrievedEntry
|
from app.services.models import LogPattern, RetrievedEntry
|
||||||
from app.services.search import build_fts_index
|
from app.services.search import build_fts_index
|
||||||
|
|
@ -47,7 +47,9 @@ def _detect_format(first_line: str) -> str:
|
||||||
return "caddy"
|
return "caddy"
|
||||||
except (json.JSONDecodeError, AttributeError):
|
except (json.JSONDecodeError, AttributeError):
|
||||||
pass
|
pass
|
||||||
return "unknown"
|
if plex.is_plex_log(first_line):
|
||||||
|
return "plex"
|
||||||
|
return "plaintext"
|
||||||
|
|
||||||
|
|
||||||
def _parse_file(
|
def _parse_file(
|
||||||
|
|
@ -77,8 +79,10 @@ def _parse_file(
|
||||||
yield from docker_log.parse(all_lines(), source_id, compiled, ingest_time)
|
yield from docker_log.parse(all_lines(), source_id, compiled, ingest_time)
|
||||||
elif fmt == "caddy":
|
elif fmt == "caddy":
|
||||||
yield from caddy.parse(all_lines(), source_id, compiled, ingest_time)
|
yield from caddy.parse(all_lines(), source_id, compiled, ingest_time)
|
||||||
|
elif fmt == "plex":
|
||||||
|
yield from plex.parse(all_lines(), source_id, compiled, ingest_time)
|
||||||
else:
|
else:
|
||||||
logger.warning("Unknown format in %s — skipping", path.name)
|
yield from plaintext.parse(all_lines(), source_id, compiled, ingest_time)
|
||||||
|
|
||||||
|
|
||||||
def _write_batch(conn: sqlite3.Connection, batch: list[RetrievedEntry]) -> None:
|
def _write_batch(conn: sqlite3.Connection, batch: list[RetrievedEntry]) -> None:
|
||||||
|
|
@ -102,8 +106,8 @@ def _write_batch(conn: sqlite3.Connection, batch: list[RetrievedEntry]) -> None:
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def ingest(
|
def _ingest_files(
|
||||||
corpus_dir: Path,
|
files: list[Path],
|
||||||
db_path: Path,
|
db_path: Path,
|
||||||
pattern_file: Path | None = None,
|
pattern_file: Path | None = None,
|
||||||
batch_size: int = 1000,
|
batch_size: int = 1000,
|
||||||
|
|
@ -120,10 +124,10 @@ def ingest(
|
||||||
|
|
||||||
stats: dict[str, int] = {}
|
stats: dict[str, int] = {}
|
||||||
|
|
||||||
for jsonl_file in sorted(corpus_dir.glob("*.jsonl")):
|
for log_file in files:
|
||||||
count = 0
|
count = 0
|
||||||
batch: list[RetrievedEntry] = []
|
batch: list[RetrievedEntry] = []
|
||||||
for entry in _parse_file(jsonl_file, compiled, ingest_time):
|
for entry in _parse_file(log_file, compiled, ingest_time):
|
||||||
batch.append(entry)
|
batch.append(entry)
|
||||||
if len(batch) >= batch_size:
|
if len(batch) >= batch_size:
|
||||||
_write_batch(conn, batch)
|
_write_batch(conn, batch)
|
||||||
|
|
@ -134,8 +138,8 @@ def ingest(
|
||||||
_write_batch(conn, batch)
|
_write_batch(conn, batch)
|
||||||
conn.commit()
|
conn.commit()
|
||||||
count += len(batch)
|
count += len(batch)
|
||||||
stats[jsonl_file.name] = count
|
stats[log_file.name] = count
|
||||||
logger.info("Ingested %d entries from %s", count, jsonl_file.name)
|
logger.info("Ingested %d entries from %s", count, log_file.name)
|
||||||
|
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
|
@ -144,3 +148,23 @@ def ingest(
|
||||||
logger.info("FTS index ready")
|
logger.info("FTS index ready")
|
||||||
|
|
||||||
return stats
|
return stats
|
||||||
|
|
||||||
|
|
||||||
|
def ingest(
|
||||||
|
corpus_dir: Path,
|
||||||
|
db_path: Path,
|
||||||
|
pattern_file: Path | None = None,
|
||||||
|
batch_size: int = 1000,
|
||||||
|
) -> dict[str, int]:
|
||||||
|
"""Ingest all .jsonl and .log files from a corpus directory."""
|
||||||
|
files = sorted(corpus_dir.glob("*.jsonl")) + sorted(corpus_dir.glob("*.log"))
|
||||||
|
return _ingest_files(files, db_path, pattern_file, batch_size)
|
||||||
|
|
||||||
|
|
||||||
|
def ingest_file(
|
||||||
|
log_file: Path,
|
||||||
|
db_path: Path,
|
||||||
|
pattern_file: Path | None = None,
|
||||||
|
) -> dict[str, int]:
|
||||||
|
"""Ingest a single log file (any supported format)."""
|
||||||
|
return _ingest_files([log_file], db_path, pattern_file)
|
||||||
|
|
|
||||||
79
app/ingest/plaintext.py
Normal file
79
app/ingest/plaintext.py
Normal file
|
|
@ -0,0 +1,79 @@
|
||||||
|
"""Generic plain-text log parser — fallback for unrecognized formats.
|
||||||
|
|
||||||
|
Attempts to extract a timestamp and severity from each line using common
|
||||||
|
patterns (syslog, ISO 8601, nginx/apache). Lines that don't match any
|
||||||
|
timestamp pattern are still ingested as plain text with no timestamp.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import re
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from typing import Iterator
|
||||||
|
|
||||||
|
from app.ingest.base import (
|
||||||
|
SourceState, apply_patterns, detect_severity, make_entry_id, now_iso,
|
||||||
|
)
|
||||||
|
from app.services.models import LogPattern, RetrievedEntry
|
||||||
|
|
||||||
|
# Ordered most-specific first
|
||||||
|
_TS_PATTERNS: list[tuple[re.Pattern, str]] = [
|
||||||
|
# ISO 8601: 2026-05-07T14:23:01.123Z or 2026-05-07 14:23:01
|
||||||
|
(re.compile(r"^(?P<ts>\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}(?:\.\d+)?(?:Z|[+-]\d{2}:?\d{2})?)"), "%Y-%m-%dT%H:%M:%S"),
|
||||||
|
# Syslog: May 7 14:23:01
|
||||||
|
(re.compile(r"^(?P<ts>\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})"), "%b %d %H:%M:%S"),
|
||||||
|
# Common log: 07/May/2026:14:23:01 +0000
|
||||||
|
(re.compile(r"^(?P<ts>\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2}\s+[+-]\d{4})"), "%d/%b/%Y:%H:%M:%S %z"),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_ts(line: str) -> tuple[str, str]:
|
||||||
|
for pattern, fmt in _TS_PATTERNS:
|
||||||
|
m = pattern.match(line)
|
||||||
|
if m:
|
||||||
|
ts_raw = m.group("ts")
|
||||||
|
try:
|
||||||
|
# Strip fractional seconds / TZ for strptime compat
|
||||||
|
clean = re.sub(r"(\.\d+)?([Zz]|[+-]\d{2}:?\d{2})?$", "", ts_raw).strip()
|
||||||
|
clean = clean.replace("T", " ")
|
||||||
|
dt = datetime.strptime(clean, fmt)
|
||||||
|
if dt.year == 1900:
|
||||||
|
dt = dt.replace(year=datetime.now().year)
|
||||||
|
dt = dt.replace(tzinfo=timezone.utc)
|
||||||
|
return ts_raw, dt.isoformat()
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
return "", ""
|
||||||
|
|
||||||
|
|
||||||
|
def parse(
|
||||||
|
lines: Iterator[str],
|
||||||
|
source_id: str,
|
||||||
|
compiled_patterns: list[tuple[LogPattern, object]],
|
||||||
|
ingest_time: str | None = None,
|
||||||
|
) -> Iterator[RetrievedEntry]:
|
||||||
|
ingest_time = ingest_time or now_iso()
|
||||||
|
state = SourceState()
|
||||||
|
|
||||||
|
for raw_line in lines:
|
||||||
|
text = raw_line.strip()
|
||||||
|
if not text:
|
||||||
|
continue
|
||||||
|
|
||||||
|
ts_raw, ts_iso = _extract_ts(text)
|
||||||
|
severity = detect_severity(text)
|
||||||
|
repeat, out_of_order = state.observe(text, ts_iso or None)
|
||||||
|
matched = apply_patterns(text, compiled_patterns)
|
||||||
|
|
||||||
|
yield RetrievedEntry(
|
||||||
|
entry_id=make_entry_id(source_id, state.sequence, text),
|
||||||
|
source_id=source_id,
|
||||||
|
sequence=state.sequence,
|
||||||
|
timestamp_raw=ts_raw,
|
||||||
|
timestamp_iso=ts_iso or None,
|
||||||
|
ingest_time=ingest_time,
|
||||||
|
severity=severity,
|
||||||
|
repeat_count=repeat,
|
||||||
|
out_of_order=out_of_order,
|
||||||
|
matched_patterns=matched,
|
||||||
|
text=text,
|
||||||
|
)
|
||||||
103
app/ingest/plex.py
Normal file
103
app/ingest/plex.py
Normal file
|
|
@ -0,0 +1,103 @@
|
||||||
|
"""Plex Media Server log parser.
|
||||||
|
|
||||||
|
Handles the standard Plex log format:
|
||||||
|
Jan 01, 2026 12:00:00.000 [12345] DEBUG - Message text here
|
||||||
|
|
||||||
|
Severity is read directly from the log level field. The EAE crash signature
|
||||||
|
(plex_eae_failure pattern) is matched by the shared pattern library.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import re
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from typing import Iterator
|
||||||
|
|
||||||
|
from app.ingest.base import (
|
||||||
|
SourceState, apply_patterns, make_entry_id, now_iso,
|
||||||
|
)
|
||||||
|
from app.services.models import LogPattern, RetrievedEntry
|
||||||
|
|
||||||
|
# Jan 01, 2026 12:00:00.000 [12345] DEBUG - Message
|
||||||
|
_LINE_RE = re.compile(
|
||||||
|
r"^(?P<month>\w{3})\s+(?P<day>\d{1,2}),\s+(?P<year>\d{4})"
|
||||||
|
r"\s+(?P<time>\d{2}:\d{2}:\d{2}\.\d+)"
|
||||||
|
r"\s+\[(?P<pid>\d+)\]"
|
||||||
|
r"\s+(?P<level>[A-Z]+)"
|
||||||
|
r"\s+-\s+(?P<msg>.*)$"
|
||||||
|
)
|
||||||
|
|
||||||
|
_LEVEL_MAP = {
|
||||||
|
"DEBUG": "DEBUG",
|
||||||
|
"INFO": "INFO",
|
||||||
|
"WARN": "WARN",
|
||||||
|
"WARNING": "WARN",
|
||||||
|
"ERROR": "ERROR",
|
||||||
|
"FATAL": "CRITICAL",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_ts(month: str, day: str, year: str, time: str) -> tuple[str, str]:
|
||||||
|
raw = f"{month} {day}, {year} {time}"
|
||||||
|
try:
|
||||||
|
# Plex logs are local time — treat as UTC for now (no TZ in log)
|
||||||
|
dt = datetime.strptime(raw, "%b %d, %Y %H:%M:%S.%f").replace(tzinfo=timezone.utc)
|
||||||
|
return raw, dt.isoformat()
|
||||||
|
except ValueError:
|
||||||
|
return raw, ""
|
||||||
|
|
||||||
|
|
||||||
|
def is_plex_log(first_line: str) -> bool:
|
||||||
|
return bool(_LINE_RE.match(first_line.strip()))
|
||||||
|
|
||||||
|
|
||||||
|
def parse(
|
||||||
|
lines: Iterator[str],
|
||||||
|
source_id: str,
|
||||||
|
compiled_patterns: list[tuple[LogPattern, object]],
|
||||||
|
ingest_time: str | None = None,
|
||||||
|
) -> Iterator[RetrievedEntry]:
|
||||||
|
ingest_time = ingest_time or now_iso()
|
||||||
|
state = SourceState()
|
||||||
|
pending_text: str | None = None
|
||||||
|
pending_meta: dict = {}
|
||||||
|
|
||||||
|
def _emit(text: str, meta: dict) -> RetrievedEntry:
|
||||||
|
repeat, out_of_order = state.observe(text, meta.get("ts_iso"))
|
||||||
|
matched = apply_patterns(text, compiled_patterns)
|
||||||
|
return RetrievedEntry(
|
||||||
|
entry_id=make_entry_id(source_id, state.sequence, text),
|
||||||
|
source_id=source_id,
|
||||||
|
sequence=state.sequence,
|
||||||
|
timestamp_raw=meta.get("ts_raw", ""),
|
||||||
|
timestamp_iso=meta.get("ts_iso", ""),
|
||||||
|
ingest_time=ingest_time,
|
||||||
|
severity=meta.get("severity"),
|
||||||
|
repeat_count=repeat,
|
||||||
|
out_of_order=out_of_order,
|
||||||
|
matched_patterns=matched,
|
||||||
|
text=text,
|
||||||
|
)
|
||||||
|
|
||||||
|
for raw_line in lines:
|
||||||
|
line = raw_line.rstrip("\n")
|
||||||
|
m = _LINE_RE.match(line)
|
||||||
|
if m:
|
||||||
|
# Flush any accumulated multi-line entry
|
||||||
|
if pending_text is not None:
|
||||||
|
yield _emit(pending_text, pending_meta)
|
||||||
|
|
||||||
|
ts_raw, ts_iso = _parse_ts(
|
||||||
|
m.group("month"), m.group("day"), m.group("year"), m.group("time")
|
||||||
|
)
|
||||||
|
pending_meta = {
|
||||||
|
"ts_raw": ts_raw,
|
||||||
|
"ts_iso": ts_iso,
|
||||||
|
"severity": _LEVEL_MAP.get(m.group("level").upper()),
|
||||||
|
}
|
||||||
|
pending_text = m.group("msg")
|
||||||
|
elif pending_text is not None:
|
||||||
|
# Continuation line (stack trace, wrapped message)
|
||||||
|
pending_text += "\n" + line.strip()
|
||||||
|
|
||||||
|
if pending_text is not None:
|
||||||
|
yield _emit(pending_text, pending_meta)
|
||||||
24
manage.sh
24
manage.sh
|
|
@ -80,7 +80,8 @@ usage() {
|
||||||
echo -e " ${GREEN}dev${NC} uvicorn --reload (:${API_PORT}) + Vite HMR (:${VITE_PORT})"
|
echo -e " ${GREEN}dev${NC} uvicorn --reload (:${API_PORT}) + Vite HMR (:${VITE_PORT})"
|
||||||
echo ""
|
echo ""
|
||||||
echo " Data:"
|
echo " Data:"
|
||||||
echo -e " ${GREEN}ingest CORPUS_DIR [DB]${NC} Ingest a corpus directory into the database"
|
echo -e " ${GREEN}ingest PATH [DB]${NC} Ingest a log file or corpus directory"
|
||||||
|
echo -e " ${GREEN}ingest-plex [HOST]${NC} Pull Plex log from Cass (or HOST) and ingest"
|
||||||
echo -e " ${GREEN}build-fts${NC} Rebuild the FTS search index"
|
echo -e " ${GREEN}build-fts${NC} Rebuild the FTS search index"
|
||||||
echo ""
|
echo ""
|
||||||
echo " Tests:"
|
echo " Tests:"
|
||||||
|
|
@ -187,12 +188,31 @@ case "$CMD" in
|
||||||
|
|
||||||
ingest)
|
ingest)
|
||||||
if [[ $# -lt 1 ]]; then
|
if [[ $# -lt 1 ]]; then
|
||||||
error "Usage: ./manage.sh ingest CORPUS_DIR [DB_PATH]"
|
error "Usage: ./manage.sh ingest <file_or_dir> [DB_PATH]"
|
||||||
fi
|
fi
|
||||||
info "Ingesting $1 → ${2:-$DB}…"
|
info "Ingesting $1 → ${2:-$DB}…"
|
||||||
"$PYTHON" scripts/ingest_corpus.py "$1" "${2:-$DB}"
|
"$PYTHON" scripts/ingest_corpus.py "$1" "${2:-$DB}"
|
||||||
;;
|
;;
|
||||||
|
|
||||||
|
ingest-plex)
|
||||||
|
PLEX_HOST="${1:-cass}"
|
||||||
|
PLEX_LOG_PATH="/var/lib/plexmediaserver/Library/Application Support/Plex Media Server/Logs/Plex Media Server.log"
|
||||||
|
TMP_LOG="/tmp/turnstone-plex-$(date +%s).log"
|
||||||
|
|
||||||
|
info "Pulling Plex log from ${PLEX_HOST}…"
|
||||||
|
if ! ssh "$PLEX_HOST" "cat '${PLEX_LOG_PATH}'" > "$TMP_LOG" 2>&1; then
|
||||||
|
rm -f "$TMP_LOG"
|
||||||
|
error "SSH to ${PLEX_HOST} failed. Ensure 'ssh ${PLEX_HOST}' works without a password prompt."
|
||||||
|
fi
|
||||||
|
LINES=$(wc -l < "$TMP_LOG")
|
||||||
|
success "Pulled ${LINES} lines from ${PLEX_HOST}"
|
||||||
|
|
||||||
|
info "Ingesting into ${DB}…"
|
||||||
|
"$PYTHON" scripts/ingest_corpus.py "$TMP_LOG" "$DB"
|
||||||
|
rm -f "$TMP_LOG"
|
||||||
|
success "Done. Restart the server to refresh: ./manage.sh restart"
|
||||||
|
;;
|
||||||
|
|
||||||
build-fts)
|
build-fts)
|
||||||
info "Rebuilding FTS index for ${DB}…"
|
info "Rebuilding FTS index for ${DB}…"
|
||||||
TURNSTONE_DB="$DB" "$PYTHON" scripts/build_fts_index.py "$DB"
|
TURNSTONE_DB="$DB" "$PYTHON" scripts/build_fts_index.py "$DB"
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
"""CLI: ingest a corpus directory into the Turnstone SQLite database."""
|
"""CLI: ingest a log file or corpus directory into the Turnstone SQLite database."""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
@ -7,20 +7,28 @@ from pathlib import Path
|
||||||
|
|
||||||
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
|
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
|
||||||
|
|
||||||
# Allow running from repo root
|
|
||||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||||
|
|
||||||
from app.ingest.pipeline import ingest
|
from app.ingest.pipeline import ingest, ingest_file
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
corpus_dir = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("corpus/raw")
|
if len(sys.argv) < 2:
|
||||||
db_path = Path(sys.argv[2]) if len(sys.argv) > 2 else Path("data/turnstone.db")
|
print("Usage: ingest_corpus.py <file_or_dir> [db_path]", file=sys.stderr)
|
||||||
pattern_file = Path("patterns/default.yaml")
|
sys.exit(1)
|
||||||
|
|
||||||
|
target = Path(sys.argv[1])
|
||||||
|
db_path = Path(sys.argv[2]) if len(sys.argv) > 2 else Path("data/turnstone.db")
|
||||||
db_path.parent.mkdir(parents=True, exist_ok=True)
|
db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
print(f"Ingesting {corpus_dir} → {db_path}")
|
print(f"Ingesting {target} → {db_path}")
|
||||||
stats = ingest(corpus_dir, db_path, pattern_file)
|
|
||||||
|
if target.is_file():
|
||||||
|
stats = ingest_file(target, db_path)
|
||||||
|
elif target.is_dir():
|
||||||
|
stats = ingest(target, db_path)
|
||||||
|
else:
|
||||||
|
print(f"Error: {target} is not a file or directory", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
total = sum(stats.values())
|
total = sum(stats.values())
|
||||||
for fname, count in sorted(stats.items()):
|
for fname, count in sorted(stats.items()):
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue