generated from Grigo/AndroidTemplate
1539 lines
61 KiB
Python
1539 lines
61 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import dbus
|
||
import dbus.exceptions
|
||
import dbus.mainloop.glib
|
||
import dbus.service
|
||
import json
|
||
import os
|
||
import queue
|
||
import random
|
||
import threading
|
||
import struct
|
||
import sys
|
||
import time
|
||
from collections import OrderedDict
|
||
|
||
try:
|
||
import msgpack # type: ignore
|
||
_HAS_MSGPACK = True
|
||
except Exception:
|
||
msgpack = None # type: ignore
|
||
_HAS_MSGPACK = False
|
||
from datetime import datetime
|
||
from gi.repository import GLib
|
||
from typing import Any, Optional
|
||
|
||
BLUEZ_SERVICE_NAME = 'org.bluez'
|
||
GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
|
||
LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
|
||
LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1'
|
||
GATT_SERVICE_IFACE = 'org.bluez.GattService1'
|
||
GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
|
||
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
|
||
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
|
||
|
||
MAIN_LOOP = None
|
||
|
||
# ============ Логирование ============
|
||
|
||
def log(level, message):
|
||
"""Логирование с временной меткой"""
|
||
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
|
||
print(f'[{timestamp}] [{level}] {message}', flush=True)
|
||
|
||
def log_info(message):
|
||
log('INFO', message)
|
||
|
||
def log_warn(message):
|
||
log('WARN', message)
|
||
|
||
def log_error(message):
|
||
log('ERROR', message)
|
||
|
||
def log_debug(message):
|
||
log('DEBUG', message)
|
||
|
||
def _hex_preview(b: bytes, max_len: int = 32) -> str:
|
||
if len(b) <= max_len:
|
||
return b.hex()
|
||
return b[:max_len].hex() + f"...(+{len(b)-max_len}B)"
|
||
|
||
# ============ AIS Hub BLE protocol v2 UUIDs ============
|
||
|
||
# NOTE: UUIDs are placeholders; adjust if you already have assigned ones.
|
||
AIS_HUB_SERVICE_UUID = '34b5f2a0-5b23-4c5a-9b2a-3c4c1a9c0001'
|
||
AIS_HUB_CONTROL_UUID = '34b5f2a0-5b23-4c5a-9b2a-3c4c1a9c0002'
|
||
AIS_HUB_DATA_UUID = '34b5f2a0-5b23-4c5a-9b2a-3c4c1a9c0003'
|
||
AIS_HUB_STATUS_UUID = '34b5f2a0-5b23-4c5a-9b2a-3c4c1a9c0004'
|
||
|
||
# ============ Protocol constants ============
|
||
|
||
# Version byte semantics:
|
||
# 0x01 = protocol v1, payload = JSON (UTF-8)
|
||
# 0x02 = protocol v1, payload = MessagePack (opt-in; requires client support)
|
||
PROTO_VERSION_JSON = 0x01
|
||
PROTO_VERSION_MSGPACK = 0x02
|
||
PROTO_VERSION = PROTO_VERSION_JSON # kept for backward compatibility with existing code paths
|
||
|
||
MSG_HELLO_ACK = 0x01
|
||
MSG_SNAPSHOT_BEGIN = 0x02
|
||
MSG_SNAPSHOT_CHUNK = 0x03
|
||
MSG_SNAPSHOT_END = 0x04
|
||
MSG_EVENT = 0x05
|
||
MSG_STATUS = 0x06
|
||
MSG_ERROR = 0x07
|
||
MSG_PONG = 0x08
|
||
|
||
DEFAULT_MAX_PAYLOAD_BYTES = 120
|
||
DEFAULT_TX_QUEUE_MAX = 256
|
||
|
||
# Pacing for broadcast (CCCD / PropertiesChanged) path.
|
||
# BlueZ serializes PropertiesChanged over D-Bus -> hci0 and coalesces or drops
|
||
# notifications if we flood it. 6-8 ms between frames keeps Android happy
|
||
# without hurting snapshot throughput noticeably (~125-160 frames/sec).
|
||
BROADCAST_FRAME_GAP_SEC = float(os.environ.get("AIS_BLE_BROADCAST_GAP_SEC", "0.008"))
|
||
BROADCAST_QUEUE_MAX = int(os.environ.get("AIS_BLE_BROADCAST_QUEUE_MAX", "4096"))
|
||
|
||
# Broadcast payload size (bytes inside a single BLE notification payload).
|
||
# Android negotiates ATT MTU up to 247 -> payload up to ~244B. Default 180 is
|
||
# safe for most Android devices and cuts frame count ~1.5x vs 120. If all your
|
||
# clients negotiate high MTU, bump to 200-220 via env.
|
||
BROADCAST_MAX_PAYLOAD_BYTES = int(os.environ.get("AIS_BLE_BROADCAST_PAYLOAD", "180"))
|
||
|
||
# Encoding for broadcast path. Must match what Android client can decode.
|
||
# "json" - safe default, works with legacy clients (PROTO_VERSION=0x01)
|
||
# "msgpack" - ~30-40% smaller; requires client to recognize PROTO_VERSION=0x02
|
||
# and have a MessagePack decoder (org.msgpack:msgpack-core on Android).
|
||
BROADCAST_ENCODING = os.environ.get("AIS_BLE_BROADCAST_ENCODING", "json").lower()
|
||
if BROADCAST_ENCODING == "msgpack" and not _HAS_MSGPACK:
|
||
# Fail loud at import; better than silently sending JSON while client expects msgpack.
|
||
raise RuntimeError(
|
||
"AIS_BLE_BROADCAST_ENCODING=msgpack but 'msgpack' Python package is not installed. "
|
||
"Run: pip install msgpack"
|
||
)
|
||
|
||
# Snapshot producer throttling: pause scheduling new chunks while the broadcast
|
||
# queue is above this fraction of its capacity. Prevents silent frame drop
|
||
# (old behavior cut snapshots at ~699 vessels because _bcast_queue overflowed).
|
||
SNAPSHOT_DRAIN_THRESHOLD_FRAC = float(os.environ.get("AIS_BLE_SNAPSHOT_DRAIN_FRAC", "0.25"))
|
||
SNAPSHOT_DRAIN_POLL_SEC = float(os.environ.get("AIS_BLE_SNAPSHOT_DRAIN_POLL_SEC", "0.05"))
|
||
|
||
# Vessels per SNAPSHOT_CHUNK. Larger batches amortize header overhead; with
|
||
# msgpack + MTU ~185 a single chunk still fits many frames. 40 is a good
|
||
# middle ground for JSON; msgpack can go higher without hurting latency.
|
||
SNAPSHOT_VESSELS_PER_CHUNK = int(os.environ.get(
|
||
"AIS_BLE_SNAPSHOT_BATCH",
|
||
"60" if BROADCAST_ENCODING == "msgpack" else "40",
|
||
))
|
||
DEFAULT_SNAPSHOT_MAX_VESSELS = int(os.environ.get("AIS_BLE_SNAPSHOT_MAX_VESSELS", "5000"))
|
||
SNAPSHOT_ENQUEUE_TIMEOUT_SEC = float(os.environ.get("AIS_BLE_SNAPSHOT_ENQUEUE_TIMEOUT_SEC", "30.0"))
|
||
|
||
# Live target updates can arrive as large bursts. Do not drop by aggregate time
|
||
# throttle; keep the latest event per MMSI and drain them at a BLE-safe pace.
|
||
TARGET_UPDATE_FLUSH_INTERVAL_SEC = float(os.environ.get("AIS_BLE_TARGET_UPDATE_GAP_SEC", "0.02"))
|
||
TARGET_UPDATE_BACKPRESSURE_FRAC = float(os.environ.get("AIS_BLE_TARGET_UPDATE_DRAIN_FRAC", "0.25"))
|
||
|
||
DEFAULT_SUBSCRIPTIONS = {
|
||
"ownship.update",
|
||
"target.update",
|
||
"base_station.update",
|
||
"aton.update",
|
||
"stats.update",
|
||
}
|
||
|
||
AIS_HUB_BASE_URL = os.environ.get("AIS_HUB_BASE_URL", "http://127.0.0.1:8081")
|
||
AIS_HUB_WS_URL = os.environ.get("AIS_HUB_WS_URL", "ws://127.0.0.1:8081/ws")
|
||
|
||
|
||
# ============ D-Bus Exceptions ============
|
||
|
||
class InvalidArgsException(dbus.exceptions.DBusException):
|
||
_dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
|
||
|
||
|
||
class NotSupportedException(dbus.exceptions.DBusException):
|
||
_dbus_error_name = 'org.bluez.Error.NotSupported'
|
||
|
||
class FailedException(dbus.exceptions.DBusException):
|
||
_dbus_error_name = 'org.bluez.Error.Failed'
|
||
|
||
|
||
# ============ Application (ObjectManager) ============
|
||
|
||
class Application(dbus.service.Object):
|
||
"""
|
||
Корневой объект, который отдаёт BlueZ все наши сервисы/характеристики через GetManagedObjects.
|
||
"""
|
||
def __init__(self, bus):
|
||
self.path = '/org/bluez/example/app'
|
||
self.services = []
|
||
dbus.service.Object.__init__(self, bus, self.path)
|
||
|
||
def get_path(self):
|
||
return dbus.ObjectPath(self.path)
|
||
|
||
def add_service(self, service):
|
||
self.services.append(service)
|
||
|
||
def get_services(self):
|
||
return self.services
|
||
|
||
@dbus.service.method(DBUS_OM_IFACE,
|
||
out_signature='a{oa{sa{sv}}}')
|
||
def GetManagedObjects(self):
|
||
log_info('🔵 GetManagedObjects вызван - клиент запрашивает список сервисов')
|
||
log_info(' Это означает, что подключение установлено и начинается дискаверинг')
|
||
managed_objects = {}
|
||
for service in self.services:
|
||
managed_objects[service.get_path()] = service.get_properties()
|
||
for chrc in service.get_characteristics():
|
||
managed_objects[chrc.get_path()] = chrc.get_properties()
|
||
log_info(f' Возвращаем {len(managed_objects)} объектов (сервисы + характеристики)')
|
||
return managed_objects
|
||
|
||
|
||
# ============ GATT Service ============
|
||
|
||
class Service(dbus.service.Object):
|
||
def __init__(self, bus, index, uuid, primary):
|
||
self.path = f'/org/bluez/example/service{index}'
|
||
self.bus = bus
|
||
self.uuid = uuid
|
||
self.primary = primary
|
||
self.characteristics = []
|
||
dbus.service.Object.__init__(self, bus, self.path)
|
||
|
||
def get_path(self):
|
||
return dbus.ObjectPath(self.path)
|
||
|
||
def add_characteristic(self, chrc):
|
||
self.characteristics.append(chrc)
|
||
|
||
def get_characteristics(self):
|
||
return self.characteristics
|
||
|
||
def get_properties(self):
|
||
return {
|
||
GATT_SERVICE_IFACE: {
|
||
'UUID': self.uuid,
|
||
'Primary': dbus.Boolean(self.primary),
|
||
'Includes': dbus.Array([], signature='o'),
|
||
}
|
||
}
|
||
|
||
|
||
# ============ GATT Characteristic (base) ============
|
||
|
||
class Characteristic(dbus.service.Object):
|
||
def __init__(self, bus, index, uuid, flags, service):
|
||
self.path = service.path + f'/char{index}'
|
||
self.bus = bus
|
||
self.uuid = uuid
|
||
self.flags = flags
|
||
self.service = service
|
||
self.notifying = False
|
||
self.value_bytes = bytes([0x00]) # дефолт
|
||
dbus.service.Object.__init__(self, bus, self.path)
|
||
|
||
def get_path(self):
|
||
return dbus.ObjectPath(self.path)
|
||
|
||
def get_properties(self):
|
||
return {
|
||
GATT_CHRC_IFACE: {
|
||
'Service': self.service.get_path(),
|
||
'UUID': self.uuid,
|
||
'Flags': dbus.Array(self.flags, signature='s'),
|
||
# Стартовое Value, чтобы клиенты видели, что характеристика живая
|
||
'Value': dbus.Array([dbus.Byte(b) for b in self.value_bytes], signature='y'),
|
||
}
|
||
}
|
||
|
||
@dbus.service.method(GATT_CHRC_IFACE,
|
||
in_signature='a{sv}',
|
||
out_signature='ay')
|
||
def ReadValue(self, options):
|
||
print(f'{self.uuid}: ReadValue (base stub)')
|
||
raise NotSupportedException('Read not implemented')
|
||
|
||
@dbus.service.method(GATT_CHRC_IFACE,
|
||
in_signature='aya{sv}',
|
||
out_signature='')
|
||
def WriteValue(self, value, options):
|
||
print(f'{self.uuid}: WriteValue (base stub)')
|
||
raise NotSupportedException('Write not implemented')
|
||
|
||
@dbus.service.method(GATT_CHRC_IFACE,
|
||
in_signature='',
|
||
out_signature='')
|
||
def StartNotify(self):
|
||
print(f'{self.uuid}: StartNotify (base stub)')
|
||
raise NotSupportedException('Notifications not supported')
|
||
|
||
@dbus.service.method(GATT_CHRC_IFACE,
|
||
in_signature='',
|
||
out_signature='')
|
||
def StopNotify(self):
|
||
print(f'{self.uuid}: StopNotify (base stub)')
|
||
raise NotSupportedException('Notifications not supported')
|
||
|
||
# helper для рассылки notify
|
||
def _update_value_and_notify(self, new_bytes: bytes):
|
||
self.value_bytes = new_bytes
|
||
self.PropertiesChanged(
|
||
GATT_CHRC_IFACE,
|
||
{'Value': dbus.Array([dbus.Byte(b) for b in self.value_bytes], signature='y')},
|
||
[]
|
||
)
|
||
|
||
@dbus.service.signal(DBUS_PROP_IFACE,
|
||
signature='sa{sv}as')
|
||
def PropertiesChanged(self, interface, changed, invalidated):
|
||
pass
|
||
|
||
|
||
# ============ AIS Hub v2 server core ============
|
||
|
||
def _dbus_bytes(data: bytes) -> dbus.Array:
|
||
return dbus.Array([dbus.Byte(b) for b in data], signature='y')
|
||
|
||
def _now_ts() -> float:
|
||
return time.time()
|
||
|
||
def _safe_json_dumps(obj: Any) -> str:
|
||
return json.dumps(obj, ensure_ascii=False, separators=(",", ":"))
|
||
|
||
def _encode_payload(obj: Any, encoding: str) -> tuple[bytes, int]:
|
||
"""
|
||
Returns (payload_bytes, proto_version_byte).
|
||
encoding: "json" | "msgpack"
|
||
"""
|
||
if encoding == "msgpack":
|
||
if not _HAS_MSGPACK:
|
||
# Should not happen: validated at import time. Fallback to JSON to avoid crash.
|
||
return _safe_json_dumps(obj).encode("utf-8"), PROTO_VERSION_JSON
|
||
return msgpack.packb(obj, use_bin_type=True), PROTO_VERSION_MSGPACK
|
||
return _safe_json_dumps(obj).encode("utf-8"), PROTO_VERSION_JSON
|
||
|
||
def pack_frames(
|
||
*,
|
||
msg_type: int,
|
||
session_msg_id: int,
|
||
payload_obj: Any,
|
||
max_payload_bytes: int,
|
||
encoding: str = "json",
|
||
) -> list[bytes]:
|
||
"""
|
||
Frames a single logical message into BLE-sized chunks.
|
||
|
||
Header layout (10 bytes, little-endian):
|
||
<B proto_version (0x01=JSON, 0x02=MessagePack)
|
||
B msg_type
|
||
H session_msg_id
|
||
H chunk_idx
|
||
H chunk_count
|
||
H chunk_len>
|
||
"""
|
||
payload_bytes, proto_byte = _encode_payload(payload_obj, encoding)
|
||
if max_payload_bytes <= 0:
|
||
max_payload_bytes = DEFAULT_MAX_PAYLOAD_BYTES
|
||
chunks = [payload_bytes[i:i + max_payload_bytes] for i in range(0, len(payload_bytes), max_payload_bytes)]
|
||
if not chunks:
|
||
chunks = [b""]
|
||
chunk_count = len(chunks)
|
||
|
||
frames: list[bytes] = []
|
||
for idx, chunk in enumerate(chunks):
|
||
header = struct.pack(
|
||
"<BBHHHH",
|
||
proto_byte,
|
||
msg_type & 0xFF,
|
||
session_msg_id & 0xFFFF,
|
||
idx & 0xFFFF,
|
||
chunk_count & 0xFFFF,
|
||
len(chunk) & 0xFFFF,
|
||
)
|
||
frames.append(header + chunk)
|
||
return frames
|
||
|
||
# Backward-compat alias (legacy callers).
|
||
def pack_frames_json(
|
||
*,
|
||
msg_type: int,
|
||
session_msg_id: int,
|
||
payload_obj: Any,
|
||
max_payload_bytes: int,
|
||
) -> list[bytes]:
|
||
return pack_frames(
|
||
msg_type=msg_type,
|
||
session_msg_id=session_msg_id,
|
||
payload_obj=payload_obj,
|
||
max_payload_bytes=max_payload_bytes,
|
||
encoding="json",
|
||
)
|
||
|
||
|
||
class ClientSession:
|
||
def __init__(self, device_path: str):
|
||
self.device_path = device_path
|
||
self.connected_at = _now_ts()
|
||
self.proto = PROTO_VERSION
|
||
self.subscriptions: set[str] = set(DEFAULT_SUBSCRIPTIONS)
|
||
self.snapshot_in_progress = False
|
||
self.snapshot_id = 0
|
||
self.filters: dict[str, Any] = {}
|
||
self.last_ping_ts: float | None = None
|
||
|
||
self._next_msg_id = 1
|
||
self._tx_queue: "queue.Queue[bytes]" = queue.Queue(maxsize=DEFAULT_TX_QUEUE_MAX)
|
||
self.tx_dropped = 0
|
||
self.max_payload_bytes = DEFAULT_MAX_PAYLOAD_BYTES
|
||
self.att_mtu = 23
|
||
# Per-session encoding for the AcquireNotify path. Negotiated via "hello" cmd.
|
||
# Broadcast (CCCD) path uses BROADCAST_ENCODING globally.
|
||
self.encoding: str = "json"
|
||
|
||
self._notify_fd: int | None = None
|
||
self._notify_thread: threading.Thread | None = None
|
||
self._notify_stop = threading.Event()
|
||
|
||
def next_msg_id(self) -> int:
|
||
mid = self._next_msg_id
|
||
self._next_msg_id = (self._next_msg_id + 1) & 0xFFFF
|
||
if self._next_msg_id == 0:
|
||
self._next_msg_id = 1
|
||
return mid
|
||
|
||
def set_notify_fd(self, fd: int, att_mtu: int):
|
||
old_fd = self._notify_fd
|
||
self._notify_fd = fd
|
||
if old_fd is not None and old_fd != fd:
|
||
try:
|
||
os.close(old_fd)
|
||
except OSError:
|
||
pass
|
||
self.att_mtu = att_mtu
|
||
max_att_value = max(20, att_mtu - 3)
|
||
self.max_payload_bytes = max(1, min(DEFAULT_MAX_PAYLOAD_BYTES, max_att_value - 10))
|
||
|
||
if self._notify_thread and self._notify_thread.is_alive():
|
||
return
|
||
|
||
self._notify_stop.clear()
|
||
self._notify_thread = threading.Thread(target=self._notify_writer_loop, daemon=True)
|
||
self._notify_thread.start()
|
||
|
||
def close_notify(self):
|
||
self._notify_stop.set()
|
||
fd = self._notify_fd
|
||
self._notify_fd = None
|
||
if fd is not None:
|
||
try:
|
||
os.close(fd)
|
||
except OSError:
|
||
pass
|
||
|
||
def enqueue_frames(self, frames: list[bytes]):
|
||
for frame in frames:
|
||
try:
|
||
self._tx_queue.put_nowait(frame)
|
||
except queue.Full:
|
||
try:
|
||
_ = self._tx_queue.get_nowait()
|
||
except queue.Empty:
|
||
pass
|
||
try:
|
||
self._tx_queue.put_nowait(frame)
|
||
except queue.Full:
|
||
self.tx_dropped += 1
|
||
continue
|
||
self.tx_dropped += 1
|
||
if frames:
|
||
log_debug(
|
||
f"[TXQ] device={self.device_path} +{len(frames)} frames "
|
||
f"depth={self.tx_queue_depth()} dropped={self.tx_dropped}"
|
||
)
|
||
|
||
async def enqueue_frames_lossless(self, frames: list[bytes], timeout_sec: float) -> bool:
|
||
for frame in frames:
|
||
deadline = time.monotonic() + timeout_sec
|
||
while True:
|
||
if self._notify_fd is None or self._notify_stop.is_set():
|
||
log_warn(f"[TXQ] snapshot enqueue aborted device={self.device_path}; notify fd closed")
|
||
return False
|
||
try:
|
||
self._tx_queue.put_nowait(frame)
|
||
break
|
||
except queue.Full:
|
||
if time.monotonic() > deadline:
|
||
log_warn(
|
||
f"[TXQ] snapshot enqueue timeout device={self.device_path} "
|
||
f"depth={self.tx_queue_depth()}"
|
||
)
|
||
return False
|
||
await asyncio.sleep(SNAPSHOT_DRAIN_POLL_SEC)
|
||
if frames:
|
||
log_debug(
|
||
f"[TXQ] snapshot device={self.device_path} +{len(frames)} frames "
|
||
f"depth={self.tx_queue_depth()} dropped={self.tx_dropped}"
|
||
)
|
||
return True
|
||
|
||
def tx_queue_depth(self) -> int:
|
||
return self._tx_queue.qsize()
|
||
|
||
def _notify_writer_loop(self):
|
||
while not self._notify_stop.is_set():
|
||
fd = self._notify_fd
|
||
if fd is None:
|
||
time.sleep(0.05)
|
||
continue
|
||
try:
|
||
frame = self._tx_queue.get(timeout=0.25)
|
||
except queue.Empty:
|
||
continue
|
||
try:
|
||
os.write(fd, frame)
|
||
except OSError:
|
||
log_warn(f"[DATA] os.write failed device={self.device_path}; closing notify fd")
|
||
self.close_notify()
|
||
return
|
||
|
||
|
||
class AisHubBridge:
|
||
"""
|
||
Owns an asyncio loop in a background thread.
|
||
- REST fetch for snapshot sections
|
||
- One WS client to ais_hub /ws, fan-out events to sessions
|
||
"""
|
||
def __init__(self):
|
||
self._loop: asyncio.AbstractEventLoop | None = None
|
||
self._thread: threading.Thread | None = None
|
||
self._stop = threading.Event()
|
||
|
||
self.ws_alive = False
|
||
self.last_ws_msg_ts: float | None = None
|
||
|
||
self._sessions: dict[str, ClientSession] = {}
|
||
self._sessions_lock = threading.Lock()
|
||
self.data_char: DataCharacteristic | None = None
|
||
|
||
# Global snapshot serialization lock for the broadcast (CCCD) channel.
|
||
# Android clients share a single notify stream; we MUST NOT interleave
|
||
# frames from two simultaneous snapshots or each client will receive a
|
||
# mashup. Per-fd (AcquireNotify) sessions don't need this lock - they
|
||
# have dedicated pipes. Implemented as an asyncio.Lock used by do_snapshot.
|
||
self._broadcast_snapshot_lock: asyncio.Lock | None = None
|
||
|
||
# Simple throttling to avoid overwhelming BLE notify path (esp. Android).
|
||
# Values are minimum interval (seconds) between forwarded events of that type.
|
||
self._event_min_interval_sec: dict[str, float] = {
|
||
"ownship.update": 0.33, # ~3 Hz
|
||
"stats.update": 1.0, # <= 1 Hz (source is ~0.2 Hz anyway)
|
||
}
|
||
self._event_last_sent_ts: dict[str, float] = {}
|
||
self._pending_target_events: "OrderedDict[str, dict[str, Any]]" = OrderedDict()
|
||
self._target_flush_sent = 0
|
||
|
||
# Paced broadcast queue: draining via a dedicated thread that schedules
|
||
# PropertiesChanged emissions on the GLib main loop with BROADCAST_FRAME_GAP_SEC
|
||
# gap. This prevents BlueZ from coalescing/delaying notifications when snapshot
|
||
# or bursty events get pushed at once.
|
||
self._bcast_queue: "queue.Queue[bytes]" = queue.Queue(maxsize=BROADCAST_QUEUE_MAX)
|
||
self._bcast_dropped = 0
|
||
self._bcast_sent = 0
|
||
self._bcast_stop = threading.Event()
|
||
self._bcast_thread: threading.Thread | None = None
|
||
self._bcast_msg_id_lock = threading.Lock()
|
||
self._bcast_next_msg_id = 1
|
||
|
||
def attach_data_characteristic(self, data_char: "DataCharacteristic"):
|
||
self.data_char = data_char
|
||
|
||
def next_broadcast_msg_id(self) -> int:
|
||
with self._bcast_msg_id_lock:
|
||
msg_id = self._bcast_next_msg_id
|
||
self._bcast_next_msg_id += 1
|
||
if self._bcast_next_msg_id > 0xFFFF:
|
||
self._bcast_next_msg_id = 1
|
||
return msg_id
|
||
|
||
def send_broadcast_frame(self, frame: bytes):
|
||
"""
|
||
Fallback for clients that only use standard CCCD notifications (Android).
|
||
Enqueues the frame into the paced broadcast queue; a dedicated thread
|
||
will schedule it on the GLib main loop with BROADCAST_FRAME_GAP_SEC gap.
|
||
"""
|
||
if not self.data_char:
|
||
return
|
||
if not self.data_char.notifying:
|
||
return
|
||
try:
|
||
self._bcast_queue.put_nowait(frame)
|
||
except queue.Full:
|
||
try:
|
||
_ = self._bcast_queue.get_nowait()
|
||
except queue.Empty:
|
||
pass
|
||
self._bcast_dropped += 1
|
||
try:
|
||
self._bcast_queue.put_nowait(frame)
|
||
except queue.Full:
|
||
self._bcast_dropped += 1
|
||
|
||
async def enqueue_broadcast_frame_lossless(self, frame: bytes, timeout_sec: float) -> bool:
|
||
deadline = time.monotonic() + timeout_sec
|
||
while True:
|
||
if not self.data_char or not self.data_char.notifying:
|
||
log_warn("[BCAST] snapshot enqueue aborted: no active DATA notifier")
|
||
return False
|
||
try:
|
||
self._bcast_queue.put_nowait(frame)
|
||
return True
|
||
except queue.Full:
|
||
if time.monotonic() > deadline:
|
||
log_warn(
|
||
f"[BCAST] snapshot enqueue timeout: qsize={self._bcast_queue.qsize()} "
|
||
f"capacity={BROADCAST_QUEUE_MAX}"
|
||
)
|
||
return False
|
||
await asyncio.sleep(SNAPSHOT_DRAIN_POLL_SEC)
|
||
|
||
async def wait_broadcast_drain(self, threshold: int, timeout_sec: float = 10.0):
|
||
"""
|
||
Async backpressure for snapshot producer: block until broadcast queue
|
||
depth drops below `threshold`. Prevents silent drops when the snapshot
|
||
would otherwise flood the queue faster than BROADCAST_FRAME_GAP_SEC drains it.
|
||
"""
|
||
deadline = time.monotonic() + timeout_sec
|
||
while self._bcast_queue.qsize() > threshold:
|
||
if time.monotonic() > deadline:
|
||
log_warn(
|
||
f"[BCAST] wait_broadcast_drain timeout: qsize={self._bcast_queue.qsize()} "
|
||
f"threshold={threshold}"
|
||
)
|
||
return
|
||
await asyncio.sleep(SNAPSHOT_DRAIN_POLL_SEC)
|
||
|
||
def _broadcast_writer_loop(self):
|
||
gap = max(0.0, BROADCAST_FRAME_GAP_SEC)
|
||
while not self._bcast_stop.is_set():
|
||
try:
|
||
frame = self._bcast_queue.get(timeout=0.25)
|
||
except queue.Empty:
|
||
continue
|
||
if not self.data_char or not self.data_char.notifying:
|
||
# Drop silently if no subscribers (avoid growing backlog when idle).
|
||
continue
|
||
GLib.idle_add(self.data_char.send_frame_broadcast, frame)
|
||
self._bcast_sent += 1
|
||
if self._bcast_sent <= 10 or (self._bcast_sent % 500) == 0:
|
||
log_debug(
|
||
f"[BCAST] paced emit #{self._bcast_sent} qdepth={self._bcast_queue.qsize()} "
|
||
f"dropped={self._bcast_dropped} gap_ms={gap * 1000:.1f}"
|
||
)
|
||
if gap > 0:
|
||
time.sleep(gap)
|
||
|
||
def start(self):
|
||
if self._thread and self._thread.is_alive():
|
||
return
|
||
self._stop.clear()
|
||
self._thread = threading.Thread(target=self._run_loop_thread, daemon=True)
|
||
self._thread.start()
|
||
self._bcast_stop.clear()
|
||
if not self._bcast_thread or not self._bcast_thread.is_alive():
|
||
self._bcast_thread = threading.Thread(
|
||
target=self._broadcast_writer_loop,
|
||
name="ais_ble_bcast_writer",
|
||
daemon=True,
|
||
)
|
||
self._bcast_thread.start()
|
||
|
||
def stop(self):
|
||
self._stop.set()
|
||
self._bcast_stop.set()
|
||
loop = self._loop
|
||
if loop:
|
||
loop.call_soon_threadsafe(loop.stop)
|
||
|
||
def get_or_create_session(self, device_path: str) -> ClientSession:
|
||
with self._sessions_lock:
|
||
sess = self._sessions.get(device_path)
|
||
if sess is None:
|
||
sess = ClientSession(device_path)
|
||
self._sessions[device_path] = sess
|
||
return sess
|
||
|
||
def remove_session(self, device_path: str):
|
||
with self._sessions_lock:
|
||
sess = self._sessions.pop(device_path, None)
|
||
if sess:
|
||
sess.close_notify()
|
||
|
||
def sessions_snapshot(self) -> list[ClientSession]:
|
||
with self._sessions_lock:
|
||
return list(self._sessions.values())
|
||
|
||
def session_count(self) -> int:
|
||
with self._sessions_lock:
|
||
return len(self._sessions)
|
||
|
||
def submit_coroutine(self, coro: "asyncio.Future[Any]"):
|
||
loop = self._loop
|
||
if not loop:
|
||
raise RuntimeError("AisHubBridge loop not started")
|
||
return asyncio.run_coroutine_threadsafe(coro, loop)
|
||
|
||
def _run_loop_thread(self):
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
self._loop = loop
|
||
|
||
async def runner():
|
||
# asyncio primitives must be created inside the loop they belong to.
|
||
self._broadcast_snapshot_lock = asyncio.Lock()
|
||
tasks = [
|
||
asyncio.create_task(self._ws_loop(), name="ais_hub_ws_loop"),
|
||
asyncio.create_task(self._target_update_flush_loop(), name="ais_hub_target_flush_loop"),
|
||
]
|
||
try:
|
||
while not self._stop.is_set():
|
||
await asyncio.sleep(0.25)
|
||
finally:
|
||
for t in tasks:
|
||
t.cancel()
|
||
await asyncio.gather(*tasks, return_exceptions=True)
|
||
|
||
try:
|
||
loop.run_until_complete(runner())
|
||
finally:
|
||
try:
|
||
loop.close()
|
||
except Exception:
|
||
pass
|
||
|
||
async def _ws_loop(self):
|
||
try:
|
||
import aiohttp
|
||
except Exception as e:
|
||
log_error(f"[ais_hub] aiohttp not installed: {e}")
|
||
self.ws_alive = False
|
||
return
|
||
|
||
backoff = 1.0
|
||
while not self._stop.is_set():
|
||
try:
|
||
async with aiohttp.ClientSession() as session:
|
||
log_info(f"[ais_hub] WS connecting: {AIS_HUB_WS_URL}")
|
||
async with session.ws_connect(AIS_HUB_WS_URL, heartbeat=30) as ws:
|
||
self.ws_alive = True
|
||
log_info("[ais_hub] WS connected")
|
||
backoff = 1.0
|
||
async for msg in ws:
|
||
if msg.type == aiohttp.WSMsgType.TEXT:
|
||
self.last_ws_msg_ts = _now_ts()
|
||
try:
|
||
ev = json.loads(msg.data)
|
||
except Exception:
|
||
continue
|
||
ev_type = ev.get("type")
|
||
if not ev_type or ev_type == "state.snapshot":
|
||
continue
|
||
self._fanout_event(ev)
|
||
elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR):
|
||
break
|
||
except Exception:
|
||
self.ws_alive = False
|
||
log_warn(f"[ais_hub] WS disconnected; retry in {backoff:.1f}s")
|
||
await asyncio.sleep(backoff)
|
||
backoff = min(15.0, backoff * 1.5)
|
||
|
||
async def _target_update_flush_loop(self):
|
||
gap = max(0.0, TARGET_UPDATE_FLUSH_INTERVAL_SEC)
|
||
drain_threshold = max(64, int(BROADCAST_QUEUE_MAX * TARGET_UPDATE_BACKPRESSURE_FRAC))
|
||
while not self._stop.is_set():
|
||
if not self._pending_target_events:
|
||
await asyncio.sleep(0.05)
|
||
continue
|
||
|
||
sessions = self.sessions_snapshot()
|
||
if any(s.snapshot_in_progress for s in sessions):
|
||
await asyncio.sleep(0.10)
|
||
continue
|
||
if self._bcast_queue.qsize() > drain_threshold:
|
||
await asyncio.sleep(SNAPSHOT_DRAIN_POLL_SEC)
|
||
continue
|
||
|
||
_, ev = self._pending_target_events.popitem(last=False)
|
||
self._fanout_event_direct(ev)
|
||
self._target_flush_sent += 1
|
||
if self._target_flush_sent <= 10 or (self._target_flush_sent % 500) == 0:
|
||
log_debug(
|
||
f"[FANOUT] target flush sent={self._target_flush_sent} "
|
||
f"pending={len(self._pending_target_events)} bcast_q={self._bcast_queue.qsize()}"
|
||
)
|
||
if gap > 0:
|
||
await asyncio.sleep(gap)
|
||
|
||
def _fanout_event(self, ev: dict[str, Any]):
|
||
ev_type = ev.get("type")
|
||
if ev_type == "target.update":
|
||
data = ev.get("data")
|
||
mmsi = None
|
||
if isinstance(data, dict):
|
||
mmsi = data.get("mmsi")
|
||
if mmsi is not None:
|
||
key = str(mmsi)
|
||
if key in self._pending_target_events:
|
||
del self._pending_target_events[key]
|
||
self._pending_target_events[key] = ev
|
||
return
|
||
self._fanout_event_direct(ev)
|
||
|
||
def _fanout_event_direct(self, ev: dict[str, Any]):
|
||
ev_type = ev.get("type")
|
||
if not ev_type:
|
||
return
|
||
|
||
now = _now_ts()
|
||
min_iv = self._event_min_interval_sec.get(ev_type)
|
||
if min_iv is not None:
|
||
last = self._event_last_sent_ts.get(ev_type)
|
||
if last is not None and (now - last) < min_iv:
|
||
return
|
||
self._event_last_sent_ts[ev_type] = now
|
||
|
||
sessions = self.sessions_snapshot()
|
||
|
||
# Suppress low-priority live events (ownship.update, stats.update) while any
|
||
# session has a snapshot transfer in progress. It's OK for the client to rely
|
||
# on the snapshot data; the next live update will catch up once snapshot ends.
|
||
# This prevents interleaving small frames into the snapshot stream and
|
||
# overwhelming BlueZ / Android's BLE stack (which then drops the link).
|
||
snapshot_active = any(s.snapshot_in_progress for s in sessions)
|
||
suppress_on_snapshot = ev_type in ("ownship.update", "stats.update")
|
||
if snapshot_active and suppress_on_snapshot:
|
||
log_debug(
|
||
f"[FANOUT] suppress ev_type={ev_type} during active snapshot "
|
||
f"sessions={len(sessions)}"
|
||
)
|
||
return
|
||
|
||
# Broadcast path (CCCD notify): do it ONCE per event if at least one CCCD-client wants it.
|
||
# Skip broadcast entirely if all CCCD-clients are currently in snapshot.
|
||
wants_broadcast = any(
|
||
(s._notify_fd is None)
|
||
and (ev_type in s.subscriptions)
|
||
and not s.snapshot_in_progress
|
||
for s in sessions
|
||
)
|
||
if wants_broadcast:
|
||
msg_id = self.next_broadcast_msg_id()
|
||
frames = pack_frames(
|
||
msg_type=MSG_EVENT,
|
||
session_msg_id=msg_id,
|
||
payload_obj=ev,
|
||
max_payload_bytes=BROADCAST_MAX_PAYLOAD_BYTES,
|
||
encoding=BROADCAST_ENCODING,
|
||
)
|
||
for fr in frames:
|
||
self.send_broadcast_frame(fr)
|
||
log_debug(
|
||
f"[DATA] broadcast frames={len(frames)} msg_type=0x{MSG_EVENT:02X} "
|
||
f"msg_id={msg_id} ev_type={ev_type} sessions={len(sessions)} enc={BROADCAST_ENCODING}"
|
||
)
|
||
|
||
# Per-client fd path (AcquireNotify): send only to those sessions.
|
||
for sess in sessions:
|
||
if sess._notify_fd is None:
|
||
continue
|
||
if ev_type not in sess.subscriptions:
|
||
continue
|
||
if sess.snapshot_in_progress:
|
||
continue
|
||
msg_id = sess.next_msg_id()
|
||
frames = pack_frames(
|
||
msg_type=MSG_EVENT,
|
||
session_msg_id=msg_id,
|
||
payload_obj=ev,
|
||
max_payload_bytes=sess.max_payload_bytes,
|
||
encoding=sess.encoding,
|
||
)
|
||
sess.enqueue_frames(frames)
|
||
|
||
async def fetch_snapshot_sections(self, include: list[str], max_vessels: int) -> dict[str, Any]:
|
||
try:
|
||
import aiohttp
|
||
except Exception as e:
|
||
raise RuntimeError(f"aiohttp not installed: {e}")
|
||
|
||
sections: dict[str, Any] = {}
|
||
async with aiohttp.ClientSession() as s:
|
||
if "ownship" in include:
|
||
async with s.get(f"{AIS_HUB_BASE_URL}/api/v1/ownship") as r:
|
||
r.raise_for_status()
|
||
sections["ownship"] = await r.json()
|
||
if "vessels" in include:
|
||
async with s.get(f"{AIS_HUB_BASE_URL}/api/v1/vessels", params={"limit": max_vessels}) as r:
|
||
r.raise_for_status()
|
||
sections["vessels"] = await r.json()
|
||
if "base_stations" in include:
|
||
async with s.get(f"{AIS_HUB_BASE_URL}/api/v1/base_stations") as r:
|
||
r.raise_for_status()
|
||
sections["base_stations"] = await r.json()
|
||
if "atons" in include:
|
||
async with s.get(f"{AIS_HUB_BASE_URL}/api/v1/atons") as r:
|
||
r.raise_for_status()
|
||
sections["atons"] = await r.json()
|
||
if "stats" in include:
|
||
async with s.get(f"{AIS_HUB_BASE_URL}/api/v1/stats") as r:
|
||
r.raise_for_status()
|
||
sections["stats"] = await r.json()
|
||
return sections
|
||
|
||
|
||
class AisHubService(Service):
|
||
def __init__(self, bus, index, bridge: AisHubBridge):
|
||
Service.__init__(self, bus, index, AIS_HUB_SERVICE_UUID, True)
|
||
self.bridge = bridge
|
||
self.control = ControlCharacteristic(bus, 0, self, bridge)
|
||
self.data = DataCharacteristic(bus, 1, self, bridge)
|
||
self.status = StatusCharacteristic(bus, 2, self, bridge)
|
||
self.bridge.attach_data_characteristic(self.data)
|
||
self.add_characteristic(self.control)
|
||
self.add_characteristic(self.data)
|
||
self.add_characteristic(self.status)
|
||
|
||
|
||
class ControlCharacteristic(Characteristic):
|
||
def __init__(self, bus, index, service: Service, bridge: AisHubBridge):
|
||
Characteristic.__init__(self, bus, index, AIS_HUB_CONTROL_UUID, ["write", "write-without-response"], service)
|
||
self.bridge = bridge
|
||
|
||
@dbus.service.method(GATT_CHRC_IFACE, in_signature="aya{sv}", out_signature="")
|
||
def WriteValue(self, value, options):
|
||
raw = bytes(value)
|
||
device_path = str(options.get("device", "unknown"))
|
||
sess = self.bridge.get_or_create_session(device_path)
|
||
log_info(
|
||
f"[CONTROL] WriteValue device={device_path} bytes={len(raw)} "
|
||
f"preview={_hex_preview(raw)}"
|
||
)
|
||
try:
|
||
cmd_obj = json.loads(raw.decode("utf-8"))
|
||
except Exception:
|
||
self._send_error(sess, "bad_command", "invalid JSON")
|
||
return
|
||
|
||
cmd = cmd_obj.get("cmd")
|
||
log_info(f"[CONTROL] cmd={cmd} device={device_path} json={cmd_obj}")
|
||
if cmd == "hello":
|
||
# Optional encoding negotiation. Client can pass {"encodings": ["msgpack","json"]}.
|
||
# We pick the first one we support. Default: "json" for backward compat.
|
||
client_encodings = cmd_obj.get("encodings")
|
||
chosen = "json"
|
||
if isinstance(client_encodings, list):
|
||
for enc in client_encodings:
|
||
enc_s = str(enc).lower()
|
||
if enc_s == "msgpack" and _HAS_MSGPACK:
|
||
chosen = "msgpack"
|
||
break
|
||
if enc_s == "json":
|
||
chosen = "json"
|
||
break
|
||
sess.encoding = chosen
|
||
payload = {
|
||
"ok": True,
|
||
"proto": PROTO_VERSION,
|
||
"server": "ais_ble",
|
||
"server_time": _now_ts(),
|
||
"encoding": chosen,
|
||
"features": {
|
||
"snapshot": True,
|
||
"live_events": True,
|
||
"filters": True,
|
||
"compression": False,
|
||
"msgpack": _HAS_MSGPACK,
|
||
"multi_client": True,
|
||
},
|
||
}
|
||
# HELLO_ACK is always JSON so a fresh client can decode it before it
|
||
# knows which encoding the server picked.
|
||
msg_id = sess.next_msg_id()
|
||
frames = pack_frames(
|
||
msg_type=MSG_HELLO_ACK,
|
||
session_msg_id=msg_id,
|
||
payload_obj=payload,
|
||
max_payload_bytes=sess.max_payload_bytes,
|
||
encoding="json",
|
||
)
|
||
if sess._notify_fd is not None:
|
||
sess.enqueue_frames(frames)
|
||
else:
|
||
for fr in frames:
|
||
self.bridge.send_broadcast_frame(fr)
|
||
return
|
||
|
||
if cmd == "ping":
|
||
payload = {"id": cmd_obj.get("id"), "server_time": _now_ts()}
|
||
self._send(sess, MSG_PONG, payload)
|
||
sess.last_ping_ts = _now_ts()
|
||
return
|
||
|
||
if cmd == "subscribe":
|
||
events = cmd_obj.get("events")
|
||
if isinstance(events, list) and events:
|
||
sess.subscriptions = set(str(e) for e in events)
|
||
else:
|
||
sess.subscriptions = set(DEFAULT_SUBSCRIPTIONS)
|
||
self._send(sess, MSG_STATUS, {"ok": True, "subscriptions": sorted(sess.subscriptions)})
|
||
return
|
||
|
||
if cmd == "unsubscribe":
|
||
events = cmd_obj.get("events")
|
||
if isinstance(events, list):
|
||
for e in events:
|
||
sess.subscriptions.discard(str(e))
|
||
self._send(sess, MSG_STATUS, {"ok": True, "subscriptions": sorted(sess.subscriptions)})
|
||
return
|
||
|
||
if cmd == "set_filters":
|
||
filt = cmd_obj.get("targets")
|
||
if isinstance(filt, dict):
|
||
sess.filters["targets"] = filt
|
||
self._send(sess, MSG_STATUS, {"ok": True, "filters": sess.filters})
|
||
return
|
||
|
||
if cmd == "get_snapshot":
|
||
include = cmd_obj.get("include") or ["ownship", "vessels", "base_stations", "atons", "stats"]
|
||
if not isinstance(include, list):
|
||
include = ["ownship", "vessels", "base_stations", "atons", "stats"]
|
||
include = [str(x) for x in include]
|
||
max_vessels = cmd_obj.get("max_vessels")
|
||
if max_vessels is None:
|
||
max_vessels = DEFAULT_SNAPSHOT_MAX_VESSELS
|
||
try:
|
||
max_vessels = int(max_vessels)
|
||
except Exception:
|
||
max_vessels = DEFAULT_SNAPSHOT_MAX_VESSELS
|
||
if max_vessels <= 0:
|
||
max_vessels = DEFAULT_SNAPSHOT_MAX_VESSELS
|
||
|
||
if sess.snapshot_in_progress:
|
||
self._send_error(sess, "snapshot_busy", "snapshot already in progress")
|
||
return
|
||
|
||
sess.snapshot_in_progress = True
|
||
sess.snapshot_id = (sess.snapshot_id + 1) & 0xFFFF or random.randint(1, 0xFFFF)
|
||
|
||
async def do_snapshot():
|
||
# Serialize broadcast snapshots: only one CCCD-snapshot at a time
|
||
# across ALL Android clients, otherwise two clients requesting
|
||
# snapshots simultaneously would receive interleaved frames on
|
||
# the shared notify stream and decode garbage. Per-fd sessions
|
||
# have private pipes and don't need this lock.
|
||
if sess._notify_fd is None and self.bridge._broadcast_snapshot_lock is not None:
|
||
log_debug(
|
||
f"[SNAPSHOT] waiting broadcast lock sess={sess.device_path} "
|
||
f"snapshot_id={sess.snapshot_id}"
|
||
)
|
||
await self.bridge._broadcast_snapshot_lock.acquire()
|
||
log_debug(f"[SNAPSHOT] acquired broadcast lock sess={sess.device_path}")
|
||
try:
|
||
await _do_snapshot_inner()
|
||
finally:
|
||
if sess._notify_fd is None and self.bridge._broadcast_snapshot_lock is not None:
|
||
try:
|
||
self.bridge._broadcast_snapshot_lock.release()
|
||
except Exception:
|
||
pass
|
||
|
||
async def _do_snapshot_inner():
|
||
try:
|
||
sections = await self.bridge.fetch_snapshot_sections(include, max_vessels)
|
||
except Exception as e:
|
||
GLib.idle_add(self._send_error, sess, "snapshot_failed", str(e))
|
||
GLib.idle_add(self._snapshot_done, sess)
|
||
return
|
||
|
||
total_objects = {}
|
||
if "ownship" in sections:
|
||
total_objects["ownship"] = 1
|
||
if "stats" in sections:
|
||
total_objects["stats"] = 1
|
||
if "vessels" in sections:
|
||
total_objects["vessels"] = len(sections["vessels"]) if isinstance(sections["vessels"], list) else 0
|
||
if "base_stations" in sections:
|
||
total_objects["base_stations"] = (
|
||
len(sections["base_stations"]) if isinstance(sections["base_stations"], list) else 0
|
||
)
|
||
if "atons" in sections:
|
||
total_objects["atons"] = len(sections["atons"]) if isinstance(sections["atons"], list) else 0
|
||
|
||
# Backpressure threshold: pause producer once broadcast queue
|
||
# exceeds this depth. Keeps memory bounded AND prevents the
|
||
# silent-drop-at-1024 bug that capped snapshots at ~699 vessels.
|
||
drain_threshold = max(64, int(BROADCAST_QUEUE_MAX * SNAPSHOT_DRAIN_THRESHOLD_FRAC))
|
||
|
||
def abort_snapshot(reason: str) -> bool:
|
||
log_warn(
|
||
f"[SNAPSHOT] abort sess={sess.device_path} "
|
||
f"snapshot_id={sess.snapshot_id}: {reason}"
|
||
)
|
||
GLib.idle_add(self._send_error, sess, "snapshot_failed", reason)
|
||
GLib.idle_add(self._snapshot_done, sess)
|
||
return False
|
||
|
||
async def gated_send(msg_type: int, payload_obj: Any) -> bool:
|
||
# Snapshot frames must be in the downstream queue before we
|
||
# wait on it. Deferring _send through GLib.idle_add made the
|
||
# producer outrun the queue and declare the snapshot done
|
||
# while its tail was still only scheduled.
|
||
msg_id = sess.next_msg_id()
|
||
if sess._notify_fd is None:
|
||
await self.bridge.wait_broadcast_drain(drain_threshold)
|
||
frames = pack_frames(
|
||
msg_type=msg_type,
|
||
session_msg_id=msg_id,
|
||
payload_obj=payload_obj,
|
||
max_payload_bytes=BROADCAST_MAX_PAYLOAD_BYTES,
|
||
encoding=BROADCAST_ENCODING,
|
||
)
|
||
for fr in frames:
|
||
ok = await self.bridge.enqueue_broadcast_frame_lossless(
|
||
fr,
|
||
SNAPSHOT_ENQUEUE_TIMEOUT_SEC,
|
||
)
|
||
if not ok:
|
||
return abort_snapshot("broadcast DATA queue unavailable")
|
||
log_debug(
|
||
f"[DATA] snapshot broadcast frames={len(frames)} msg_type=0x{msg_type:02X} "
|
||
f"msg_id={msg_id} device={sess.device_path} enc={BROADCAST_ENCODING}"
|
||
)
|
||
return True
|
||
|
||
# Per-fd queue is small (256). Wait if it's mostly full and
|
||
# then enqueue without dropping old snapshot frames.
|
||
per_fd_threshold = max(32, DEFAULT_TX_QUEUE_MAX // 4)
|
||
deadline = time.monotonic() + 10.0
|
||
while sess.tx_queue_depth() > per_fd_threshold:
|
||
if time.monotonic() > deadline:
|
||
log_warn(
|
||
f"[SNAPSHOT] per-fd drain timeout sess={sess.device_path} "
|
||
f"depth={sess.tx_queue_depth()}"
|
||
)
|
||
break
|
||
await asyncio.sleep(SNAPSHOT_DRAIN_POLL_SEC)
|
||
frames = pack_frames(
|
||
msg_type=msg_type,
|
||
session_msg_id=msg_id,
|
||
payload_obj=payload_obj,
|
||
max_payload_bytes=sess.max_payload_bytes,
|
||
encoding=sess.encoding,
|
||
)
|
||
ok = await sess.enqueue_frames_lossless(frames, SNAPSHOT_ENQUEUE_TIMEOUT_SEC)
|
||
if not ok:
|
||
return abort_snapshot("per-client DATA queue unavailable")
|
||
log_debug(
|
||
f"[DATA] snapshot fd frames={len(frames)} msg_type=0x{msg_type:02X} "
|
||
f"msg_id={msg_id} device={sess.device_path} enc={sess.encoding}"
|
||
)
|
||
return True
|
||
|
||
if not await gated_send(
|
||
MSG_SNAPSHOT_BEGIN,
|
||
{
|
||
"snapshot_id": sess.snapshot_id,
|
||
"sections": include,
|
||
"total_objects": total_objects,
|
||
},
|
||
):
|
||
return
|
||
|
||
if "ownship" in sections:
|
||
if not await gated_send(
|
||
MSG_SNAPSHOT_CHUNK,
|
||
{
|
||
"snapshot_id": sess.snapshot_id,
|
||
"section": "ownship",
|
||
"seq": 1,
|
||
"more": False,
|
||
"item": sections["ownship"],
|
||
},
|
||
):
|
||
return
|
||
|
||
if "stats" in sections:
|
||
if not await gated_send(
|
||
MSG_SNAPSHOT_CHUNK,
|
||
{
|
||
"snapshot_id": sess.snapshot_id,
|
||
"section": "stats",
|
||
"seq": 1,
|
||
"more": False,
|
||
"item": sections["stats"],
|
||
},
|
||
):
|
||
return
|
||
|
||
async def send_list_section(section_name: str) -> bool:
|
||
if section_name not in sections or not isinstance(sections[section_name], list):
|
||
return True
|
||
items_all: list[Any] = sections[section_name]
|
||
batch_size = SNAPSHOT_VESSELS_PER_CHUNK
|
||
seq = 1
|
||
sent = 0
|
||
for i in range(0, len(items_all), batch_size):
|
||
items = items_all[i:i + batch_size]
|
||
more = (i + batch_size) < len(items_all)
|
||
if not await gated_send(
|
||
MSG_SNAPSHOT_CHUNK,
|
||
{
|
||
"snapshot_id": sess.snapshot_id,
|
||
"section": section_name,
|
||
"seq": seq,
|
||
"more": more,
|
||
"items": items,
|
||
},
|
||
):
|
||
return False
|
||
seq += 1
|
||
sent += len(items)
|
||
if (seq % 10) == 0:
|
||
log_debug(
|
||
f"[SNAPSHOT] {section_name} progress sess={sess.device_path} "
|
||
f"sent={sent}/{len(items_all)} seq={seq} "
|
||
f"bcast_q={self.bridge._bcast_queue.qsize()}"
|
||
)
|
||
return True
|
||
|
||
for section_name in ("vessels", "base_stations", "atons"):
|
||
if not await send_list_section(section_name):
|
||
return
|
||
|
||
if not await gated_send(MSG_SNAPSHOT_END, {"snapshot_id": sess.snapshot_id, "ok": True}):
|
||
return
|
||
# Wait for the broadcast queue to actually drain before clearing
|
||
# snapshot_in_progress. Otherwise stats.update / ownship.update
|
||
# interleave into the still-emitting snapshot tail and the next
|
||
# CCCD client's snapshot can start while frames are still flying.
|
||
if sess._notify_fd is None:
|
||
await self.bridge.wait_broadcast_drain(0, timeout_sec=30.0)
|
||
else:
|
||
deadline = time.monotonic() + 30.0
|
||
while sess.tx_queue_depth() > 0:
|
||
if time.monotonic() > deadline:
|
||
log_warn(
|
||
f"[SNAPSHOT] per-fd final drain timeout sess={sess.device_path} "
|
||
f"depth={sess.tx_queue_depth()}"
|
||
)
|
||
break
|
||
await asyncio.sleep(SNAPSHOT_DRAIN_POLL_SEC)
|
||
GLib.idle_add(self._snapshot_done, sess)
|
||
log_info(
|
||
f"[SNAPSHOT] done sess={sess.device_path} snapshot_id={sess.snapshot_id} "
|
||
f"vessels={total_objects.get('vessels', 0)} "
|
||
f"base_stations={total_objects.get('base_stations', 0)} "
|
||
f"atons={total_objects.get('atons', 0)} "
|
||
f"bcast_dropped={self.bridge._bcast_dropped}"
|
||
)
|
||
|
||
try:
|
||
self.bridge.submit_coroutine(do_snapshot())
|
||
except Exception as e:
|
||
self._send_error(sess, "snapshot_failed", str(e))
|
||
self._snapshot_done(sess)
|
||
return
|
||
|
||
self._send_error(sess, "bad_command", "unknown cmd")
|
||
|
||
def _snapshot_done(self, sess: ClientSession):
|
||
sess.snapshot_in_progress = False
|
||
return False
|
||
|
||
def _send(self, sess: ClientSession, msg_type: int, payload_obj: Any):
|
||
msg_id = sess.next_msg_id()
|
||
# Per-fd path uses negotiated session encoding; broadcast path uses the
|
||
# global broadcast encoding (must match what ALL CCCD clients can decode).
|
||
if sess._notify_fd is not None:
|
||
frames = pack_frames(
|
||
msg_type=msg_type,
|
||
session_msg_id=msg_id,
|
||
payload_obj=payload_obj,
|
||
max_payload_bytes=sess.max_payload_bytes,
|
||
encoding=sess.encoding,
|
||
)
|
||
sess.enqueue_frames(frames)
|
||
else:
|
||
frames = pack_frames(
|
||
msg_type=msg_type,
|
||
session_msg_id=msg_id,
|
||
payload_obj=payload_obj,
|
||
max_payload_bytes=BROADCAST_MAX_PAYLOAD_BYTES,
|
||
encoding=BROADCAST_ENCODING,
|
||
)
|
||
for fr in frames:
|
||
self.bridge.send_broadcast_frame(fr)
|
||
log_debug(
|
||
f"[DATA] broadcast frames={len(frames)} msg_type=0x{msg_type:02X} "
|
||
f"msg_id={msg_id} device={sess.device_path} enc={BROADCAST_ENCODING}"
|
||
)
|
||
return False
|
||
|
||
def _send_error(self, sess: ClientSession, code: str, message: str):
|
||
return self._send(sess, MSG_ERROR, {"code": code, "message": message})
|
||
|
||
|
||
class DataCharacteristic(Characteristic):
|
||
"""
|
||
Per-client notifications via BlueZ AcquireNotify().
|
||
"""
|
||
def __init__(self, bus, index, service: Service, bridge: AisHubBridge):
|
||
Characteristic.__init__(self, bus, index, AIS_HUB_DATA_UUID, ["notify"], service)
|
||
self.bridge = bridge
|
||
self._broadcast_sent = 0
|
||
|
||
@dbus.service.method(GATT_CHRC_IFACE, in_signature="a{sv}", out_signature="hq")
|
||
def AcquireNotify(self, options):
|
||
device_path = str(options.get("device", "unknown"))
|
||
sess = self.bridge.get_or_create_session(device_path)
|
||
|
||
# BlueZ expects us to provide a writable fd (unix).
|
||
rfd, wfd = os.pipe()
|
||
att_mtu = int(options.get("mtu", 23))
|
||
sess.set_notify_fd(wfd, att_mtu)
|
||
log_info(f"[DATA] AcquireNotify device={device_path} mtu={att_mtu} fd={wfd}")
|
||
try:
|
||
dup_rfd = os.dup(rfd)
|
||
finally:
|
||
try:
|
||
os.close(rfd)
|
||
except OSError:
|
||
pass
|
||
return dbus.UnixFd(dup_rfd), dbus.UInt16(att_mtu)
|
||
|
||
@dbus.service.method(GATT_CHRC_IFACE, in_signature="", out_signature="")
|
||
def StartNotify(self):
|
||
self.notifying = True
|
||
log_info("[DATA] StartNotify (CCCD enabled)")
|
||
|
||
@dbus.service.method(GATT_CHRC_IFACE, in_signature="", out_signature="")
|
||
def StopNotify(self):
|
||
self.notifying = False
|
||
log_info("[DATA] StopNotify (CCCD disabled)")
|
||
|
||
def send_frame_broadcast(self, frame: bytes):
|
||
if not self.notifying:
|
||
return False
|
||
self._broadcast_sent += 1
|
||
self._update_value_and_notify(frame)
|
||
if self._broadcast_sent <= 20 or (self._broadcast_sent % 200) == 0:
|
||
log_debug(f"[DATA] notify broadcast sent={self._broadcast_sent} bytes={len(frame)} preview={_hex_preview(frame)}")
|
||
return False
|
||
|
||
|
||
class StatusCharacteristic(Characteristic):
|
||
def __init__(self, bus, index, service: Service, bridge: AisHubBridge):
|
||
Characteristic.__init__(self, bus, index, AIS_HUB_STATUS_UUID, ["read", "notify"], service)
|
||
self.bridge = bridge
|
||
self.notifying = False
|
||
|
||
@dbus.service.method(GATT_CHRC_IFACE, in_signature="a{sv}", out_signature="ay")
|
||
def ReadValue(self, options):
|
||
device_path = str(options.get("device", "unknown"))
|
||
sess = self.bridge.get_or_create_session(device_path)
|
||
status = {
|
||
"proto": PROTO_VERSION,
|
||
"server_time": _now_ts(),
|
||
"gps_fix": None,
|
||
"vessels_active": None,
|
||
"ws_source_alive": self.bridge.ws_alive,
|
||
"snapshot_in_progress": sess.snapshot_in_progress,
|
||
"tx_queue": sess.tx_queue_depth(),
|
||
"tx_dropped": sess.tx_dropped,
|
||
}
|
||
return _dbus_bytes(_safe_json_dumps(status).encode("utf-8"))
|
||
|
||
|
||
# ============ LE Advertisement ============
|
||
|
||
class TestAdvertisement(dbus.service.Object):
|
||
PATH_BASE = '/org/bluez/example/advertisement'
|
||
|
||
def __init__(self, bus, index):
|
||
self.path = self.PATH_BASE + str(index)
|
||
self.bus = bus
|
||
dbus.service.Object.__init__(self, bus, self.path)
|
||
|
||
def get_path(self):
|
||
return dbus.ObjectPath(self.path)
|
||
|
||
@dbus.service.method(DBUS_PROP_IFACE,
|
||
in_signature='s',
|
||
out_signature='a{sv}')
|
||
def GetAll(self, interface):
|
||
log_info('[Advertisement] GetAll вызван для интерфейса: ' + interface)
|
||
if interface != LE_ADVERTISEMENT_IFACE:
|
||
log_error('[Advertisement] Неверный интерфейс: ' + interface)
|
||
raise InvalidArgsException()
|
||
# Connectable реклама с явными флагами
|
||
# Важно: IncludeTxPower и другие параметры могут помочь с подключением
|
||
props = {
|
||
'Type': dbus.String('peripheral'),
|
||
'ServiceUUIDs': dbus.Array([
|
||
AIS_HUB_SERVICE_UUID,
|
||
], signature='s'),
|
||
'LocalName': dbus.String('AIS'),
|
||
}
|
||
log_info('[Advertisement] Возвращаем свойства рекламы:')
|
||
log_info(f' Type: {props["Type"]}')
|
||
log_info(f' LocalName: {props["LocalName"]}')
|
||
log_info(f' ServiceUUIDs: {len(props["ServiceUUIDs"])} сервисов')
|
||
return props
|
||
|
||
@dbus.service.method(LE_ADVERTISEMENT_IFACE,
|
||
in_signature='',
|
||
out_signature='')
|
||
def Release(self):
|
||
log_warn('[Advertisement] Release вызван - возможно, клиент отключился или реклама была отменена')
|
||
log_warn('[Advertisement] Это может означать, что реклама была остановлена BlueZ')
|
||
|
||
|
||
# ============ Вспомогательное: поиск адаптера с GATT Manager ============
|
||
|
||
def find_adapter(bus):
|
||
obj = bus.get_object(BLUEZ_SERVICE_NAME, '/')
|
||
om = dbus.Interface(obj, DBUS_OM_IFACE)
|
||
objects = om.GetManagedObjects()
|
||
for path, ifaces in objects.items():
|
||
if GATT_MANAGER_IFACE in ifaces:
|
||
return path
|
||
return None
|
||
|
||
|
||
# ============ main() ============
|
||
|
||
def main():
|
||
global MAIN_LOOP
|
||
|
||
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
|
||
bus = dbus.SystemBus()
|
||
|
||
adapter_path = find_adapter(bus)
|
||
if not adapter_path:
|
||
log_error('Не найден адаптер с GattManager1. '
|
||
'Проверь, что bluetoothd запущен с --experimental')
|
||
return 1
|
||
|
||
log_info(f'Используем адаптер: {adapter_path}')
|
||
|
||
service_manager = dbus.Interface(
|
||
bus.get_object(BLUEZ_SERVICE_NAME, adapter_path),
|
||
GATT_MANAGER_IFACE
|
||
)
|
||
|
||
advertising_manager = dbus.Interface(
|
||
bus.get_object(BLUEZ_SERVICE_NAME, adapter_path),
|
||
LE_ADVERTISING_MANAGER_IFACE
|
||
)
|
||
|
||
app = Application(bus)
|
||
|
||
bridge = AisHubBridge()
|
||
bridge.start()
|
||
|
||
def _on_bluez_properties_changed(interface, changed, invalidated, path=None):
|
||
try:
|
||
if interface != "org.bluez.Device1":
|
||
return
|
||
if "Connected" not in changed:
|
||
return
|
||
connected = bool(changed.get("Connected"))
|
||
dev_path = str(path or "")
|
||
if not dev_path:
|
||
return
|
||
if connected:
|
||
log_info(f"[BlueZ] Device connected: {dev_path} sessions={bridge.session_count()}")
|
||
bridge.get_or_create_session(dev_path)
|
||
else:
|
||
log_info(f"[BlueZ] Device disconnected: {dev_path} -> removing session")
|
||
bridge.remove_session(dev_path)
|
||
log_info(f"[BlueZ] sessions now: {bridge.session_count()}")
|
||
except Exception as e:
|
||
log_warn(f"[BlueZ] PropertiesChanged handler error: {e}")
|
||
|
||
# Track device connect/disconnect to avoid leaking sessions.
|
||
bus.add_signal_receiver(
|
||
_on_bluez_properties_changed,
|
||
dbus_interface=DBUS_PROP_IFACE,
|
||
signal_name="PropertiesChanged",
|
||
path_keyword="path",
|
||
)
|
||
|
||
ais_srv = AisHubService(bus, 0, bridge)
|
||
app.add_service(ais_srv)
|
||
|
||
adv = TestAdvertisement(bus, 0)
|
||
|
||
# Регистрируем GATT апп
|
||
log_info('Регистрируем GATT Application...')
|
||
service_manager.RegisterApplication(
|
||
app.get_path(),
|
||
{},
|
||
reply_handler=lambda: log_info('✅ GATT Application зарегистрирован успешно'),
|
||
error_handler=lambda e: log_error(f'❌ Ошибка RegisterApplication: {e}'),
|
||
)
|
||
|
||
# Регистрируем рекламу
|
||
log_info('Регистрируем рекламу...')
|
||
log_info(f'Путь рекламы: {adv.get_path()}')
|
||
advertising_manager.RegisterAdvertisement(
|
||
adv.get_path(),
|
||
{},
|
||
reply_handler=lambda: log_info('✅ Advertisement зарегистрирован успешно - устройство должно быть видно при сканировании'),
|
||
error_handler=lambda e: log_error(f'❌ Ошибка RegisterAdvertisement: {e}'),
|
||
)
|
||
|
||
log_info('=' * 60)
|
||
log_info('BLE GATT сервер запущен и готов к подключениям')
|
||
log_info(f'Имя устройства: AIS')
|
||
log_info(f'Сервис: AIS Hub ({AIS_HUB_SERVICE_UUID})')
|
||
log_info(f'ais_hub REST: {AIS_HUB_BASE_URL}')
|
||
log_info(f'ais_hub WS: {AIS_HUB_WS_URL}')
|
||
log_info('=' * 60)
|
||
|
||
MAIN_LOOP = GLib.MainLoop()
|
||
try:
|
||
MAIN_LOOP.run()
|
||
except KeyboardInterrupt:
|
||
log_info('Получен сигнал завершения (Ctrl+C)')
|
||
log_info('Останавливаем сервер...')
|
||
try:
|
||
bridge.stop()
|
||
except Exception:
|
||
pass
|
||
try:
|
||
# Отменяем рекламу
|
||
advertising_manager.UnregisterAdvertisement(adv.get_path())
|
||
log_info('✅ Advertisement отменён')
|
||
except Exception as e:
|
||
log_error(f'❌ Ошибка при отмене advertisement: {e}')
|
||
try:
|
||
# Отменяем регистрацию GATT приложения
|
||
service_manager.UnregisterApplication(app.get_path())
|
||
log_info('✅ GATT Application отменён')
|
||
except Exception as e:
|
||
log_error(f'❌ Ошибка при отмене GATT application: {e}')
|
||
log_info('Сервер остановлен')
|
||
return 0
|
||
|
||
|
||
if __name__ == '__main__':
|
||
sys.exit(main() or 0)
|