"""Terrain elevation via self-hosted Open-Meteo-compatible API.""" from __future__ import annotations import logging import math import time from typing import Any, Optional import httpx from .config import ( ELEVATION_API_URL, ELEVATION_CONNECT_TIMEOUT, ELEVATION_PROBE_TTL_SEC, ) logger = logging.getLogger(__name__) _BATCH_SIZE = 100 _MAX_PROFILE_POINTS = 500 _CACHE: dict[tuple[float, float], Optional[float]] = {} _probe_checked_at = 0.0 _probe_ok = False _probe_error: Optional[str] = None def _cache_key(lat: float, lon: float) -> tuple[float, float]: return (round(lat, 6), round(lon, 6)) def haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float: r = 6_371_000.0 d_lat = math.radians(lat2 - lat1) d_lon = math.radians(lon2 - lon1) a = ( math.sin(d_lat / 2) ** 2 + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(d_lon / 2) ** 2 ) return r * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) def probe_elevation_api(force: bool = False) -> dict[str, Any]: """Ping elevation service before batch requests (cached for TTL).""" global _probe_checked_at, _probe_ok, _probe_error now = time.monotonic() if ( not force and _probe_checked_at > 0 and now - _probe_checked_at < ELEVATION_PROBE_TTL_SEC ): return { "ok": _probe_ok, "url": ELEVATION_API_URL, "error": _probe_error, } try: with httpx.Client(timeout=ELEVATION_CONNECT_TIMEOUT) as client: r = client.get( ELEVATION_API_URL, params={"latitude": "0.000000", "longitude": "0.000000"}, ) r.raise_for_status() data = r.json() if "elevation" not in data: raise ValueError("response has no elevation field") _probe_checked_at = now _probe_ok = True _probe_error = None logger.info("elevation API ok: %s", ELEVATION_API_URL) except Exception as e: _probe_checked_at = now _probe_ok = False _probe_error = str(e) logger.warning("elevation API unreachable %s: %s", ELEVATION_API_URL, e) return { "ok": _probe_ok, "url": ELEVATION_API_URL, "error": _probe_error, } def elevation_status(force: bool = False) -> dict[str, Any]: probe = probe_elevation_api(force=force) return { "elevation_ok": probe["ok"], "elevation_url": probe["url"], "elevation_error": probe["error"], } def _fetch_elevation_batch( batch_lat: list[float], batch_lon: list[float] ) -> list[Optional[float]]: if not batch_lat: return [] params = { "latitude": ",".join(f"{lat:.6f}" for lat in batch_lat), "longitude": ",".join(f"{lon:.6f}" for lon in batch_lon), } with httpx.Client(timeout=ELEVATION_CONNECT_TIMEOUT) as client: r = client.get(ELEVATION_API_URL, params=params) r.raise_for_status() data = r.json() elevations = data.get("elevation") or [] out: list[Optional[float]] = [] for j, elev in enumerate(elevations): if j >= len(batch_lat): break if elev is None: out.append(None) else: out.append(float(elev)) while len(out) < len(batch_lat): out.append(None) return out def fetch_elevation_m(lat: float, lon: float) -> Optional[float]: vals = fetch_elevations_batch([lat], [lon]) return vals[0] if vals else None def fetch_elevations_batch( lats: list[float], lons: list[float] ) -> list[Optional[float]]: if not lats or len(lats) != len(lons): return [] probe = probe_elevation_api() if not probe["ok"]: logger.warning( "skip elevation fetch: API unreachable (%s)", probe.get("error"), ) return [None] * len(lats) out: list[Optional[float]] = [None] * len(lats) pending_idx: list[int] = [] pending_lat: list[float] = [] pending_lon: list[float] = [] for i, (lat, lon) in enumerate(zip(lats, lons)): key = _cache_key(lat, lon) if key in _CACHE: out[i] = _CACHE[key] else: pending_idx.append(i) pending_lat.append(float(lat)) pending_lon.append(float(lon)) for start in range(0, len(pending_lat), _BATCH_SIZE): batch_i = pending_idx[start : start + _BATCH_SIZE] batch_lat = pending_lat[start : start + _BATCH_SIZE] batch_lon = pending_lon[start : start + _BATCH_SIZE] try: batch_vals = _fetch_elevation_batch(batch_lat, batch_lon) for j, val in enumerate(batch_vals): lat = batch_lat[j] lon = batch_lon[j] _CACHE[_cache_key(lat, lon)] = val out[batch_i[j]] = val logger.info( "elevation ok: %s points, sample=%s", len(batch_lat), batch_vals[0] if batch_vals else None, ) except Exception as e: logger.warning( "elevation batch failed (%s points): %s", len(batch_lat), e, ) for j in range(len(batch_lat)): try: single = _fetch_elevation_batch( [batch_lat[j]], [batch_lon[j]] ) val = single[0] if single else None except Exception as e2: logger.warning( "elevation single failed %.6f,%.6f: %s", batch_lat[j], batch_lon[j], e2, ) val = None _CACHE[_cache_key(batch_lat[j], batch_lon[j])] = val out[batch_i[j]] = val return out def _interp_at_dist( cleaned: list[tuple[float, float]], cum: list[float], dist_m: float ) -> tuple[float, float]: if dist_m <= 0: return cleaned[0] if dist_m >= cum[-1]: return cleaned[-1] for i in range(1, len(cum)): if dist_m <= cum[i]: seg = cum[i] - cum[i - 1] t = 0.0 if seg <= 0 else (dist_m - cum[i - 1]) / seg lat1, lon1 = cleaned[i - 1] lat2, lon2 = cleaned[i] return lat1 + (lat2 - lat1) * t, lon1 + (lon2 - lon1) * t return cleaned[-1] def resample_track_path( points: list[dict[str, Any]], step_m: float = 10.0 ) -> list[dict[str, float]]: """Sample (lat, lon, dist_m) along polyline every ~step_m meters.""" if not points or step_m <= 0: return [] cleaned: list[tuple[float, float]] = [] for p in points: lat = p.get("lat") lon = p.get("lon") if lat is None or lon is None: continue lat_f, lon_f = float(lat), float(lon) if not cleaned or haversine_m(cleaned[-1][0], cleaned[-1][1], lat_f, lon_f) > 0.5: cleaned.append((lat_f, lon_f)) if not cleaned: return [] if len(cleaned) == 1: return [{"lat": cleaned[0][0], "lon": cleaned[0][1], "dist_m": 0.0}] cum = [0.0] for i in range(1, len(cleaned)): cum.append( cum[-1] + haversine_m( cleaned[i - 1][0], cleaned[i - 1][1], cleaned[i][0], cleaned[i][1] ) ) total = cum[-1] samples: list[dict[str, float]] = [] dist = 0.0 while dist <= total + 1e-6: lat, lon = _interp_at_dist(cleaned, cum, dist) samples.append({"lat": lat, "lon": lon, "dist_m": round(dist, 1)}) if dist >= total: break dist += step_m return samples def resample_track_path_count( points: list[dict[str, Any]], count: int ) -> list[dict[str, float]]: """Sample exactly `count` points evenly spaced along polyline.""" if not points or count < 2: return [] cleaned: list[tuple[float, float]] = [] for p in points: lat = p.get("lat") lon = p.get("lon") if lat is None or lon is None: continue lat_f, lon_f = float(lat), float(lon) if not cleaned or haversine_m(cleaned[-1][0], cleaned[-1][1], lat_f, lon_f) > 0.5: cleaned.append((lat_f, lon_f)) if not cleaned: return [] if len(cleaned) == 1: return [{"lat": cleaned[0][0], "lon": cleaned[0][1], "dist_m": 0.0}] cum = [0.0] for i in range(1, len(cleaned)): cum.append( cum[-1] + haversine_m( cleaned[i - 1][0], cleaned[i - 1][1], cleaned[i][0], cleaned[i][1] ) ) total = cum[-1] if total < 1e-6: return [{"lat": cleaned[0][0], "lon": cleaned[0][1], "dist_m": 0.0}] n = max(2, min(_MAX_PROFILE_POINTS, int(count))) samples: list[dict[str, float]] = [] for i in range(n): dist = (total * i) / (n - 1) lat, lon = _interp_at_dist(cleaned, cum, dist) samples.append({"lat": lat, "lon": lon, "dist_m": round(dist, 1)}) return samples def build_elevation_profile( points: list[dict[str, Any]], step_m: float = 10.0, target_points: int | None = None, ) -> dict[str, Any]: """Resample track and fetch terrain elevations.""" if target_points is not None: n = max(2, min(_MAX_PROFILE_POINTS, int(target_points))) samples = resample_track_path_count(points, n) if len(samples) > 1: step_m = round( (samples[-1]["dist_m"] - samples[0]["dist_m"]) / (len(samples) - 1), 2, ) else: step_m = 0.0 else: step_m = max(5.0, min(10.0, float(step_m))) samples = resample_track_path(points, step_m) if not samples: return { "step_m": step_m, "points": [], "total_m": 0.0, "api_source": "elevation", "api_error": "no samples", } probe = probe_elevation_api() if not probe["ok"]: return { "step_m": step_m, "points": [], "total_m": 0.0, "api_source": "elevation", "api_error": f"elevation API unreachable: {probe['error']}", "elevation_url": ELEVATION_API_URL, } lats = [s["lat"] for s in samples] lons = [s["lon"] for s in samples] elevations = fetch_elevations_batch(lats, lons) profile: list[dict[str, Any]] = [] elev_vals: list[float] = [] for s, elev in zip(samples, elevations): item = { "dist_m": round(s["dist_m"], 1), "lat": round(s["lat"], 6), "lon": round(s["lon"], 6), "elevation_m": elev, } profile.append(item) if elev is not None: elev_vals.append(elev) total_m = profile[-1]["dist_m"] if profile else 0.0 result: dict[str, Any] = { "step_m": step_m, "total_m": total_m, "min_elevation_m": min(elev_vals) if elev_vals else None, "max_elevation_m": max(elev_vals) if elev_vals else None, "points": profile, "api_source": "elevation", "elevation_url": ELEVATION_API_URL, } if not elev_vals: result["api_error"] = "elevation API returned no values" return result def _offset_m(lat: float, lon: float, north_m: float, east_m: float) -> tuple[float, float]: dlat = north_m / 111_320.0 dlon = east_m / (111_320.0 * max(math.cos(math.radians(lat)), 1e-6)) return lat + dlat, lon + dlon _MAX_GRID_POINTS = 2500 def _auto_step_m(radius_m: float) -> float: if radius_m <= 150: return 10.0 if radius_m <= 300: return 15.0 return 20.0 def _sample_circular_grid( lat: float, lon: float, radius_m: float, step_m: float, ) -> list[tuple[int, int, float, float, float]]: steps = int(radius_m / step_m) cells: list[tuple[int, int, float, float, float]] = [] for i in range(-steps, steps + 1): for j in range(-steps, steps + 1): north = i * step_m east = j * step_m dist = math.hypot(north, east) if dist > radius_m: continue la, lo = _offset_m(lat, lon, north, east) cells.append((i, j, la, lo, dist)) return cells def _resolve_grid_step(lat: float, lon: float, radius_m: float, step_m: float) -> float: if step_m <= 0: step_m = _auto_step_m(radius_m) step_m = max(5.0, min(float(step_m), 100.0)) while len(_sample_circular_grid(lat, lon, radius_m, step_m)) > _MAX_GRID_POINTS: step_m = math.ceil(step_m * 1.25) if step_m >= radius_m: break return step_m def build_elevation_grid( lat: float, lon: float, radius_m: float = 200.0, step_m: float = 0.0, ) -> dict[str, Any]: """Circular elevation grid for heatmap (delta relative to center).""" probe = probe_elevation_api() if not probe["ok"]: return { "ok": False, "error": f"elevation API unreachable: {probe['error']}", "elevation_url": ELEVATION_API_URL, } radius_m = max(100.0, min(float(radius_m), 500.0)) step_m = _resolve_grid_step(lat, lon, radius_m, step_m) center_elev = fetch_elevation_m(lat, lon) if center_elev is None: return {"ok": False, "error": "no elevation at center"} grid_cells = _sample_circular_grid(lat, lon, radius_m, step_m) if not grid_cells: return {"ok": False, "error": "empty search grid"} lats = [c[2] for c in grid_cells] lons = [c[3] for c in grid_cells] elevations = fetch_elevations_batch(lats, lons) points: list[dict[str, Any]] = [] deltas: list[float] = [] for (i, j, la, lo, dist), elev in zip(grid_cells, elevations): if elev is None: continue delta = float(elev) - center_elev deltas.append(delta) points.append( { "i": i, "j": j, "lat": round(la, 6), "lon": round(lo, 6), "dist_m": round(dist, 1), "elevation_m": float(elev), "delta_m": round(delta, 1), } ) if not points: return {"ok": False, "error": "no elevation values in grid"} return { "ok": True, "center": { "lat": round(lat, 6), "lon": round(lon, 6), "elevation_m": center_elev, }, "radius_m": radius_m, "step_m": step_m, "points": points, "min_delta_m": round(min(deltas), 1), "max_delta_m": round(max(deltas), 1), "api_source": "elevation", "elevation_url": ELEVATION_API_URL, } def find_nearest_hill( lat: float, lon: float, radius_m: float = 5000.0, step_m: float = 300.0, min_prominence_m: float = 8.0, ) -> dict[str, Any]: """Find nearest local elevation maximum around a point.""" probe = probe_elevation_api() if not probe["ok"]: return { "ok": False, "error": f"elevation API unreachable: {probe['error']}", "elevation_url": ELEVATION_API_URL, } radius_m = max(500.0, min(float(radius_m), 15_000.0)) step_m = max(100.0, min(float(step_m), 500.0)) min_prominence_m = max(3.0, min(float(min_prominence_m), 100.0)) center_elev = fetch_elevation_m(lat, lon) if center_elev is None: return {"ok": False, "error": "no elevation at center"} grid_cells = _sample_circular_grid(lat, lon, radius_m, step_m) if not grid_cells: return {"ok": False, "error": "empty search grid"} lats = [c[2] for c in grid_cells] lons = [c[3] for c in grid_cells] elevations = fetch_elevations_batch(lats, lons) grid: dict[tuple[int, int], dict[str, Any]] = {} for (i, j, la, lo, dist), elev in zip(grid_cells, elevations): grid[(i, j)] = { "lat": round(la, 6), "lon": round(lo, 6), "dist_m": round(dist, 1), "elevation_m": elev, } def is_local_max(i: int, j: int, elev: float) -> bool: for di in (-1, 0, 1): for dj in (-1, 0, 1): if di == 0 and dj == 0: continue n = grid.get((i + di, j + dj)) if n and n["elevation_m"] is not None and n["elevation_m"] >= elev: return False return True candidates: list[dict[str, Any]] = [] for (i, j), cell in grid.items(): elev = cell.get("elevation_m") if elev is None: continue prominence = float(elev) - center_elev if prominence < min_prominence_m: continue if is_local_max(i, j, float(elev)): candidates.append({**cell, "prominence_m": round(prominence, 1)}) if not candidates: best = None for cell in grid.values(): elev = cell.get("elevation_m") if elev is None: continue prominence = float(elev) - center_elev if prominence < min_prominence_m * 0.5: continue if best is None or cell["dist_m"] < best["dist_m"]: best = { **cell, "prominence_m": round(prominence, 1), "is_local_max": False, } if best is None: return { "ok": False, "error": "no hill found in radius", "center": { "lat": round(lat, 6), "lon": round(lon, 6), "elevation_m": center_elev, }, "radius_m": radius_m, } hill = best else: candidates.sort(key=lambda c: c["dist_m"]) hill = {**candidates[0], "is_local_max": True} return { "ok": True, "center": { "lat": round(lat, 6), "lon": round(lon, 6), "elevation_m": center_elev, }, "hill": hill, "candidates": len(candidates), "radius_m": radius_m, "step_m": step_m, "api_source": "elevation", "elevation_url": ELEVATION_API_URL, }