Renames the app/ingest/ package to app/glean/ and updates all references across Python modules, shell scripts, Vue components, tests, and documentation. Intentionally preserved: - SQLite column name ingest_time (avoids schema migration) - RetrievedEntry.ingest_time field (maps to the column above) - Any public-facing JSON keys that reference ingest_time Changes by category: - app/ingest/ → app/glean/ (full package move, all parsers) - app/tasks/ingest_scheduler.py → app/tasks/glean_scheduler.py - scripts/ingest_corpus.py → scripts/glean_corpus.py - tests/test_ingest_*.py → tests/test_glean_*.py - Docstrings, log messages, comments: ingest → glean - Env var: TURNSTONE_INGEST_INTERVAL → TURNSTONE_GLEAN_INTERVAL - Shell scripts: glean.log, glean_corpus.py references - README.md: multi-source ingest → multi-source glean - .env.example: updated env var name - patterns/: new diagnostic patterns from 2026-05-20 SSH incident (service_crash_loop, pkg_daemon_restart, ssh_forward_conflict) - SourcesView.vue: pipeline label updated - All test import paths updated to app.glean.* 285 tests passing.
118 lines
4 KiB
Python
118 lines
4 KiB
Python
"""Tests for the Wazuh alert gleaner."""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from datetime import datetime
|
|
|
|
from app.glean.wazuh import is_wazuh_alert, parse
|
|
from app.glean.pipeline import _detect_format
|
|
|
|
_ALERT = {
|
|
"timestamp": "2024-01-15T10:23:45.123+0000",
|
|
"rule": {
|
|
"level": 7,
|
|
"description": "SSH authentication failure.",
|
|
"id": "5710",
|
|
"firedtimes": 1,
|
|
"groups": ["syslog", "sshd", "authentication_failed"],
|
|
},
|
|
"agent": {"id": "001", "name": "web-server-01", "ip": "192.168.1.100"},
|
|
"manager": {"name": "wazuh-mgr"},
|
|
"id": "1705312125.123456",
|
|
"full_log": "Jan 15 10:23:45 web-server-01 sshd[1234]: Failed password for admin from 10.0.0.5",
|
|
"location": "/var/log/auth.log",
|
|
"data": {"srcip": "10.0.0.5", "srcuser": "admin"},
|
|
}
|
|
|
|
_CRITICAL_ALERT = {
|
|
"timestamp": "2024-01-15T10:30:00.000+0000",
|
|
"rule": {"level": 13, "description": "Rootkit detected.", "id": "510", "groups": ["rootcheck"]},
|
|
"agent": {"id": "002", "name": "db-host", "ip": "192.168.1.200"},
|
|
"manager": {"name": "wazuh-mgr"},
|
|
"full_log": "rootkit patterns found",
|
|
"location": "/var/ossec/logs/active-responses.log",
|
|
}
|
|
|
|
|
|
class TestDetector:
|
|
def test_detects_valid_alert(self):
|
|
assert is_wazuh_alert(_ALERT)
|
|
|
|
def test_detects_minimal_alert(self):
|
|
assert is_wazuh_alert({
|
|
"timestamp": "2024-01-15T10:23:45+0000",
|
|
"rule": {"level": 5, "description": "test"},
|
|
"agent": {"name": "host"},
|
|
})
|
|
|
|
def test_rejects_journald(self):
|
|
assert not is_wazuh_alert({"__REALTIME_TIMESTAMP": "123", "MESSAGE": "hi"})
|
|
|
|
def test_rejects_caddy(self):
|
|
assert not is_wazuh_alert({"ts": 1234, "msg": "served", "request": {}})
|
|
|
|
def test_rejects_no_agent(self):
|
|
assert not is_wazuh_alert({"rule": {"level": 5}, "timestamp": "2024-01-01T00:00:00Z"})
|
|
|
|
def test_pipeline_routes_to_wazuh(self):
|
|
assert _detect_format(json.dumps(_ALERT)) == "wazuh"
|
|
|
|
|
|
class TestParser:
|
|
def _parse(self, *alerts) -> list:
|
|
lines = [json.dumps(a) for a in alerts]
|
|
return list(parse(iter(lines), "wazuh", []))
|
|
|
|
def test_single_entry_parsed(self):
|
|
entries = self._parse(_ALERT)
|
|
assert len(entries) == 1
|
|
|
|
def test_severity_from_level(self):
|
|
entries = self._parse(_ALERT)
|
|
assert entries[0].severity == "WARN" # level 7
|
|
|
|
def test_critical_severity(self):
|
|
entries = self._parse(_CRITICAL_ALERT)
|
|
assert entries[0].severity == "CRITICAL" # level 13
|
|
|
|
def test_source_id_includes_agent(self):
|
|
entries = self._parse(_ALERT)
|
|
assert entries[0].source_id == "wazuh:web-server-01"
|
|
|
|
def test_text_contains_rule_description(self):
|
|
entries = self._parse(_ALERT)
|
|
assert "SSH authentication failure" in entries[0].text
|
|
|
|
def test_text_contains_agent_name(self):
|
|
entries = self._parse(_ALERT)
|
|
assert "web-server-01" in entries[0].text
|
|
|
|
def test_text_contains_decoded_data(self):
|
|
entries = self._parse(_ALERT)
|
|
assert "10.0.0.5" in entries[0].text
|
|
|
|
def test_text_contains_full_log(self):
|
|
entries = self._parse(_ALERT)
|
|
assert "Failed password" in entries[0].text
|
|
|
|
def test_timestamp_parsed_to_utc(self):
|
|
entries = self._parse(_ALERT)
|
|
dt = datetime.fromisoformat(entries[0].timestamp_iso)
|
|
assert dt.utcoffset() is not None
|
|
assert dt.hour == 10 and dt.minute == 23 and dt.second == 45
|
|
|
|
def test_skips_malformed_json(self):
|
|
lines = iter(["not json\n", json.dumps(_ALERT)])
|
|
entries = list(parse(lines, "wazuh", []))
|
|
assert len(entries) == 1
|
|
|
|
def test_skips_empty_lines(self):
|
|
lines = iter(["\n", " \n", json.dumps(_ALERT)])
|
|
entries = list(parse(lines, "wazuh", []))
|
|
assert len(entries) == 1
|
|
|
|
def test_multi_alert_sequence(self):
|
|
entries = self._parse(_ALERT, _CRITICAL_ALERT)
|
|
assert len(entries) == 2
|
|
seqs = [e.sequence for e in entries]
|
|
assert seqs == sorted(seqs)
|