added api
This commit is contained in:
@@ -0,0 +1,53 @@
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
|
||||
from app.config import get_settings
|
||||
|
||||
|
||||
class NetdataClient:
|
||||
def __init__(self) -> None:
|
||||
settings = get_settings()
|
||||
self.base_url = settings.netdata_base_url.rstrip("/")
|
||||
self.enabled = settings.netdata_alerts_enabled
|
||||
|
||||
def fetch_alarms(self) -> dict[str, Any]:
|
||||
if not self.enabled:
|
||||
return {"ok": False, "error": "Netdata alerts disabled", "alarms": []}
|
||||
|
||||
try:
|
||||
with httpx.Client(timeout=15.0) as client:
|
||||
response = client.get(f"{self.base_url}/api/v1/alarms", params={"all": "true"})
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
except Exception as exc:
|
||||
return {"ok": False, "error": str(exc), "alarms": []}
|
||||
|
||||
alarms_raw = data.get("alarms") or {}
|
||||
alarms: list[dict[str, Any]] = []
|
||||
if isinstance(alarms_raw, dict):
|
||||
for name, info in alarms_raw.items():
|
||||
if not isinstance(info, dict):
|
||||
continue
|
||||
status = (info.get("status") or "").lower()
|
||||
if status in ("clear", "undefined", "uninitialized", ""):
|
||||
continue
|
||||
alarms.append({
|
||||
"name": name,
|
||||
"status": status,
|
||||
"value_string": info.get("value_string") or info.get("value") or "",
|
||||
"chart": info.get("chart") or "",
|
||||
"host": info.get("hostname") or info.get("host") or "localhost",
|
||||
"info": info.get("info") or "",
|
||||
})
|
||||
|
||||
return {"ok": True, "alarms": alarms}
|
||||
|
||||
def fetch_info(self) -> dict[str, Any]:
|
||||
try:
|
||||
with httpx.Client(timeout=10.0) as client:
|
||||
response = client.get(f"{self.base_url}/api/v1/info")
|
||||
response.raise_for_status()
|
||||
return {"ok": True, "info": response.json()}
|
||||
except Exception as exc:
|
||||
return {"ok": False, "error": str(exc)}
|
||||
Reference in New Issue
Block a user