Files
AndroidAisMap/ble_gatt.py
T

1539 lines
61 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import asyncio
import dbus
import dbus.exceptions
import dbus.mainloop.glib
import dbus.service
import json
import os
import queue
import random
import threading
import struct
import sys
import time
from collections import OrderedDict
try:
import msgpack # type: ignore
_HAS_MSGPACK = True
except Exception:
msgpack = None # type: ignore
_HAS_MSGPACK = False
from datetime import datetime
from gi.repository import GLib
from typing import Any, Optional
BLUEZ_SERVICE_NAME = 'org.bluez'
GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1'
GATT_SERVICE_IFACE = 'org.bluez.GattService1'
GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
MAIN_LOOP = None
# ============ Логирование ============
def log(level, message):
"""Логирование с временной меткой"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
print(f'[{timestamp}] [{level}] {message}', flush=True)
def log_info(message):
log('INFO', message)
def log_warn(message):
log('WARN', message)
def log_error(message):
log('ERROR', message)
def log_debug(message):
log('DEBUG', message)
def _hex_preview(b: bytes, max_len: int = 32) -> str:
if len(b) <= max_len:
return b.hex()
return b[:max_len].hex() + f"...(+{len(b)-max_len}B)"
# ============ AIS Hub BLE protocol v2 UUIDs ============
# NOTE: UUIDs are placeholders; adjust if you already have assigned ones.
AIS_HUB_SERVICE_UUID = '34b5f2a0-5b23-4c5a-9b2a-3c4c1a9c0001'
AIS_HUB_CONTROL_UUID = '34b5f2a0-5b23-4c5a-9b2a-3c4c1a9c0002'
AIS_HUB_DATA_UUID = '34b5f2a0-5b23-4c5a-9b2a-3c4c1a9c0003'
AIS_HUB_STATUS_UUID = '34b5f2a0-5b23-4c5a-9b2a-3c4c1a9c0004'
# ============ Protocol constants ============
# Version byte semantics:
# 0x01 = protocol v1, payload = JSON (UTF-8)
# 0x02 = protocol v1, payload = MessagePack (opt-in; requires client support)
PROTO_VERSION_JSON = 0x01
PROTO_VERSION_MSGPACK = 0x02
PROTO_VERSION = PROTO_VERSION_JSON # kept for backward compatibility with existing code paths
MSG_HELLO_ACK = 0x01
MSG_SNAPSHOT_BEGIN = 0x02
MSG_SNAPSHOT_CHUNK = 0x03
MSG_SNAPSHOT_END = 0x04
MSG_EVENT = 0x05
MSG_STATUS = 0x06
MSG_ERROR = 0x07
MSG_PONG = 0x08
DEFAULT_MAX_PAYLOAD_BYTES = 120
DEFAULT_TX_QUEUE_MAX = 256
# Pacing for broadcast (CCCD / PropertiesChanged) path.
# BlueZ serializes PropertiesChanged over D-Bus -> hci0 and coalesces or drops
# notifications if we flood it. 6-8 ms between frames keeps Android happy
# without hurting snapshot throughput noticeably (~125-160 frames/sec).
BROADCAST_FRAME_GAP_SEC = float(os.environ.get("AIS_BLE_BROADCAST_GAP_SEC", "0.008"))
BROADCAST_QUEUE_MAX = int(os.environ.get("AIS_BLE_BROADCAST_QUEUE_MAX", "4096"))
# Broadcast payload size (bytes inside a single BLE notification payload).
# Android negotiates ATT MTU up to 247 -> payload up to ~244B. Default 180 is
# safe for most Android devices and cuts frame count ~1.5x vs 120. If all your
# clients negotiate high MTU, bump to 200-220 via env.
BROADCAST_MAX_PAYLOAD_BYTES = int(os.environ.get("AIS_BLE_BROADCAST_PAYLOAD", "180"))
# Encoding for broadcast path. Must match what Android client can decode.
# "json" - safe default, works with legacy clients (PROTO_VERSION=0x01)
# "msgpack" - ~30-40% smaller; requires client to recognize PROTO_VERSION=0x02
# and have a MessagePack decoder (org.msgpack:msgpack-core on Android).
BROADCAST_ENCODING = os.environ.get("AIS_BLE_BROADCAST_ENCODING", "json").lower()
if BROADCAST_ENCODING == "msgpack" and not _HAS_MSGPACK:
# Fail loud at import; better than silently sending JSON while client expects msgpack.
raise RuntimeError(
"AIS_BLE_BROADCAST_ENCODING=msgpack but 'msgpack' Python package is not installed. "
"Run: pip install msgpack"
)
# Snapshot producer throttling: pause scheduling new chunks while the broadcast
# queue is above this fraction of its capacity. Prevents silent frame drop
# (old behavior cut snapshots at ~699 vessels because _bcast_queue overflowed).
SNAPSHOT_DRAIN_THRESHOLD_FRAC = float(os.environ.get("AIS_BLE_SNAPSHOT_DRAIN_FRAC", "0.25"))
SNAPSHOT_DRAIN_POLL_SEC = float(os.environ.get("AIS_BLE_SNAPSHOT_DRAIN_POLL_SEC", "0.05"))
# Vessels per SNAPSHOT_CHUNK. Larger batches amortize header overhead; with
# msgpack + MTU ~185 a single chunk still fits many frames. 40 is a good
# middle ground for JSON; msgpack can go higher without hurting latency.
SNAPSHOT_VESSELS_PER_CHUNK = int(os.environ.get(
"AIS_BLE_SNAPSHOT_BATCH",
"60" if BROADCAST_ENCODING == "msgpack" else "40",
))
DEFAULT_SNAPSHOT_MAX_VESSELS = int(os.environ.get("AIS_BLE_SNAPSHOT_MAX_VESSELS", "5000"))
SNAPSHOT_ENQUEUE_TIMEOUT_SEC = float(os.environ.get("AIS_BLE_SNAPSHOT_ENQUEUE_TIMEOUT_SEC", "30.0"))
# Live target updates can arrive as large bursts. Do not drop by aggregate time
# throttle; keep the latest event per MMSI and drain them at a BLE-safe pace.
TARGET_UPDATE_FLUSH_INTERVAL_SEC = float(os.environ.get("AIS_BLE_TARGET_UPDATE_GAP_SEC", "0.02"))
TARGET_UPDATE_BACKPRESSURE_FRAC = float(os.environ.get("AIS_BLE_TARGET_UPDATE_DRAIN_FRAC", "0.25"))
DEFAULT_SUBSCRIPTIONS = {
"ownship.update",
"target.update",
"base_station.update",
"aton.update",
"stats.update",
}
AIS_HUB_BASE_URL = os.environ.get("AIS_HUB_BASE_URL", "http://127.0.0.1:8081")
AIS_HUB_WS_URL = os.environ.get("AIS_HUB_WS_URL", "ws://127.0.0.1:8081/ws")
# ============ D-Bus Exceptions ============
class InvalidArgsException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
class NotSupportedException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.NotSupported'
class FailedException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.Failed'
# ============ Application (ObjectManager) ============
class Application(dbus.service.Object):
"""
Корневой объект, который отдаёт BlueZ все наши сервисы/характеристики через GetManagedObjects.
"""
def __init__(self, bus):
self.path = '/org/bluez/example/app'
self.services = []
dbus.service.Object.__init__(self, bus, self.path)
def get_path(self):
return dbus.ObjectPath(self.path)
def add_service(self, service):
self.services.append(service)
def get_services(self):
return self.services
@dbus.service.method(DBUS_OM_IFACE,
out_signature='a{oa{sa{sv}}}')
def GetManagedObjects(self):
log_info('🔵 GetManagedObjects вызван - клиент запрашивает список сервисов')
log_info(' Это означает, что подключение установлено и начинается дискаверинг')
managed_objects = {}
for service in self.services:
managed_objects[service.get_path()] = service.get_properties()
for chrc in service.get_characteristics():
managed_objects[chrc.get_path()] = chrc.get_properties()
log_info(f' Возвращаем {len(managed_objects)} объектов (сервисы + характеристики)')
return managed_objects
# ============ GATT Service ============
class Service(dbus.service.Object):
def __init__(self, bus, index, uuid, primary):
self.path = f'/org/bluez/example/service{index}'
self.bus = bus
self.uuid = uuid
self.primary = primary
self.characteristics = []
dbus.service.Object.__init__(self, bus, self.path)
def get_path(self):
return dbus.ObjectPath(self.path)
def add_characteristic(self, chrc):
self.characteristics.append(chrc)
def get_characteristics(self):
return self.characteristics
def get_properties(self):
return {
GATT_SERVICE_IFACE: {
'UUID': self.uuid,
'Primary': dbus.Boolean(self.primary),
'Includes': dbus.Array([], signature='o'),
}
}
# ============ GATT Characteristic (base) ============
class Characteristic(dbus.service.Object):
def __init__(self, bus, index, uuid, flags, service):
self.path = service.path + f'/char{index}'
self.bus = bus
self.uuid = uuid
self.flags = flags
self.service = service
self.notifying = False
self.value_bytes = bytes([0x00]) # дефолт
dbus.service.Object.__init__(self, bus, self.path)
def get_path(self):
return dbus.ObjectPath(self.path)
def get_properties(self):
return {
GATT_CHRC_IFACE: {
'Service': self.service.get_path(),
'UUID': self.uuid,
'Flags': dbus.Array(self.flags, signature='s'),
# Стартовое Value, чтобы клиенты видели, что характеристика живая
'Value': dbus.Array([dbus.Byte(b) for b in self.value_bytes], signature='y'),
}
}
@dbus.service.method(GATT_CHRC_IFACE,
in_signature='a{sv}',
out_signature='ay')
def ReadValue(self, options):
print(f'{self.uuid}: ReadValue (base stub)')
raise NotSupportedException('Read not implemented')
@dbus.service.method(GATT_CHRC_IFACE,
in_signature='aya{sv}',
out_signature='')
def WriteValue(self, value, options):
print(f'{self.uuid}: WriteValue (base stub)')
raise NotSupportedException('Write not implemented')
@dbus.service.method(GATT_CHRC_IFACE,
in_signature='',
out_signature='')
def StartNotify(self):
print(f'{self.uuid}: StartNotify (base stub)')
raise NotSupportedException('Notifications not supported')
@dbus.service.method(GATT_CHRC_IFACE,
in_signature='',
out_signature='')
def StopNotify(self):
print(f'{self.uuid}: StopNotify (base stub)')
raise NotSupportedException('Notifications not supported')
# helper для рассылки notify
def _update_value_and_notify(self, new_bytes: bytes):
self.value_bytes = new_bytes
self.PropertiesChanged(
GATT_CHRC_IFACE,
{'Value': dbus.Array([dbus.Byte(b) for b in self.value_bytes], signature='y')},
[]
)
@dbus.service.signal(DBUS_PROP_IFACE,
signature='sa{sv}as')
def PropertiesChanged(self, interface, changed, invalidated):
pass
# ============ AIS Hub v2 server core ============
def _dbus_bytes(data: bytes) -> dbus.Array:
return dbus.Array([dbus.Byte(b) for b in data], signature='y')
def _now_ts() -> float:
return time.time()
def _safe_json_dumps(obj: Any) -> str:
return json.dumps(obj, ensure_ascii=False, separators=(",", ":"))
def _encode_payload(obj: Any, encoding: str) -> tuple[bytes, int]:
"""
Returns (payload_bytes, proto_version_byte).
encoding: "json" | "msgpack"
"""
if encoding == "msgpack":
if not _HAS_MSGPACK:
# Should not happen: validated at import time. Fallback to JSON to avoid crash.
return _safe_json_dumps(obj).encode("utf-8"), PROTO_VERSION_JSON
return msgpack.packb(obj, use_bin_type=True), PROTO_VERSION_MSGPACK
return _safe_json_dumps(obj).encode("utf-8"), PROTO_VERSION_JSON
def pack_frames(
*,
msg_type: int,
session_msg_id: int,
payload_obj: Any,
max_payload_bytes: int,
encoding: str = "json",
) -> list[bytes]:
"""
Frames a single logical message into BLE-sized chunks.
Header layout (10 bytes, little-endian):
<B proto_version (0x01=JSON, 0x02=MessagePack)
B msg_type
H session_msg_id
H chunk_idx
H chunk_count
H chunk_len>
"""
payload_bytes, proto_byte = _encode_payload(payload_obj, encoding)
if max_payload_bytes <= 0:
max_payload_bytes = DEFAULT_MAX_PAYLOAD_BYTES
chunks = [payload_bytes[i:i + max_payload_bytes] for i in range(0, len(payload_bytes), max_payload_bytes)]
if not chunks:
chunks = [b""]
chunk_count = len(chunks)
frames: list[bytes] = []
for idx, chunk in enumerate(chunks):
header = struct.pack(
"<BBHHHH",
proto_byte,
msg_type & 0xFF,
session_msg_id & 0xFFFF,
idx & 0xFFFF,
chunk_count & 0xFFFF,
len(chunk) & 0xFFFF,
)
frames.append(header + chunk)
return frames
# Backward-compat alias (legacy callers).
def pack_frames_json(
*,
msg_type: int,
session_msg_id: int,
payload_obj: Any,
max_payload_bytes: int,
) -> list[bytes]:
return pack_frames(
msg_type=msg_type,
session_msg_id=session_msg_id,
payload_obj=payload_obj,
max_payload_bytes=max_payload_bytes,
encoding="json",
)
class ClientSession:
def __init__(self, device_path: str):
self.device_path = device_path
self.connected_at = _now_ts()
self.proto = PROTO_VERSION
self.subscriptions: set[str] = set(DEFAULT_SUBSCRIPTIONS)
self.snapshot_in_progress = False
self.snapshot_id = 0
self.filters: dict[str, Any] = {}
self.last_ping_ts: float | None = None
self._next_msg_id = 1
self._tx_queue: "queue.Queue[bytes]" = queue.Queue(maxsize=DEFAULT_TX_QUEUE_MAX)
self.tx_dropped = 0
self.max_payload_bytes = DEFAULT_MAX_PAYLOAD_BYTES
self.att_mtu = 23
# Per-session encoding for the AcquireNotify path. Negotiated via "hello" cmd.
# Broadcast (CCCD) path uses BROADCAST_ENCODING globally.
self.encoding: str = "json"
self._notify_fd: int | None = None
self._notify_thread: threading.Thread | None = None
self._notify_stop = threading.Event()
def next_msg_id(self) -> int:
mid = self._next_msg_id
self._next_msg_id = (self._next_msg_id + 1) & 0xFFFF
if self._next_msg_id == 0:
self._next_msg_id = 1
return mid
def set_notify_fd(self, fd: int, att_mtu: int):
old_fd = self._notify_fd
self._notify_fd = fd
if old_fd is not None and old_fd != fd:
try:
os.close(old_fd)
except OSError:
pass
self.att_mtu = att_mtu
max_att_value = max(20, att_mtu - 3)
self.max_payload_bytes = max(1, min(DEFAULT_MAX_PAYLOAD_BYTES, max_att_value - 10))
if self._notify_thread and self._notify_thread.is_alive():
return
self._notify_stop.clear()
self._notify_thread = threading.Thread(target=self._notify_writer_loop, daemon=True)
self._notify_thread.start()
def close_notify(self):
self._notify_stop.set()
fd = self._notify_fd
self._notify_fd = None
if fd is not None:
try:
os.close(fd)
except OSError:
pass
def enqueue_frames(self, frames: list[bytes]):
for frame in frames:
try:
self._tx_queue.put_nowait(frame)
except queue.Full:
try:
_ = self._tx_queue.get_nowait()
except queue.Empty:
pass
try:
self._tx_queue.put_nowait(frame)
except queue.Full:
self.tx_dropped += 1
continue
self.tx_dropped += 1
if frames:
log_debug(
f"[TXQ] device={self.device_path} +{len(frames)} frames "
f"depth={self.tx_queue_depth()} dropped={self.tx_dropped}"
)
async def enqueue_frames_lossless(self, frames: list[bytes], timeout_sec: float) -> bool:
for frame in frames:
deadline = time.monotonic() + timeout_sec
while True:
if self._notify_fd is None or self._notify_stop.is_set():
log_warn(f"[TXQ] snapshot enqueue aborted device={self.device_path}; notify fd closed")
return False
try:
self._tx_queue.put_nowait(frame)
break
except queue.Full:
if time.monotonic() > deadline:
log_warn(
f"[TXQ] snapshot enqueue timeout device={self.device_path} "
f"depth={self.tx_queue_depth()}"
)
return False
await asyncio.sleep(SNAPSHOT_DRAIN_POLL_SEC)
if frames:
log_debug(
f"[TXQ] snapshot device={self.device_path} +{len(frames)} frames "
f"depth={self.tx_queue_depth()} dropped={self.tx_dropped}"
)
return True
def tx_queue_depth(self) -> int:
return self._tx_queue.qsize()
def _notify_writer_loop(self):
while not self._notify_stop.is_set():
fd = self._notify_fd
if fd is None:
time.sleep(0.05)
continue
try:
frame = self._tx_queue.get(timeout=0.25)
except queue.Empty:
continue
try:
os.write(fd, frame)
except OSError:
log_warn(f"[DATA] os.write failed device={self.device_path}; closing notify fd")
self.close_notify()
return
class AisHubBridge:
"""
Owns an asyncio loop in a background thread.
- REST fetch for snapshot sections
- One WS client to ais_hub /ws, fan-out events to sessions
"""
def __init__(self):
self._loop: asyncio.AbstractEventLoop | None = None
self._thread: threading.Thread | None = None
self._stop = threading.Event()
self.ws_alive = False
self.last_ws_msg_ts: float | None = None
self._sessions: dict[str, ClientSession] = {}
self._sessions_lock = threading.Lock()
self.data_char: DataCharacteristic | None = None
# Global snapshot serialization lock for the broadcast (CCCD) channel.
# Android clients share a single notify stream; we MUST NOT interleave
# frames from two simultaneous snapshots or each client will receive a
# mashup. Per-fd (AcquireNotify) sessions don't need this lock - they
# have dedicated pipes. Implemented as an asyncio.Lock used by do_snapshot.
self._broadcast_snapshot_lock: asyncio.Lock | None = None
# Simple throttling to avoid overwhelming BLE notify path (esp. Android).
# Values are minimum interval (seconds) between forwarded events of that type.
self._event_min_interval_sec: dict[str, float] = {
"ownship.update": 0.33, # ~3 Hz
"stats.update": 1.0, # <= 1 Hz (source is ~0.2 Hz anyway)
}
self._event_last_sent_ts: dict[str, float] = {}
self._pending_target_events: "OrderedDict[str, dict[str, Any]]" = OrderedDict()
self._target_flush_sent = 0
# Paced broadcast queue: draining via a dedicated thread that schedules
# PropertiesChanged emissions on the GLib main loop with BROADCAST_FRAME_GAP_SEC
# gap. This prevents BlueZ from coalescing/delaying notifications when snapshot
# or bursty events get pushed at once.
self._bcast_queue: "queue.Queue[bytes]" = queue.Queue(maxsize=BROADCAST_QUEUE_MAX)
self._bcast_dropped = 0
self._bcast_sent = 0
self._bcast_stop = threading.Event()
self._bcast_thread: threading.Thread | None = None
self._bcast_msg_id_lock = threading.Lock()
self._bcast_next_msg_id = 1
def attach_data_characteristic(self, data_char: "DataCharacteristic"):
self.data_char = data_char
def next_broadcast_msg_id(self) -> int:
with self._bcast_msg_id_lock:
msg_id = self._bcast_next_msg_id
self._bcast_next_msg_id += 1
if self._bcast_next_msg_id > 0xFFFF:
self._bcast_next_msg_id = 1
return msg_id
def send_broadcast_frame(self, frame: bytes):
"""
Fallback for clients that only use standard CCCD notifications (Android).
Enqueues the frame into the paced broadcast queue; a dedicated thread
will schedule it on the GLib main loop with BROADCAST_FRAME_GAP_SEC gap.
"""
if not self.data_char:
return
if not self.data_char.notifying:
return
try:
self._bcast_queue.put_nowait(frame)
except queue.Full:
try:
_ = self._bcast_queue.get_nowait()
except queue.Empty:
pass
self._bcast_dropped += 1
try:
self._bcast_queue.put_nowait(frame)
except queue.Full:
self._bcast_dropped += 1
async def enqueue_broadcast_frame_lossless(self, frame: bytes, timeout_sec: float) -> bool:
deadline = time.monotonic() + timeout_sec
while True:
if not self.data_char or not self.data_char.notifying:
log_warn("[BCAST] snapshot enqueue aborted: no active DATA notifier")
return False
try:
self._bcast_queue.put_nowait(frame)
return True
except queue.Full:
if time.monotonic() > deadline:
log_warn(
f"[BCAST] snapshot enqueue timeout: qsize={self._bcast_queue.qsize()} "
f"capacity={BROADCAST_QUEUE_MAX}"
)
return False
await asyncio.sleep(SNAPSHOT_DRAIN_POLL_SEC)
async def wait_broadcast_drain(self, threshold: int, timeout_sec: float = 10.0):
"""
Async backpressure for snapshot producer: block until broadcast queue
depth drops below `threshold`. Prevents silent drops when the snapshot
would otherwise flood the queue faster than BROADCAST_FRAME_GAP_SEC drains it.
"""
deadline = time.monotonic() + timeout_sec
while self._bcast_queue.qsize() > threshold:
if time.monotonic() > deadline:
log_warn(
f"[BCAST] wait_broadcast_drain timeout: qsize={self._bcast_queue.qsize()} "
f"threshold={threshold}"
)
return
await asyncio.sleep(SNAPSHOT_DRAIN_POLL_SEC)
def _broadcast_writer_loop(self):
gap = max(0.0, BROADCAST_FRAME_GAP_SEC)
while not self._bcast_stop.is_set():
try:
frame = self._bcast_queue.get(timeout=0.25)
except queue.Empty:
continue
if not self.data_char or not self.data_char.notifying:
# Drop silently if no subscribers (avoid growing backlog when idle).
continue
GLib.idle_add(self.data_char.send_frame_broadcast, frame)
self._bcast_sent += 1
if self._bcast_sent <= 10 or (self._bcast_sent % 500) == 0:
log_debug(
f"[BCAST] paced emit #{self._bcast_sent} qdepth={self._bcast_queue.qsize()} "
f"dropped={self._bcast_dropped} gap_ms={gap * 1000:.1f}"
)
if gap > 0:
time.sleep(gap)
def start(self):
if self._thread and self._thread.is_alive():
return
self._stop.clear()
self._thread = threading.Thread(target=self._run_loop_thread, daemon=True)
self._thread.start()
self._bcast_stop.clear()
if not self._bcast_thread or not self._bcast_thread.is_alive():
self._bcast_thread = threading.Thread(
target=self._broadcast_writer_loop,
name="ais_ble_bcast_writer",
daemon=True,
)
self._bcast_thread.start()
def stop(self):
self._stop.set()
self._bcast_stop.set()
loop = self._loop
if loop:
loop.call_soon_threadsafe(loop.stop)
def get_or_create_session(self, device_path: str) -> ClientSession:
with self._sessions_lock:
sess = self._sessions.get(device_path)
if sess is None:
sess = ClientSession(device_path)
self._sessions[device_path] = sess
return sess
def remove_session(self, device_path: str):
with self._sessions_lock:
sess = self._sessions.pop(device_path, None)
if sess:
sess.close_notify()
def sessions_snapshot(self) -> list[ClientSession]:
with self._sessions_lock:
return list(self._sessions.values())
def session_count(self) -> int:
with self._sessions_lock:
return len(self._sessions)
def submit_coroutine(self, coro: "asyncio.Future[Any]"):
loop = self._loop
if not loop:
raise RuntimeError("AisHubBridge loop not started")
return asyncio.run_coroutine_threadsafe(coro, loop)
def _run_loop_thread(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self._loop = loop
async def runner():
# asyncio primitives must be created inside the loop they belong to.
self._broadcast_snapshot_lock = asyncio.Lock()
tasks = [
asyncio.create_task(self._ws_loop(), name="ais_hub_ws_loop"),
asyncio.create_task(self._target_update_flush_loop(), name="ais_hub_target_flush_loop"),
]
try:
while not self._stop.is_set():
await asyncio.sleep(0.25)
finally:
for t in tasks:
t.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
try:
loop.run_until_complete(runner())
finally:
try:
loop.close()
except Exception:
pass
async def _ws_loop(self):
try:
import aiohttp
except Exception as e:
log_error(f"[ais_hub] aiohttp not installed: {e}")
self.ws_alive = False
return
backoff = 1.0
while not self._stop.is_set():
try:
async with aiohttp.ClientSession() as session:
log_info(f"[ais_hub] WS connecting: {AIS_HUB_WS_URL}")
async with session.ws_connect(AIS_HUB_WS_URL, heartbeat=30) as ws:
self.ws_alive = True
log_info("[ais_hub] WS connected")
backoff = 1.0
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
self.last_ws_msg_ts = _now_ts()
try:
ev = json.loads(msg.data)
except Exception:
continue
ev_type = ev.get("type")
if not ev_type or ev_type == "state.snapshot":
continue
self._fanout_event(ev)
elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR):
break
except Exception:
self.ws_alive = False
log_warn(f"[ais_hub] WS disconnected; retry in {backoff:.1f}s")
await asyncio.sleep(backoff)
backoff = min(15.0, backoff * 1.5)
async def _target_update_flush_loop(self):
gap = max(0.0, TARGET_UPDATE_FLUSH_INTERVAL_SEC)
drain_threshold = max(64, int(BROADCAST_QUEUE_MAX * TARGET_UPDATE_BACKPRESSURE_FRAC))
while not self._stop.is_set():
if not self._pending_target_events:
await asyncio.sleep(0.05)
continue
sessions = self.sessions_snapshot()
if any(s.snapshot_in_progress for s in sessions):
await asyncio.sleep(0.10)
continue
if self._bcast_queue.qsize() > drain_threshold:
await asyncio.sleep(SNAPSHOT_DRAIN_POLL_SEC)
continue
_, ev = self._pending_target_events.popitem(last=False)
self._fanout_event_direct(ev)
self._target_flush_sent += 1
if self._target_flush_sent <= 10 or (self._target_flush_sent % 500) == 0:
log_debug(
f"[FANOUT] target flush sent={self._target_flush_sent} "
f"pending={len(self._pending_target_events)} bcast_q={self._bcast_queue.qsize()}"
)
if gap > 0:
await asyncio.sleep(gap)
def _fanout_event(self, ev: dict[str, Any]):
ev_type = ev.get("type")
if ev_type == "target.update":
data = ev.get("data")
mmsi = None
if isinstance(data, dict):
mmsi = data.get("mmsi")
if mmsi is not None:
key = str(mmsi)
if key in self._pending_target_events:
del self._pending_target_events[key]
self._pending_target_events[key] = ev
return
self._fanout_event_direct(ev)
def _fanout_event_direct(self, ev: dict[str, Any]):
ev_type = ev.get("type")
if not ev_type:
return
now = _now_ts()
min_iv = self._event_min_interval_sec.get(ev_type)
if min_iv is not None:
last = self._event_last_sent_ts.get(ev_type)
if last is not None and (now - last) < min_iv:
return
self._event_last_sent_ts[ev_type] = now
sessions = self.sessions_snapshot()
# Suppress low-priority live events (ownship.update, stats.update) while any
# session has a snapshot transfer in progress. It's OK for the client to rely
# on the snapshot data; the next live update will catch up once snapshot ends.
# This prevents interleaving small frames into the snapshot stream and
# overwhelming BlueZ / Android's BLE stack (which then drops the link).
snapshot_active = any(s.snapshot_in_progress for s in sessions)
suppress_on_snapshot = ev_type in ("ownship.update", "stats.update")
if snapshot_active and suppress_on_snapshot:
log_debug(
f"[FANOUT] suppress ev_type={ev_type} during active snapshot "
f"sessions={len(sessions)}"
)
return
# Broadcast path (CCCD notify): do it ONCE per event if at least one CCCD-client wants it.
# Skip broadcast entirely if all CCCD-clients are currently in snapshot.
wants_broadcast = any(
(s._notify_fd is None)
and (ev_type in s.subscriptions)
and not s.snapshot_in_progress
for s in sessions
)
if wants_broadcast:
msg_id = self.next_broadcast_msg_id()
frames = pack_frames(
msg_type=MSG_EVENT,
session_msg_id=msg_id,
payload_obj=ev,
max_payload_bytes=BROADCAST_MAX_PAYLOAD_BYTES,
encoding=BROADCAST_ENCODING,
)
for fr in frames:
self.send_broadcast_frame(fr)
log_debug(
f"[DATA] broadcast frames={len(frames)} msg_type=0x{MSG_EVENT:02X} "
f"msg_id={msg_id} ev_type={ev_type} sessions={len(sessions)} enc={BROADCAST_ENCODING}"
)
# Per-client fd path (AcquireNotify): send only to those sessions.
for sess in sessions:
if sess._notify_fd is None:
continue
if ev_type not in sess.subscriptions:
continue
if sess.snapshot_in_progress:
continue
msg_id = sess.next_msg_id()
frames = pack_frames(
msg_type=MSG_EVENT,
session_msg_id=msg_id,
payload_obj=ev,
max_payload_bytes=sess.max_payload_bytes,
encoding=sess.encoding,
)
sess.enqueue_frames(frames)
async def fetch_snapshot_sections(self, include: list[str], max_vessels: int) -> dict[str, Any]:
try:
import aiohttp
except Exception as e:
raise RuntimeError(f"aiohttp not installed: {e}")
sections: dict[str, Any] = {}
async with aiohttp.ClientSession() as s:
if "ownship" in include:
async with s.get(f"{AIS_HUB_BASE_URL}/api/v1/ownship") as r:
r.raise_for_status()
sections["ownship"] = await r.json()
if "vessels" in include:
async with s.get(f"{AIS_HUB_BASE_URL}/api/v1/vessels", params={"limit": max_vessels}) as r:
r.raise_for_status()
sections["vessels"] = await r.json()
if "base_stations" in include:
async with s.get(f"{AIS_HUB_BASE_URL}/api/v1/base_stations") as r:
r.raise_for_status()
sections["base_stations"] = await r.json()
if "atons" in include:
async with s.get(f"{AIS_HUB_BASE_URL}/api/v1/atons") as r:
r.raise_for_status()
sections["atons"] = await r.json()
if "stats" in include:
async with s.get(f"{AIS_HUB_BASE_URL}/api/v1/stats") as r:
r.raise_for_status()
sections["stats"] = await r.json()
return sections
class AisHubService(Service):
def __init__(self, bus, index, bridge: AisHubBridge):
Service.__init__(self, bus, index, AIS_HUB_SERVICE_UUID, True)
self.bridge = bridge
self.control = ControlCharacteristic(bus, 0, self, bridge)
self.data = DataCharacteristic(bus, 1, self, bridge)
self.status = StatusCharacteristic(bus, 2, self, bridge)
self.bridge.attach_data_characteristic(self.data)
self.add_characteristic(self.control)
self.add_characteristic(self.data)
self.add_characteristic(self.status)
class ControlCharacteristic(Characteristic):
def __init__(self, bus, index, service: Service, bridge: AisHubBridge):
Characteristic.__init__(self, bus, index, AIS_HUB_CONTROL_UUID, ["write", "write-without-response"], service)
self.bridge = bridge
@dbus.service.method(GATT_CHRC_IFACE, in_signature="aya{sv}", out_signature="")
def WriteValue(self, value, options):
raw = bytes(value)
device_path = str(options.get("device", "unknown"))
sess = self.bridge.get_or_create_session(device_path)
log_info(
f"[CONTROL] WriteValue device={device_path} bytes={len(raw)} "
f"preview={_hex_preview(raw)}"
)
try:
cmd_obj = json.loads(raw.decode("utf-8"))
except Exception:
self._send_error(sess, "bad_command", "invalid JSON")
return
cmd = cmd_obj.get("cmd")
log_info(f"[CONTROL] cmd={cmd} device={device_path} json={cmd_obj}")
if cmd == "hello":
# Optional encoding negotiation. Client can pass {"encodings": ["msgpack","json"]}.
# We pick the first one we support. Default: "json" for backward compat.
client_encodings = cmd_obj.get("encodings")
chosen = "json"
if isinstance(client_encodings, list):
for enc in client_encodings:
enc_s = str(enc).lower()
if enc_s == "msgpack" and _HAS_MSGPACK:
chosen = "msgpack"
break
if enc_s == "json":
chosen = "json"
break
sess.encoding = chosen
payload = {
"ok": True,
"proto": PROTO_VERSION,
"server": "ais_ble",
"server_time": _now_ts(),
"encoding": chosen,
"features": {
"snapshot": True,
"live_events": True,
"filters": True,
"compression": False,
"msgpack": _HAS_MSGPACK,
"multi_client": True,
},
}
# HELLO_ACK is always JSON so a fresh client can decode it before it
# knows which encoding the server picked.
msg_id = sess.next_msg_id()
frames = pack_frames(
msg_type=MSG_HELLO_ACK,
session_msg_id=msg_id,
payload_obj=payload,
max_payload_bytes=sess.max_payload_bytes,
encoding="json",
)
if sess._notify_fd is not None:
sess.enqueue_frames(frames)
else:
for fr in frames:
self.bridge.send_broadcast_frame(fr)
return
if cmd == "ping":
payload = {"id": cmd_obj.get("id"), "server_time": _now_ts()}
self._send(sess, MSG_PONG, payload)
sess.last_ping_ts = _now_ts()
return
if cmd == "subscribe":
events = cmd_obj.get("events")
if isinstance(events, list) and events:
sess.subscriptions = set(str(e) for e in events)
else:
sess.subscriptions = set(DEFAULT_SUBSCRIPTIONS)
self._send(sess, MSG_STATUS, {"ok": True, "subscriptions": sorted(sess.subscriptions)})
return
if cmd == "unsubscribe":
events = cmd_obj.get("events")
if isinstance(events, list):
for e in events:
sess.subscriptions.discard(str(e))
self._send(sess, MSG_STATUS, {"ok": True, "subscriptions": sorted(sess.subscriptions)})
return
if cmd == "set_filters":
filt = cmd_obj.get("targets")
if isinstance(filt, dict):
sess.filters["targets"] = filt
self._send(sess, MSG_STATUS, {"ok": True, "filters": sess.filters})
return
if cmd == "get_snapshot":
include = cmd_obj.get("include") or ["ownship", "vessels", "base_stations", "atons", "stats"]
if not isinstance(include, list):
include = ["ownship", "vessels", "base_stations", "atons", "stats"]
include = [str(x) for x in include]
max_vessels = cmd_obj.get("max_vessels")
if max_vessels is None:
max_vessels = DEFAULT_SNAPSHOT_MAX_VESSELS
try:
max_vessels = int(max_vessels)
except Exception:
max_vessels = DEFAULT_SNAPSHOT_MAX_VESSELS
if max_vessels <= 0:
max_vessels = DEFAULT_SNAPSHOT_MAX_VESSELS
if sess.snapshot_in_progress:
self._send_error(sess, "snapshot_busy", "snapshot already in progress")
return
sess.snapshot_in_progress = True
sess.snapshot_id = (sess.snapshot_id + 1) & 0xFFFF or random.randint(1, 0xFFFF)
async def do_snapshot():
# Serialize broadcast snapshots: only one CCCD-snapshot at a time
# across ALL Android clients, otherwise two clients requesting
# snapshots simultaneously would receive interleaved frames on
# the shared notify stream and decode garbage. Per-fd sessions
# have private pipes and don't need this lock.
if sess._notify_fd is None and self.bridge._broadcast_snapshot_lock is not None:
log_debug(
f"[SNAPSHOT] waiting broadcast lock sess={sess.device_path} "
f"snapshot_id={sess.snapshot_id}"
)
await self.bridge._broadcast_snapshot_lock.acquire()
log_debug(f"[SNAPSHOT] acquired broadcast lock sess={sess.device_path}")
try:
await _do_snapshot_inner()
finally:
if sess._notify_fd is None and self.bridge._broadcast_snapshot_lock is not None:
try:
self.bridge._broadcast_snapshot_lock.release()
except Exception:
pass
async def _do_snapshot_inner():
try:
sections = await self.bridge.fetch_snapshot_sections(include, max_vessels)
except Exception as e:
GLib.idle_add(self._send_error, sess, "snapshot_failed", str(e))
GLib.idle_add(self._snapshot_done, sess)
return
total_objects = {}
if "ownship" in sections:
total_objects["ownship"] = 1
if "stats" in sections:
total_objects["stats"] = 1
if "vessels" in sections:
total_objects["vessels"] = len(sections["vessels"]) if isinstance(sections["vessels"], list) else 0
if "base_stations" in sections:
total_objects["base_stations"] = (
len(sections["base_stations"]) if isinstance(sections["base_stations"], list) else 0
)
if "atons" in sections:
total_objects["atons"] = len(sections["atons"]) if isinstance(sections["atons"], list) else 0
# Backpressure threshold: pause producer once broadcast queue
# exceeds this depth. Keeps memory bounded AND prevents the
# silent-drop-at-1024 bug that capped snapshots at ~699 vessels.
drain_threshold = max(64, int(BROADCAST_QUEUE_MAX * SNAPSHOT_DRAIN_THRESHOLD_FRAC))
def abort_snapshot(reason: str) -> bool:
log_warn(
f"[SNAPSHOT] abort sess={sess.device_path} "
f"snapshot_id={sess.snapshot_id}: {reason}"
)
GLib.idle_add(self._send_error, sess, "snapshot_failed", reason)
GLib.idle_add(self._snapshot_done, sess)
return False
async def gated_send(msg_type: int, payload_obj: Any) -> bool:
# Snapshot frames must be in the downstream queue before we
# wait on it. Deferring _send through GLib.idle_add made the
# producer outrun the queue and declare the snapshot done
# while its tail was still only scheduled.
msg_id = sess.next_msg_id()
if sess._notify_fd is None:
await self.bridge.wait_broadcast_drain(drain_threshold)
frames = pack_frames(
msg_type=msg_type,
session_msg_id=msg_id,
payload_obj=payload_obj,
max_payload_bytes=BROADCAST_MAX_PAYLOAD_BYTES,
encoding=BROADCAST_ENCODING,
)
for fr in frames:
ok = await self.bridge.enqueue_broadcast_frame_lossless(
fr,
SNAPSHOT_ENQUEUE_TIMEOUT_SEC,
)
if not ok:
return abort_snapshot("broadcast DATA queue unavailable")
log_debug(
f"[DATA] snapshot broadcast frames={len(frames)} msg_type=0x{msg_type:02X} "
f"msg_id={msg_id} device={sess.device_path} enc={BROADCAST_ENCODING}"
)
return True
# Per-fd queue is small (256). Wait if it's mostly full and
# then enqueue without dropping old snapshot frames.
per_fd_threshold = max(32, DEFAULT_TX_QUEUE_MAX // 4)
deadline = time.monotonic() + 10.0
while sess.tx_queue_depth() > per_fd_threshold:
if time.monotonic() > deadline:
log_warn(
f"[SNAPSHOT] per-fd drain timeout sess={sess.device_path} "
f"depth={sess.tx_queue_depth()}"
)
break
await asyncio.sleep(SNAPSHOT_DRAIN_POLL_SEC)
frames = pack_frames(
msg_type=msg_type,
session_msg_id=msg_id,
payload_obj=payload_obj,
max_payload_bytes=sess.max_payload_bytes,
encoding=sess.encoding,
)
ok = await sess.enqueue_frames_lossless(frames, SNAPSHOT_ENQUEUE_TIMEOUT_SEC)
if not ok:
return abort_snapshot("per-client DATA queue unavailable")
log_debug(
f"[DATA] snapshot fd frames={len(frames)} msg_type=0x{msg_type:02X} "
f"msg_id={msg_id} device={sess.device_path} enc={sess.encoding}"
)
return True
if not await gated_send(
MSG_SNAPSHOT_BEGIN,
{
"snapshot_id": sess.snapshot_id,
"sections": include,
"total_objects": total_objects,
},
):
return
if "ownship" in sections:
if not await gated_send(
MSG_SNAPSHOT_CHUNK,
{
"snapshot_id": sess.snapshot_id,
"section": "ownship",
"seq": 1,
"more": False,
"item": sections["ownship"],
},
):
return
if "stats" in sections:
if not await gated_send(
MSG_SNAPSHOT_CHUNK,
{
"snapshot_id": sess.snapshot_id,
"section": "stats",
"seq": 1,
"more": False,
"item": sections["stats"],
},
):
return
async def send_list_section(section_name: str) -> bool:
if section_name not in sections or not isinstance(sections[section_name], list):
return True
items_all: list[Any] = sections[section_name]
batch_size = SNAPSHOT_VESSELS_PER_CHUNK
seq = 1
sent = 0
for i in range(0, len(items_all), batch_size):
items = items_all[i:i + batch_size]
more = (i + batch_size) < len(items_all)
if not await gated_send(
MSG_SNAPSHOT_CHUNK,
{
"snapshot_id": sess.snapshot_id,
"section": section_name,
"seq": seq,
"more": more,
"items": items,
},
):
return False
seq += 1
sent += len(items)
if (seq % 10) == 0:
log_debug(
f"[SNAPSHOT] {section_name} progress sess={sess.device_path} "
f"sent={sent}/{len(items_all)} seq={seq} "
f"bcast_q={self.bridge._bcast_queue.qsize()}"
)
return True
for section_name in ("vessels", "base_stations", "atons"):
if not await send_list_section(section_name):
return
if not await gated_send(MSG_SNAPSHOT_END, {"snapshot_id": sess.snapshot_id, "ok": True}):
return
# Wait for the broadcast queue to actually drain before clearing
# snapshot_in_progress. Otherwise stats.update / ownship.update
# interleave into the still-emitting snapshot tail and the next
# CCCD client's snapshot can start while frames are still flying.
if sess._notify_fd is None:
await self.bridge.wait_broadcast_drain(0, timeout_sec=30.0)
else:
deadline = time.monotonic() + 30.0
while sess.tx_queue_depth() > 0:
if time.monotonic() > deadline:
log_warn(
f"[SNAPSHOT] per-fd final drain timeout sess={sess.device_path} "
f"depth={sess.tx_queue_depth()}"
)
break
await asyncio.sleep(SNAPSHOT_DRAIN_POLL_SEC)
GLib.idle_add(self._snapshot_done, sess)
log_info(
f"[SNAPSHOT] done sess={sess.device_path} snapshot_id={sess.snapshot_id} "
f"vessels={total_objects.get('vessels', 0)} "
f"base_stations={total_objects.get('base_stations', 0)} "
f"atons={total_objects.get('atons', 0)} "
f"bcast_dropped={self.bridge._bcast_dropped}"
)
try:
self.bridge.submit_coroutine(do_snapshot())
except Exception as e:
self._send_error(sess, "snapshot_failed", str(e))
self._snapshot_done(sess)
return
self._send_error(sess, "bad_command", "unknown cmd")
def _snapshot_done(self, sess: ClientSession):
sess.snapshot_in_progress = False
return False
def _send(self, sess: ClientSession, msg_type: int, payload_obj: Any):
msg_id = sess.next_msg_id()
# Per-fd path uses negotiated session encoding; broadcast path uses the
# global broadcast encoding (must match what ALL CCCD clients can decode).
if sess._notify_fd is not None:
frames = pack_frames(
msg_type=msg_type,
session_msg_id=msg_id,
payload_obj=payload_obj,
max_payload_bytes=sess.max_payload_bytes,
encoding=sess.encoding,
)
sess.enqueue_frames(frames)
else:
frames = pack_frames(
msg_type=msg_type,
session_msg_id=msg_id,
payload_obj=payload_obj,
max_payload_bytes=BROADCAST_MAX_PAYLOAD_BYTES,
encoding=BROADCAST_ENCODING,
)
for fr in frames:
self.bridge.send_broadcast_frame(fr)
log_debug(
f"[DATA] broadcast frames={len(frames)} msg_type=0x{msg_type:02X} "
f"msg_id={msg_id} device={sess.device_path} enc={BROADCAST_ENCODING}"
)
return False
def _send_error(self, sess: ClientSession, code: str, message: str):
return self._send(sess, MSG_ERROR, {"code": code, "message": message})
class DataCharacteristic(Characteristic):
"""
Per-client notifications via BlueZ AcquireNotify().
"""
def __init__(self, bus, index, service: Service, bridge: AisHubBridge):
Characteristic.__init__(self, bus, index, AIS_HUB_DATA_UUID, ["notify"], service)
self.bridge = bridge
self._broadcast_sent = 0
@dbus.service.method(GATT_CHRC_IFACE, in_signature="a{sv}", out_signature="hq")
def AcquireNotify(self, options):
device_path = str(options.get("device", "unknown"))
sess = self.bridge.get_or_create_session(device_path)
# BlueZ expects us to provide a writable fd (unix).
rfd, wfd = os.pipe()
att_mtu = int(options.get("mtu", 23))
sess.set_notify_fd(wfd, att_mtu)
log_info(f"[DATA] AcquireNotify device={device_path} mtu={att_mtu} fd={wfd}")
try:
dup_rfd = os.dup(rfd)
finally:
try:
os.close(rfd)
except OSError:
pass
return dbus.UnixFd(dup_rfd), dbus.UInt16(att_mtu)
@dbus.service.method(GATT_CHRC_IFACE, in_signature="", out_signature="")
def StartNotify(self):
self.notifying = True
log_info("[DATA] StartNotify (CCCD enabled)")
@dbus.service.method(GATT_CHRC_IFACE, in_signature="", out_signature="")
def StopNotify(self):
self.notifying = False
log_info("[DATA] StopNotify (CCCD disabled)")
def send_frame_broadcast(self, frame: bytes):
if not self.notifying:
return False
self._broadcast_sent += 1
self._update_value_and_notify(frame)
if self._broadcast_sent <= 20 or (self._broadcast_sent % 200) == 0:
log_debug(f"[DATA] notify broadcast sent={self._broadcast_sent} bytes={len(frame)} preview={_hex_preview(frame)}")
return False
class StatusCharacteristic(Characteristic):
def __init__(self, bus, index, service: Service, bridge: AisHubBridge):
Characteristic.__init__(self, bus, index, AIS_HUB_STATUS_UUID, ["read", "notify"], service)
self.bridge = bridge
self.notifying = False
@dbus.service.method(GATT_CHRC_IFACE, in_signature="a{sv}", out_signature="ay")
def ReadValue(self, options):
device_path = str(options.get("device", "unknown"))
sess = self.bridge.get_or_create_session(device_path)
status = {
"proto": PROTO_VERSION,
"server_time": _now_ts(),
"gps_fix": None,
"vessels_active": None,
"ws_source_alive": self.bridge.ws_alive,
"snapshot_in_progress": sess.snapshot_in_progress,
"tx_queue": sess.tx_queue_depth(),
"tx_dropped": sess.tx_dropped,
}
return _dbus_bytes(_safe_json_dumps(status).encode("utf-8"))
# ============ LE Advertisement ============
class TestAdvertisement(dbus.service.Object):
PATH_BASE = '/org/bluez/example/advertisement'
def __init__(self, bus, index):
self.path = self.PATH_BASE + str(index)
self.bus = bus
dbus.service.Object.__init__(self, bus, self.path)
def get_path(self):
return dbus.ObjectPath(self.path)
@dbus.service.method(DBUS_PROP_IFACE,
in_signature='s',
out_signature='a{sv}')
def GetAll(self, interface):
log_info('[Advertisement] GetAll вызван для интерфейса: ' + interface)
if interface != LE_ADVERTISEMENT_IFACE:
log_error('[Advertisement] Неверный интерфейс: ' + interface)
raise InvalidArgsException()
# Connectable реклама с явными флагами
# Важно: IncludeTxPower и другие параметры могут помочь с подключением
props = {
'Type': dbus.String('peripheral'),
'ServiceUUIDs': dbus.Array([
AIS_HUB_SERVICE_UUID,
], signature='s'),
'LocalName': dbus.String('AIS'),
}
log_info('[Advertisement] Возвращаем свойства рекламы:')
log_info(f' Type: {props["Type"]}')
log_info(f' LocalName: {props["LocalName"]}')
log_info(f' ServiceUUIDs: {len(props["ServiceUUIDs"])} сервисов')
return props
@dbus.service.method(LE_ADVERTISEMENT_IFACE,
in_signature='',
out_signature='')
def Release(self):
log_warn('[Advertisement] Release вызван - возможно, клиент отключился или реклама была отменена')
log_warn('[Advertisement] Это может означать, что реклама была остановлена BlueZ')
# ============ Вспомогательное: поиск адаптера с GATT Manager ============
def find_adapter(bus):
obj = bus.get_object(BLUEZ_SERVICE_NAME, '/')
om = dbus.Interface(obj, DBUS_OM_IFACE)
objects = om.GetManagedObjects()
for path, ifaces in objects.items():
if GATT_MANAGER_IFACE in ifaces:
return path
return None
# ============ main() ============
def main():
global MAIN_LOOP
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
adapter_path = find_adapter(bus)
if not adapter_path:
log_error('Не найден адаптер с GattManager1. '
'Проверь, что bluetoothd запущен с --experimental')
return 1
log_info(f'Используем адаптер: {adapter_path}')
service_manager = dbus.Interface(
bus.get_object(BLUEZ_SERVICE_NAME, adapter_path),
GATT_MANAGER_IFACE
)
advertising_manager = dbus.Interface(
bus.get_object(BLUEZ_SERVICE_NAME, adapter_path),
LE_ADVERTISING_MANAGER_IFACE
)
app = Application(bus)
bridge = AisHubBridge()
bridge.start()
def _on_bluez_properties_changed(interface, changed, invalidated, path=None):
try:
if interface != "org.bluez.Device1":
return
if "Connected" not in changed:
return
connected = bool(changed.get("Connected"))
dev_path = str(path or "")
if not dev_path:
return
if connected:
log_info(f"[BlueZ] Device connected: {dev_path} sessions={bridge.session_count()}")
bridge.get_or_create_session(dev_path)
else:
log_info(f"[BlueZ] Device disconnected: {dev_path} -> removing session")
bridge.remove_session(dev_path)
log_info(f"[BlueZ] sessions now: {bridge.session_count()}")
except Exception as e:
log_warn(f"[BlueZ] PropertiesChanged handler error: {e}")
# Track device connect/disconnect to avoid leaking sessions.
bus.add_signal_receiver(
_on_bluez_properties_changed,
dbus_interface=DBUS_PROP_IFACE,
signal_name="PropertiesChanged",
path_keyword="path",
)
ais_srv = AisHubService(bus, 0, bridge)
app.add_service(ais_srv)
adv = TestAdvertisement(bus, 0)
# Регистрируем GATT апп
log_info('Регистрируем GATT Application...')
service_manager.RegisterApplication(
app.get_path(),
{},
reply_handler=lambda: log_info('✅ GATT Application зарегистрирован успешно'),
error_handler=lambda e: log_error(f'❌ Ошибка RegisterApplication: {e}'),
)
# Регистрируем рекламу
log_info('Регистрируем рекламу...')
log_info(f'Путь рекламы: {adv.get_path()}')
advertising_manager.RegisterAdvertisement(
adv.get_path(),
{},
reply_handler=lambda: log_info('✅ Advertisement зарегистрирован успешно - устройство должно быть видно при сканировании'),
error_handler=lambda e: log_error(f'❌ Ошибка RegisterAdvertisement: {e}'),
)
log_info('=' * 60)
log_info('BLE GATT сервер запущен и готов к подключениям')
log_info(f'Имя устройства: AIS')
log_info(f'Сервис: AIS Hub ({AIS_HUB_SERVICE_UUID})')
log_info(f'ais_hub REST: {AIS_HUB_BASE_URL}')
log_info(f'ais_hub WS: {AIS_HUB_WS_URL}')
log_info('=' * 60)
MAIN_LOOP = GLib.MainLoop()
try:
MAIN_LOOP.run()
except KeyboardInterrupt:
log_info('Получен сигнал завершения (Ctrl+C)')
log_info('Останавливаем сервер...')
try:
bridge.stop()
except Exception:
pass
try:
# Отменяем рекламу
advertising_manager.UnregisterAdvertisement(adv.get_path())
log_info('✅ Advertisement отменён')
except Exception as e:
log_error(f'❌ Ошибка при отмене advertisement: {e}')
try:
# Отменяем регистрацию GATT приложения
service_manager.UnregisterApplication(app.get_path())
log_info('✅ GATT Application отменён')
except Exception as e:
log_error(f'❌ Ошибка при отмене GATT application: {e}')
log_info('Сервер остановлен')
return 0
if __name__ == '__main__':
sys.exit(main() or 0)