Files
LoraMapTester/server/core/storage.py
T
2026-06-16 11:24:21 +03:00

747 lines
23 KiB
Python

import json
import logging
import sqlite3
import time
from contextlib import contextmanager
from typing import Any, Optional
from .config import DATABASE_PATH, TELEMETRY_LIMIT, TRACK_POINTS_LIMIT
from .devices import is_valid_device_id
from .elevation import fetch_elevation_m
from .models import ChatIn, TelemetryIn
from .schema import SCHEMA_VERSION, apply_migrations, check_db_ok, get_schema_version
WEB_SENDER_ID = "web"
COMMAND_KINDS = frozenset({"at", "mode", "stats_push"})
PAIRED_ONLINE_SEC = 30.0
PAIRED_START_DELAY_SEC = 3.0
# Hide devices on map/UI after this many seconds without telemetry.
DEVICE_VISIBLE_SEC = 180.0
logger = logging.getLogger(__name__)
_HISTORY_COLUMNS = (
"id, device_id, lat, lon, rssi, range_m, meta, role, ts, source"
)
def _connect() -> sqlite3.Connection:
conn = sqlite3.connect(DATABASE_PATH, check_same_thread=False)
conn.row_factory = sqlite3.Row
return conn
@contextmanager
def _db():
conn = _connect()
try:
yield conn
conn.commit()
finally:
conn.close()
def init_db() -> None:
with _db() as conn:
log = apply_migrations(conn)
for line in log:
logger.info("db migrate: %s", line)
cleanup_invalid_devices()
def db_status() -> dict[str, Any]:
with _db() as conn:
return {
"db_ok": check_db_ok(conn),
"schema_version": get_schema_version(conn),
"expected_version": SCHEMA_VERSION,
"database_path": DATABASE_PATH,
}
def cleanup_invalid_devices() -> int:
with _db() as conn:
cur_t = conn.execute(
"DELETE FROM telemetry WHERE device_id NOT GLOB 'android-????????'"
)
cur_d = conn.execute(
"DELETE FROM devices WHERE device_id NOT GLOB 'android-????????'"
)
return cur_t.rowcount + cur_d.rowcount
def _sanitize_coords(
lat: Optional[float], lon: Optional[float]
) -> tuple[Optional[float], Optional[float]]:
if lat is None or lon is None:
return lat, lon
if abs(lat) < 1e-5 and abs(lon) < 1e-5:
return None, None
if not (-90.0 <= lat <= 90.0 and -180.0 <= lon <= 180.0):
return None, None
return lat, lon
def record_telemetry(data: TelemetryIn) -> dict[str, Any]:
if not is_valid_device_id(data.device_id):
raise ValueError(
f"invalid device_id '{data.device_id}', expected android-xxxxxxxx (8 hex)"
)
ts = data.ts if data.ts is not None else time.time()
lat, lon = _sanitize_coords(data.lat, data.lon)
with _db() as conn:
phone_label = (data.device_label or "").strip()
if phone_label:
conn.execute(
"""
INSERT INTO devices (device_id, label, last_seen)
VALUES (?, ?, ?)
ON CONFLICT(device_id) DO UPDATE SET
last_seen = excluded.last_seen,
label = excluded.label
""",
(data.device_id, phone_label, ts),
)
else:
conn.execute(
"""
INSERT INTO devices (device_id, label, last_seen)
VALUES (?, ?, ?)
ON CONFLICT(device_id) DO UPDATE SET last_seen = excluded.last_seen
""",
(data.device_id, data.device_id, ts),
)
conn.execute(
"""
INSERT INTO telemetry
(device_id, lat, lon, rssi, range_m, raw_frame, meta, role, ts, source)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
data.device_id,
lat,
lon,
data.rssi,
data.range_m,
data.raw_frame,
data.meta,
data.role,
ts,
data.source,
),
)
_trim_telemetry(conn, data.device_id)
return {"ok": True, "device_id": data.device_id, "ts": ts}
def _trim_telemetry(conn: sqlite3.Connection, device_id: str) -> None:
conn.execute(
"""
DELETE FROM telemetry
WHERE device_id = ? AND id NOT IN (
SELECT id FROM telemetry
WHERE device_id = ?
ORDER BY ts DESC
LIMIT ?
)
""",
(device_id, device_id, TELEMETRY_LIMIT),
)
def update_device_label(device_id: str, label: str) -> dict[str, Any]:
if not is_valid_device_id(device_id):
raise ValueError(f"invalid device_id '{device_id}'")
clean = (label or "").strip()
if not clean:
raise ValueError("label required")
ts = time.time()
with _db() as conn:
conn.execute(
"""
INSERT INTO devices (device_id, label, last_seen)
VALUES (?, ?, ?)
ON CONFLICT(device_id) DO UPDATE SET label = excluded.label
""",
(device_id, clean, ts),
)
return {"ok": True, "device_id": device_id, "label": clean}
def list_devices() -> list[dict[str, Any]]:
with _db() as conn:
rows = conn.execute(
"""
SELECT d.device_id, d.label, d.last_seen,
t.lat, t.lon, t.rssi, t.range_m, t.raw_frame, t.meta, t.role, t.ts, t.source
FROM devices d
INNER JOIN telemetry t ON t.id = (
SELECT id FROM telemetry
WHERE device_id = d.device_id AND source = 'android'
ORDER BY ts DESC LIMIT 1
)
WHERE d.device_id GLOB 'android-????????'
ORDER BY d.last_seen DESC
"""
).fetchall()
cutoff = time.time() - DEVICE_VISIBLE_SEC
devices = [_row_to_device(r) for r in rows]
return [
d
for d in devices
if not _is_null_island(d) and d.get("last_seen", 0) >= cutoff
]
def _is_null_island(device: dict[str, Any]) -> bool:
lat, lon = device.get("lat"), device.get("lon")
if lat is None or lon is None:
return False
return abs(lat) < 1e-5 and abs(lon) < 1e-5
def _row_to_device(row: sqlite3.Row) -> dict[str, Any]:
return {
"device_id": row["device_id"],
"label": row["label"] if "label" in row.keys() else None,
"last_seen": row["last_seen"],
"lat": row["lat"],
"lon": row["lon"],
"rssi": row["rssi"],
"range_m": row["range_m"],
"raw_frame": row["raw_frame"],
"meta": row["meta"] if "meta" in row.keys() else None,
"role": row["role"] if "role" in row.keys() else None,
"ts": row["ts"],
"source": row["source"] if "source" in row.keys() else None,
}
def _row_to_history(row: sqlite3.Row) -> dict[str, Any]:
return {
"id": row["id"],
"device_id": row["device_id"],
"lat": row["lat"],
"lon": row["lon"],
"rssi": row["rssi"],
"range_m": row["range_m"],
"meta": row["meta"],
"role": row["role"],
"ts": row["ts"],
"source": row["source"],
}
def get_telemetry(
device_id: Optional[str] = None,
limit: int = 100,
since: Optional[float] = None,
until: Optional[float] = None,
role: Optional[str] = None,
) -> list[dict[str, Any]]:
limit = min(max(1, limit), 500)
clauses: list[str] = []
params: list[Any] = []
if device_id:
clauses.append("device_id = ?")
params.append(device_id)
if since is not None:
clauses.append("ts >= ?")
params.append(since)
if until is not None:
clauses.append("ts <= ?")
params.append(until)
if role:
clauses.append("role = ?")
params.append(role)
where = (" WHERE " + " AND ".join(clauses)) if clauses else ""
sql = (
f"SELECT {_HISTORY_COLUMNS} FROM telemetry{where} "
f"ORDER BY ts DESC LIMIT ?"
)
params.append(limit)
with _db() as conn:
rows = conn.execute(sql, params).fetchall()
return [_row_to_history(r) for r in rows]
def start_track(device_id: str, label: Optional[str] = None) -> dict[str, Any]:
if not is_valid_device_id(device_id):
raise ValueError(f"invalid device_id '{device_id}'")
ts = time.time()
with _db() as conn:
cur = conn.execute(
"""
INSERT INTO tracks (device_id, started_at, label)
VALUES (?, ?, ?)
""",
(device_id, ts, label),
)
track_id = cur.lastrowid
return {"ok": True, "track_id": track_id, "device_id": device_id, "started_at": ts}
def add_track_points(track_id: int, points: list[dict[str, Any]]) -> dict[str, Any]:
if not points:
return {"ok": True, "added": 0}
with _db() as conn:
track = conn.execute(
"SELECT id, device_id, ended_at FROM tracks WHERE id = ?",
(track_id,),
).fetchone()
if not track:
raise ValueError(f"track {track_id} not found")
if track["ended_at"] is not None:
raise ValueError(f"track {track_id} already finished")
count = conn.execute(
"SELECT COUNT(*) FROM track_points WHERE track_id = ?",
(track_id,),
).fetchone()[0]
added = 0
for p in points:
if count + added >= TRACK_POINTS_LIMIT:
break
lat = float(p["lat"])
lon = float(p["lon"])
ts = float(p.get("ts") or time.time())
elev = fetch_elevation_m(lat, lon)
meta = p.get("meta")
if meta is not None and not isinstance(meta, str):
meta = json.dumps(meta, ensure_ascii=False)
conn.execute(
"""
INSERT INTO track_points
(track_id, ts, lat, lon, altitude_gps, elevation_m,
rssi, role, meta)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
track_id,
ts,
lat,
lon,
p.get("altitude_gps"),
elev,
p.get("rssi"),
p.get("role"),
meta,
),
)
added += 1
return {"ok": True, "added": added, "track_id": track_id}
def finish_track(track_id: int) -> dict[str, Any]:
ts = time.time()
with _db() as conn:
cur = conn.execute(
"UPDATE tracks SET ended_at = ? WHERE id = ? AND ended_at IS NULL",
(ts, track_id),
)
if cur.rowcount == 0:
raise ValueError(f"track {track_id} not found or already finished")
count = conn.execute(
"SELECT COUNT(*) FROM track_points WHERE track_id = ?",
(track_id,),
).fetchone()[0]
return {"ok": True, "track_id": track_id, "ended_at": ts, "point_count": count}
def list_tracks(device_id: Optional[str] = None, limit: int = 50) -> list[dict[str, Any]]:
limit = min(max(1, limit), 200)
with _db() as conn:
role_sub = """
(SELECT p.role FROM track_points p
WHERE p.track_id = t.id AND p.role IS NOT NULL AND p.role != ''
ORDER BY p.ts DESC LIMIT 1)
"""
track_cols = f"""
SELECT t.id, t.device_id, t.started_at, t.ended_at, t.label,
d.label AS device_label,
(SELECT COUNT(*) FROM track_points p WHERE p.track_id = t.id) AS point_count,
{role_sub} AS role
FROM tracks t
LEFT JOIN devices d ON d.device_id = t.device_id
"""
if device_id:
rows = conn.execute(
f"""
{track_cols}
WHERE t.device_id = ?
ORDER BY t.started_at DESC
LIMIT ?
""",
(device_id, limit),
).fetchall()
else:
rows = conn.execute(
f"""
{track_cols}
ORDER BY t.started_at DESC
LIMIT ?
""",
(limit,),
).fetchall()
return [dict(r) for r in rows]
def get_track(track_id: int) -> dict[str, Any]:
with _db() as conn:
track = conn.execute(
"""
SELECT t.id, t.device_id, t.started_at, t.ended_at, t.label,
d.label AS device_label
FROM tracks t
LEFT JOIN devices d ON d.device_id = t.device_id
WHERE t.id = ?
""",
(track_id,),
).fetchone()
if not track:
raise ValueError(f"track {track_id} not found")
points = conn.execute(
"""
SELECT ts, lat, lon, altitude_gps, elevation_m, rssi, role, meta
FROM track_points
WHERE track_id = ?
ORDER BY ts ASC
""",
(track_id,),
).fetchall()
result = dict(track)
result["points"] = [dict(p) for p in points]
return result
def add_chat(data: ChatIn) -> dict[str, Any]:
ts = data.ts if data.ts is not None else time.time()
with _db() as conn:
cur = conn.execute(
"INSERT INTO chat (device_id, text, ts) VALUES (?, ?, ?)",
(data.device_id, data.text, ts),
)
msg_id = cur.lastrowid
return {
"id": msg_id,
"device_id": data.device_id,
"text": data.text,
"ts": ts,
}
def get_chat(since: float = 0.0, limit: int = 200) -> list[dict[str, Any]]:
limit = min(max(1, limit), 500)
with _db() as conn:
rows = conn.execute(
"""
SELECT c.id, c.device_id, c.text, c.ts, d.label AS device_label
FROM chat c
LEFT JOIN devices d ON d.device_id = c.device_id
WHERE c.ts > ?
ORDER BY c.ts ASC LIMIT ?
""",
(since, limit),
).fetchall()
return [dict(r) for r in rows]
def _is_valid_sender(device_id: str) -> bool:
d = (device_id or "").strip()
return d == WEB_SENDER_ID or is_valid_device_id(d)
def _row_to_command(row: sqlite3.Row) -> dict[str, Any]:
payload = row["payload"]
if payload:
try:
payload = json.loads(payload)
except json.JSONDecodeError:
pass
return {
"id": row["id"],
"from_device_id": row["from_device_id"],
"to_device_id": row["to_device_id"],
"kind": row["kind"],
"payload": payload,
"created_at": row["created_at"],
"delivered_at": row["delivered_at"],
}
def enqueue_command(
from_device_id: str,
to_device_id: str,
kind: str,
payload: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
from_id = (from_device_id or "").strip()
to_id = (to_device_id or "").strip()
kind = (kind or "").strip().lower()
if not _is_valid_sender(from_id):
raise ValueError(f"invalid from_device_id '{from_id}'")
if not is_valid_device_id(to_id):
raise ValueError(f"invalid to_device_id '{to_id}'")
if kind not in COMMAND_KINDS:
raise ValueError(f"invalid kind '{kind}', expected at|mode|stats_push")
if from_id == to_id:
raise ValueError("from and to device must differ")
ts = time.time()
payload_json = json.dumps(payload or {}, ensure_ascii=False)
with _db() as conn:
cur = conn.execute(
"""
INSERT INTO device_commands
(from_device_id, to_device_id, kind, payload, created_at)
VALUES (?, ?, ?, ?, ?)
""",
(from_id, to_id, kind, payload_json, ts),
)
cmd_id = cur.lastrowid
return {
"ok": True,
"id": cmd_id,
"from_device_id": from_id,
"to_device_id": to_id,
"kind": kind,
"created_at": ts,
}
def poll_pending_commands(device_id: str, limit: int = 20) -> list[dict[str, Any]]:
if not is_valid_device_id(device_id):
raise ValueError(f"invalid device_id '{device_id}'")
limit = min(max(1, limit), 50)
now = time.time()
with _db() as conn:
rows = conn.execute(
"""
SELECT id, from_device_id, to_device_id, kind, payload, created_at, delivered_at
FROM device_commands
WHERE to_device_id = ? AND delivered_at IS NULL
ORDER BY created_at ASC
LIMIT ?
""",
(device_id, limit),
).fetchall()
ids = [r["id"] for r in rows]
if ids:
placeholders = ",".join("?" * len(ids))
conn.execute(
f"UPDATE device_commands SET delivered_at = ? WHERE id IN ({placeholders})",
[now, *ids],
)
return [_row_to_command(r) for r in rows]
def list_commands(
to_device_id: Optional[str] = None, limit: int = 50
) -> list[dict[str, Any]]:
limit = min(max(1, limit), 200)
with _db() as conn:
if to_device_id:
rows = conn.execute(
"""
SELECT id, from_device_id, to_device_id, kind, payload, created_at, delivered_at
FROM device_commands
WHERE to_device_id = ?
ORDER BY created_at DESC
LIMIT ?
""",
(to_device_id, limit),
).fetchall()
else:
rows = conn.execute(
"""
SELECT id, from_device_id, to_device_id, kind, payload, created_at, delivered_at
FROM device_commands
ORDER BY created_at DESC
LIMIT ?
""",
(limit,),
).fetchall()
return [_row_to_command(r) for r in rows]
def _online_android_devices(within_sec: float = PAIRED_ONLINE_SEC) -> list[str]:
cutoff = time.time() - within_sec
devices = list_devices()
return [
d["device_id"]
for d in devices
if d.get("last_seen", 0) >= cutoff
]
def _row_to_paired_session(row: sqlite3.Row) -> dict[str, Any]:
return {
"id": row["id"],
"device_a": row["device_a"],
"device_b": row["device_b"],
"initiator": row["initiator"],
"status": row["status"],
"start_at": row["start_at"],
"track_id_a": row["track_id_a"],
"track_id_b": row["track_id_b"],
"created_at": row["created_at"],
}
def _cancel_active_paired_sessions(conn: sqlite3.Connection) -> None:
conn.execute(
"""
UPDATE paired_track_sessions
SET status = 'cancelled'
WHERE status IN ('armed', 'recording')
"""
)
def start_paired_track(
device_ids: Optional[list[str]] = None,
initiator: str = WEB_SENDER_ID,
) -> dict[str, Any]:
initiator = (initiator or WEB_SENDER_ID).strip()
if not _is_valid_sender(initiator):
raise ValueError(f"invalid initiator '{initiator}'")
if device_ids and len(device_ids) == 2:
a, b = [str(x).strip() for x in device_ids]
if not is_valid_device_id(a) or not is_valid_device_id(b):
raise ValueError("device_ids must be two valid android-* ids")
if a == b:
raise ValueError("device_ids must differ")
else:
online = _online_android_devices()
if len(online) != 2:
raise ValueError(
f"expected exactly 2 online devices, found {len(online)}"
)
a, b = sorted(online)
now = time.time()
start_at = now + PAIRED_START_DELAY_SEC
with _db() as conn:
_cancel_active_paired_sessions(conn)
cur = conn.execute(
"""
INSERT INTO paired_track_sessions
(device_a, device_b, initiator, status, start_at, created_at)
VALUES (?, ?, ?, 'armed', ?, ?)
""",
(a, b, initiator, start_at, now),
)
session_id = cur.lastrowid
return {
"ok": True,
"session": get_paired_track_session(session_id),
}
def get_active_paired_track() -> Optional[dict[str, Any]]:
with _db() as conn:
row = conn.execute(
"""
SELECT id, device_a, device_b, initiator, status, start_at,
track_id_a, track_id_b, created_at
FROM paired_track_sessions
WHERE status IN ('armed', 'recording')
ORDER BY id DESC
LIMIT 1
"""
).fetchone()
if not row:
return None
session = _row_to_paired_session(row)
now = time.time()
session["server_time"] = now
session["ready"] = session["status"] == "armed" and now >= session["start_at"]
return session
def get_paired_track_session(session_id: int) -> dict[str, Any]:
with _db() as conn:
row = conn.execute(
"""
SELECT id, device_a, device_b, initiator, status, start_at,
track_id_a, track_id_b, created_at
FROM paired_track_sessions WHERE id = ?
""",
(session_id,),
).fetchone()
if not row:
raise ValueError(f"session {session_id} not found")
session = _row_to_paired_session(row)
now = time.time()
session["server_time"] = now
session["ready"] = session["status"] == "armed" and now >= session["start_at"]
return session
def ack_paired_track(
session_id: int, device_id: str, track_id: int
) -> dict[str, Any]:
if not is_valid_device_id(device_id):
raise ValueError(f"invalid device_id '{device_id}'")
with _db() as conn:
row = conn.execute(
"""
SELECT id, device_a, device_b, status, track_id_a, track_id_b
FROM paired_track_sessions WHERE id = ?
""",
(session_id,),
).fetchone()
if not row:
raise ValueError(f"session {session_id} not found")
if row["status"] not in ("armed", "recording"):
raise ValueError(f"session {session_id} not active")
col = None
if device_id == row["device_a"]:
col = "track_id_a"
elif device_id == row["device_b"]:
col = "track_id_b"
else:
raise ValueError(f"device {device_id} not in session")
conn.execute(
f"UPDATE paired_track_sessions SET {col} = ? WHERE id = ?",
(track_id, session_id),
)
updated = conn.execute(
"""
SELECT track_id_a, track_id_b, status FROM paired_track_sessions
WHERE id = ?
""",
(session_id,),
).fetchone()
if updated["track_id_a"] and updated["track_id_b"]:
conn.execute(
"UPDATE paired_track_sessions SET status = 'recording' WHERE id = ?",
(session_id,),
)
return {"ok": True, "session": get_paired_track_session(session_id)}
def cancel_paired_track(session_id: Optional[int] = None) -> dict[str, Any]:
with _db() as conn:
if session_id is not None:
cur = conn.execute(
"""
UPDATE paired_track_sessions
SET status = 'cancelled'
WHERE id = ? AND status IN ('armed', 'recording')
""",
(session_id,),
)
if cur.rowcount == 0:
raise ValueError(f"session {session_id} not found or not active")
else:
_cancel_active_paired_sessions(conn)
return {"ok": True, "active": get_active_paired_track()}