"""SQLite schema creation and migrations.""" from __future__ import annotations import sqlite3 SCHEMA_VERSION = 4 def table_exists(conn: sqlite3.Connection, name: str) -> bool: row = conn.execute( "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?", (name,), ).fetchone() return row is not None def column_exists(conn: sqlite3.Connection, table: str, column: str) -> bool: if not table_exists(conn, table): return False cols = conn.execute(f"PRAGMA table_info({table})").fetchall() return any(c[1] == column for c in cols) def ensure_column( conn: sqlite3.Connection, table: str, column: str, ddl: str, log: list[str] ) -> None: if not column_exists(conn, table, column): conn.execute(f"ALTER TABLE {table} ADD COLUMN {column} {ddl}") log.append(f"ALTER {table} ADD {column}") def get_schema_version(conn: sqlite3.Connection) -> int: if not table_exists(conn, "schema_version"): return 0 row = conn.execute("SELECT version FROM schema_version LIMIT 1").fetchone() return int(row[0]) if row else 0 def set_schema_version(conn: sqlite3.Connection, version: int) -> None: conn.execute( """ CREATE TABLE IF NOT EXISTS schema_version (version INTEGER NOT NULL); """ ) conn.execute("DELETE FROM schema_version") conn.execute("INSERT INTO schema_version (version) VALUES (?)", (version,)) def apply_migrations(conn: sqlite3.Connection) -> list[str]: log: list[str] = [] conn.executescript( """ CREATE TABLE IF NOT EXISTS devices ( device_id TEXT PRIMARY KEY, label TEXT, last_seen REAL NOT NULL ); CREATE TABLE IF NOT EXISTS telemetry ( id INTEGER PRIMARY KEY AUTOINCREMENT, device_id TEXT NOT NULL, lat REAL, lon REAL, rssi REAL, range_m REAL, raw_frame TEXT, ts REAL NOT NULL ); CREATE INDEX IF NOT EXISTS idx_telemetry_device_ts ON telemetry(device_id, ts DESC); CREATE TABLE IF NOT EXISTS chat ( id INTEGER PRIMARY KEY AUTOINCREMENT, device_id TEXT NOT NULL, text TEXT NOT NULL, ts REAL NOT NULL ); CREATE INDEX IF NOT EXISTS idx_chat_ts ON chat(ts); """ ) log.append("ensure base tables") ensure_column(conn, "telemetry", "source", "TEXT", log) ensure_column(conn, "telemetry", "meta", "TEXT", log) ensure_column(conn, "telemetry", "role", "TEXT", log) if not table_exists(conn, "tracks"): conn.executescript( """ CREATE TABLE tracks ( id INTEGER PRIMARY KEY AUTOINCREMENT, device_id TEXT NOT NULL, started_at REAL NOT NULL, ended_at REAL, label TEXT ); CREATE INDEX IF NOT EXISTS idx_tracks_device ON tracks(device_id, started_at DESC); """ ) log.append("CREATE tracks") if not table_exists(conn, "track_points"): conn.executescript( """ CREATE TABLE track_points ( id INTEGER PRIMARY KEY AUTOINCREMENT, track_id INTEGER NOT NULL, ts REAL NOT NULL, lat REAL NOT NULL, lon REAL NOT NULL, altitude_gps REAL, elevation_m REAL, rssi REAL, role TEXT, meta TEXT, FOREIGN KEY (track_id) REFERENCES tracks(id) ); CREATE INDEX IF NOT EXISTS idx_track_points_track_ts ON track_points(track_id, ts); """ ) log.append("CREATE track_points") if not table_exists(conn, "device_commands"): conn.executescript( """ CREATE TABLE device_commands ( id INTEGER PRIMARY KEY AUTOINCREMENT, from_device_id TEXT NOT NULL, to_device_id TEXT NOT NULL, kind TEXT NOT NULL, payload TEXT, created_at REAL NOT NULL, delivered_at REAL ); CREATE INDEX IF NOT EXISTS idx_commands_to_pending ON device_commands(to_device_id, delivered_at, created_at); """ ) log.append("CREATE device_commands") if not table_exists(conn, "paired_track_sessions"): conn.executescript( """ CREATE TABLE paired_track_sessions ( id INTEGER PRIMARY KEY AUTOINCREMENT, device_a TEXT NOT NULL, device_b TEXT NOT NULL, initiator TEXT NOT NULL, status TEXT NOT NULL, start_at REAL NOT NULL, track_id_a INTEGER, track_id_b INTEGER, created_at REAL NOT NULL ); CREATE INDEX IF NOT EXISTS idx_paired_status ON paired_track_sessions(status, created_at DESC); """ ) log.append("CREATE paired_track_sessions") set_schema_version(conn, SCHEMA_VERSION) log.append(f"schema_version={SCHEMA_VERSION}") return log def check_db_ok(conn: sqlite3.Connection) -> bool: required = [ ("devices", None), ("telemetry", "meta"), ("tracks", None), ("track_points", "elevation_m"), ("device_commands", None), ("paired_track_sessions", None), ] for table, col in required: if not table_exists(conn, table): return False if col and not column_exists(conn, table, col): return False return True