Files
AndroidAisMap/ble_mini2.py
T

557 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import dbus
import dbus.exceptions
import dbus.mainloop.glib
import dbus.service
import threading
import socket
import struct
import sys
import time
from datetime import datetime
from gi.repository import GLib
BLUEZ_SERVICE_NAME = 'org.bluez'
GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1'
GATT_SERVICE_IFACE = 'org.bluez.GattService1'
GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
MAIN_LOOP = None
# ============ Логирование ============
def log(level, message):
"""Логирование с временной меткой"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
print(f'[{timestamp}] [{level}] {message}', flush=True)
def log_info(message):
log('INFO', message)
def log_warn(message):
log('WARN', message)
def log_error(message):
log('ERROR', message)
def log_debug(message):
log('DEBUG', message)
# ============ UUID (вариант A: короткие) ============
# Battery Service / Level стандартные
BAT_SERVICE_UUID = '0000180f-0000-1000-8000-00805f9b34fb'
BAT_LEVEL_UUID = '00002a19-0000-1000-8000-00805f9b34fb'
# P2P 16-битки в SIG-диапазоне
P2P_SERVICE_UUID = '0000fe40-0000-1000-8000-00805f9b34fb'
P2P_CHAR_UUID = '0000fe42-0000-1000-8000-00805f9b34fb'
# RW – кастом 16-битный диапазон
RW_SERVICE_UUID = '0000ab10-0000-1000-8000-00805f9b34fb'
RW_CHAR_UUID = '0000ab11-0000-1000-8000-00805f9b34fb'
# ============ D-Bus Exceptions ============
class InvalidArgsException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
class NotSupportedException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.NotSupported'
# ============ Application (ObjectManager) ============
class Application(dbus.service.Object):
"""
Корневой объект, который отдаёт BlueZ все наши сервисы/характеристики через GetManagedObjects.
"""
def __init__(self, bus):
self.path = '/org/bluez/example/app'
self.services = []
dbus.service.Object.__init__(self, bus, self.path)
def get_path(self):
return dbus.ObjectPath(self.path)
def add_service(self, service):
self.services.append(service)
def get_services(self):
return self.services
@dbus.service.method(DBUS_OM_IFACE,
out_signature='a{oa{sa{sv}}}')
def GetManagedObjects(self):
log_info('🔵 GetManagedObjects вызван - клиент запрашивает список сервисов')
log_info(' Это означает, что подключение установлено и начинается дискаверинг')
managed_objects = {}
for service in self.services:
managed_objects[service.get_path()] = service.get_properties()
for chrc in service.get_characteristics():
managed_objects[chrc.get_path()] = chrc.get_properties()
log_info(f' Возвращаем {len(managed_objects)} объектов (сервисы + характеристики)')
return managed_objects
# ============ GATT Service ============
class Service(dbus.service.Object):
def __init__(self, bus, index, uuid, primary):
self.path = f'/org/bluez/example/service{index}'
self.bus = bus
self.uuid = uuid
self.primary = primary
self.characteristics = []
dbus.service.Object.__init__(self, bus, self.path)
def get_path(self):
return dbus.ObjectPath(self.path)
def add_characteristic(self, chrc):
self.characteristics.append(chrc)
def get_characteristics(self):
return self.characteristics
def get_properties(self):
return {
GATT_SERVICE_IFACE: {
'UUID': self.uuid,
'Primary': dbus.Boolean(self.primary),
'Includes': dbus.Array([], signature='o'),
}
}
# ============ GATT Characteristic (base) ============
class Characteristic(dbus.service.Object):
def __init__(self, bus, index, uuid, flags, service):
self.path = service.path + f'/char{index}'
self.bus = bus
self.uuid = uuid
self.flags = flags
self.service = service
self.notifying = False
self.value_bytes = bytes([0x00]) # дефолт
dbus.service.Object.__init__(self, bus, self.path)
def get_path(self):
return dbus.ObjectPath(self.path)
def get_properties(self):
return {
GATT_CHRC_IFACE: {
'Service': self.service.get_path(),
'UUID': self.uuid,
'Flags': dbus.Array(self.flags, signature='s'),
# Стартовое Value, чтобы клиенты видели, что характеристика живая
'Value': dbus.Array([dbus.Byte(b) for b in self.value_bytes], signature='y'),
}
}
@dbus.service.method(GATT_CHRC_IFACE,
in_signature='a{sv}',
out_signature='ay')
def ReadValue(self, options):
print(f'{self.uuid}: ReadValue (base stub)')
raise NotSupportedException('Read not implemented')
@dbus.service.method(GATT_CHRC_IFACE,
in_signature='aya{sv}',
out_signature='')
def WriteValue(self, value, options):
print(f'{self.uuid}: WriteValue (base stub)')
raise NotSupportedException('Write not implemented')
@dbus.service.method(GATT_CHRC_IFACE,
in_signature='',
out_signature='')
def StartNotify(self):
print(f'{self.uuid}: StartNotify (base stub)')
raise NotSupportedException('Notifications not supported')
@dbus.service.method(GATT_CHRC_IFACE,
in_signature='',
out_signature='')
def StopNotify(self):
print(f'{self.uuid}: StopNotify (base stub)')
raise NotSupportedException('Notifications not supported')
# helper для рассылки notify
def _update_value_and_notify(self, new_bytes: bytes):
self.value_bytes = new_bytes
self.PropertiesChanged(
GATT_CHRC_IFACE,
{'Value': dbus.Array([dbus.Byte(b) for b in self.value_bytes], signature='y')},
[]
)
@dbus.service.signal(DBUS_PROP_IFACE,
signature='sa{sv}as')
def PropertiesChanged(self, interface, changed, invalidated):
pass
# ============ Battery Service ============
class BatteryService(Service):
def __init__(self, bus, index):
Service.__init__(self, bus, index, BAT_SERVICE_UUID, True)
self.level_characteristic = BatteryLevelCharacteristic(bus, 0, self)
self.add_characteristic(self.level_characteristic)
class BatteryLevelCharacteristic(Characteristic):
def __init__(self, bus, index, service):
Characteristic.__init__(self, bus, index, BAT_LEVEL_UUID,
['read', 'notify'], service)
self.level = 77 # стартовый % батарейки
self.value_bytes = bytes([self.level])
@dbus.service.method(GATT_CHRC_IFACE,
in_signature='a{sv}',
out_signature='ay')
def ReadValue(self, options):
log_info(f'[Battery] ReadValue -> {self.level}%')
return dbus.Array([dbus.Byte(self.level)], signature='y')
def set_level(self, value):
value = max(0, min(100, int(value)))
self.level = value
if self.notifying:
self._update_value_and_notify(bytes([self.level]))
@dbus.service.method(GATT_CHRC_IFACE,
in_signature='',
out_signature='')
def StartNotify(self):
log_info('[Battery] StartNotify - клиент подписался на уведомления')
self.notifying = True
self._update_value_and_notify(bytes([self.level]))
@dbus.service.method(GATT_CHRC_IFACE,
in_signature='',
out_signature='')
def StopNotify(self):
log_info('[Battery] StopNotify - клиент отписался от уведомлений')
self.notifying = False
# ============ P2P Service (write+notify, UDP→notify) ============
class P2PService(Service):
def __init__(self, bus, index):
Service.__init__(self, bus, index, P2P_SERVICE_UUID, True)
self.p2p_char = P2PCharacteristic(bus, 0, self)
self.add_characteristic(self.p2p_char)
class P2PCharacteristic(Characteristic):
"""
P2P:
- BLE: read + write-without-response + notify
- UDP listener (порт 5005) → режем датаграмму на чанки по 20 байт и шлём notify
"""
def __init__(self, bus, index, service):
Characteristic.__init__(self, bus, index, P2P_CHAR_UUID,
['read', 'write-without-response', 'notify'], service)
self.data = bytearray(b'\x00' * 20)
self.value_bytes = bytes(self.data)
self.notifying = False
# UDP listener
self.udp_port = 5005
self._udp_thread = threading.Thread(target=self._udp_listener, daemon=True)
self._udp_thread.start()
log_info(f'[P2P] UDP listener запущен на порту {self.udp_port}')
# BLE read
@dbus.service.method(GATT_CHRC_IFACE,
in_signature='a{sv}',
out_signature='ay')
def ReadValue(self, options):
data_hex = self.data.hex()
log_info(f'[P2P] ReadValue -> {len(self.data)} байт: {data_hex}')
return dbus.Array([dbus.Byte(b) for b in self.data], signature='y')
# BLE write
@dbus.service.method(GATT_CHRC_IFACE,
in_signature='aya{sv}',
out_signature='')
def WriteValue(self, value, options):
raw = bytes(value)[:20]
self.data[:] = raw.ljust(20, b'\x00')
data_hex = raw.hex()
log_info(f'[P2P] WriteValue <- {len(raw)} байт: {data_hex}')
if self.notifying:
self._update_value_and_notify(bytes(self.data))
log_debug('[P2P] Отправлено уведомление после записи')
# BLE start notifications
@dbus.service.method(GATT_CHRC_IFACE,
in_signature='',
out_signature='')
def StartNotify(self):
log_info('[P2P] StartNotify - клиент подписался на уведомления')
self.notifying = True
self._update_value_and_notify(bytes(self.data))
# BLE stop notifications
@dbus.service.method(GATT_CHRC_IFACE,
in_signature='',
out_signature='')
def StopNotify(self):
log_info('[P2P] StopNotify - клиент отписался от уведомлений')
self.notifying = False
# ======================
# UDP Listener Thread
# ======================
def _udp_listener(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(("0.0.0.0", self.udp_port))
log_info(f'[UDP] Слушаем на 0.0.0.0:{self.udp_port}')
while True:
try:
data, addr = sock.recvfrom(2048)
log_debug(f'[UDP] Получено {len(data)} байт от {addr[0]}:{addr[1]}')
# режем на чанки по 20 байт
chunks_count = (len(data) + 19) // 20
for i in range(0, len(data), 20):
chunk = data[i:i + 20]
chunk_hex = chunk.hex()
log_debug(f'[UDP] → BLE chunk {i//20 + 1}/{chunks_count} ({len(chunk)} байт): {chunk_hex}')
# отправляем notify из потока GLib (потокобезопасно)
GLib.idle_add(self._send_notify_safe, chunk)
except Exception as e:
log_error(f'[UDP] Ошибка при получении данных: {e}')
# вызывается внутри GLib main loop
def _send_notify_safe(self, chunk: bytes):
if self.notifying:
self.data[:] = chunk.ljust(20, b'\x00')
self._update_value_and_notify(bytes(self.data))
log_debug(f'[P2P] Отправлено notify из UDP: {chunk.hex()}')
else:
log_debug('[P2P] Пропущено notify из UDP (клиент не подписан)')
return False # чтобы idle_add не повторял вызов
# ============ RW Int32 Service (read/write/notify) ============
class RWService(Service):
def __init__(self, bus, index):
Service.__init__(self, bus, index, RW_SERVICE_UUID, True)
self.rw_char = RWIntCharacteristic(bus, 0, self)
self.add_characteristic(self.rw_char)
class RWIntCharacteristic(Characteristic):
"""
RW Int32 (LE signed, 4 байта)
"""
def __init__(self, bus, index, service):
Characteristic.__init__(self, bus, index, RW_CHAR_UUID,
['read', 'write', 'notify'], service)
self.value = 1234
self.value_bytes = struct.pack('<i', self.value)
@dbus.service.method(GATT_CHRC_IFACE,
in_signature='a{sv}',
out_signature='ay')
def ReadValue(self, options):
log_info(f'[RWInt] ReadValue -> {self.value}')
data = struct.pack('<i', int(self.value))
return dbus.Array([dbus.Byte(b) for b in data], signature='y')
@dbus.service.method(GATT_CHRC_IFACE,
in_signature='aya{sv}',
out_signature='')
def WriteValue(self, value, options):
raw = bytes(value)
if len(raw) < 4:
log_warn(f'[RWInt] WriteValue: слишком короткие данные ({len(raw)} байт), игнорируем')
return
self.value = struct.unpack('<i', raw[:4])[0]
log_info(f'[RWInt] WriteValue <- {self.value}')
if self.notifying:
self._update_value_and_notify(struct.pack('<i', self.value))
log_debug('[RWInt] Отправлено уведомление после записи')
@dbus.service.method(GATT_CHRC_IFACE,
in_signature='',
out_signature='')
def StartNotify(self):
log_info('[RWInt] StartNotify - клиент подписался на уведомления')
self.notifying = True
self._update_value_and_notify(struct.pack('<i', self.value))
@dbus.service.method(GATT_CHRC_IFACE,
in_signature='',
out_signature='')
def StopNotify(self):
log_info('[RWInt] StopNotify - клиент отписался от уведомлений')
self.notifying = False
# ============ LE Advertisement ============
class TestAdvertisement(dbus.service.Object):
PATH_BASE = '/org/bluez/example/advertisement'
def __init__(self, bus, index):
self.path = self.PATH_BASE + str(index)
self.bus = bus
dbus.service.Object.__init__(self, bus, self.path)
def get_path(self):
return dbus.ObjectPath(self.path)
@dbus.service.method(DBUS_PROP_IFACE,
in_signature='s',
out_signature='a{sv}')
def GetAll(self, interface):
log_info('[Advertisement] GetAll вызван для интерфейса: ' + interface)
if interface != LE_ADVERTISEMENT_IFACE:
log_error('[Advertisement] Неверный интерфейс: ' + interface)
raise InvalidArgsException()
# Connectable реклама с явными флагами
# Важно: IncludeTxPower и другие параметры могут помочь с подключением
props = {
'Type': dbus.String('peripheral'),
'ServiceUUIDs': dbus.Array([
BAT_SERVICE_UUID,
P2P_SERVICE_UUID,
RW_SERVICE_UUID,
], signature='s'),
'LocalName': dbus.String('AIS'),
}
log_info('[Advertisement] Возвращаем свойства рекламы:')
log_info(f' Type: {props["Type"]}')
log_info(f' LocalName: {props["LocalName"]}')
log_info(f' ServiceUUIDs: {len(props["ServiceUUIDs"])} сервисов')
return props
@dbus.service.method(LE_ADVERTISEMENT_IFACE,
in_signature='',
out_signature='')
def Release(self):
log_warn('[Advertisement] Release вызван - возможно, клиент отключился или реклама была отменена')
log_warn('[Advertisement] Это может означать, что реклама была остановлена BlueZ')
# ============ Вспомогательное: поиск адаптера с GATT Manager ============
def find_adapter(bus):
obj = bus.get_object(BLUEZ_SERVICE_NAME, '/')
om = dbus.Interface(obj, DBUS_OM_IFACE)
objects = om.GetManagedObjects()
for path, ifaces in objects.items():
if GATT_MANAGER_IFACE in ifaces:
return path
return None
# ============ main() ============
def main():
global MAIN_LOOP
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
adapter_path = find_adapter(bus)
if not adapter_path:
log_error('Не найден адаптер с GattManager1. '
'Проверь, что bluetoothd запущен с --experimental')
return 1
log_info(f'Используем адаптер: {adapter_path}')
service_manager = dbus.Interface(
bus.get_object(BLUEZ_SERVICE_NAME, adapter_path),
GATT_MANAGER_IFACE
)
advertising_manager = dbus.Interface(
bus.get_object(BLUEZ_SERVICE_NAME, adapter_path),
LE_ADVERTISING_MANAGER_IFACE
)
app = Application(bus)
# Добавляем сервисы
bat_srv = BatteryService(bus, 0)
p2p_srv = P2PService(bus, 1)
rw_srv = RWService(bus, 2)
app.add_service(bat_srv)
app.add_service(p2p_srv)
app.add_service(rw_srv)
adv = TestAdvertisement(bus, 0)
# Регистрируем GATT апп
log_info('Регистрируем GATT Application...')
service_manager.RegisterApplication(
app.get_path(),
{},
reply_handler=lambda: log_info('✅ GATT Application зарегистрирован успешно'),
error_handler=lambda e: log_error(f'❌ Ошибка RegisterApplication: {e}'),
)
# Регистрируем рекламу
log_info('Регистрируем рекламу...')
log_info(f'Путь рекламы: {adv.get_path()}')
advertising_manager.RegisterAdvertisement(
adv.get_path(),
{},
reply_handler=lambda: log_info('✅ Advertisement зарегистрирован успешно - устройство должно быть видно при сканировании'),
error_handler=lambda e: log_error(f'❌ Ошибка RegisterAdvertisement: {e}'),
)
log_info('=' * 60)
log_info('BLE GATT сервер запущен и готов к подключениям')
log_info(f'Имя устройства: AIS')
log_info(f'Сервисы: Battery ({BAT_SERVICE_UUID}), P2P ({P2P_SERVICE_UUID}), RW ({RW_SERVICE_UUID})')
log_info('=' * 60)
MAIN_LOOP = GLib.MainLoop()
try:
MAIN_LOOP.run()
except KeyboardInterrupt:
log_info('Получен сигнал завершения (Ctrl+C)')
log_info('Останавливаем сервер...')
try:
# Отменяем рекламу
advertising_manager.UnregisterAdvertisement(adv.get_path())
log_info('✅ Advertisement отменён')
except Exception as e:
log_error(f'❌ Ошибка при отмене advertisement: {e}')
try:
# Отменяем регистрацию GATT приложения
service_manager.UnregisterApplication(app.get_path())
log_info('✅ GATT Application отменён')
except Exception as e:
log_error(f'❌ Ошибка при отмене GATT application: {e}')
log_info('Сервер остановлен')
return 0
if __name__ == '__main__':
sys.exit(main() or 0)