fix: support hotio qBittorrent 5.x log format (N/I/W/C single-char level)

Contributor2's ghcr.io/hotio/qbittorrent:latest container uses a different format
than the classic GUI build: `(N) 2026-04-26T03:32:59 - message` with a
single-char level code before an ISO timestamp, not inside parens.

Added _HOTIO_RE alongside _CLASSIC_RE; unified via _match_line() helper so
parse() loop is unchanged. 28 tests passing, both formats covered.
This commit is contained in:
pyr0ball 2026-05-11 05:55:40 -07:00
parent 457b4fd7ae
commit 40bbc9225d
2 changed files with 154 additions and 62 deletions

View file

@ -1,11 +1,16 @@
"""qBittorrent log parser.
Handles the standard qBittorrent log format:
Handles two formats produced by different qBittorrent builds:
Classic (pre-5.x GUI builds):
(YYYY/MM/DD HH:MM:SS) [Level] Message text
(YYYY/MM/DD HH:MM:SS) Message text (no explicit level)
The level field is optional qBittorrent omits it for Normal/Info messages
in some versions. Parenthesised timestamp is the format fingerprint.
Hotio/headless (5.x container builds, e.g. ghcr.io/hotio/qbittorrent):
(N) 2026-04-26T03:32:59 - Message text
(W) 2026-04-26T03:33:00 - Warning message
Level codes for hotio format: N=Normal/Info, I=Info, W=Warning, C=Critical.
"""
from __future__ import annotations
@ -18,34 +23,58 @@ from app.ingest.base import (
)
from app.services.models import LogPattern, RetrievedEntry
# (2026/05/09 14:23:01) [Warning] Tracker 'http://...' is not working.
# (2026/05/09 14:23:01) qBittorrent v5.0.3 started
_LINE_RE = re.compile(
# Classic: (2026/05/09 14:23:01) [Warning] Tracker '...' is not working.
_CLASSIC_RE = re.compile(
r"^\((?P<ts>\d{4}[/-]\d{2}[/-]\d{2}\s+\d{2}:\d{2}:\d{2})\)"
r"(?:\s+\[(?P<level>[^\]]+)\])?"
r"\s+(?P<msg>.*)$"
)
_LEVEL_MAP = {
# Hotio/headless: (N) 2026-04-26T03:33:11 - Successfully listening on IP...
_HOTIO_RE = re.compile(
r"^\((?P<level>[NIWC])\)\s+"
r"(?P<ts>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})"
r"\s+-\s+(?P<msg>.*)$"
)
_LEVEL_MAP: dict[str, str] = {
# Classic bracket labels (lowercased for lookup)
"normal": "INFO",
"info": "INFO",
"warning": "WARN",
"critical": "CRITICAL",
# Hotio single-char codes (lowercased for lookup)
"n": "INFO",
"i": "INFO",
"w": "WARN",
"c": "CRITICAL",
}
def _parse_ts(ts_str: str) -> tuple[str, str]:
"""Return (raw, iso). Handles both YYYY/MM/DD and YYYY-MM-DD."""
normalized = ts_str.replace("/", "-")
try:
dt = datetime.strptime(normalized, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone.utc)
return ts_str, dt.isoformat()
except ValueError:
return ts_str, ""
"""Return (raw, iso). Handles classic (space sep) and hotio (T sep) timestamps."""
for fmt in ("%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S", "%Y/%m/%d %H:%M:%S"):
try:
dt = datetime.strptime(ts_str, fmt).replace(tzinfo=timezone.utc)
return ts_str, dt.isoformat()
except ValueError:
continue
return ts_str, ""
def _match_line(line: str) -> dict | None:
"""Try both log formats. Returns {ts, level, msg} or None."""
m = _CLASSIC_RE.match(line)
if m:
return {"ts": m.group("ts"), "level": m.group("level") or "", "msg": m.group("msg")}
m = _HOTIO_RE.match(line)
if m:
return {"ts": m.group("ts"), "level": m.group("level"), "msg": m.group("msg")}
return None
def is_qbit_log(first_line: str) -> bool:
return bool(_LINE_RE.match(first_line.strip()))
return _match_line(first_line.strip()) is not None
def parse(
@ -78,22 +107,16 @@ def parse(
for raw_line in lines:
line = raw_line.rstrip("\n")
m = _LINE_RE.match(line)
m = _match_line(line)
if m:
if pending_text is not None:
yield _emit(pending_text, pending_meta)
ts_raw, ts_iso = _parse_ts(m.group("ts"))
level_raw = (m.group("level") or "").lower()
severity = _LEVEL_MAP.get(level_raw) or detect_severity(m.group("msg"))
pending_meta = {
"ts_raw": ts_raw,
"ts_iso": ts_iso,
"severity": severity,
}
pending_text = m.group("msg")
ts_raw, ts_iso = _parse_ts(m["ts"])
severity = _LEVEL_MAP.get(m["level"].lower()) or detect_severity(m["msg"])
pending_meta = {"ts_raw": ts_raw, "ts_iso": ts_iso, "severity": severity}
pending_text = m["msg"]
elif pending_text is not None:
# Continuation line (Qt stack trace or wrapped message)
pending_text += "\n" + line.strip()
if pending_text is not None:

View file

@ -5,7 +5,10 @@ import pytest
from app.ingest.qbittorrent import is_qbit_log, parse
SAMPLE_LOG = """\
# ---------------------------------------------------------------------------
# Classic format sample (pre-5.x GUI builds)
# ---------------------------------------------------------------------------
CLASSIC_LOG = """\
(2026/05/09 14:10:01) qBittorrent v5.0.3 started
(2026/05/09 14:10:02) [Warning] Tracker 'http://tracker.example.com/announce' is not working. Reason: Connection timed out
(2026/05/09 14:10:03) [Critical] Couldn't listen on any of the network interfaces. Aborting!
@ -17,15 +20,42 @@ SAMPLE_LOG = """\
(2026/05/09 14:10:07) Normal message without bracket level
"""
DASH_FORMAT = "(2026-05-09 14:10:01) qBittorrent v4.6.2 started\n"
CLASSIC_DASH = "(2026-05-09 14:10:01) qBittorrent v4.6.2 started\n"
# ---------------------------------------------------------------------------
# Hotio format sample (5.x headless container, ghcr.io/hotio/qbittorrent)
# ---------------------------------------------------------------------------
HOTIO_LOG = """\
(N) 2026-04-26T03:32:59 - qBittorrent termination initiated
(N) 2026-04-26T03:32:59 - Saving resume data completed.
(N) 2026-04-26T03:33:02 - BitTorrent session successfully finished.
(N) 2026-04-26T03:33:11 - qBittorrent v5.1.4 started. Process ID: 309
(I) 2026-04-26T03:33:11 - Peer ID: "-qB5140-"
(I) 2026-04-26T03:33:11 - HTTP User-Agent: "qBittorrent/5.1.4"
(W) 2026-04-26T03:33:15 - Tracker is not working. Reason: Connection refused
(C) 2026-04-26T03:33:20 - Couldn't listen on any of the network interfaces. Aborting!
(N) 2026-04-26T03:33:25 - Restored torrent. Torrent: "wikipedia_en_all_maxi_2026-02.zim"
"""
class TestDetector:
def test_detects_slash_format(self):
def test_detects_classic_slash_format(self):
assert is_qbit_log("(2026/05/09 14:10:01) qBittorrent started")
def test_detects_dash_format(self):
assert is_qbit_log(DASH_FORMAT.strip())
def test_detects_classic_dash_format(self):
assert is_qbit_log(CLASSIC_DASH.strip())
def test_detects_hotio_normal(self):
assert is_qbit_log("(N) 2026-04-26T03:32:59 - qBittorrent termination initiated")
def test_detects_hotio_info(self):
assert is_qbit_log("(I) 2026-04-26T03:33:11 - Peer ID: \"-qB5140-\"")
def test_detects_hotio_warning(self):
assert is_qbit_log("(W) 2026-04-26T03:33:15 - Tracker is not working")
def test_detects_hotio_critical(self):
assert is_qbit_log("(C) 2026-04-26T03:33:20 - Couldn't listen")
def test_rejects_plex_format(self):
assert not is_qbit_log("Jan 01, 2026 12:00:00.000 [12345] DEBUG - message")
@ -36,55 +66,94 @@ class TestDetector:
def test_rejects_plaintext(self):
assert not is_qbit_log("2026-05-09 14:10:01 some syslog line")
def test_rejects_hotio_wrapper_log(self):
# hotio container wrapper uses [YYYY-MM-DD HH:MM:SS] [INF] [VPN] format — not qbit
assert not is_qbit_log("[2026-05-11 05:14:38] [INF] [VPN] Script found. Executing...")
class TestParser:
class TestClassicParser:
def _parse(self, text: str) -> list:
return list(parse(iter(text.splitlines(keepends=True)), "qbit_test", []))
def test_entry_count(self):
entries = self._parse(SAMPLE_LOG)
assert len(entries) == 7
assert len(self._parse(CLASSIC_LOG)) == 7
def test_startup_entry(self):
e = self._parse(SAMPLE_LOG)[0]
e = self._parse(CLASSIC_LOG)[0]
assert "qBittorrent v5.0.3 started" in e.text
# No bracket level + no severity keyword in text → None (consistent with other ingestors)
assert e.severity is None
assert e.timestamp_iso == "2026-05-09T14:10:01+00:00"
def test_warning_severity(self):
entries = self._parse(SAMPLE_LOG)
tracker_entry = entries[1]
assert tracker_entry.severity == "WARN"
assert "not working" in tracker_entry.text
e = self._parse(CLASSIC_LOG)[1]
assert e.severity == "WARN"
assert "not working" in e.text
def test_critical_severity(self):
entries = self._parse(SAMPLE_LOG)
port_entry = entries[2]
assert port_entry.severity == "CRITICAL"
e = self._parse(CLASSIC_LOG)[2]
assert e.severity == "CRITICAL"
def test_multiline_continuation(self):
entries = self._parse(SAMPLE_LOG)
multiline = entries[5]
assert "continues on the next line" in multiline.text
assert "third line" in multiline.text
e = self._parse(CLASSIC_LOG)[5]
assert "continues on the next line" in e.text
assert "third line" in e.text
def test_no_level_bracket_falls_back_to_detect(self):
entries = self._parse(SAMPLE_LOG)
last = entries[6]
assert last.text == "Normal message without bracket level"
def test_source_id_propagated(self):
entries = self._parse(SAMPLE_LOG)
assert all(e.source_id == "qbit_test" for e in entries)
def test_sequence_is_monotonic(self):
entries = self._parse(SAMPLE_LOG)
sequences = [e.sequence for e in entries]
assert sequences == sorted(sequences)
assert len(set(sequences)) == len(sequences)
e = self._parse(CLASSIC_LOG)[6]
assert e.text == "Normal message without bracket level"
def test_dash_format_timestamp(self):
entries = list(parse(iter(DASH_FORMAT.splitlines(keepends=True)), "qbit", []))
assert len(entries) == 1
entries = list(parse(iter(CLASSIC_DASH.splitlines(keepends=True)), "qbit", []))
assert entries[0].timestamp_iso == "2026-05-09T14:10:01+00:00"
def test_source_id_propagated(self):
assert all(e.source_id == "qbit_test" for e in self._parse(CLASSIC_LOG))
def test_sequence_is_monotonic(self):
entries = self._parse(CLASSIC_LOG)
seqs = [e.sequence for e in entries]
assert seqs == sorted(seqs) and len(set(seqs)) == len(seqs)
class TestHotioParser:
def _parse(self, text: str) -> list:
return list(parse(iter(text.splitlines(keepends=True)), "qbit_hotio", []))
def test_entry_count(self):
assert len(self._parse(HOTIO_LOG)) == 9
def test_normal_maps_to_info(self):
e = self._parse(HOTIO_LOG)[0]
assert e.severity == "INFO"
assert "termination initiated" in e.text
def test_info_severity(self):
e = self._parse(HOTIO_LOG)[4]
assert e.severity == "INFO"
assert "Peer ID" in e.text
def test_warning_severity(self):
e = self._parse(HOTIO_LOG)[6]
assert e.severity == "WARN"
assert "not working" in e.text
def test_critical_severity(self):
e = self._parse(HOTIO_LOG)[7]
assert e.severity == "CRITICAL"
def test_iso_timestamp_parsed(self):
e = self._parse(HOTIO_LOG)[0]
assert e.timestamp_iso == "2026-04-26T03:32:59+00:00"
assert e.timestamp_raw == "2026-04-26T03:32:59"
def test_source_id_propagated(self):
assert all(e.source_id == "qbit_hotio" for e in self._parse(HOTIO_LOG))
def test_sequence_is_monotonic(self):
entries = self._parse(HOTIO_LOG)
seqs = [e.sequence for e in entries]
assert seqs == sorted(seqs) and len(set(seqs)) == len(seqs)
def test_detector_identifies_hotio_first_line(self):
first_line = HOTIO_LOG.splitlines()[0]
assert is_qbit_log(first_line)