import json import logging import sqlite3 import time from contextlib import contextmanager from typing import Any, Optional from .config import DATABASE_PATH, TELEMETRY_LIMIT, TRACK_POINTS_LIMIT from .devices import is_valid_device_id from .elevation import fetch_elevation_m from .models import ChatIn, TelemetryIn from .schema import SCHEMA_VERSION, apply_migrations, check_db_ok, get_schema_version logger = logging.getLogger(__name__) _HISTORY_COLUMNS = ( "id, device_id, lat, lon, rssi, range_m, meta, role, ts, source" ) def _connect() -> sqlite3.Connection: conn = sqlite3.connect(DATABASE_PATH, check_same_thread=False) conn.row_factory = sqlite3.Row return conn @contextmanager def _db(): conn = _connect() try: yield conn conn.commit() finally: conn.close() def init_db() -> None: with _db() as conn: log = apply_migrations(conn) for line in log: logger.info("db migrate: %s", line) cleanup_invalid_devices() def db_status() -> dict[str, Any]: with _db() as conn: return { "db_ok": check_db_ok(conn), "schema_version": get_schema_version(conn), "expected_version": SCHEMA_VERSION, "database_path": DATABASE_PATH, } def cleanup_invalid_devices() -> int: with _db() as conn: cur_t = conn.execute( "DELETE FROM telemetry WHERE device_id NOT GLOB 'android-????????'" ) cur_d = conn.execute( "DELETE FROM devices WHERE device_id NOT GLOB 'android-????????'" ) return cur_t.rowcount + cur_d.rowcount def _sanitize_coords( lat: Optional[float], lon: Optional[float] ) -> tuple[Optional[float], Optional[float]]: if lat is None or lon is None: return lat, lon if abs(lat) < 1e-5 and abs(lon) < 1e-5: return None, None if not (-90.0 <= lat <= 90.0 and -180.0 <= lon <= 180.0): return None, None return lat, lon def record_telemetry(data: TelemetryIn) -> dict[str, Any]: if not is_valid_device_id(data.device_id): raise ValueError( f"invalid device_id '{data.device_id}', expected android-xxxxxxxx (8 hex)" ) ts = data.ts if data.ts is not None else time.time() lat, lon = _sanitize_coords(data.lat, data.lon) with _db() as conn: conn.execute( """ INSERT INTO devices (device_id, label, last_seen) VALUES (?, ?, ?) ON CONFLICT(device_id) DO UPDATE SET last_seen = excluded.last_seen """, (data.device_id, data.device_id, ts), ) conn.execute( """ INSERT INTO telemetry (device_id, lat, lon, rssi, range_m, raw_frame, meta, role, ts, source) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( data.device_id, lat, lon, data.rssi, data.range_m, data.raw_frame, data.meta, data.role, ts, data.source, ), ) _trim_telemetry(conn, data.device_id) return {"ok": True, "device_id": data.device_id, "ts": ts} def _trim_telemetry(conn: sqlite3.Connection, device_id: str) -> None: conn.execute( """ DELETE FROM telemetry WHERE device_id = ? AND id NOT IN ( SELECT id FROM telemetry WHERE device_id = ? ORDER BY ts DESC LIMIT ? ) """, (device_id, device_id, TELEMETRY_LIMIT), ) def list_devices() -> list[dict[str, Any]]: with _db() as conn: rows = conn.execute( """ SELECT d.device_id, d.last_seen, t.lat, t.lon, t.rssi, t.range_m, t.raw_frame, t.meta, t.role, t.ts, t.source FROM devices d INNER JOIN telemetry t ON t.id = ( SELECT id FROM telemetry WHERE device_id = d.device_id AND source = 'android' ORDER BY ts DESC LIMIT 1 ) WHERE d.device_id GLOB 'android-????????' ORDER BY d.last_seen DESC """ ).fetchall() devices = [_row_to_device(r) for r in rows] return [d for d in devices if not _is_null_island(d)] def _is_null_island(device: dict[str, Any]) -> bool: lat, lon = device.get("lat"), device.get("lon") if lat is None or lon is None: return False return abs(lat) < 1e-5 and abs(lon) < 1e-5 def _row_to_device(row: sqlite3.Row) -> dict[str, Any]: return { "device_id": row["device_id"], "last_seen": row["last_seen"], "lat": row["lat"], "lon": row["lon"], "rssi": row["rssi"], "range_m": row["range_m"], "raw_frame": row["raw_frame"], "meta": row["meta"] if "meta" in row.keys() else None, "role": row["role"] if "role" in row.keys() else None, "ts": row["ts"], "source": row["source"] if "source" in row.keys() else None, } def _row_to_history(row: sqlite3.Row) -> dict[str, Any]: return { "id": row["id"], "device_id": row["device_id"], "lat": row["lat"], "lon": row["lon"], "rssi": row["rssi"], "range_m": row["range_m"], "meta": row["meta"], "role": row["role"], "ts": row["ts"], "source": row["source"], } def get_telemetry( device_id: Optional[str] = None, limit: int = 100, since: Optional[float] = None, until: Optional[float] = None, role: Optional[str] = None, ) -> list[dict[str, Any]]: limit = min(max(1, limit), 500) clauses: list[str] = [] params: list[Any] = [] if device_id: clauses.append("device_id = ?") params.append(device_id) if since is not None: clauses.append("ts >= ?") params.append(since) if until is not None: clauses.append("ts <= ?") params.append(until) if role: clauses.append("role = ?") params.append(role) where = (" WHERE " + " AND ".join(clauses)) if clauses else "" sql = ( f"SELECT {_HISTORY_COLUMNS} FROM telemetry{where} " f"ORDER BY ts DESC LIMIT ?" ) params.append(limit) with _db() as conn: rows = conn.execute(sql, params).fetchall() return [_row_to_history(r) for r in rows] def start_track(device_id: str, label: Optional[str] = None) -> dict[str, Any]: if not is_valid_device_id(device_id): raise ValueError(f"invalid device_id '{device_id}'") ts = time.time() with _db() as conn: cur = conn.execute( """ INSERT INTO tracks (device_id, started_at, label) VALUES (?, ?, ?) """, (device_id, ts, label), ) track_id = cur.lastrowid return {"ok": True, "track_id": track_id, "device_id": device_id, "started_at": ts} def add_track_points(track_id: int, points: list[dict[str, Any]]) -> dict[str, Any]: if not points: return {"ok": True, "added": 0} with _db() as conn: track = conn.execute( "SELECT id, device_id, ended_at FROM tracks WHERE id = ?", (track_id,), ).fetchone() if not track: raise ValueError(f"track {track_id} not found") if track["ended_at"] is not None: raise ValueError(f"track {track_id} already finished") count = conn.execute( "SELECT COUNT(*) FROM track_points WHERE track_id = ?", (track_id,), ).fetchone()[0] added = 0 for p in points: if count + added >= TRACK_POINTS_LIMIT: break lat = float(p["lat"]) lon = float(p["lon"]) ts = float(p.get("ts") or time.time()) elev = fetch_elevation_m(lat, lon) meta = p.get("meta") if meta is not None and not isinstance(meta, str): meta = json.dumps(meta, ensure_ascii=False) conn.execute( """ INSERT INTO track_points (track_id, ts, lat, lon, altitude_gps, elevation_m, rssi, role, meta) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( track_id, ts, lat, lon, p.get("altitude_gps"), elev, p.get("rssi"), p.get("role"), meta, ), ) added += 1 return {"ok": True, "added": added, "track_id": track_id} def finish_track(track_id: int) -> dict[str, Any]: ts = time.time() with _db() as conn: cur = conn.execute( "UPDATE tracks SET ended_at = ? WHERE id = ? AND ended_at IS NULL", (ts, track_id), ) if cur.rowcount == 0: raise ValueError(f"track {track_id} not found or already finished") count = conn.execute( "SELECT COUNT(*) FROM track_points WHERE track_id = ?", (track_id,), ).fetchone()[0] return {"ok": True, "track_id": track_id, "ended_at": ts, "point_count": count} def list_tracks(device_id: Optional[str] = None, limit: int = 50) -> list[dict[str, Any]]: limit = min(max(1, limit), 200) with _db() as conn: role_sub = """ (SELECT p.role FROM track_points p WHERE p.track_id = t.id AND p.role IS NOT NULL AND p.role != '' ORDER BY p.ts DESC LIMIT 1) """ if device_id: rows = conn.execute( f""" SELECT t.id, t.device_id, t.started_at, t.ended_at, t.label, (SELECT COUNT(*) FROM track_points p WHERE p.track_id = t.id) AS point_count, {role_sub} AS role FROM tracks t WHERE t.device_id = ? ORDER BY t.started_at DESC LIMIT ? """, (device_id, limit), ).fetchall() else: rows = conn.execute( f""" SELECT t.id, t.device_id, t.started_at, t.ended_at, t.label, (SELECT COUNT(*) FROM track_points p WHERE p.track_id = t.id) AS point_count, {role_sub} AS role FROM tracks t ORDER BY t.started_at DESC LIMIT ? """, (limit,), ).fetchall() return [dict(r) for r in rows] def get_track(track_id: int) -> dict[str, Any]: with _db() as conn: track = conn.execute( """ SELECT id, device_id, started_at, ended_at, label FROM tracks WHERE id = ? """, (track_id,), ).fetchone() if not track: raise ValueError(f"track {track_id} not found") points = conn.execute( """ SELECT ts, lat, lon, altitude_gps, elevation_m, rssi, role, meta FROM track_points WHERE track_id = ? ORDER BY ts ASC """, (track_id,), ).fetchall() result = dict(track) result["points"] = [dict(p) for p in points] return result def add_chat(data: ChatIn) -> dict[str, Any]: ts = data.ts if data.ts is not None else time.time() with _db() as conn: cur = conn.execute( "INSERT INTO chat (device_id, text, ts) VALUES (?, ?, ?)", (data.device_id, data.text, ts), ) msg_id = cur.lastrowid return { "id": msg_id, "device_id": data.device_id, "text": data.text, "ts": ts, } def get_chat(since: float = 0.0, limit: int = 200) -> list[dict[str, Any]]: limit = min(max(1, limit), 500) with _db() as conn: rows = conn.execute( """ SELECT id, device_id, text, ts FROM chat WHERE ts > ? ORDER BY ts ASC LIMIT ? """, (since, limit), ).fetchall() return [dict(r) for r in rows]