turnstone/app/services/pihole.py
pyr0ball dceb2d30ca feat(blocklist): Pi-hole v5/v6 API client + tests
PiholeClient dataclass supporting both Pi-hole v5 (PHP /admin/api.php)
and v6 (REST /api/) with public block/unblock/test_connection methods.
9 tests covering both API versions, auth flow, and error handling.
2026-05-15 21:00:01 -07:00

89 lines
3.4 KiB
Python

"""Pi-hole API client supporting v5 (PHP) and v6 (REST) APIs."""
from __future__ import annotations
import dataclasses
import httpx
@dataclasses.dataclass
class PiholeClient:
url: str
api_key: str
version: str = "v6" # "v5" | "v6"
def __post_init__(self) -> None:
self.url = self.url.rstrip("/")
# ── Public API ────────────────────────────────────────────────────────
def block(self, domain: str, comment: str = "Turnstone block") -> None:
if self.version == "v5":
self._v5_get("black", "add", domain)
else:
sid = self._v6_auth()
self._v6_post_domain(sid, domain, comment)
def unblock(self, domain: str) -> None:
if self.version == "v5":
self._v5_get("black", "sub", domain)
else:
sid = self._v6_auth()
self._v6_delete_domain(sid, domain)
def test_connection(self) -> dict:
try:
if self.version == "v5":
return self._v5_test()
return self._v6_test()
except Exception as exc:
return {"ok": False, "version": self.version, "domain_count": 0, "error": str(exc)}
# ── v5 helpers ────────────────────────────────────────────────────────
def _v5_get(self, list_type: str, action: str, domain: str) -> None:
params = {"list": list_type, action: domain, "auth": self.api_key}
with httpx.Client(timeout=10) as c:
c.get(f"{self.url}/admin/api.php", params=params).raise_for_status()
def _v5_test(self) -> dict:
with httpx.Client(timeout=10) as c:
r = c.get(f"{self.url}/admin/api.php", params={"summaryRaw": "", "auth": self.api_key})
r.raise_for_status()
data = r.json()
return {
"ok": True,
"version": "v5",
"domain_count": int(data.get("domains_being_blocked", 0)),
"error": None,
}
# ── v6 helpers ────────────────────────────────────────────────────────
def _v6_auth(self) -> str:
with httpx.Client(timeout=10) as c:
r = c.post(f"{self.url}/api/auth", json={"password": self.api_key})
r.raise_for_status()
return r.json()["session"]["sid"]
def _v6_post_domain(self, sid: str, domain: str, comment: str) -> None:
body = [{"domain": domain, "comment": comment, "enabled": True}]
with httpx.Client(timeout=10, cookies={"sid": sid}) as c:
c.post(f"{self.url}/api/domains/deny", json=body).raise_for_status()
def _v6_delete_domain(self, sid: str, domain: str) -> None:
with httpx.Client(timeout=10, cookies={"sid": sid}) as c:
c.delete(f"{self.url}/api/domains/deny/{domain}").raise_for_status()
def _v6_test(self) -> dict:
sid = self._v6_auth()
with httpx.Client(timeout=10, cookies={"sid": sid}) as c:
r = c.get(f"{self.url}/api/domains/deny")
r.raise_for_status()
data = r.json()
return {
"ok": True,
"version": "v6",
"domain_count": len(data.get("data", [])),
"error": None,
}