ais_hub
Central AIS/GPS/radio telemetry aggregation service for embedded Linux.
ais_hub is a Python 3.11 asyncio service. It ingests raw AIS (UDP), GPS
(UART) and radio-telemetry (UDP) streams, reassembles multi-fragment AIS
sentences, maintains a single authoritative in-memory state keyed by MMSI,
writes history to SQLite (WAL + retention), and publishes data to external
processes (BLE server, web UI, SPI bridge) via stable interfaces.
No BLE, web UI or MCU code lives here; those are separate processes that consume the interfaces below.
Interfaces
Полный JSON-контракт с примерами для фронтенда — в docs/API.md. Короткая сводка ниже.
HTTP (REST, GET unless noted)
| Path | Purpose |
|---|---|
/api/v1/health |
Liveness + uptime + version. |
/api/v1/stats |
Counters, gauges, uptime. |
/api/v1/ownship |
Latest GPS fix. |
/api/v1/vessels |
All merged targets. Query: since, limit. |
/api/v1/vessels/{mmsi} |
One merged target. |
/api/v1/base_stations |
AIS base station snapshots. |
/api/v1/atons |
AtoN snapshots. |
/api/v1/slots |
TDMA slot occupancy + per-slot detail from AIS-catcher. |
/api/v1/radio |
Latest RSSI pair (channel A/B) from AIS-catcher. |
/api/v1/logs |
Service logs tail, from in-memory ring. Query: level, limit, since. |
/api/v1/nmea/tail |
Raw NMEA tail (separate from /logs). Query: source=ais|gps|radio, limit. |
/api/v1/history/positions |
AIS dynamic history. Query: mmsi, from, to, limit. |
POST /api/v1/tx |
Inject NMEA sentence(s) into the TX outbox. Body: {"payload": "!AIVDM,..."} or {"nmea": ["...", "..."]}. |
WebSocket
GET /ws— on connect, clients receive a fullstate.snapshotevent (ownship, vessels, base_stations, atons, slots, stats). Then a live stream ofEventenvelopes:ownship.update,target.update,base_station.update,aton.update,radio.update,slots.update,slots.detail,signal.update,stats.update.
- Each client has its own bounded queue; slow consumers drop oldest
events and the
ws_droppedcounter increments.
Local UDP
| Endpoint | Direction | Contract |
|---|---|---|
127.0.0.1:7001 |
ais_hub → external | JSON Event, one event per datagram, dropped if > max_datagram_bytes. |
127.0.0.1:6007 |
ais_hub → SPI bridge | Raw NMEA fan-out, one sentence per datagram (<line>\r\n). Untouched from ingest. |
127.0.0.1:6010 |
ais_hub → SPI bridge | TX publish outbox. One NMEA sentence per datagram. Fed by POST /api/v1/tx and internal emitters. |
ais_hub does not listen on :6010; it is publish-only.
Layout
src/ais_hub/ingest/— AIS UDP / GPS UART / radio UDP listeners, raw tap.src/ais_hub/parser/— NMEA utils, AIS multi-fragment assembler, GPS, radio telemetry parsers.src/ais_hub/core/— domain models, in-memory state, eager per-MMSI merge, stats counters, internal pub/sub bus.src/ais_hub/storage/— aiosqlite DB (WAL), batched writer with retry, retention cleanup, read queries for REST.src/ais_hub/publish/— aiohttp REST + WS server, UDP publishers, bounded queue helpers with drop-oldest policy.src/ais_hub/app.py— orchestrator (supervisors, graceful shutdown).
Install
pip install -e .
Runtime dependencies: aiohttp, pyais, pyserial-asyncio, aiosqlite,
PyYAML.
Run
ais_hub --config config/config.example.yaml
Environment variable overrides
Env vars override any YAML value. Prefix AIS_HUB_, level separator
__, upper-case. Examples:
AIS_HUB_INGEST__AIS_UDP__PORT=4011
AIS_HUB_STORAGE__PATH=/data/ais_hub.db
AIS_HUB_STORAGE__STORE_RAW_NMEA=true
AIS_HUB_LOGGING__LEVEL=DEBUG
Deploy on embedded Linux (systemd)
-
Create user and directories:
useradd -r -s /sbin/nologin ais_hub install -d -o ais_hub -g ais_hub /var/lib/ais_hub /var/log/ais_hub /etc/ais_hub -
Install package (system-wide or in a venv the unit points at):
pip install . -
Copy config:
install -m 0644 config/config.example.yaml /etc/ais_hub/config.yaml -
Install unit and enable:
install -m 0644 systemd/ais_hub.service /etc/systemd/system/ systemctl daemon-reload systemctl enable --now ais_hub journalctl -u ais_hub -f
For UART access, add the ais_hub user to the serial/dialout group or
grant access via a udev rule on /dev/ttyS1.
Resilience notes
- Each subsystem runs under a supervisor task: on exception it is logged and restarted with exponential backoff; the process itself never dies because of a single subsystem failure.
- All inter-subsystem queues are bounded with drop-oldest. Per-channel
drop counters are exposed via
/api/v1/stats:parser_in_dropped,udp_nmea_dropped,storage_in_dropped,tx_outbox_dropped,bus_dropped,ws_dropped.
- SQLite transient errors are retried up to 3 times with exponential backoff; on persistent failure the batch is dropped, the writer keeps draining the queue, and REST history may return an error while the rest of the service continues to serve live data.
- Multi-fragment AIS is keyed by
(source_tag, talker, channel, seq_id, total_fragments). Fragment ordering is strictly validated; any deviation resets the per-key buffer and incrementsais_fragment_errors. Stale buffers are GC'd after 60 s (ais_fragment_timeouts).
Tests
pip install -e .[dev]
pytest