import json import logging import sqlite3 import time from contextlib import contextmanager from typing import Any, Optional from .config import DATABASE_PATH, TELEMETRY_LIMIT, TRACK_POINTS_LIMIT from .devices import is_valid_device_id from .elevation import fetch_elevation_m from .models import ChatIn, TelemetryIn from .schema import SCHEMA_VERSION, apply_migrations, check_db_ok, get_schema_version WEB_SENDER_ID = "web" COMMAND_KINDS = frozenset({"at", "mode", "stats_push"}) PAIRED_ONLINE_SEC = 30.0 PAIRED_START_DELAY_SEC = 3.0 # Hide devices on map/UI after this many seconds without telemetry. DEVICE_VISIBLE_SEC = 180.0 logger = logging.getLogger(__name__) _HISTORY_COLUMNS = ( "id, device_id, lat, lon, rssi, range_m, meta, role, ts, source" ) def _connect() -> sqlite3.Connection: conn = sqlite3.connect(DATABASE_PATH, check_same_thread=False) conn.row_factory = sqlite3.Row return conn @contextmanager def _db(): conn = _connect() try: yield conn conn.commit() finally: conn.close() def init_db() -> None: with _db() as conn: log = apply_migrations(conn) for line in log: logger.info("db migrate: %s", line) cleanup_invalid_devices() def db_status() -> dict[str, Any]: with _db() as conn: return { "db_ok": check_db_ok(conn), "schema_version": get_schema_version(conn), "expected_version": SCHEMA_VERSION, "database_path": DATABASE_PATH, } def cleanup_invalid_devices() -> int: with _db() as conn: cur_t = conn.execute( "DELETE FROM telemetry WHERE device_id NOT GLOB 'android-????????'" ) cur_d = conn.execute( "DELETE FROM devices WHERE device_id NOT GLOB 'android-????????'" ) return cur_t.rowcount + cur_d.rowcount def _sanitize_coords( lat: Optional[float], lon: Optional[float] ) -> tuple[Optional[float], Optional[float]]: if lat is None or lon is None: return lat, lon if abs(lat) < 1e-5 and abs(lon) < 1e-5: return None, None if not (-90.0 <= lat <= 90.0 and -180.0 <= lon <= 180.0): return None, None return lat, lon def record_telemetry(data: TelemetryIn) -> dict[str, Any]: if not is_valid_device_id(data.device_id): raise ValueError( f"invalid device_id '{data.device_id}', expected android-xxxxxxxx (8 hex)" ) ts = data.ts if data.ts is not None else time.time() lat, lon = _sanitize_coords(data.lat, data.lon) with _db() as conn: phone_label = (data.device_label or "").strip() if phone_label: conn.execute( """ INSERT INTO devices (device_id, label, last_seen) VALUES (?, ?, ?) ON CONFLICT(device_id) DO UPDATE SET last_seen = excluded.last_seen, label = excluded.label """, (data.device_id, phone_label, ts), ) else: conn.execute( """ INSERT INTO devices (device_id, label, last_seen) VALUES (?, ?, ?) ON CONFLICT(device_id) DO UPDATE SET last_seen = excluded.last_seen """, (data.device_id, data.device_id, ts), ) conn.execute( """ INSERT INTO telemetry (device_id, lat, lon, rssi, range_m, raw_frame, meta, role, ts, source) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( data.device_id, lat, lon, data.rssi, data.range_m, data.raw_frame, data.meta, data.role, ts, data.source, ), ) _trim_telemetry(conn, data.device_id) return {"ok": True, "device_id": data.device_id, "ts": ts} def _trim_telemetry(conn: sqlite3.Connection, device_id: str) -> None: conn.execute( """ DELETE FROM telemetry WHERE device_id = ? AND id NOT IN ( SELECT id FROM telemetry WHERE device_id = ? ORDER BY ts DESC LIMIT ? ) """, (device_id, device_id, TELEMETRY_LIMIT), ) def list_devices() -> list[dict[str, Any]]: with _db() as conn: rows = conn.execute( """ SELECT d.device_id, d.label, d.last_seen, t.lat, t.lon, t.rssi, t.range_m, t.raw_frame, t.meta, t.role, t.ts, t.source FROM devices d INNER JOIN telemetry t ON t.id = ( SELECT id FROM telemetry WHERE device_id = d.device_id AND source = 'android' ORDER BY ts DESC LIMIT 1 ) WHERE d.device_id GLOB 'android-????????' ORDER BY d.last_seen DESC """ ).fetchall() cutoff = time.time() - DEVICE_VISIBLE_SEC devices = [_row_to_device(r) for r in rows] return [ d for d in devices if not _is_null_island(d) and d.get("last_seen", 0) >= cutoff ] def _is_null_island(device: dict[str, Any]) -> bool: lat, lon = device.get("lat"), device.get("lon") if lat is None or lon is None: return False return abs(lat) < 1e-5 and abs(lon) < 1e-5 def _row_to_device(row: sqlite3.Row) -> dict[str, Any]: return { "device_id": row["device_id"], "label": row["label"] if "label" in row.keys() else None, "last_seen": row["last_seen"], "lat": row["lat"], "lon": row["lon"], "rssi": row["rssi"], "range_m": row["range_m"], "raw_frame": row["raw_frame"], "meta": row["meta"] if "meta" in row.keys() else None, "role": row["role"] if "role" in row.keys() else None, "ts": row["ts"], "source": row["source"] if "source" in row.keys() else None, } def _row_to_history(row: sqlite3.Row) -> dict[str, Any]: return { "id": row["id"], "device_id": row["device_id"], "lat": row["lat"], "lon": row["lon"], "rssi": row["rssi"], "range_m": row["range_m"], "meta": row["meta"], "role": row["role"], "ts": row["ts"], "source": row["source"], } def get_telemetry( device_id: Optional[str] = None, limit: int = 100, since: Optional[float] = None, until: Optional[float] = None, role: Optional[str] = None, ) -> list[dict[str, Any]]: limit = min(max(1, limit), 500) clauses: list[str] = [] params: list[Any] = [] if device_id: clauses.append("device_id = ?") params.append(device_id) if since is not None: clauses.append("ts >= ?") params.append(since) if until is not None: clauses.append("ts <= ?") params.append(until) if role: clauses.append("role = ?") params.append(role) where = (" WHERE " + " AND ".join(clauses)) if clauses else "" sql = ( f"SELECT {_HISTORY_COLUMNS} FROM telemetry{where} " f"ORDER BY ts DESC LIMIT ?" ) params.append(limit) with _db() as conn: rows = conn.execute(sql, params).fetchall() return [_row_to_history(r) for r in rows] def start_track(device_id: str, label: Optional[str] = None) -> dict[str, Any]: if not is_valid_device_id(device_id): raise ValueError(f"invalid device_id '{device_id}'") ts = time.time() with _db() as conn: cur = conn.execute( """ INSERT INTO tracks (device_id, started_at, label) VALUES (?, ?, ?) """, (device_id, ts, label), ) track_id = cur.lastrowid return {"ok": True, "track_id": track_id, "device_id": device_id, "started_at": ts} def add_track_points(track_id: int, points: list[dict[str, Any]]) -> dict[str, Any]: if not points: return {"ok": True, "added": 0} with _db() as conn: track = conn.execute( "SELECT id, device_id, ended_at FROM tracks WHERE id = ?", (track_id,), ).fetchone() if not track: raise ValueError(f"track {track_id} not found") if track["ended_at"] is not None: raise ValueError(f"track {track_id} already finished") count = conn.execute( "SELECT COUNT(*) FROM track_points WHERE track_id = ?", (track_id,), ).fetchone()[0] added = 0 for p in points: if count + added >= TRACK_POINTS_LIMIT: break lat = float(p["lat"]) lon = float(p["lon"]) ts = float(p.get("ts") or time.time()) elev = fetch_elevation_m(lat, lon) meta = p.get("meta") if meta is not None and not isinstance(meta, str): meta = json.dumps(meta, ensure_ascii=False) conn.execute( """ INSERT INTO track_points (track_id, ts, lat, lon, altitude_gps, elevation_m, rssi, role, meta) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( track_id, ts, lat, lon, p.get("altitude_gps"), elev, p.get("rssi"), p.get("role"), meta, ), ) added += 1 return {"ok": True, "added": added, "track_id": track_id} def finish_track(track_id: int) -> dict[str, Any]: ts = time.time() with _db() as conn: cur = conn.execute( "UPDATE tracks SET ended_at = ? WHERE id = ? AND ended_at IS NULL", (ts, track_id), ) if cur.rowcount == 0: raise ValueError(f"track {track_id} not found or already finished") count = conn.execute( "SELECT COUNT(*) FROM track_points WHERE track_id = ?", (track_id,), ).fetchone()[0] return {"ok": True, "track_id": track_id, "ended_at": ts, "point_count": count} def list_tracks(device_id: Optional[str] = None, limit: int = 50) -> list[dict[str, Any]]: limit = min(max(1, limit), 200) with _db() as conn: role_sub = """ (SELECT p.role FROM track_points p WHERE p.track_id = t.id AND p.role IS NOT NULL AND p.role != '' ORDER BY p.ts DESC LIMIT 1) """ track_cols = f""" SELECT t.id, t.device_id, t.started_at, t.ended_at, t.label, d.label AS device_label, (SELECT COUNT(*) FROM track_points p WHERE p.track_id = t.id) AS point_count, {role_sub} AS role FROM tracks t LEFT JOIN devices d ON d.device_id = t.device_id """ if device_id: rows = conn.execute( f""" {track_cols} WHERE t.device_id = ? ORDER BY t.started_at DESC LIMIT ? """, (device_id, limit), ).fetchall() else: rows = conn.execute( f""" {track_cols} ORDER BY t.started_at DESC LIMIT ? """, (limit,), ).fetchall() return [dict(r) for r in rows] def get_track(track_id: int) -> dict[str, Any]: with _db() as conn: track = conn.execute( """ SELECT t.id, t.device_id, t.started_at, t.ended_at, t.label, d.label AS device_label FROM tracks t LEFT JOIN devices d ON d.device_id = t.device_id WHERE t.id = ? """, (track_id,), ).fetchone() if not track: raise ValueError(f"track {track_id} not found") points = conn.execute( """ SELECT ts, lat, lon, altitude_gps, elevation_m, rssi, role, meta FROM track_points WHERE track_id = ? ORDER BY ts ASC """, (track_id,), ).fetchall() result = dict(track) result["points"] = [dict(p) for p in points] return result def add_chat(data: ChatIn) -> dict[str, Any]: ts = data.ts if data.ts is not None else time.time() with _db() as conn: cur = conn.execute( "INSERT INTO chat (device_id, text, ts) VALUES (?, ?, ?)", (data.device_id, data.text, ts), ) msg_id = cur.lastrowid return { "id": msg_id, "device_id": data.device_id, "text": data.text, "ts": ts, } def get_chat(since: float = 0.0, limit: int = 200) -> list[dict[str, Any]]: limit = min(max(1, limit), 500) with _db() as conn: rows = conn.execute( """ SELECT c.id, c.device_id, c.text, c.ts, d.label AS device_label FROM chat c LEFT JOIN devices d ON d.device_id = c.device_id WHERE c.ts > ? ORDER BY c.ts ASC LIMIT ? """, (since, limit), ).fetchall() return [dict(r) for r in rows] def _is_valid_sender(device_id: str) -> bool: d = (device_id or "").strip() return d == WEB_SENDER_ID or is_valid_device_id(d) def _row_to_command(row: sqlite3.Row) -> dict[str, Any]: payload = row["payload"] if payload: try: payload = json.loads(payload) except json.JSONDecodeError: pass return { "id": row["id"], "from_device_id": row["from_device_id"], "to_device_id": row["to_device_id"], "kind": row["kind"], "payload": payload, "created_at": row["created_at"], "delivered_at": row["delivered_at"], } def enqueue_command( from_device_id: str, to_device_id: str, kind: str, payload: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: from_id = (from_device_id or "").strip() to_id = (to_device_id or "").strip() kind = (kind or "").strip().lower() if not _is_valid_sender(from_id): raise ValueError(f"invalid from_device_id '{from_id}'") if not is_valid_device_id(to_id): raise ValueError(f"invalid to_device_id '{to_id}'") if kind not in COMMAND_KINDS: raise ValueError(f"invalid kind '{kind}', expected at|mode|stats_push") if from_id == to_id: raise ValueError("from and to device must differ") ts = time.time() payload_json = json.dumps(payload or {}, ensure_ascii=False) with _db() as conn: cur = conn.execute( """ INSERT INTO device_commands (from_device_id, to_device_id, kind, payload, created_at) VALUES (?, ?, ?, ?, ?) """, (from_id, to_id, kind, payload_json, ts), ) cmd_id = cur.lastrowid return { "ok": True, "id": cmd_id, "from_device_id": from_id, "to_device_id": to_id, "kind": kind, "created_at": ts, } def poll_pending_commands(device_id: str, limit: int = 20) -> list[dict[str, Any]]: if not is_valid_device_id(device_id): raise ValueError(f"invalid device_id '{device_id}'") limit = min(max(1, limit), 50) now = time.time() with _db() as conn: rows = conn.execute( """ SELECT id, from_device_id, to_device_id, kind, payload, created_at, delivered_at FROM device_commands WHERE to_device_id = ? AND delivered_at IS NULL ORDER BY created_at ASC LIMIT ? """, (device_id, limit), ).fetchall() ids = [r["id"] for r in rows] if ids: placeholders = ",".join("?" * len(ids)) conn.execute( f"UPDATE device_commands SET delivered_at = ? WHERE id IN ({placeholders})", [now, *ids], ) return [_row_to_command(r) for r in rows] def list_commands( to_device_id: Optional[str] = None, limit: int = 50 ) -> list[dict[str, Any]]: limit = min(max(1, limit), 200) with _db() as conn: if to_device_id: rows = conn.execute( """ SELECT id, from_device_id, to_device_id, kind, payload, created_at, delivered_at FROM device_commands WHERE to_device_id = ? ORDER BY created_at DESC LIMIT ? """, (to_device_id, limit), ).fetchall() else: rows = conn.execute( """ SELECT id, from_device_id, to_device_id, kind, payload, created_at, delivered_at FROM device_commands ORDER BY created_at DESC LIMIT ? """, (limit,), ).fetchall() return [_row_to_command(r) for r in rows] def _online_android_devices(within_sec: float = PAIRED_ONLINE_SEC) -> list[str]: cutoff = time.time() - within_sec devices = list_devices() return [ d["device_id"] for d in devices if d.get("last_seen", 0) >= cutoff ] def _row_to_paired_session(row: sqlite3.Row) -> dict[str, Any]: return { "id": row["id"], "device_a": row["device_a"], "device_b": row["device_b"], "initiator": row["initiator"], "status": row["status"], "start_at": row["start_at"], "track_id_a": row["track_id_a"], "track_id_b": row["track_id_b"], "created_at": row["created_at"], } def _cancel_active_paired_sessions(conn: sqlite3.Connection) -> None: conn.execute( """ UPDATE paired_track_sessions SET status = 'cancelled' WHERE status IN ('armed', 'recording') """ ) def start_paired_track( device_ids: Optional[list[str]] = None, initiator: str = WEB_SENDER_ID, ) -> dict[str, Any]: initiator = (initiator or WEB_SENDER_ID).strip() if not _is_valid_sender(initiator): raise ValueError(f"invalid initiator '{initiator}'") if device_ids and len(device_ids) == 2: a, b = [str(x).strip() for x in device_ids] if not is_valid_device_id(a) or not is_valid_device_id(b): raise ValueError("device_ids must be two valid android-* ids") if a == b: raise ValueError("device_ids must differ") else: online = _online_android_devices() if len(online) != 2: raise ValueError( f"expected exactly 2 online devices, found {len(online)}" ) a, b = sorted(online) now = time.time() start_at = now + PAIRED_START_DELAY_SEC with _db() as conn: _cancel_active_paired_sessions(conn) cur = conn.execute( """ INSERT INTO paired_track_sessions (device_a, device_b, initiator, status, start_at, created_at) VALUES (?, ?, ?, 'armed', ?, ?) """, (a, b, initiator, start_at, now), ) session_id = cur.lastrowid return { "ok": True, "session": get_paired_track_session(session_id), } def get_active_paired_track() -> Optional[dict[str, Any]]: with _db() as conn: row = conn.execute( """ SELECT id, device_a, device_b, initiator, status, start_at, track_id_a, track_id_b, created_at FROM paired_track_sessions WHERE status IN ('armed', 'recording') ORDER BY id DESC LIMIT 1 """ ).fetchone() if not row: return None session = _row_to_paired_session(row) now = time.time() session["server_time"] = now session["ready"] = session["status"] == "armed" and now >= session["start_at"] return session def get_paired_track_session(session_id: int) -> dict[str, Any]: with _db() as conn: row = conn.execute( """ SELECT id, device_a, device_b, initiator, status, start_at, track_id_a, track_id_b, created_at FROM paired_track_sessions WHERE id = ? """, (session_id,), ).fetchone() if not row: raise ValueError(f"session {session_id} not found") session = _row_to_paired_session(row) now = time.time() session["server_time"] = now session["ready"] = session["status"] == "armed" and now >= session["start_at"] return session def ack_paired_track( session_id: int, device_id: str, track_id: int ) -> dict[str, Any]: if not is_valid_device_id(device_id): raise ValueError(f"invalid device_id '{device_id}'") with _db() as conn: row = conn.execute( """ SELECT id, device_a, device_b, status, track_id_a, track_id_b FROM paired_track_sessions WHERE id = ? """, (session_id,), ).fetchone() if not row: raise ValueError(f"session {session_id} not found") if row["status"] not in ("armed", "recording"): raise ValueError(f"session {session_id} not active") col = None if device_id == row["device_a"]: col = "track_id_a" elif device_id == row["device_b"]: col = "track_id_b" else: raise ValueError(f"device {device_id} not in session") conn.execute( f"UPDATE paired_track_sessions SET {col} = ? WHERE id = ?", (track_id, session_id), ) updated = conn.execute( """ SELECT track_id_a, track_id_b, status FROM paired_track_sessions WHERE id = ? """, (session_id,), ).fetchone() if updated["track_id_a"] and updated["track_id_b"]: conn.execute( "UPDATE paired_track_sessions SET status = 'recording' WHERE id = ?", (session_id,), ) return {"ok": True, "session": get_paired_track_session(session_id)} def cancel_paired_track(session_id: Optional[int] = None) -> dict[str, Any]: with _db() as conn: if session_id is not None: cur = conn.execute( """ UPDATE paired_track_sessions SET status = 'cancelled' WHERE id = ? AND status IN ('armed', 'recording') """, (session_id,), ) if cur.rowcount == 0: raise ValueError(f"session {session_id} not found or not active") else: _cancel_active_paired_sessions(conn) return {"ok": True, "active": get_active_paired_track()}