Files
2026-06-10 10:29:21 +03:00

54 lines
2.0 KiB
Python

from typing import Any
import httpx
from app.config import get_settings
class NetdataClient:
def __init__(self) -> None:
settings = get_settings()
self.base_url = settings.netdata_base_url.rstrip("/")
self.enabled = settings.netdata_alerts_enabled
def fetch_alarms(self) -> dict[str, Any]:
if not self.enabled:
return {"ok": False, "error": "Netdata alerts disabled", "alarms": []}
try:
with httpx.Client(timeout=15.0) as client:
response = client.get(f"{self.base_url}/api/v1/alarms", params={"all": "true"})
response.raise_for_status()
data = response.json()
except Exception as exc:
return {"ok": False, "error": str(exc), "alarms": []}
alarms_raw = data.get("alarms") or {}
alarms: list[dict[str, Any]] = []
if isinstance(alarms_raw, dict):
for name, info in alarms_raw.items():
if not isinstance(info, dict):
continue
status = (info.get("status") or "").lower()
if status in ("clear", "undefined", "uninitialized", ""):
continue
alarms.append({
"name": name,
"status": status,
"value_string": info.get("value_string") or info.get("value") or "",
"chart": info.get("chart") or "",
"host": info.get("hostname") or info.get("host") or "localhost",
"info": info.get("info") or "",
})
return {"ok": True, "alarms": alarms}
def fetch_info(self) -> dict[str, Any]:
try:
with httpx.Client(timeout=10.0) as client:
response = client.get(f"{self.base_url}/api/v1/info")
response.raise_for_status()
return {"ok": True, "info": response.json()}
except Exception as exc:
return {"ok": False, "error": str(exc)}