added local api

This commit is contained in:
2026-06-11 08:38:08 +03:00
parent 81eaa95df3
commit 8fd7e85c83
39 changed files with 3224 additions and 723 deletions
Binary file not shown.
Binary file not shown.
+10
View File
@@ -9,3 +9,13 @@ HOST = os.environ.get("LORATESTER_HOST", "0.0.0.0")
PORT = int(os.environ.get("LORATESTER_PORT", "7634"))
TELEMETRY_LIMIT = int(os.environ.get("LORATESTER_TELEMETRY_LIMIT", "5000"))
TRACK_POINTS_LIMIT = int(os.environ.get("LORATESTER_TRACK_POINTS_LIMIT", "10000"))
ELEVATION_API_URL = os.environ.get(
"LORATESTER_ELEVATION_URL",
"http://192.168.1.109:8085/v1/elevation",
).rstrip("/")
ELEVATION_PROBE_TTL_SEC = float(
os.environ.get("LORATESTER_ELEVATION_PROBE_TTL", "60")
)
ELEVATION_CONNECT_TIMEOUT = float(
os.environ.get("LORATESTER_ELEVATION_TIMEOUT", "8")
)
+288 -19
View File
@@ -1,40 +1,309 @@
"""Terrain elevation via Open-Meteo (cached per coordinate)."""
"""Terrain elevation via self-hosted Open-Meteo-compatible API."""
from __future__ import annotations
import logging
from typing import Optional
import math
import time
from typing import Any, Optional
import httpx
from .config import (
ELEVATION_API_URL,
ELEVATION_CONNECT_TIMEOUT,
ELEVATION_PROBE_TTL_SEC,
)
logger = logging.getLogger(__name__)
_BATCH_SIZE = 100
_CACHE: dict[tuple[float, float], Optional[float]] = {}
_TIMEOUT = 3.0
_probe_checked_at = 0.0
_probe_ok = False
_probe_error: Optional[str] = None
def _cache_key(lat: float, lon: float) -> tuple[float, float]:
return (round(lat, 4), round(lon, 4))
return (round(lat, 6), round(lon, 6))
def fetch_elevation_m(lat: float, lon: float) -> Optional[float]:
key = _cache_key(lat, lon)
if key in _CACHE:
return _CACHE[key]
def haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
r = 6_371_000.0
d_lat = math.radians(lat2 - lat1)
d_lon = math.radians(lon2 - lon1)
a = (
math.sin(d_lat / 2) ** 2
+ math.cos(math.radians(lat1))
* math.cos(math.radians(lat2))
* math.sin(d_lon / 2) ** 2
)
return r * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def probe_elevation_api(force: bool = False) -> dict[str, Any]:
"""Ping elevation service before batch requests (cached for TTL)."""
global _probe_checked_at, _probe_ok, _probe_error
now = time.monotonic()
if (
not force
and _probe_checked_at > 0
and now - _probe_checked_at < ELEVATION_PROBE_TTL_SEC
):
return {
"ok": _probe_ok,
"url": ELEVATION_API_URL,
"error": _probe_error,
}
try:
with httpx.Client(timeout=_TIMEOUT) as client:
with httpx.Client(timeout=ELEVATION_CONNECT_TIMEOUT) as client:
r = client.get(
"https://api.open-meteo.com/v1/elevation",
params={"latitude": lat, "longitude": lon},
ELEVATION_API_URL,
params={"latitude": "0.000000", "longitude": "0.000000"},
)
r.raise_for_status()
data = r.json()
elevations = data.get("elevation") or []
if elevations:
val = float(elevations[0])
_CACHE[key] = val
return val
if "elevation" not in data:
raise ValueError("response has no elevation field")
_probe_checked_at = now
_probe_ok = True
_probe_error = None
logger.info("elevation API ok: %s", ELEVATION_API_URL)
except Exception as e:
logger.warning("open-meteo elevation failed for %s,%s: %s", lat, lon, e)
_CACHE[key] = None
return None
_probe_checked_at = now
_probe_ok = False
_probe_error = str(e)
logger.warning("elevation API unreachable %s: %s", ELEVATION_API_URL, e)
return {
"ok": _probe_ok,
"url": ELEVATION_API_URL,
"error": _probe_error,
}
def elevation_status(force: bool = False) -> dict[str, Any]:
probe = probe_elevation_api(force=force)
return {
"elevation_ok": probe["ok"],
"elevation_url": probe["url"],
"elevation_error": probe["error"],
}
def _fetch_elevation_batch(
batch_lat: list[float], batch_lon: list[float]
) -> list[Optional[float]]:
if not batch_lat:
return []
params = {
"latitude": ",".join(f"{lat:.6f}" for lat in batch_lat),
"longitude": ",".join(f"{lon:.6f}" for lon in batch_lon),
}
with httpx.Client(timeout=ELEVATION_CONNECT_TIMEOUT) as client:
r = client.get(ELEVATION_API_URL, params=params)
r.raise_for_status()
data = r.json()
elevations = data.get("elevation") or []
out: list[Optional[float]] = []
for j, elev in enumerate(elevations):
if j >= len(batch_lat):
break
if elev is None:
out.append(None)
else:
out.append(float(elev))
while len(out) < len(batch_lat):
out.append(None)
return out
def fetch_elevation_m(lat: float, lon: float) -> Optional[float]:
vals = fetch_elevations_batch([lat], [lon])
return vals[0] if vals else None
def fetch_elevations_batch(
lats: list[float], lons: list[float]
) -> list[Optional[float]]:
if not lats or len(lats) != len(lons):
return []
probe = probe_elevation_api()
if not probe["ok"]:
logger.warning(
"skip elevation fetch: API unreachable (%s)",
probe.get("error"),
)
return [None] * len(lats)
out: list[Optional[float]] = [None] * len(lats)
pending_idx: list[int] = []
pending_lat: list[float] = []
pending_lon: list[float] = []
for i, (lat, lon) in enumerate(zip(lats, lons)):
key = _cache_key(lat, lon)
if key in _CACHE:
out[i] = _CACHE[key]
else:
pending_idx.append(i)
pending_lat.append(float(lat))
pending_lon.append(float(lon))
for start in range(0, len(pending_lat), _BATCH_SIZE):
batch_i = pending_idx[start : start + _BATCH_SIZE]
batch_lat = pending_lat[start : start + _BATCH_SIZE]
batch_lon = pending_lon[start : start + _BATCH_SIZE]
try:
batch_vals = _fetch_elevation_batch(batch_lat, batch_lon)
for j, val in enumerate(batch_vals):
lat = batch_lat[j]
lon = batch_lon[j]
_CACHE[_cache_key(lat, lon)] = val
out[batch_i[j]] = val
logger.info(
"elevation ok: %s points, sample=%s",
len(batch_lat),
batch_vals[0] if batch_vals else None,
)
except Exception as e:
logger.warning(
"elevation batch failed (%s points): %s",
len(batch_lat),
e,
)
for j in range(len(batch_lat)):
try:
single = _fetch_elevation_batch(
[batch_lat[j]], [batch_lon[j]]
)
val = single[0] if single else None
except Exception as e2:
logger.warning(
"elevation single failed %.6f,%.6f: %s",
batch_lat[j],
batch_lon[j],
e2,
)
val = None
_CACHE[_cache_key(batch_lat[j], batch_lon[j])] = val
out[batch_i[j]] = val
return out
def _interp_at_dist(
cleaned: list[tuple[float, float]], cum: list[float], dist_m: float
) -> tuple[float, float]:
if dist_m <= 0:
return cleaned[0]
if dist_m >= cum[-1]:
return cleaned[-1]
for i in range(1, len(cum)):
if dist_m <= cum[i]:
seg = cum[i] - cum[i - 1]
t = 0.0 if seg <= 0 else (dist_m - cum[i - 1]) / seg
lat1, lon1 = cleaned[i - 1]
lat2, lon2 = cleaned[i]
return lat1 + (lat2 - lat1) * t, lon1 + (lon2 - lon1) * t
return cleaned[-1]
def resample_track_path(
points: list[dict[str, Any]], step_m: float = 10.0
) -> list[dict[str, float]]:
"""Sample (lat, lon, dist_m) along polyline every ~step_m meters."""
if not points or step_m <= 0:
return []
cleaned: list[tuple[float, float]] = []
for p in points:
lat = p.get("lat")
lon = p.get("lon")
if lat is None or lon is None:
continue
lat_f, lon_f = float(lat), float(lon)
if not cleaned or haversine_m(cleaned[-1][0], cleaned[-1][1], lat_f, lon_f) > 0.5:
cleaned.append((lat_f, lon_f))
if not cleaned:
return []
if len(cleaned) == 1:
return [{"lat": cleaned[0][0], "lon": cleaned[0][1], "dist_m": 0.0}]
cum = [0.0]
for i in range(1, len(cleaned)):
cum.append(
cum[-1]
+ haversine_m(
cleaned[i - 1][0], cleaned[i - 1][1], cleaned[i][0], cleaned[i][1]
)
)
total = cum[-1]
samples: list[dict[str, float]] = []
dist = 0.0
while dist <= total + 1e-6:
lat, lon = _interp_at_dist(cleaned, cum, dist)
samples.append({"lat": lat, "lon": lon, "dist_m": round(dist, 1)})
if dist >= total:
break
dist += step_m
return samples
def build_elevation_profile(
points: list[dict[str, Any]], step_m: float = 10.0
) -> dict[str, Any]:
"""Resample track and fetch terrain elevations."""
step_m = max(5.0, min(10.0, float(step_m)))
samples = resample_track_path(points, step_m)
if not samples:
return {
"step_m": step_m,
"points": [],
"total_m": 0.0,
"api_source": "elevation",
"api_error": "no samples",
}
probe = probe_elevation_api()
if not probe["ok"]:
return {
"step_m": step_m,
"points": [],
"total_m": 0.0,
"api_source": "elevation",
"api_error": f"elevation API unreachable: {probe['error']}",
"elevation_url": ELEVATION_API_URL,
}
lats = [s["lat"] for s in samples]
lons = [s["lon"] for s in samples]
elevations = fetch_elevations_batch(lats, lons)
profile: list[dict[str, Any]] = []
elev_vals: list[float] = []
for s, elev in zip(samples, elevations):
item = {
"dist_m": round(s["dist_m"], 1),
"lat": round(s["lat"], 6),
"lon": round(s["lon"], 6),
"elevation_m": elev,
}
profile.append(item)
if elev is not None:
elev_vals.append(elev)
total_m = profile[-1]["dist_m"] if profile else 0.0
result: dict[str, Any] = {
"step_m": step_m,
"total_m": total_m,
"min_elevation_m": min(elev_vals) if elev_vals else None,
"max_elevation_m": max(elev_vals) if elev_vals else None,
"points": profile,
"api_source": "elevation",
"elevation_url": ELEVATION_API_URL,
}
if not elev_vals:
result["api_error"] = "elevation API returned no values"
return result