turnstone/app/services/pihole.py

94 lines
3.6 KiB
Python

"""Pi-hole API client supporting v5 (PHP) and v6 (REST) APIs."""
from __future__ import annotations
import dataclasses
import httpx
@dataclasses.dataclass
class PiholeClient:
url: str
api_key: str
version: str = "v6" # "v5" | "v6"
def __post_init__(self) -> None:
self.url = self.url.rstrip("/")
# ── Public API ────────────────────────────────────────────────────────
def block(self, domain: str, comment: str = "Turnstone block") -> None:
if self.version == "v5":
self._v5_get("black", "add", domain)
else:
sid = self._v6_auth()
self._v6_post_domain(sid, domain, comment)
def unblock(self, domain: str) -> None:
if self.version == "v5":
self._v5_get("black", "sub", domain)
else:
sid = self._v6_auth()
self._v6_delete_domain(sid, domain)
def test_connection(self) -> dict:
try:
if self.version == "v5":
return self._v5_test()
return self._v6_test()
except Exception as exc:
return {"ok": False, "version": self.version, "domain_count": 0, "error": str(exc)}
# ── v5 helpers ────────────────────────────────────────────────────────
def _v5_get(self, list_type: str, action: str, domain: str) -> None:
params = {"list": list_type, action: domain, "auth": self.api_key}
with httpx.Client(timeout=10) as c:
c.get(f"{self.url}/admin/api.php", params=params).raise_for_status()
def _v5_test(self) -> dict:
with httpx.Client(timeout=10) as c:
r = c.get(f"{self.url}/admin/api.php", params={"summaryRaw": "", "auth": self.api_key})
r.raise_for_status()
data = r.json()
return {
"ok": True,
"version": "v5",
"domain_count": int(data.get("domains_being_blocked", 0)),
"error": None,
}
# ── v6 helpers ────────────────────────────────────────────────────────
def _v6_auth(self) -> str:
with httpx.Client(timeout=10) as c:
r = c.post(f"{self.url}/api/auth", json={"password": self.api_key})
r.raise_for_status()
data = r.json()
sid = data.get("session", {}).get("sid")
if not sid:
msg = data.get("session", {}).get("message", "no sid returned")
raise ValueError(f"Pi-hole v6 auth failed: {msg}")
return sid
def _v6_post_domain(self, sid: str, domain: str, comment: str) -> None:
body = [{"domain": domain, "comment": comment, "enabled": True}]
with httpx.Client(timeout=10, cookies={"sid": sid}) as c:
c.post(f"{self.url}/api/domains/deny", json=body).raise_for_status()
def _v6_delete_domain(self, sid: str, domain: str) -> None:
with httpx.Client(timeout=10, cookies={"sid": sid}) as c:
c.delete(f"{self.url}/api/domains/deny/{domain}").raise_for_status()
def _v6_test(self) -> dict:
sid = self._v6_auth()
with httpx.Client(timeout=10, cookies={"sid": sid}) as c:
r = c.get(f"{self.url}/api/domains/deny")
r.raise_for_status()
data = r.json()
return {
"ok": True,
"version": "v6",
"domain_count": len(data.get("data", [])),
"error": None,
}