from typing import Any import httpx from app.config import get_settings class NetdataClient: def __init__(self) -> None: settings = get_settings() self.base_url = settings.netdata_base_url.rstrip("/") self.enabled = settings.netdata_alerts_enabled def fetch_alarms(self) -> dict[str, Any]: if not self.enabled: return {"ok": False, "error": "Netdata alerts disabled", "alarms": []} try: with httpx.Client(timeout=15.0) as client: response = client.get(f"{self.base_url}/api/v1/alarms", params={"all": "true"}) response.raise_for_status() data = response.json() except Exception as exc: return {"ok": False, "error": str(exc), "alarms": []} alarms_raw = data.get("alarms") or {} alarms: list[dict[str, Any]] = [] if isinstance(alarms_raw, dict): for name, info in alarms_raw.items(): if not isinstance(info, dict): continue status = (info.get("status") or "").lower() if status in ("clear", "undefined", "uninitialized", ""): continue alarms.append({ "name": name, "status": status, "value_string": info.get("value_string") or info.get("value") or "", "chart": info.get("chart") or "", "host": info.get("hostname") or info.get("host") or "localhost", "info": info.get("info") or "", }) return {"ok": True, "alarms": alarms} def fetch_info(self) -> dict[str, Any]: try: with httpx.Client(timeout=10.0) as client: response = client.get(f"{self.base_url}/api/v1/info") response.raise_for_status() return {"ok": True, "info": response.json()} except Exception as exc: return {"ok": False, "error": str(exc)}