#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import annotations import asyncio import dbus import dbus.exceptions import dbus.mainloop.glib import dbus.service import json import os import queue import random import threading import struct import sys import time from collections import OrderedDict try: import msgpack # type: ignore _HAS_MSGPACK = True except Exception: msgpack = None # type: ignore _HAS_MSGPACK = False from datetime import datetime from gi.repository import GLib from typing import Any, Optional BLUEZ_SERVICE_NAME = 'org.bluez' GATT_MANAGER_IFACE = 'org.bluez.GattManager1' LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1' LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1' GATT_SERVICE_IFACE = 'org.bluez.GattService1' GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1' DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' MAIN_LOOP = None # ============ Логирование ============ def log(level, message): """Логирование с временной меткой""" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] print(f'[{timestamp}] [{level}] {message}', flush=True) def log_info(message): log('INFO', message) def log_warn(message): log('WARN', message) def log_error(message): log('ERROR', message) def log_debug(message): log('DEBUG', message) def _hex_preview(b: bytes, max_len: int = 32) -> str: if len(b) <= max_len: return b.hex() return b[:max_len].hex() + f"...(+{len(b)-max_len}B)" # ============ AIS Hub BLE protocol v2 UUIDs ============ # NOTE: UUIDs are placeholders; adjust if you already have assigned ones. AIS_HUB_SERVICE_UUID = '34b5f2a0-5b23-4c5a-9b2a-3c4c1a9c0001' AIS_HUB_CONTROL_UUID = '34b5f2a0-5b23-4c5a-9b2a-3c4c1a9c0002' AIS_HUB_DATA_UUID = '34b5f2a0-5b23-4c5a-9b2a-3c4c1a9c0003' AIS_HUB_STATUS_UUID = '34b5f2a0-5b23-4c5a-9b2a-3c4c1a9c0004' # ============ Protocol constants ============ # Version byte semantics: # 0x01 = protocol v1, payload = JSON (UTF-8) # 0x02 = protocol v1, payload = MessagePack (opt-in; requires client support) PROTO_VERSION_JSON = 0x01 PROTO_VERSION_MSGPACK = 0x02 PROTO_VERSION = PROTO_VERSION_JSON # kept for backward compatibility with existing code paths MSG_HELLO_ACK = 0x01 MSG_SNAPSHOT_BEGIN = 0x02 MSG_SNAPSHOT_CHUNK = 0x03 MSG_SNAPSHOT_END = 0x04 MSG_EVENT = 0x05 MSG_STATUS = 0x06 MSG_ERROR = 0x07 MSG_PONG = 0x08 DEFAULT_MAX_PAYLOAD_BYTES = 120 DEFAULT_TX_QUEUE_MAX = 256 # Pacing for broadcast (CCCD / PropertiesChanged) path. # BlueZ serializes PropertiesChanged over D-Bus -> hci0 and coalesces or drops # notifications if we flood it. 6-8 ms between frames keeps Android happy # without hurting snapshot throughput noticeably (~125-160 frames/sec). BROADCAST_FRAME_GAP_SEC = float(os.environ.get("AIS_BLE_BROADCAST_GAP_SEC", "0.008")) BROADCAST_QUEUE_MAX = int(os.environ.get("AIS_BLE_BROADCAST_QUEUE_MAX", "4096")) # Broadcast payload size (bytes inside a single BLE notification payload). # Android negotiates ATT MTU up to 247 -> payload up to ~244B. Default 180 is # safe for most Android devices and cuts frame count ~1.5x vs 120. If all your # clients negotiate high MTU, bump to 200-220 via env. BROADCAST_MAX_PAYLOAD_BYTES = int(os.environ.get("AIS_BLE_BROADCAST_PAYLOAD", "180")) # Encoding for broadcast path. Must match what Android client can decode. # "json" - safe default, works with legacy clients (PROTO_VERSION=0x01) # "msgpack" - ~30-40% smaller; requires client to recognize PROTO_VERSION=0x02 # and have a MessagePack decoder (org.msgpack:msgpack-core on Android). BROADCAST_ENCODING = os.environ.get("AIS_BLE_BROADCAST_ENCODING", "json").lower() if BROADCAST_ENCODING == "msgpack" and not _HAS_MSGPACK: # Fail loud at import; better than silently sending JSON while client expects msgpack. raise RuntimeError( "AIS_BLE_BROADCAST_ENCODING=msgpack but 'msgpack' Python package is not installed. " "Run: pip install msgpack" ) # Snapshot producer throttling: pause scheduling new chunks while the broadcast # queue is above this fraction of its capacity. Prevents silent frame drop # (old behavior cut snapshots at ~699 vessels because _bcast_queue overflowed). SNAPSHOT_DRAIN_THRESHOLD_FRAC = float(os.environ.get("AIS_BLE_SNAPSHOT_DRAIN_FRAC", "0.25")) SNAPSHOT_DRAIN_POLL_SEC = float(os.environ.get("AIS_BLE_SNAPSHOT_DRAIN_POLL_SEC", "0.05")) # Vessels per SNAPSHOT_CHUNK. Larger batches amortize header overhead; with # msgpack + MTU ~185 a single chunk still fits many frames. 40 is a good # middle ground for JSON; msgpack can go higher without hurting latency. SNAPSHOT_VESSELS_PER_CHUNK = int(os.environ.get( "AIS_BLE_SNAPSHOT_BATCH", "60" if BROADCAST_ENCODING == "msgpack" else "40", )) DEFAULT_SNAPSHOT_MAX_VESSELS = int(os.environ.get("AIS_BLE_SNAPSHOT_MAX_VESSELS", "5000")) SNAPSHOT_ENQUEUE_TIMEOUT_SEC = float(os.environ.get("AIS_BLE_SNAPSHOT_ENQUEUE_TIMEOUT_SEC", "30.0")) # Live target updates can arrive as large bursts. Do not drop by aggregate time # throttle; keep the latest event per MMSI and drain them at a BLE-safe pace. TARGET_UPDATE_FLUSH_INTERVAL_SEC = float(os.environ.get("AIS_BLE_TARGET_UPDATE_GAP_SEC", "0.02")) TARGET_UPDATE_BACKPRESSURE_FRAC = float(os.environ.get("AIS_BLE_TARGET_UPDATE_DRAIN_FRAC", "0.25")) DEFAULT_SUBSCRIPTIONS = { "ownship.update", "target.update", "base_station.update", "aton.update", "stats.update", } AIS_HUB_BASE_URL = os.environ.get("AIS_HUB_BASE_URL", "http://127.0.0.1:8081") AIS_HUB_WS_URL = os.environ.get("AIS_HUB_WS_URL", "ws://127.0.0.1:8081/ws") # ============ D-Bus Exceptions ============ class InvalidArgsException(dbus.exceptions.DBusException): _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs' class NotSupportedException(dbus.exceptions.DBusException): _dbus_error_name = 'org.bluez.Error.NotSupported' class FailedException(dbus.exceptions.DBusException): _dbus_error_name = 'org.bluez.Error.Failed' # ============ Application (ObjectManager) ============ class Application(dbus.service.Object): """ Корневой объект, который отдаёт BlueZ все наши сервисы/характеристики через GetManagedObjects. """ def __init__(self, bus): self.path = '/org/bluez/example/app' self.services = [] dbus.service.Object.__init__(self, bus, self.path) def get_path(self): return dbus.ObjectPath(self.path) def add_service(self, service): self.services.append(service) def get_services(self): return self.services @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}') def GetManagedObjects(self): log_info('🔵 GetManagedObjects вызван - клиент запрашивает список сервисов') log_info(' Это означает, что подключение установлено и начинается дискаверинг') managed_objects = {} for service in self.services: managed_objects[service.get_path()] = service.get_properties() for chrc in service.get_characteristics(): managed_objects[chrc.get_path()] = chrc.get_properties() log_info(f' Возвращаем {len(managed_objects)} объектов (сервисы + характеристики)') return managed_objects # ============ GATT Service ============ class Service(dbus.service.Object): def __init__(self, bus, index, uuid, primary): self.path = f'/org/bluez/example/service{index}' self.bus = bus self.uuid = uuid self.primary = primary self.characteristics = [] dbus.service.Object.__init__(self, bus, self.path) def get_path(self): return dbus.ObjectPath(self.path) def add_characteristic(self, chrc): self.characteristics.append(chrc) def get_characteristics(self): return self.characteristics def get_properties(self): return { GATT_SERVICE_IFACE: { 'UUID': self.uuid, 'Primary': dbus.Boolean(self.primary), 'Includes': dbus.Array([], signature='o'), } } # ============ GATT Characteristic (base) ============ class Characteristic(dbus.service.Object): def __init__(self, bus, index, uuid, flags, service): self.path = service.path + f'/char{index}' self.bus = bus self.uuid = uuid self.flags = flags self.service = service self.notifying = False self.value_bytes = bytes([0x00]) # дефолт dbus.service.Object.__init__(self, bus, self.path) def get_path(self): return dbus.ObjectPath(self.path) def get_properties(self): return { GATT_CHRC_IFACE: { 'Service': self.service.get_path(), 'UUID': self.uuid, 'Flags': dbus.Array(self.flags, signature='s'), # Стартовое Value, чтобы клиенты видели, что характеристика живая 'Value': dbus.Array([dbus.Byte(b) for b in self.value_bytes], signature='y'), } } @dbus.service.method(GATT_CHRC_IFACE, in_signature='a{sv}', out_signature='ay') def ReadValue(self, options): print(f'{self.uuid}: ReadValue (base stub)') raise NotSupportedException('Read not implemented') @dbus.service.method(GATT_CHRC_IFACE, in_signature='aya{sv}', out_signature='') def WriteValue(self, value, options): print(f'{self.uuid}: WriteValue (base stub)') raise NotSupportedException('Write not implemented') @dbus.service.method(GATT_CHRC_IFACE, in_signature='', out_signature='') def StartNotify(self): print(f'{self.uuid}: StartNotify (base stub)') raise NotSupportedException('Notifications not supported') @dbus.service.method(GATT_CHRC_IFACE, in_signature='', out_signature='') def StopNotify(self): print(f'{self.uuid}: StopNotify (base stub)') raise NotSupportedException('Notifications not supported') # helper для рассылки notify def _update_value_and_notify(self, new_bytes: bytes): self.value_bytes = new_bytes self.PropertiesChanged( GATT_CHRC_IFACE, {'Value': dbus.Array([dbus.Byte(b) for b in self.value_bytes], signature='y')}, [] ) @dbus.service.signal(DBUS_PROP_IFACE, signature='sa{sv}as') def PropertiesChanged(self, interface, changed, invalidated): pass # ============ AIS Hub v2 server core ============ def _dbus_bytes(data: bytes) -> dbus.Array: return dbus.Array([dbus.Byte(b) for b in data], signature='y') def _now_ts() -> float: return time.time() def _safe_json_dumps(obj: Any) -> str: return json.dumps(obj, ensure_ascii=False, separators=(",", ":")) def _encode_payload(obj: Any, encoding: str) -> tuple[bytes, int]: """ Returns (payload_bytes, proto_version_byte). encoding: "json" | "msgpack" """ if encoding == "msgpack": if not _HAS_MSGPACK: # Should not happen: validated at import time. Fallback to JSON to avoid crash. return _safe_json_dumps(obj).encode("utf-8"), PROTO_VERSION_JSON return msgpack.packb(obj, use_bin_type=True), PROTO_VERSION_MSGPACK return _safe_json_dumps(obj).encode("utf-8"), PROTO_VERSION_JSON def pack_frames( *, msg_type: int, session_msg_id: int, payload_obj: Any, max_payload_bytes: int, encoding: str = "json", ) -> list[bytes]: """ Frames a single logical message into BLE-sized chunks. Header layout (10 bytes, little-endian): """ payload_bytes, proto_byte = _encode_payload(payload_obj, encoding) if max_payload_bytes <= 0: max_payload_bytes = DEFAULT_MAX_PAYLOAD_BYTES chunks = [payload_bytes[i:i + max_payload_bytes] for i in range(0, len(payload_bytes), max_payload_bytes)] if not chunks: chunks = [b""] chunk_count = len(chunks) frames: list[bytes] = [] for idx, chunk in enumerate(chunks): header = struct.pack( " list[bytes]: return pack_frames( msg_type=msg_type, session_msg_id=session_msg_id, payload_obj=payload_obj, max_payload_bytes=max_payload_bytes, encoding="json", ) class ClientSession: def __init__(self, device_path: str): self.device_path = device_path self.connected_at = _now_ts() self.proto = PROTO_VERSION self.subscriptions: set[str] = set(DEFAULT_SUBSCRIPTIONS) self.snapshot_in_progress = False self.snapshot_id = 0 self.filters: dict[str, Any] = {} self.last_ping_ts: float | None = None self._next_msg_id = 1 self._tx_queue: "queue.Queue[bytes]" = queue.Queue(maxsize=DEFAULT_TX_QUEUE_MAX) self.tx_dropped = 0 self.max_payload_bytes = DEFAULT_MAX_PAYLOAD_BYTES self.att_mtu = 23 # Per-session encoding for the AcquireNotify path. Negotiated via "hello" cmd. # Broadcast (CCCD) path uses BROADCAST_ENCODING globally. self.encoding: str = "json" self._notify_fd: int | None = None self._notify_thread: threading.Thread | None = None self._notify_stop = threading.Event() def next_msg_id(self) -> int: mid = self._next_msg_id self._next_msg_id = (self._next_msg_id + 1) & 0xFFFF if self._next_msg_id == 0: self._next_msg_id = 1 return mid def set_notify_fd(self, fd: int, att_mtu: int): old_fd = self._notify_fd self._notify_fd = fd if old_fd is not None and old_fd != fd: try: os.close(old_fd) except OSError: pass self.att_mtu = att_mtu max_att_value = max(20, att_mtu - 3) self.max_payload_bytes = max(1, min(DEFAULT_MAX_PAYLOAD_BYTES, max_att_value - 10)) if self._notify_thread and self._notify_thread.is_alive(): return self._notify_stop.clear() self._notify_thread = threading.Thread(target=self._notify_writer_loop, daemon=True) self._notify_thread.start() def close_notify(self): self._notify_stop.set() fd = self._notify_fd self._notify_fd = None if fd is not None: try: os.close(fd) except OSError: pass def enqueue_frames(self, frames: list[bytes]): for frame in frames: try: self._tx_queue.put_nowait(frame) except queue.Full: try: _ = self._tx_queue.get_nowait() except queue.Empty: pass try: self._tx_queue.put_nowait(frame) except queue.Full: self.tx_dropped += 1 continue self.tx_dropped += 1 if frames: log_debug( f"[TXQ] device={self.device_path} +{len(frames)} frames " f"depth={self.tx_queue_depth()} dropped={self.tx_dropped}" ) async def enqueue_frames_lossless(self, frames: list[bytes], timeout_sec: float) -> bool: for frame in frames: deadline = time.monotonic() + timeout_sec while True: if self._notify_fd is None or self._notify_stop.is_set(): log_warn(f"[TXQ] snapshot enqueue aborted device={self.device_path}; notify fd closed") return False try: self._tx_queue.put_nowait(frame) break except queue.Full: if time.monotonic() > deadline: log_warn( f"[TXQ] snapshot enqueue timeout device={self.device_path} " f"depth={self.tx_queue_depth()}" ) return False await asyncio.sleep(SNAPSHOT_DRAIN_POLL_SEC) if frames: log_debug( f"[TXQ] snapshot device={self.device_path} +{len(frames)} frames " f"depth={self.tx_queue_depth()} dropped={self.tx_dropped}" ) return True def tx_queue_depth(self) -> int: return self._tx_queue.qsize() def _notify_writer_loop(self): while not self._notify_stop.is_set(): fd = self._notify_fd if fd is None: time.sleep(0.05) continue try: frame = self._tx_queue.get(timeout=0.25) except queue.Empty: continue try: os.write(fd, frame) except OSError: log_warn(f"[DATA] os.write failed device={self.device_path}; closing notify fd") self.close_notify() return class AisHubBridge: """ Owns an asyncio loop in a background thread. - REST fetch for snapshot sections - One WS client to ais_hub /ws, fan-out events to sessions """ def __init__(self): self._loop: asyncio.AbstractEventLoop | None = None self._thread: threading.Thread | None = None self._stop = threading.Event() self.ws_alive = False self.last_ws_msg_ts: float | None = None self._sessions: dict[str, ClientSession] = {} self._sessions_lock = threading.Lock() self.data_char: DataCharacteristic | None = None # Global snapshot serialization lock for the broadcast (CCCD) channel. # Android clients share a single notify stream; we MUST NOT interleave # frames from two simultaneous snapshots or each client will receive a # mashup. Per-fd (AcquireNotify) sessions don't need this lock - they # have dedicated pipes. Implemented as an asyncio.Lock used by do_snapshot. self._broadcast_snapshot_lock: asyncio.Lock | None = None # Simple throttling to avoid overwhelming BLE notify path (esp. Android). # Values are minimum interval (seconds) between forwarded events of that type. self._event_min_interval_sec: dict[str, float] = { "ownship.update": 0.33, # ~3 Hz "stats.update": 1.0, # <= 1 Hz (source is ~0.2 Hz anyway) } self._event_last_sent_ts: dict[str, float] = {} self._pending_target_events: "OrderedDict[str, dict[str, Any]]" = OrderedDict() self._target_flush_sent = 0 # Paced broadcast queue: draining via a dedicated thread that schedules # PropertiesChanged emissions on the GLib main loop with BROADCAST_FRAME_GAP_SEC # gap. This prevents BlueZ from coalescing/delaying notifications when snapshot # or bursty events get pushed at once. self._bcast_queue: "queue.Queue[bytes]" = queue.Queue(maxsize=BROADCAST_QUEUE_MAX) self._bcast_dropped = 0 self._bcast_sent = 0 self._bcast_stop = threading.Event() self._bcast_thread: threading.Thread | None = None self._bcast_msg_id_lock = threading.Lock() self._bcast_next_msg_id = 1 def attach_data_characteristic(self, data_char: "DataCharacteristic"): self.data_char = data_char def next_broadcast_msg_id(self) -> int: with self._bcast_msg_id_lock: msg_id = self._bcast_next_msg_id self._bcast_next_msg_id += 1 if self._bcast_next_msg_id > 0xFFFF: self._bcast_next_msg_id = 1 return msg_id def send_broadcast_frame(self, frame: bytes): """ Fallback for clients that only use standard CCCD notifications (Android). Enqueues the frame into the paced broadcast queue; a dedicated thread will schedule it on the GLib main loop with BROADCAST_FRAME_GAP_SEC gap. """ if not self.data_char: return if not self.data_char.notifying: return try: self._bcast_queue.put_nowait(frame) except queue.Full: try: _ = self._bcast_queue.get_nowait() except queue.Empty: pass self._bcast_dropped += 1 try: self._bcast_queue.put_nowait(frame) except queue.Full: self._bcast_dropped += 1 async def enqueue_broadcast_frame_lossless(self, frame: bytes, timeout_sec: float) -> bool: deadline = time.monotonic() + timeout_sec while True: if not self.data_char or not self.data_char.notifying: log_warn("[BCAST] snapshot enqueue aborted: no active DATA notifier") return False try: self._bcast_queue.put_nowait(frame) return True except queue.Full: if time.monotonic() > deadline: log_warn( f"[BCAST] snapshot enqueue timeout: qsize={self._bcast_queue.qsize()} " f"capacity={BROADCAST_QUEUE_MAX}" ) return False await asyncio.sleep(SNAPSHOT_DRAIN_POLL_SEC) async def wait_broadcast_drain(self, threshold: int, timeout_sec: float = 10.0): """ Async backpressure for snapshot producer: block until broadcast queue depth drops below `threshold`. Prevents silent drops when the snapshot would otherwise flood the queue faster than BROADCAST_FRAME_GAP_SEC drains it. """ deadline = time.monotonic() + timeout_sec while self._bcast_queue.qsize() > threshold: if time.monotonic() > deadline: log_warn( f"[BCAST] wait_broadcast_drain timeout: qsize={self._bcast_queue.qsize()} " f"threshold={threshold}" ) return await asyncio.sleep(SNAPSHOT_DRAIN_POLL_SEC) def _broadcast_writer_loop(self): gap = max(0.0, BROADCAST_FRAME_GAP_SEC) while not self._bcast_stop.is_set(): try: frame = self._bcast_queue.get(timeout=0.25) except queue.Empty: continue if not self.data_char or not self.data_char.notifying: # Drop silently if no subscribers (avoid growing backlog when idle). continue GLib.idle_add(self.data_char.send_frame_broadcast, frame) self._bcast_sent += 1 if self._bcast_sent <= 10 or (self._bcast_sent % 500) == 0: log_debug( f"[BCAST] paced emit #{self._bcast_sent} qdepth={self._bcast_queue.qsize()} " f"dropped={self._bcast_dropped} gap_ms={gap * 1000:.1f}" ) if gap > 0: time.sleep(gap) def start(self): if self._thread and self._thread.is_alive(): return self._stop.clear() self._thread = threading.Thread(target=self._run_loop_thread, daemon=True) self._thread.start() self._bcast_stop.clear() if not self._bcast_thread or not self._bcast_thread.is_alive(): self._bcast_thread = threading.Thread( target=self._broadcast_writer_loop, name="ais_ble_bcast_writer", daemon=True, ) self._bcast_thread.start() def stop(self): self._stop.set() self._bcast_stop.set() loop = self._loop if loop: loop.call_soon_threadsafe(loop.stop) def get_or_create_session(self, device_path: str) -> ClientSession: with self._sessions_lock: sess = self._sessions.get(device_path) if sess is None: sess = ClientSession(device_path) self._sessions[device_path] = sess return sess def remove_session(self, device_path: str): with self._sessions_lock: sess = self._sessions.pop(device_path, None) if sess: sess.close_notify() def sessions_snapshot(self) -> list[ClientSession]: with self._sessions_lock: return list(self._sessions.values()) def session_count(self) -> int: with self._sessions_lock: return len(self._sessions) def submit_coroutine(self, coro: "asyncio.Future[Any]"): loop = self._loop if not loop: raise RuntimeError("AisHubBridge loop not started") return asyncio.run_coroutine_threadsafe(coro, loop) def _run_loop_thread(self): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) self._loop = loop async def runner(): # asyncio primitives must be created inside the loop they belong to. self._broadcast_snapshot_lock = asyncio.Lock() tasks = [ asyncio.create_task(self._ws_loop(), name="ais_hub_ws_loop"), asyncio.create_task(self._target_update_flush_loop(), name="ais_hub_target_flush_loop"), ] try: while not self._stop.is_set(): await asyncio.sleep(0.25) finally: for t in tasks: t.cancel() await asyncio.gather(*tasks, return_exceptions=True) try: loop.run_until_complete(runner()) finally: try: loop.close() except Exception: pass async def _ws_loop(self): try: import aiohttp except Exception as e: log_error(f"[ais_hub] aiohttp not installed: {e}") self.ws_alive = False return backoff = 1.0 while not self._stop.is_set(): try: async with aiohttp.ClientSession() as session: log_info(f"[ais_hub] WS connecting: {AIS_HUB_WS_URL}") async with session.ws_connect(AIS_HUB_WS_URL, heartbeat=30) as ws: self.ws_alive = True log_info("[ais_hub] WS connected") backoff = 1.0 async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: self.last_ws_msg_ts = _now_ts() try: ev = json.loads(msg.data) except Exception: continue ev_type = ev.get("type") if not ev_type or ev_type == "state.snapshot": continue self._fanout_event(ev) elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR): break except Exception: self.ws_alive = False log_warn(f"[ais_hub] WS disconnected; retry in {backoff:.1f}s") await asyncio.sleep(backoff) backoff = min(15.0, backoff * 1.5) async def _target_update_flush_loop(self): gap = max(0.0, TARGET_UPDATE_FLUSH_INTERVAL_SEC) drain_threshold = max(64, int(BROADCAST_QUEUE_MAX * TARGET_UPDATE_BACKPRESSURE_FRAC)) while not self._stop.is_set(): if not self._pending_target_events: await asyncio.sleep(0.05) continue sessions = self.sessions_snapshot() if any(s.snapshot_in_progress for s in sessions): await asyncio.sleep(0.10) continue if self._bcast_queue.qsize() > drain_threshold: await asyncio.sleep(SNAPSHOT_DRAIN_POLL_SEC) continue _, ev = self._pending_target_events.popitem(last=False) self._fanout_event_direct(ev) self._target_flush_sent += 1 if self._target_flush_sent <= 10 or (self._target_flush_sent % 500) == 0: log_debug( f"[FANOUT] target flush sent={self._target_flush_sent} " f"pending={len(self._pending_target_events)} bcast_q={self._bcast_queue.qsize()}" ) if gap > 0: await asyncio.sleep(gap) def _fanout_event(self, ev: dict[str, Any]): ev_type = ev.get("type") if ev_type == "target.update": data = ev.get("data") mmsi = None if isinstance(data, dict): mmsi = data.get("mmsi") if mmsi is not None: key = str(mmsi) if key in self._pending_target_events: del self._pending_target_events[key] self._pending_target_events[key] = ev return self._fanout_event_direct(ev) def _fanout_event_direct(self, ev: dict[str, Any]): ev_type = ev.get("type") if not ev_type: return now = _now_ts() min_iv = self._event_min_interval_sec.get(ev_type) if min_iv is not None: last = self._event_last_sent_ts.get(ev_type) if last is not None and (now - last) < min_iv: return self._event_last_sent_ts[ev_type] = now sessions = self.sessions_snapshot() # Suppress low-priority live events (ownship.update, stats.update) while any # session has a snapshot transfer in progress. It's OK for the client to rely # on the snapshot data; the next live update will catch up once snapshot ends. # This prevents interleaving small frames into the snapshot stream and # overwhelming BlueZ / Android's BLE stack (which then drops the link). snapshot_active = any(s.snapshot_in_progress for s in sessions) suppress_on_snapshot = ev_type in ("ownship.update", "stats.update") if snapshot_active and suppress_on_snapshot: log_debug( f"[FANOUT] suppress ev_type={ev_type} during active snapshot " f"sessions={len(sessions)}" ) return # Broadcast path (CCCD notify): do it ONCE per event if at least one CCCD-client wants it. # Skip broadcast entirely if all CCCD-clients are currently in snapshot. wants_broadcast = any( (s._notify_fd is None) and (ev_type in s.subscriptions) and not s.snapshot_in_progress for s in sessions ) if wants_broadcast: msg_id = self.next_broadcast_msg_id() frames = pack_frames( msg_type=MSG_EVENT, session_msg_id=msg_id, payload_obj=ev, max_payload_bytes=BROADCAST_MAX_PAYLOAD_BYTES, encoding=BROADCAST_ENCODING, ) for fr in frames: self.send_broadcast_frame(fr) log_debug( f"[DATA] broadcast frames={len(frames)} msg_type=0x{MSG_EVENT:02X} " f"msg_id={msg_id} ev_type={ev_type} sessions={len(sessions)} enc={BROADCAST_ENCODING}" ) # Per-client fd path (AcquireNotify): send only to those sessions. for sess in sessions: if sess._notify_fd is None: continue if ev_type not in sess.subscriptions: continue if sess.snapshot_in_progress: continue msg_id = sess.next_msg_id() frames = pack_frames( msg_type=MSG_EVENT, session_msg_id=msg_id, payload_obj=ev, max_payload_bytes=sess.max_payload_bytes, encoding=sess.encoding, ) sess.enqueue_frames(frames) async def fetch_snapshot_sections(self, include: list[str], max_vessels: int) -> dict[str, Any]: try: import aiohttp except Exception as e: raise RuntimeError(f"aiohttp not installed: {e}") sections: dict[str, Any] = {} async with aiohttp.ClientSession() as s: if "ownship" in include: async with s.get(f"{AIS_HUB_BASE_URL}/api/v1/ownship") as r: r.raise_for_status() sections["ownship"] = await r.json() if "vessels" in include: async with s.get(f"{AIS_HUB_BASE_URL}/api/v1/vessels", params={"limit": max_vessels}) as r: r.raise_for_status() sections["vessels"] = await r.json() if "base_stations" in include: async with s.get(f"{AIS_HUB_BASE_URL}/api/v1/base_stations") as r: r.raise_for_status() sections["base_stations"] = await r.json() if "atons" in include: async with s.get(f"{AIS_HUB_BASE_URL}/api/v1/atons") as r: r.raise_for_status() sections["atons"] = await r.json() if "stats" in include: async with s.get(f"{AIS_HUB_BASE_URL}/api/v1/stats") as r: r.raise_for_status() sections["stats"] = await r.json() return sections class AisHubService(Service): def __init__(self, bus, index, bridge: AisHubBridge): Service.__init__(self, bus, index, AIS_HUB_SERVICE_UUID, True) self.bridge = bridge self.control = ControlCharacteristic(bus, 0, self, bridge) self.data = DataCharacteristic(bus, 1, self, bridge) self.status = StatusCharacteristic(bus, 2, self, bridge) self.bridge.attach_data_characteristic(self.data) self.add_characteristic(self.control) self.add_characteristic(self.data) self.add_characteristic(self.status) class ControlCharacteristic(Characteristic): def __init__(self, bus, index, service: Service, bridge: AisHubBridge): Characteristic.__init__(self, bus, index, AIS_HUB_CONTROL_UUID, ["write", "write-without-response"], service) self.bridge = bridge @dbus.service.method(GATT_CHRC_IFACE, in_signature="aya{sv}", out_signature="") def WriteValue(self, value, options): raw = bytes(value) device_path = str(options.get("device", "unknown")) sess = self.bridge.get_or_create_session(device_path) log_info( f"[CONTROL] WriteValue device={device_path} bytes={len(raw)} " f"preview={_hex_preview(raw)}" ) try: cmd_obj = json.loads(raw.decode("utf-8")) except Exception: self._send_error(sess, "bad_command", "invalid JSON") return cmd = cmd_obj.get("cmd") log_info(f"[CONTROL] cmd={cmd} device={device_path} json={cmd_obj}") if cmd == "hello": # Optional encoding negotiation. Client can pass {"encodings": ["msgpack","json"]}. # We pick the first one we support. Default: "json" for backward compat. client_encodings = cmd_obj.get("encodings") chosen = "json" if isinstance(client_encodings, list): for enc in client_encodings: enc_s = str(enc).lower() if enc_s == "msgpack" and _HAS_MSGPACK: chosen = "msgpack" break if enc_s == "json": chosen = "json" break sess.encoding = chosen payload = { "ok": True, "proto": PROTO_VERSION, "server": "ais_ble", "server_time": _now_ts(), "encoding": chosen, "features": { "snapshot": True, "live_events": True, "filters": True, "compression": False, "msgpack": _HAS_MSGPACK, "multi_client": True, }, } # HELLO_ACK is always JSON so a fresh client can decode it before it # knows which encoding the server picked. msg_id = sess.next_msg_id() frames = pack_frames( msg_type=MSG_HELLO_ACK, session_msg_id=msg_id, payload_obj=payload, max_payload_bytes=sess.max_payload_bytes, encoding="json", ) if sess._notify_fd is not None: sess.enqueue_frames(frames) else: for fr in frames: self.bridge.send_broadcast_frame(fr) return if cmd == "ping": payload = {"id": cmd_obj.get("id"), "server_time": _now_ts()} self._send(sess, MSG_PONG, payload) sess.last_ping_ts = _now_ts() return if cmd == "subscribe": events = cmd_obj.get("events") if isinstance(events, list) and events: sess.subscriptions = set(str(e) for e in events) else: sess.subscriptions = set(DEFAULT_SUBSCRIPTIONS) self._send(sess, MSG_STATUS, {"ok": True, "subscriptions": sorted(sess.subscriptions)}) return if cmd == "unsubscribe": events = cmd_obj.get("events") if isinstance(events, list): for e in events: sess.subscriptions.discard(str(e)) self._send(sess, MSG_STATUS, {"ok": True, "subscriptions": sorted(sess.subscriptions)}) return if cmd == "set_filters": filt = cmd_obj.get("targets") if isinstance(filt, dict): sess.filters["targets"] = filt self._send(sess, MSG_STATUS, {"ok": True, "filters": sess.filters}) return if cmd == "get_snapshot": include = cmd_obj.get("include") or ["ownship", "vessels", "base_stations", "atons", "stats"] if not isinstance(include, list): include = ["ownship", "vessels", "base_stations", "atons", "stats"] include = [str(x) for x in include] max_vessels = cmd_obj.get("max_vessels") if max_vessels is None: max_vessels = DEFAULT_SNAPSHOT_MAX_VESSELS try: max_vessels = int(max_vessels) except Exception: max_vessels = DEFAULT_SNAPSHOT_MAX_VESSELS if max_vessels <= 0: max_vessels = DEFAULT_SNAPSHOT_MAX_VESSELS if sess.snapshot_in_progress: self._send_error(sess, "snapshot_busy", "snapshot already in progress") return sess.snapshot_in_progress = True sess.snapshot_id = (sess.snapshot_id + 1) & 0xFFFF or random.randint(1, 0xFFFF) async def do_snapshot(): # Serialize broadcast snapshots: only one CCCD-snapshot at a time # across ALL Android clients, otherwise two clients requesting # snapshots simultaneously would receive interleaved frames on # the shared notify stream and decode garbage. Per-fd sessions # have private pipes and don't need this lock. if sess._notify_fd is None and self.bridge._broadcast_snapshot_lock is not None: log_debug( f"[SNAPSHOT] waiting broadcast lock sess={sess.device_path} " f"snapshot_id={sess.snapshot_id}" ) await self.bridge._broadcast_snapshot_lock.acquire() log_debug(f"[SNAPSHOT] acquired broadcast lock sess={sess.device_path}") try: await _do_snapshot_inner() finally: if sess._notify_fd is None and self.bridge._broadcast_snapshot_lock is not None: try: self.bridge._broadcast_snapshot_lock.release() except Exception: pass async def _do_snapshot_inner(): try: sections = await self.bridge.fetch_snapshot_sections(include, max_vessels) except Exception as e: GLib.idle_add(self._send_error, sess, "snapshot_failed", str(e)) GLib.idle_add(self._snapshot_done, sess) return total_objects = {} if "ownship" in sections: total_objects["ownship"] = 1 if "stats" in sections: total_objects["stats"] = 1 if "vessels" in sections: total_objects["vessels"] = len(sections["vessels"]) if isinstance(sections["vessels"], list) else 0 if "base_stations" in sections: total_objects["base_stations"] = ( len(sections["base_stations"]) if isinstance(sections["base_stations"], list) else 0 ) if "atons" in sections: total_objects["atons"] = len(sections["atons"]) if isinstance(sections["atons"], list) else 0 # Backpressure threshold: pause producer once broadcast queue # exceeds this depth. Keeps memory bounded AND prevents the # silent-drop-at-1024 bug that capped snapshots at ~699 vessels. drain_threshold = max(64, int(BROADCAST_QUEUE_MAX * SNAPSHOT_DRAIN_THRESHOLD_FRAC)) def abort_snapshot(reason: str) -> bool: log_warn( f"[SNAPSHOT] abort sess={sess.device_path} " f"snapshot_id={sess.snapshot_id}: {reason}" ) GLib.idle_add(self._send_error, sess, "snapshot_failed", reason) GLib.idle_add(self._snapshot_done, sess) return False async def gated_send(msg_type: int, payload_obj: Any) -> bool: # Snapshot frames must be in the downstream queue before we # wait on it. Deferring _send through GLib.idle_add made the # producer outrun the queue and declare the snapshot done # while its tail was still only scheduled. msg_id = sess.next_msg_id() if sess._notify_fd is None: await self.bridge.wait_broadcast_drain(drain_threshold) frames = pack_frames( msg_type=msg_type, session_msg_id=msg_id, payload_obj=payload_obj, max_payload_bytes=BROADCAST_MAX_PAYLOAD_BYTES, encoding=BROADCAST_ENCODING, ) for fr in frames: ok = await self.bridge.enqueue_broadcast_frame_lossless( fr, SNAPSHOT_ENQUEUE_TIMEOUT_SEC, ) if not ok: return abort_snapshot("broadcast DATA queue unavailable") log_debug( f"[DATA] snapshot broadcast frames={len(frames)} msg_type=0x{msg_type:02X} " f"msg_id={msg_id} device={sess.device_path} enc={BROADCAST_ENCODING}" ) return True # Per-fd queue is small (256). Wait if it's mostly full and # then enqueue without dropping old snapshot frames. per_fd_threshold = max(32, DEFAULT_TX_QUEUE_MAX // 4) deadline = time.monotonic() + 10.0 while sess.tx_queue_depth() > per_fd_threshold: if time.monotonic() > deadline: log_warn( f"[SNAPSHOT] per-fd drain timeout sess={sess.device_path} " f"depth={sess.tx_queue_depth()}" ) break await asyncio.sleep(SNAPSHOT_DRAIN_POLL_SEC) frames = pack_frames( msg_type=msg_type, session_msg_id=msg_id, payload_obj=payload_obj, max_payload_bytes=sess.max_payload_bytes, encoding=sess.encoding, ) ok = await sess.enqueue_frames_lossless(frames, SNAPSHOT_ENQUEUE_TIMEOUT_SEC) if not ok: return abort_snapshot("per-client DATA queue unavailable") log_debug( f"[DATA] snapshot fd frames={len(frames)} msg_type=0x{msg_type:02X} " f"msg_id={msg_id} device={sess.device_path} enc={sess.encoding}" ) return True if not await gated_send( MSG_SNAPSHOT_BEGIN, { "snapshot_id": sess.snapshot_id, "sections": include, "total_objects": total_objects, }, ): return if "ownship" in sections: if not await gated_send( MSG_SNAPSHOT_CHUNK, { "snapshot_id": sess.snapshot_id, "section": "ownship", "seq": 1, "more": False, "item": sections["ownship"], }, ): return if "stats" in sections: if not await gated_send( MSG_SNAPSHOT_CHUNK, { "snapshot_id": sess.snapshot_id, "section": "stats", "seq": 1, "more": False, "item": sections["stats"], }, ): return async def send_list_section(section_name: str) -> bool: if section_name not in sections or not isinstance(sections[section_name], list): return True items_all: list[Any] = sections[section_name] batch_size = SNAPSHOT_VESSELS_PER_CHUNK seq = 1 sent = 0 for i in range(0, len(items_all), batch_size): items = items_all[i:i + batch_size] more = (i + batch_size) < len(items_all) if not await gated_send( MSG_SNAPSHOT_CHUNK, { "snapshot_id": sess.snapshot_id, "section": section_name, "seq": seq, "more": more, "items": items, }, ): return False seq += 1 sent += len(items) if (seq % 10) == 0: log_debug( f"[SNAPSHOT] {section_name} progress sess={sess.device_path} " f"sent={sent}/{len(items_all)} seq={seq} " f"bcast_q={self.bridge._bcast_queue.qsize()}" ) return True for section_name in ("vessels", "base_stations", "atons"): if not await send_list_section(section_name): return if not await gated_send(MSG_SNAPSHOT_END, {"snapshot_id": sess.snapshot_id, "ok": True}): return # Wait for the broadcast queue to actually drain before clearing # snapshot_in_progress. Otherwise stats.update / ownship.update # interleave into the still-emitting snapshot tail and the next # CCCD client's snapshot can start while frames are still flying. if sess._notify_fd is None: await self.bridge.wait_broadcast_drain(0, timeout_sec=30.0) else: deadline = time.monotonic() + 30.0 while sess.tx_queue_depth() > 0: if time.monotonic() > deadline: log_warn( f"[SNAPSHOT] per-fd final drain timeout sess={sess.device_path} " f"depth={sess.tx_queue_depth()}" ) break await asyncio.sleep(SNAPSHOT_DRAIN_POLL_SEC) GLib.idle_add(self._snapshot_done, sess) log_info( f"[SNAPSHOT] done sess={sess.device_path} snapshot_id={sess.snapshot_id} " f"vessels={total_objects.get('vessels', 0)} " f"base_stations={total_objects.get('base_stations', 0)} " f"atons={total_objects.get('atons', 0)} " f"bcast_dropped={self.bridge._bcast_dropped}" ) try: self.bridge.submit_coroutine(do_snapshot()) except Exception as e: self._send_error(sess, "snapshot_failed", str(e)) self._snapshot_done(sess) return self._send_error(sess, "bad_command", "unknown cmd") def _snapshot_done(self, sess: ClientSession): sess.snapshot_in_progress = False return False def _send(self, sess: ClientSession, msg_type: int, payload_obj: Any): msg_id = sess.next_msg_id() # Per-fd path uses negotiated session encoding; broadcast path uses the # global broadcast encoding (must match what ALL CCCD clients can decode). if sess._notify_fd is not None: frames = pack_frames( msg_type=msg_type, session_msg_id=msg_id, payload_obj=payload_obj, max_payload_bytes=sess.max_payload_bytes, encoding=sess.encoding, ) sess.enqueue_frames(frames) else: frames = pack_frames( msg_type=msg_type, session_msg_id=msg_id, payload_obj=payload_obj, max_payload_bytes=BROADCAST_MAX_PAYLOAD_BYTES, encoding=BROADCAST_ENCODING, ) for fr in frames: self.bridge.send_broadcast_frame(fr) log_debug( f"[DATA] broadcast frames={len(frames)} msg_type=0x{msg_type:02X} " f"msg_id={msg_id} device={sess.device_path} enc={BROADCAST_ENCODING}" ) return False def _send_error(self, sess: ClientSession, code: str, message: str): return self._send(sess, MSG_ERROR, {"code": code, "message": message}) class DataCharacteristic(Characteristic): """ Per-client notifications via BlueZ AcquireNotify(). """ def __init__(self, bus, index, service: Service, bridge: AisHubBridge): Characteristic.__init__(self, bus, index, AIS_HUB_DATA_UUID, ["notify"], service) self.bridge = bridge self._broadcast_sent = 0 @dbus.service.method(GATT_CHRC_IFACE, in_signature="a{sv}", out_signature="hq") def AcquireNotify(self, options): device_path = str(options.get("device", "unknown")) sess = self.bridge.get_or_create_session(device_path) # BlueZ expects us to provide a writable fd (unix). rfd, wfd = os.pipe() att_mtu = int(options.get("mtu", 23)) sess.set_notify_fd(wfd, att_mtu) log_info(f"[DATA] AcquireNotify device={device_path} mtu={att_mtu} fd={wfd}") try: dup_rfd = os.dup(rfd) finally: try: os.close(rfd) except OSError: pass return dbus.UnixFd(dup_rfd), dbus.UInt16(att_mtu) @dbus.service.method(GATT_CHRC_IFACE, in_signature="", out_signature="") def StartNotify(self): self.notifying = True log_info("[DATA] StartNotify (CCCD enabled)") @dbus.service.method(GATT_CHRC_IFACE, in_signature="", out_signature="") def StopNotify(self): self.notifying = False log_info("[DATA] StopNotify (CCCD disabled)") def send_frame_broadcast(self, frame: bytes): if not self.notifying: return False self._broadcast_sent += 1 self._update_value_and_notify(frame) if self._broadcast_sent <= 20 or (self._broadcast_sent % 200) == 0: log_debug(f"[DATA] notify broadcast sent={self._broadcast_sent} bytes={len(frame)} preview={_hex_preview(frame)}") return False class StatusCharacteristic(Characteristic): def __init__(self, bus, index, service: Service, bridge: AisHubBridge): Characteristic.__init__(self, bus, index, AIS_HUB_STATUS_UUID, ["read", "notify"], service) self.bridge = bridge self.notifying = False @dbus.service.method(GATT_CHRC_IFACE, in_signature="a{sv}", out_signature="ay") def ReadValue(self, options): device_path = str(options.get("device", "unknown")) sess = self.bridge.get_or_create_session(device_path) status = { "proto": PROTO_VERSION, "server_time": _now_ts(), "gps_fix": None, "vessels_active": None, "ws_source_alive": self.bridge.ws_alive, "snapshot_in_progress": sess.snapshot_in_progress, "tx_queue": sess.tx_queue_depth(), "tx_dropped": sess.tx_dropped, } return _dbus_bytes(_safe_json_dumps(status).encode("utf-8")) # ============ LE Advertisement ============ class TestAdvertisement(dbus.service.Object): PATH_BASE = '/org/bluez/example/advertisement' def __init__(self, bus, index): self.path = self.PATH_BASE + str(index) self.bus = bus dbus.service.Object.__init__(self, bus, self.path) def get_path(self): return dbus.ObjectPath(self.path) @dbus.service.method(DBUS_PROP_IFACE, in_signature='s', out_signature='a{sv}') def GetAll(self, interface): log_info('[Advertisement] GetAll вызван для интерфейса: ' + interface) if interface != LE_ADVERTISEMENT_IFACE: log_error('[Advertisement] Неверный интерфейс: ' + interface) raise InvalidArgsException() # Connectable реклама с явными флагами # Важно: IncludeTxPower и другие параметры могут помочь с подключением props = { 'Type': dbus.String('peripheral'), 'ServiceUUIDs': dbus.Array([ AIS_HUB_SERVICE_UUID, ], signature='s'), 'LocalName': dbus.String('AIS'), } log_info('[Advertisement] Возвращаем свойства рекламы:') log_info(f' Type: {props["Type"]}') log_info(f' LocalName: {props["LocalName"]}') log_info(f' ServiceUUIDs: {len(props["ServiceUUIDs"])} сервисов') return props @dbus.service.method(LE_ADVERTISEMENT_IFACE, in_signature='', out_signature='') def Release(self): log_warn('[Advertisement] Release вызван - возможно, клиент отключился или реклама была отменена') log_warn('[Advertisement] Это может означать, что реклама была остановлена BlueZ') # ============ Вспомогательное: поиск адаптера с GATT Manager ============ def find_adapter(bus): obj = bus.get_object(BLUEZ_SERVICE_NAME, '/') om = dbus.Interface(obj, DBUS_OM_IFACE) objects = om.GetManagedObjects() for path, ifaces in objects.items(): if GATT_MANAGER_IFACE in ifaces: return path return None # ============ main() ============ def main(): global MAIN_LOOP dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() adapter_path = find_adapter(bus) if not adapter_path: log_error('Не найден адаптер с GattManager1. ' 'Проверь, что bluetoothd запущен с --experimental') return 1 log_info(f'Используем адаптер: {adapter_path}') service_manager = dbus.Interface( bus.get_object(BLUEZ_SERVICE_NAME, adapter_path), GATT_MANAGER_IFACE ) advertising_manager = dbus.Interface( bus.get_object(BLUEZ_SERVICE_NAME, adapter_path), LE_ADVERTISING_MANAGER_IFACE ) app = Application(bus) bridge = AisHubBridge() bridge.start() def _on_bluez_properties_changed(interface, changed, invalidated, path=None): try: if interface != "org.bluez.Device1": return if "Connected" not in changed: return connected = bool(changed.get("Connected")) dev_path = str(path or "") if not dev_path: return if connected: log_info(f"[BlueZ] Device connected: {dev_path} sessions={bridge.session_count()}") bridge.get_or_create_session(dev_path) else: log_info(f"[BlueZ] Device disconnected: {dev_path} -> removing session") bridge.remove_session(dev_path) log_info(f"[BlueZ] sessions now: {bridge.session_count()}") except Exception as e: log_warn(f"[BlueZ] PropertiesChanged handler error: {e}") # Track device connect/disconnect to avoid leaking sessions. bus.add_signal_receiver( _on_bluez_properties_changed, dbus_interface=DBUS_PROP_IFACE, signal_name="PropertiesChanged", path_keyword="path", ) ais_srv = AisHubService(bus, 0, bridge) app.add_service(ais_srv) adv = TestAdvertisement(bus, 0) # Регистрируем GATT апп log_info('Регистрируем GATT Application...') service_manager.RegisterApplication( app.get_path(), {}, reply_handler=lambda: log_info('✅ GATT Application зарегистрирован успешно'), error_handler=lambda e: log_error(f'❌ Ошибка RegisterApplication: {e}'), ) # Регистрируем рекламу log_info('Регистрируем рекламу...') log_info(f'Путь рекламы: {adv.get_path()}') advertising_manager.RegisterAdvertisement( adv.get_path(), {}, reply_handler=lambda: log_info('✅ Advertisement зарегистрирован успешно - устройство должно быть видно при сканировании'), error_handler=lambda e: log_error(f'❌ Ошибка RegisterAdvertisement: {e}'), ) log_info('=' * 60) log_info('BLE GATT сервер запущен и готов к подключениям') log_info(f'Имя устройства: AIS') log_info(f'Сервис: AIS Hub ({AIS_HUB_SERVICE_UUID})') log_info(f'ais_hub REST: {AIS_HUB_BASE_URL}') log_info(f'ais_hub WS: {AIS_HUB_WS_URL}') log_info('=' * 60) MAIN_LOOP = GLib.MainLoop() try: MAIN_LOOP.run() except KeyboardInterrupt: log_info('Получен сигнал завершения (Ctrl+C)') log_info('Останавливаем сервер...') try: bridge.stop() except Exception: pass try: # Отменяем рекламу advertising_manager.UnregisterAdvertisement(adv.get_path()) log_info('✅ Advertisement отменён') except Exception as e: log_error(f'❌ Ошибка при отмене advertisement: {e}') try: # Отменяем регистрацию GATT приложения service_manager.UnregisterApplication(app.get_path()) log_info('✅ GATT Application отменён') except Exception as e: log_error(f'❌ Ошибка при отмене GATT application: {e}') log_info('Сервер остановлен') return 0 if __name__ == '__main__': sys.exit(main() or 0)