generated from Grigo/AndroidTemplate
596 lines
18 KiB
Python
596 lines
18 KiB
Python
"""Terrain elevation via self-hosted Open-Meteo-compatible API."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import math
|
|
import time
|
|
from typing import Any, Optional
|
|
|
|
import httpx
|
|
|
|
from .config import (
|
|
ELEVATION_API_URL,
|
|
ELEVATION_CONNECT_TIMEOUT,
|
|
ELEVATION_PROBE_TTL_SEC,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_BATCH_SIZE = 100
|
|
_MAX_PROFILE_POINTS = 500
|
|
_CACHE: dict[tuple[float, float], Optional[float]] = {}
|
|
_probe_checked_at = 0.0
|
|
_probe_ok = False
|
|
_probe_error: Optional[str] = None
|
|
|
|
|
|
def _cache_key(lat: float, lon: float) -> tuple[float, float]:
|
|
return (round(lat, 6), round(lon, 6))
|
|
|
|
|
|
def haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
|
|
r = 6_371_000.0
|
|
d_lat = math.radians(lat2 - lat1)
|
|
d_lon = math.radians(lon2 - lon1)
|
|
a = (
|
|
math.sin(d_lat / 2) ** 2
|
|
+ math.cos(math.radians(lat1))
|
|
* math.cos(math.radians(lat2))
|
|
* math.sin(d_lon / 2) ** 2
|
|
)
|
|
return r * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
|
|
|
|
|
|
def probe_elevation_api(force: bool = False) -> dict[str, Any]:
|
|
"""Ping elevation service before batch requests (cached for TTL)."""
|
|
global _probe_checked_at, _probe_ok, _probe_error
|
|
|
|
now = time.monotonic()
|
|
if (
|
|
not force
|
|
and _probe_checked_at > 0
|
|
and now - _probe_checked_at < ELEVATION_PROBE_TTL_SEC
|
|
):
|
|
return {
|
|
"ok": _probe_ok,
|
|
"url": ELEVATION_API_URL,
|
|
"error": _probe_error,
|
|
}
|
|
|
|
try:
|
|
with httpx.Client(timeout=ELEVATION_CONNECT_TIMEOUT) as client:
|
|
r = client.get(
|
|
ELEVATION_API_URL,
|
|
params={"latitude": "0.000000", "longitude": "0.000000"},
|
|
)
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
if "elevation" not in data:
|
|
raise ValueError("response has no elevation field")
|
|
_probe_checked_at = now
|
|
_probe_ok = True
|
|
_probe_error = None
|
|
logger.info("elevation API ok: %s", ELEVATION_API_URL)
|
|
except Exception as e:
|
|
_probe_checked_at = now
|
|
_probe_ok = False
|
|
_probe_error = str(e)
|
|
logger.warning("elevation API unreachable %s: %s", ELEVATION_API_URL, e)
|
|
|
|
return {
|
|
"ok": _probe_ok,
|
|
"url": ELEVATION_API_URL,
|
|
"error": _probe_error,
|
|
}
|
|
|
|
|
|
def elevation_status(force: bool = False) -> dict[str, Any]:
|
|
probe = probe_elevation_api(force=force)
|
|
return {
|
|
"elevation_ok": probe["ok"],
|
|
"elevation_url": probe["url"],
|
|
"elevation_error": probe["error"],
|
|
}
|
|
|
|
|
|
def _fetch_elevation_batch(
|
|
batch_lat: list[float], batch_lon: list[float]
|
|
) -> list[Optional[float]]:
|
|
if not batch_lat:
|
|
return []
|
|
params = {
|
|
"latitude": ",".join(f"{lat:.6f}" for lat in batch_lat),
|
|
"longitude": ",".join(f"{lon:.6f}" for lon in batch_lon),
|
|
}
|
|
with httpx.Client(timeout=ELEVATION_CONNECT_TIMEOUT) as client:
|
|
r = client.get(ELEVATION_API_URL, params=params)
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
elevations = data.get("elevation") or []
|
|
out: list[Optional[float]] = []
|
|
for j, elev in enumerate(elevations):
|
|
if j >= len(batch_lat):
|
|
break
|
|
if elev is None:
|
|
out.append(None)
|
|
else:
|
|
out.append(float(elev))
|
|
while len(out) < len(batch_lat):
|
|
out.append(None)
|
|
return out
|
|
|
|
|
|
def fetch_elevation_m(lat: float, lon: float) -> Optional[float]:
|
|
vals = fetch_elevations_batch([lat], [lon])
|
|
return vals[0] if vals else None
|
|
|
|
|
|
def fetch_elevations_batch(
|
|
lats: list[float], lons: list[float]
|
|
) -> list[Optional[float]]:
|
|
if not lats or len(lats) != len(lons):
|
|
return []
|
|
|
|
probe = probe_elevation_api()
|
|
if not probe["ok"]:
|
|
logger.warning(
|
|
"skip elevation fetch: API unreachable (%s)",
|
|
probe.get("error"),
|
|
)
|
|
return [None] * len(lats)
|
|
|
|
out: list[Optional[float]] = [None] * len(lats)
|
|
pending_idx: list[int] = []
|
|
pending_lat: list[float] = []
|
|
pending_lon: list[float] = []
|
|
|
|
for i, (lat, lon) in enumerate(zip(lats, lons)):
|
|
key = _cache_key(lat, lon)
|
|
if key in _CACHE:
|
|
out[i] = _CACHE[key]
|
|
else:
|
|
pending_idx.append(i)
|
|
pending_lat.append(float(lat))
|
|
pending_lon.append(float(lon))
|
|
|
|
for start in range(0, len(pending_lat), _BATCH_SIZE):
|
|
batch_i = pending_idx[start : start + _BATCH_SIZE]
|
|
batch_lat = pending_lat[start : start + _BATCH_SIZE]
|
|
batch_lon = pending_lon[start : start + _BATCH_SIZE]
|
|
try:
|
|
batch_vals = _fetch_elevation_batch(batch_lat, batch_lon)
|
|
for j, val in enumerate(batch_vals):
|
|
lat = batch_lat[j]
|
|
lon = batch_lon[j]
|
|
_CACHE[_cache_key(lat, lon)] = val
|
|
out[batch_i[j]] = val
|
|
logger.info(
|
|
"elevation ok: %s points, sample=%s",
|
|
len(batch_lat),
|
|
batch_vals[0] if batch_vals else None,
|
|
)
|
|
except Exception as e:
|
|
logger.warning(
|
|
"elevation batch failed (%s points): %s",
|
|
len(batch_lat),
|
|
e,
|
|
)
|
|
for j in range(len(batch_lat)):
|
|
try:
|
|
single = _fetch_elevation_batch(
|
|
[batch_lat[j]], [batch_lon[j]]
|
|
)
|
|
val = single[0] if single else None
|
|
except Exception as e2:
|
|
logger.warning(
|
|
"elevation single failed %.6f,%.6f: %s",
|
|
batch_lat[j],
|
|
batch_lon[j],
|
|
e2,
|
|
)
|
|
val = None
|
|
_CACHE[_cache_key(batch_lat[j], batch_lon[j])] = val
|
|
out[batch_i[j]] = val
|
|
return out
|
|
|
|
|
|
def _interp_at_dist(
|
|
cleaned: list[tuple[float, float]], cum: list[float], dist_m: float
|
|
) -> tuple[float, float]:
|
|
if dist_m <= 0:
|
|
return cleaned[0]
|
|
if dist_m >= cum[-1]:
|
|
return cleaned[-1]
|
|
for i in range(1, len(cum)):
|
|
if dist_m <= cum[i]:
|
|
seg = cum[i] - cum[i - 1]
|
|
t = 0.0 if seg <= 0 else (dist_m - cum[i - 1]) / seg
|
|
lat1, lon1 = cleaned[i - 1]
|
|
lat2, lon2 = cleaned[i]
|
|
return lat1 + (lat2 - lat1) * t, lon1 + (lon2 - lon1) * t
|
|
return cleaned[-1]
|
|
|
|
|
|
def resample_track_path(
|
|
points: list[dict[str, Any]], step_m: float = 10.0
|
|
) -> list[dict[str, float]]:
|
|
"""Sample (lat, lon, dist_m) along polyline every ~step_m meters."""
|
|
if not points or step_m <= 0:
|
|
return []
|
|
cleaned: list[tuple[float, float]] = []
|
|
for p in points:
|
|
lat = p.get("lat")
|
|
lon = p.get("lon")
|
|
if lat is None or lon is None:
|
|
continue
|
|
lat_f, lon_f = float(lat), float(lon)
|
|
if not cleaned or haversine_m(cleaned[-1][0], cleaned[-1][1], lat_f, lon_f) > 0.5:
|
|
cleaned.append((lat_f, lon_f))
|
|
if not cleaned:
|
|
return []
|
|
if len(cleaned) == 1:
|
|
return [{"lat": cleaned[0][0], "lon": cleaned[0][1], "dist_m": 0.0}]
|
|
|
|
cum = [0.0]
|
|
for i in range(1, len(cleaned)):
|
|
cum.append(
|
|
cum[-1]
|
|
+ haversine_m(
|
|
cleaned[i - 1][0], cleaned[i - 1][1], cleaned[i][0], cleaned[i][1]
|
|
)
|
|
)
|
|
total = cum[-1]
|
|
samples: list[dict[str, float]] = []
|
|
dist = 0.0
|
|
while dist <= total + 1e-6:
|
|
lat, lon = _interp_at_dist(cleaned, cum, dist)
|
|
samples.append({"lat": lat, "lon": lon, "dist_m": round(dist, 1)})
|
|
if dist >= total:
|
|
break
|
|
dist += step_m
|
|
return samples
|
|
|
|
|
|
def resample_track_path_count(
|
|
points: list[dict[str, Any]], count: int
|
|
) -> list[dict[str, float]]:
|
|
"""Sample exactly `count` points evenly spaced along polyline."""
|
|
if not points or count < 2:
|
|
return []
|
|
cleaned: list[tuple[float, float]] = []
|
|
for p in points:
|
|
lat = p.get("lat")
|
|
lon = p.get("lon")
|
|
if lat is None or lon is None:
|
|
continue
|
|
lat_f, lon_f = float(lat), float(lon)
|
|
if not cleaned or haversine_m(cleaned[-1][0], cleaned[-1][1], lat_f, lon_f) > 0.5:
|
|
cleaned.append((lat_f, lon_f))
|
|
if not cleaned:
|
|
return []
|
|
if len(cleaned) == 1:
|
|
return [{"lat": cleaned[0][0], "lon": cleaned[0][1], "dist_m": 0.0}]
|
|
|
|
cum = [0.0]
|
|
for i in range(1, len(cleaned)):
|
|
cum.append(
|
|
cum[-1]
|
|
+ haversine_m(
|
|
cleaned[i - 1][0], cleaned[i - 1][1], cleaned[i][0], cleaned[i][1]
|
|
)
|
|
)
|
|
total = cum[-1]
|
|
if total < 1e-6:
|
|
return [{"lat": cleaned[0][0], "lon": cleaned[0][1], "dist_m": 0.0}]
|
|
|
|
n = max(2, min(_MAX_PROFILE_POINTS, int(count)))
|
|
samples: list[dict[str, float]] = []
|
|
for i in range(n):
|
|
dist = (total * i) / (n - 1)
|
|
lat, lon = _interp_at_dist(cleaned, cum, dist)
|
|
samples.append({"lat": lat, "lon": lon, "dist_m": round(dist, 1)})
|
|
return samples
|
|
|
|
|
|
def build_elevation_profile(
|
|
points: list[dict[str, Any]],
|
|
step_m: float = 10.0,
|
|
target_points: int | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Resample track and fetch terrain elevations."""
|
|
if target_points is not None:
|
|
n = max(2, min(_MAX_PROFILE_POINTS, int(target_points)))
|
|
samples = resample_track_path_count(points, n)
|
|
if len(samples) > 1:
|
|
step_m = round(
|
|
(samples[-1]["dist_m"] - samples[0]["dist_m"]) / (len(samples) - 1),
|
|
2,
|
|
)
|
|
else:
|
|
step_m = 0.0
|
|
else:
|
|
step_m = max(5.0, min(10.0, float(step_m)))
|
|
samples = resample_track_path(points, step_m)
|
|
if not samples:
|
|
return {
|
|
"step_m": step_m,
|
|
"points": [],
|
|
"total_m": 0.0,
|
|
"api_source": "elevation",
|
|
"api_error": "no samples",
|
|
}
|
|
|
|
probe = probe_elevation_api()
|
|
if not probe["ok"]:
|
|
return {
|
|
"step_m": step_m,
|
|
"points": [],
|
|
"total_m": 0.0,
|
|
"api_source": "elevation",
|
|
"api_error": f"elevation API unreachable: {probe['error']}",
|
|
"elevation_url": ELEVATION_API_URL,
|
|
}
|
|
|
|
lats = [s["lat"] for s in samples]
|
|
lons = [s["lon"] for s in samples]
|
|
elevations = fetch_elevations_batch(lats, lons)
|
|
|
|
profile: list[dict[str, Any]] = []
|
|
elev_vals: list[float] = []
|
|
for s, elev in zip(samples, elevations):
|
|
item = {
|
|
"dist_m": round(s["dist_m"], 1),
|
|
"lat": round(s["lat"], 6),
|
|
"lon": round(s["lon"], 6),
|
|
"elevation_m": elev,
|
|
}
|
|
profile.append(item)
|
|
if elev is not None:
|
|
elev_vals.append(elev)
|
|
|
|
total_m = profile[-1]["dist_m"] if profile else 0.0
|
|
result: dict[str, Any] = {
|
|
"step_m": step_m,
|
|
"total_m": total_m,
|
|
"min_elevation_m": min(elev_vals) if elev_vals else None,
|
|
"max_elevation_m": max(elev_vals) if elev_vals else None,
|
|
"points": profile,
|
|
"api_source": "elevation",
|
|
"elevation_url": ELEVATION_API_URL,
|
|
}
|
|
if not elev_vals:
|
|
result["api_error"] = "elevation API returned no values"
|
|
return result
|
|
|
|
|
|
def _offset_m(lat: float, lon: float, north_m: float, east_m: float) -> tuple[float, float]:
|
|
dlat = north_m / 111_320.0
|
|
dlon = east_m / (111_320.0 * max(math.cos(math.radians(lat)), 1e-6))
|
|
return lat + dlat, lon + dlon
|
|
|
|
|
|
_MAX_GRID_POINTS = 2500
|
|
_MAX_GRID_POINTS_FINE = 12000
|
|
|
|
|
|
def _auto_step_m(radius_m: float) -> float:
|
|
if radius_m <= 150:
|
|
return 10.0
|
|
if radius_m <= 300:
|
|
return 15.0
|
|
return 20.0
|
|
|
|
|
|
def _sample_circular_grid(
|
|
lat: float,
|
|
lon: float,
|
|
radius_m: float,
|
|
step_m: float,
|
|
) -> list[tuple[int, int, float, float, float]]:
|
|
steps = int(radius_m / step_m)
|
|
cells: list[tuple[int, int, float, float, float]] = []
|
|
for i in range(-steps, steps + 1):
|
|
for j in range(-steps, steps + 1):
|
|
north = i * step_m
|
|
east = j * step_m
|
|
dist = math.hypot(north, east)
|
|
if dist > radius_m:
|
|
continue
|
|
la, lo = _offset_m(lat, lon, north, east)
|
|
cells.append((i, j, la, lo, dist))
|
|
return cells
|
|
|
|
|
|
def _resolve_grid_step(
|
|
lat: float, lon: float, radius_m: float, step_m: float
|
|
) -> float:
|
|
if step_m <= 0:
|
|
step_m = _auto_step_m(radius_m)
|
|
min_step = 1.0 if radius_m <= 100.0 else 5.0
|
|
step_m = max(min_step, min(float(step_m), 100.0))
|
|
max_points = _MAX_GRID_POINTS_FINE if radius_m <= 100.0 and step_m <= 1.0 else _MAX_GRID_POINTS
|
|
while len(_sample_circular_grid(lat, lon, radius_m, step_m)) > max_points:
|
|
step_m = math.ceil(step_m * 1.25)
|
|
if step_m >= radius_m:
|
|
break
|
|
return step_m
|
|
|
|
|
|
def build_elevation_grid(
|
|
lat: float,
|
|
lon: float,
|
|
radius_m: float = 200.0,
|
|
step_m: float = 0.0,
|
|
) -> dict[str, Any]:
|
|
"""Circular elevation grid for heatmap (delta relative to center)."""
|
|
probe = probe_elevation_api()
|
|
if not probe["ok"]:
|
|
return {
|
|
"ok": False,
|
|
"error": f"elevation API unreachable: {probe['error']}",
|
|
"elevation_url": ELEVATION_API_URL,
|
|
}
|
|
|
|
radius_m = max(50.0, min(float(radius_m), 500.0))
|
|
step_m = _resolve_grid_step(lat, lon, radius_m, step_m)
|
|
|
|
center_elev = fetch_elevation_m(lat, lon)
|
|
if center_elev is None:
|
|
return {"ok": False, "error": "no elevation at center"}
|
|
|
|
grid_cells = _sample_circular_grid(lat, lon, radius_m, step_m)
|
|
if not grid_cells:
|
|
return {"ok": False, "error": "empty search grid"}
|
|
|
|
lats = [c[2] for c in grid_cells]
|
|
lons = [c[3] for c in grid_cells]
|
|
elevations = fetch_elevations_batch(lats, lons)
|
|
|
|
points: list[dict[str, Any]] = []
|
|
deltas: list[float] = []
|
|
for (i, j, la, lo, dist), elev in zip(grid_cells, elevations):
|
|
if elev is None:
|
|
continue
|
|
delta = float(elev) - center_elev
|
|
deltas.append(delta)
|
|
points.append(
|
|
{
|
|
"i": i,
|
|
"j": j,
|
|
"lat": round(la, 6),
|
|
"lon": round(lo, 6),
|
|
"dist_m": round(dist, 1),
|
|
"elevation_m": float(elev),
|
|
"delta_m": round(delta, 1),
|
|
}
|
|
)
|
|
|
|
if not points:
|
|
return {"ok": False, "error": "no elevation values in grid"}
|
|
|
|
return {
|
|
"ok": True,
|
|
"center": {
|
|
"lat": round(lat, 6),
|
|
"lon": round(lon, 6),
|
|
"elevation_m": center_elev,
|
|
},
|
|
"radius_m": radius_m,
|
|
"step_m": step_m,
|
|
"points": points,
|
|
"min_delta_m": round(min(deltas), 1),
|
|
"max_delta_m": round(max(deltas), 1),
|
|
"api_source": "elevation",
|
|
"elevation_url": ELEVATION_API_URL,
|
|
}
|
|
|
|
|
|
def find_nearest_hill(
|
|
lat: float,
|
|
lon: float,
|
|
radius_m: float = 5000.0,
|
|
step_m: float = 300.0,
|
|
min_prominence_m: float = 8.0,
|
|
) -> dict[str, Any]:
|
|
"""Find nearest local elevation maximum around a point."""
|
|
probe = probe_elevation_api()
|
|
if not probe["ok"]:
|
|
return {
|
|
"ok": False,
|
|
"error": f"elevation API unreachable: {probe['error']}",
|
|
"elevation_url": ELEVATION_API_URL,
|
|
}
|
|
|
|
radius_m = max(500.0, min(float(radius_m), 15_000.0))
|
|
step_m = max(100.0, min(float(step_m), 500.0))
|
|
min_prominence_m = max(3.0, min(float(min_prominence_m), 100.0))
|
|
|
|
center_elev = fetch_elevation_m(lat, lon)
|
|
if center_elev is None:
|
|
return {"ok": False, "error": "no elevation at center"}
|
|
|
|
grid_cells = _sample_circular_grid(lat, lon, radius_m, step_m)
|
|
if not grid_cells:
|
|
return {"ok": False, "error": "empty search grid"}
|
|
|
|
lats = [c[2] for c in grid_cells]
|
|
lons = [c[3] for c in grid_cells]
|
|
elevations = fetch_elevations_batch(lats, lons)
|
|
|
|
grid: dict[tuple[int, int], dict[str, Any]] = {}
|
|
for (i, j, la, lo, dist), elev in zip(grid_cells, elevations):
|
|
grid[(i, j)] = {
|
|
"lat": round(la, 6),
|
|
"lon": round(lo, 6),
|
|
"dist_m": round(dist, 1),
|
|
"elevation_m": elev,
|
|
}
|
|
|
|
def is_local_max(i: int, j: int, elev: float) -> bool:
|
|
for di in (-1, 0, 1):
|
|
for dj in (-1, 0, 1):
|
|
if di == 0 and dj == 0:
|
|
continue
|
|
n = grid.get((i + di, j + dj))
|
|
if n and n["elevation_m"] is not None and n["elevation_m"] >= elev:
|
|
return False
|
|
return True
|
|
|
|
candidates: list[dict[str, Any]] = []
|
|
for (i, j), cell in grid.items():
|
|
elev = cell.get("elevation_m")
|
|
if elev is None:
|
|
continue
|
|
prominence = float(elev) - center_elev
|
|
if prominence < min_prominence_m:
|
|
continue
|
|
if is_local_max(i, j, float(elev)):
|
|
candidates.append({**cell, "prominence_m": round(prominence, 1)})
|
|
|
|
if not candidates:
|
|
best = None
|
|
for cell in grid.values():
|
|
elev = cell.get("elevation_m")
|
|
if elev is None:
|
|
continue
|
|
prominence = float(elev) - center_elev
|
|
if prominence < min_prominence_m * 0.5:
|
|
continue
|
|
if best is None or cell["dist_m"] < best["dist_m"]:
|
|
best = {
|
|
**cell,
|
|
"prominence_m": round(prominence, 1),
|
|
"is_local_max": False,
|
|
}
|
|
if best is None:
|
|
return {
|
|
"ok": False,
|
|
"error": "no hill found in radius",
|
|
"center": {
|
|
"lat": round(lat, 6),
|
|
"lon": round(lon, 6),
|
|
"elevation_m": center_elev,
|
|
},
|
|
"radius_m": radius_m,
|
|
}
|
|
hill = best
|
|
else:
|
|
candidates.sort(key=lambda c: c["dist_m"])
|
|
hill = {**candidates[0], "is_local_max": True}
|
|
|
|
return {
|
|
"ok": True,
|
|
"center": {
|
|
"lat": round(lat, 6),
|
|
"lon": round(lon, 6),
|
|
"elevation_m": center_elev,
|
|
},
|
|
"hill": hill,
|
|
"candidates": len(candidates),
|
|
"radius_m": radius_m,
|
|
"step_m": step_m,
|
|
"api_source": "elevation",
|
|
"elevation_url": ELEVATION_API_URL,
|
|
}
|