generated from Grigo/AndroidTemplate
557 lines
21 KiB
Python
557 lines
21 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
import dbus
|
||
import dbus.exceptions
|
||
import dbus.mainloop.glib
|
||
import dbus.service
|
||
import threading
|
||
import socket
|
||
import struct
|
||
import sys
|
||
import time
|
||
from datetime import datetime
|
||
from gi.repository import GLib
|
||
|
||
BLUEZ_SERVICE_NAME = 'org.bluez'
|
||
GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
|
||
LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
|
||
LE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1'
|
||
GATT_SERVICE_IFACE = 'org.bluez.GattService1'
|
||
GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
|
||
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
|
||
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
|
||
|
||
MAIN_LOOP = None
|
||
|
||
# ============ Логирование ============
|
||
|
||
def log(level, message):
|
||
"""Логирование с временной меткой"""
|
||
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
|
||
print(f'[{timestamp}] [{level}] {message}', flush=True)
|
||
|
||
def log_info(message):
|
||
log('INFO', message)
|
||
|
||
def log_warn(message):
|
||
log('WARN', message)
|
||
|
||
def log_error(message):
|
||
log('ERROR', message)
|
||
|
||
def log_debug(message):
|
||
log('DEBUG', message)
|
||
|
||
# ============ UUID (вариант A: короткие) ============
|
||
|
||
# Battery Service / Level – стандартные
|
||
BAT_SERVICE_UUID = '0000180f-0000-1000-8000-00805f9b34fb'
|
||
BAT_LEVEL_UUID = '00002a19-0000-1000-8000-00805f9b34fb'
|
||
|
||
# P2P – 16-битки в SIG-диапазоне
|
||
P2P_SERVICE_UUID = '0000fe40-0000-1000-8000-00805f9b34fb'
|
||
P2P_CHAR_UUID = '0000fe42-0000-1000-8000-00805f9b34fb'
|
||
|
||
# RW – кастом 16-битный диапазон
|
||
RW_SERVICE_UUID = '0000ab10-0000-1000-8000-00805f9b34fb'
|
||
RW_CHAR_UUID = '0000ab11-0000-1000-8000-00805f9b34fb'
|
||
|
||
|
||
# ============ D-Bus Exceptions ============
|
||
|
||
class InvalidArgsException(dbus.exceptions.DBusException):
|
||
_dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
|
||
|
||
|
||
class NotSupportedException(dbus.exceptions.DBusException):
|
||
_dbus_error_name = 'org.bluez.Error.NotSupported'
|
||
|
||
|
||
# ============ Application (ObjectManager) ============
|
||
|
||
class Application(dbus.service.Object):
|
||
"""
|
||
Корневой объект, который отдаёт BlueZ все наши сервисы/характеристики через GetManagedObjects.
|
||
"""
|
||
def __init__(self, bus):
|
||
self.path = '/org/bluez/example/app'
|
||
self.services = []
|
||
dbus.service.Object.__init__(self, bus, self.path)
|
||
|
||
def get_path(self):
|
||
return dbus.ObjectPath(self.path)
|
||
|
||
def add_service(self, service):
|
||
self.services.append(service)
|
||
|
||
def get_services(self):
|
||
return self.services
|
||
|
||
@dbus.service.method(DBUS_OM_IFACE,
|
||
out_signature='a{oa{sa{sv}}}')
|
||
def GetManagedObjects(self):
|
||
log_info('🔵 GetManagedObjects вызван - клиент запрашивает список сервисов')
|
||
log_info(' Это означает, что подключение установлено и начинается дискаверинг')
|
||
managed_objects = {}
|
||
for service in self.services:
|
||
managed_objects[service.get_path()] = service.get_properties()
|
||
for chrc in service.get_characteristics():
|
||
managed_objects[chrc.get_path()] = chrc.get_properties()
|
||
log_info(f' Возвращаем {len(managed_objects)} объектов (сервисы + характеристики)')
|
||
return managed_objects
|
||
|
||
|
||
# ============ GATT Service ============
|
||
|
||
class Service(dbus.service.Object):
|
||
def __init__(self, bus, index, uuid, primary):
|
||
self.path = f'/org/bluez/example/service{index}'
|
||
self.bus = bus
|
||
self.uuid = uuid
|
||
self.primary = primary
|
||
self.characteristics = []
|
||
dbus.service.Object.__init__(self, bus, self.path)
|
||
|
||
def get_path(self):
|
||
return dbus.ObjectPath(self.path)
|
||
|
||
def add_characteristic(self, chrc):
|
||
self.characteristics.append(chrc)
|
||
|
||
def get_characteristics(self):
|
||
return self.characteristics
|
||
|
||
def get_properties(self):
|
||
return {
|
||
GATT_SERVICE_IFACE: {
|
||
'UUID': self.uuid,
|
||
'Primary': dbus.Boolean(self.primary),
|
||
'Includes': dbus.Array([], signature='o'),
|
||
}
|
||
}
|
||
|
||
|
||
# ============ GATT Characteristic (base) ============
|
||
|
||
class Characteristic(dbus.service.Object):
|
||
def __init__(self, bus, index, uuid, flags, service):
|
||
self.path = service.path + f'/char{index}'
|
||
self.bus = bus
|
||
self.uuid = uuid
|
||
self.flags = flags
|
||
self.service = service
|
||
self.notifying = False
|
||
self.value_bytes = bytes([0x00]) # дефолт
|
||
dbus.service.Object.__init__(self, bus, self.path)
|
||
|
||
def get_path(self):
|
||
return dbus.ObjectPath(self.path)
|
||
|
||
def get_properties(self):
|
||
return {
|
||
GATT_CHRC_IFACE: {
|
||
'Service': self.service.get_path(),
|
||
'UUID': self.uuid,
|
||
'Flags': dbus.Array(self.flags, signature='s'),
|
||
# Стартовое Value, чтобы клиенты видели, что характеристика живая
|
||
'Value': dbus.Array([dbus.Byte(b) for b in self.value_bytes], signature='y'),
|
||
}
|
||
}
|
||
|
||
@dbus.service.method(GATT_CHRC_IFACE,
|
||
in_signature='a{sv}',
|
||
out_signature='ay')
|
||
def ReadValue(self, options):
|
||
print(f'{self.uuid}: ReadValue (base stub)')
|
||
raise NotSupportedException('Read not implemented')
|
||
|
||
@dbus.service.method(GATT_CHRC_IFACE,
|
||
in_signature='aya{sv}',
|
||
out_signature='')
|
||
def WriteValue(self, value, options):
|
||
print(f'{self.uuid}: WriteValue (base stub)')
|
||
raise NotSupportedException('Write not implemented')
|
||
|
||
@dbus.service.method(GATT_CHRC_IFACE,
|
||
in_signature='',
|
||
out_signature='')
|
||
def StartNotify(self):
|
||
print(f'{self.uuid}: StartNotify (base stub)')
|
||
raise NotSupportedException('Notifications not supported')
|
||
|
||
@dbus.service.method(GATT_CHRC_IFACE,
|
||
in_signature='',
|
||
out_signature='')
|
||
def StopNotify(self):
|
||
print(f'{self.uuid}: StopNotify (base stub)')
|
||
raise NotSupportedException('Notifications not supported')
|
||
|
||
# helper для рассылки notify
|
||
def _update_value_and_notify(self, new_bytes: bytes):
|
||
self.value_bytes = new_bytes
|
||
self.PropertiesChanged(
|
||
GATT_CHRC_IFACE,
|
||
{'Value': dbus.Array([dbus.Byte(b) for b in self.value_bytes], signature='y')},
|
||
[]
|
||
)
|
||
|
||
@dbus.service.signal(DBUS_PROP_IFACE,
|
||
signature='sa{sv}as')
|
||
def PropertiesChanged(self, interface, changed, invalidated):
|
||
pass
|
||
|
||
|
||
# ============ Battery Service ============
|
||
|
||
class BatteryService(Service):
|
||
def __init__(self, bus, index):
|
||
Service.__init__(self, bus, index, BAT_SERVICE_UUID, True)
|
||
self.level_characteristic = BatteryLevelCharacteristic(bus, 0, self)
|
||
self.add_characteristic(self.level_characteristic)
|
||
|
||
|
||
class BatteryLevelCharacteristic(Characteristic):
|
||
def __init__(self, bus, index, service):
|
||
Characteristic.__init__(self, bus, index, BAT_LEVEL_UUID,
|
||
['read', 'notify'], service)
|
||
self.level = 77 # стартовый % батарейки
|
||
self.value_bytes = bytes([self.level])
|
||
|
||
@dbus.service.method(GATT_CHRC_IFACE,
|
||
in_signature='a{sv}',
|
||
out_signature='ay')
|
||
def ReadValue(self, options):
|
||
log_info(f'[Battery] ReadValue -> {self.level}%')
|
||
return dbus.Array([dbus.Byte(self.level)], signature='y')
|
||
|
||
def set_level(self, value):
|
||
value = max(0, min(100, int(value)))
|
||
self.level = value
|
||
if self.notifying:
|
||
self._update_value_and_notify(bytes([self.level]))
|
||
|
||
@dbus.service.method(GATT_CHRC_IFACE,
|
||
in_signature='',
|
||
out_signature='')
|
||
def StartNotify(self):
|
||
log_info('[Battery] StartNotify - клиент подписался на уведомления')
|
||
self.notifying = True
|
||
self._update_value_and_notify(bytes([self.level]))
|
||
|
||
@dbus.service.method(GATT_CHRC_IFACE,
|
||
in_signature='',
|
||
out_signature='')
|
||
def StopNotify(self):
|
||
log_info('[Battery] StopNotify - клиент отписался от уведомлений')
|
||
self.notifying = False
|
||
|
||
|
||
# ============ P2P Service (write+notify, UDP→notify) ============
|
||
|
||
class P2PService(Service):
|
||
def __init__(self, bus, index):
|
||
Service.__init__(self, bus, index, P2P_SERVICE_UUID, True)
|
||
self.p2p_char = P2PCharacteristic(bus, 0, self)
|
||
self.add_characteristic(self.p2p_char)
|
||
|
||
|
||
class P2PCharacteristic(Characteristic):
|
||
"""
|
||
P2P:
|
||
- BLE: read + write-without-response + notify
|
||
- UDP listener (порт 5005) → режем датаграмму на чанки по 20 байт и шлём notify
|
||
"""
|
||
def __init__(self, bus, index, service):
|
||
Characteristic.__init__(self, bus, index, P2P_CHAR_UUID,
|
||
['read', 'write-without-response', 'notify'], service)
|
||
self.data = bytearray(b'\x00' * 20)
|
||
self.value_bytes = bytes(self.data)
|
||
self.notifying = False
|
||
|
||
# UDP listener
|
||
self.udp_port = 5005
|
||
self._udp_thread = threading.Thread(target=self._udp_listener, daemon=True)
|
||
self._udp_thread.start()
|
||
log_info(f'[P2P] UDP listener запущен на порту {self.udp_port}')
|
||
|
||
# BLE read
|
||
@dbus.service.method(GATT_CHRC_IFACE,
|
||
in_signature='a{sv}',
|
||
out_signature='ay')
|
||
def ReadValue(self, options):
|
||
data_hex = self.data.hex()
|
||
log_info(f'[P2P] ReadValue -> {len(self.data)} байт: {data_hex}')
|
||
return dbus.Array([dbus.Byte(b) for b in self.data], signature='y')
|
||
|
||
# BLE write
|
||
@dbus.service.method(GATT_CHRC_IFACE,
|
||
in_signature='aya{sv}',
|
||
out_signature='')
|
||
def WriteValue(self, value, options):
|
||
raw = bytes(value)[:20]
|
||
self.data[:] = raw.ljust(20, b'\x00')
|
||
data_hex = raw.hex()
|
||
log_info(f'[P2P] WriteValue <- {len(raw)} байт: {data_hex}')
|
||
if self.notifying:
|
||
self._update_value_and_notify(bytes(self.data))
|
||
log_debug('[P2P] Отправлено уведомление после записи')
|
||
|
||
# BLE start notifications
|
||
@dbus.service.method(GATT_CHRC_IFACE,
|
||
in_signature='',
|
||
out_signature='')
|
||
def StartNotify(self):
|
||
log_info('[P2P] StartNotify - клиент подписался на уведомления')
|
||
self.notifying = True
|
||
self._update_value_and_notify(bytes(self.data))
|
||
|
||
# BLE stop notifications
|
||
@dbus.service.method(GATT_CHRC_IFACE,
|
||
in_signature='',
|
||
out_signature='')
|
||
def StopNotify(self):
|
||
log_info('[P2P] StopNotify - клиент отписался от уведомлений')
|
||
self.notifying = False
|
||
|
||
# ======================
|
||
# UDP Listener Thread
|
||
# ======================
|
||
|
||
def _udp_listener(self):
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
sock.bind(("0.0.0.0", self.udp_port))
|
||
log_info(f'[UDP] Слушаем на 0.0.0.0:{self.udp_port}')
|
||
|
||
while True:
|
||
try:
|
||
data, addr = sock.recvfrom(2048)
|
||
log_debug(f'[UDP] Получено {len(data)} байт от {addr[0]}:{addr[1]}')
|
||
|
||
# режем на чанки по 20 байт
|
||
chunks_count = (len(data) + 19) // 20
|
||
for i in range(0, len(data), 20):
|
||
chunk = data[i:i + 20]
|
||
chunk_hex = chunk.hex()
|
||
log_debug(f'[UDP] → BLE chunk {i//20 + 1}/{chunks_count} ({len(chunk)} байт): {chunk_hex}')
|
||
|
||
# отправляем notify из потока GLib (потокобезопасно)
|
||
GLib.idle_add(self._send_notify_safe, chunk)
|
||
except Exception as e:
|
||
log_error(f'[UDP] Ошибка при получении данных: {e}')
|
||
|
||
# вызывается внутри GLib main loop
|
||
def _send_notify_safe(self, chunk: bytes):
|
||
if self.notifying:
|
||
self.data[:] = chunk.ljust(20, b'\x00')
|
||
self._update_value_and_notify(bytes(self.data))
|
||
log_debug(f'[P2P] Отправлено notify из UDP: {chunk.hex()}')
|
||
else:
|
||
log_debug('[P2P] Пропущено notify из UDP (клиент не подписан)')
|
||
return False # чтобы idle_add не повторял вызов
|
||
|
||
|
||
# ============ RW Int32 Service (read/write/notify) ============
|
||
|
||
class RWService(Service):
|
||
def __init__(self, bus, index):
|
||
Service.__init__(self, bus, index, RW_SERVICE_UUID, True)
|
||
self.rw_char = RWIntCharacteristic(bus, 0, self)
|
||
self.add_characteristic(self.rw_char)
|
||
|
||
|
||
class RWIntCharacteristic(Characteristic):
|
||
"""
|
||
RW Int32 (LE signed, 4 байта)
|
||
"""
|
||
def __init__(self, bus, index, service):
|
||
Characteristic.__init__(self, bus, index, RW_CHAR_UUID,
|
||
['read', 'write', 'notify'], service)
|
||
self.value = 1234
|
||
self.value_bytes = struct.pack('<i', self.value)
|
||
|
||
@dbus.service.method(GATT_CHRC_IFACE,
|
||
in_signature='a{sv}',
|
||
out_signature='ay')
|
||
def ReadValue(self, options):
|
||
log_info(f'[RWInt] ReadValue -> {self.value}')
|
||
data = struct.pack('<i', int(self.value))
|
||
return dbus.Array([dbus.Byte(b) for b in data], signature='y')
|
||
|
||
@dbus.service.method(GATT_CHRC_IFACE,
|
||
in_signature='aya{sv}',
|
||
out_signature='')
|
||
def WriteValue(self, value, options):
|
||
raw = bytes(value)
|
||
if len(raw) < 4:
|
||
log_warn(f'[RWInt] WriteValue: слишком короткие данные ({len(raw)} байт), игнорируем')
|
||
return
|
||
self.value = struct.unpack('<i', raw[:4])[0]
|
||
log_info(f'[RWInt] WriteValue <- {self.value}')
|
||
if self.notifying:
|
||
self._update_value_and_notify(struct.pack('<i', self.value))
|
||
log_debug('[RWInt] Отправлено уведомление после записи')
|
||
|
||
@dbus.service.method(GATT_CHRC_IFACE,
|
||
in_signature='',
|
||
out_signature='')
|
||
def StartNotify(self):
|
||
log_info('[RWInt] StartNotify - клиент подписался на уведомления')
|
||
self.notifying = True
|
||
self._update_value_and_notify(struct.pack('<i', self.value))
|
||
|
||
@dbus.service.method(GATT_CHRC_IFACE,
|
||
in_signature='',
|
||
out_signature='')
|
||
def StopNotify(self):
|
||
log_info('[RWInt] StopNotify - клиент отписался от уведомлений')
|
||
self.notifying = False
|
||
|
||
|
||
# ============ LE Advertisement ============
|
||
|
||
class TestAdvertisement(dbus.service.Object):
|
||
PATH_BASE = '/org/bluez/example/advertisement'
|
||
|
||
def __init__(self, bus, index):
|
||
self.path = self.PATH_BASE + str(index)
|
||
self.bus = bus
|
||
dbus.service.Object.__init__(self, bus, self.path)
|
||
|
||
def get_path(self):
|
||
return dbus.ObjectPath(self.path)
|
||
|
||
@dbus.service.method(DBUS_PROP_IFACE,
|
||
in_signature='s',
|
||
out_signature='a{sv}')
|
||
def GetAll(self, interface):
|
||
log_info('[Advertisement] GetAll вызван для интерфейса: ' + interface)
|
||
if interface != LE_ADVERTISEMENT_IFACE:
|
||
log_error('[Advertisement] Неверный интерфейс: ' + interface)
|
||
raise InvalidArgsException()
|
||
# Connectable реклама с явными флагами
|
||
# Важно: IncludeTxPower и другие параметры могут помочь с подключением
|
||
props = {
|
||
'Type': dbus.String('peripheral'),
|
||
'ServiceUUIDs': dbus.Array([
|
||
BAT_SERVICE_UUID,
|
||
P2P_SERVICE_UUID,
|
||
RW_SERVICE_UUID,
|
||
], signature='s'),
|
||
'LocalName': dbus.String('AIS'),
|
||
}
|
||
log_info('[Advertisement] Возвращаем свойства рекламы:')
|
||
log_info(f' Type: {props["Type"]}')
|
||
log_info(f' LocalName: {props["LocalName"]}')
|
||
log_info(f' ServiceUUIDs: {len(props["ServiceUUIDs"])} сервисов')
|
||
return props
|
||
|
||
@dbus.service.method(LE_ADVERTISEMENT_IFACE,
|
||
in_signature='',
|
||
out_signature='')
|
||
def Release(self):
|
||
log_warn('[Advertisement] Release вызван - возможно, клиент отключился или реклама была отменена')
|
||
log_warn('[Advertisement] Это может означать, что реклама была остановлена BlueZ')
|
||
|
||
|
||
# ============ Вспомогательное: поиск адаптера с GATT Manager ============
|
||
|
||
def find_adapter(bus):
|
||
obj = bus.get_object(BLUEZ_SERVICE_NAME, '/')
|
||
om = dbus.Interface(obj, DBUS_OM_IFACE)
|
||
objects = om.GetManagedObjects()
|
||
for path, ifaces in objects.items():
|
||
if GATT_MANAGER_IFACE in ifaces:
|
||
return path
|
||
return None
|
||
|
||
|
||
# ============ main() ============
|
||
|
||
def main():
|
||
global MAIN_LOOP
|
||
|
||
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
|
||
bus = dbus.SystemBus()
|
||
|
||
adapter_path = find_adapter(bus)
|
||
if not adapter_path:
|
||
log_error('Не найден адаптер с GattManager1. '
|
||
'Проверь, что bluetoothd запущен с --experimental')
|
||
return 1
|
||
|
||
log_info(f'Используем адаптер: {adapter_path}')
|
||
|
||
service_manager = dbus.Interface(
|
||
bus.get_object(BLUEZ_SERVICE_NAME, adapter_path),
|
||
GATT_MANAGER_IFACE
|
||
)
|
||
|
||
advertising_manager = dbus.Interface(
|
||
bus.get_object(BLUEZ_SERVICE_NAME, adapter_path),
|
||
LE_ADVERTISING_MANAGER_IFACE
|
||
)
|
||
|
||
app = Application(bus)
|
||
|
||
# Добавляем сервисы
|
||
bat_srv = BatteryService(bus, 0)
|
||
p2p_srv = P2PService(bus, 1)
|
||
rw_srv = RWService(bus, 2)
|
||
|
||
app.add_service(bat_srv)
|
||
app.add_service(p2p_srv)
|
||
app.add_service(rw_srv)
|
||
|
||
adv = TestAdvertisement(bus, 0)
|
||
|
||
# Регистрируем GATT апп
|
||
log_info('Регистрируем GATT Application...')
|
||
service_manager.RegisterApplication(
|
||
app.get_path(),
|
||
{},
|
||
reply_handler=lambda: log_info('✅ GATT Application зарегистрирован успешно'),
|
||
error_handler=lambda e: log_error(f'❌ Ошибка RegisterApplication: {e}'),
|
||
)
|
||
|
||
# Регистрируем рекламу
|
||
log_info('Регистрируем рекламу...')
|
||
log_info(f'Путь рекламы: {adv.get_path()}')
|
||
advertising_manager.RegisterAdvertisement(
|
||
adv.get_path(),
|
||
{},
|
||
reply_handler=lambda: log_info('✅ Advertisement зарегистрирован успешно - устройство должно быть видно при сканировании'),
|
||
error_handler=lambda e: log_error(f'❌ Ошибка RegisterAdvertisement: {e}'),
|
||
)
|
||
|
||
log_info('=' * 60)
|
||
log_info('BLE GATT сервер запущен и готов к подключениям')
|
||
log_info(f'Имя устройства: AIS')
|
||
log_info(f'Сервисы: Battery ({BAT_SERVICE_UUID}), P2P ({P2P_SERVICE_UUID}), RW ({RW_SERVICE_UUID})')
|
||
log_info('=' * 60)
|
||
|
||
MAIN_LOOP = GLib.MainLoop()
|
||
try:
|
||
MAIN_LOOP.run()
|
||
except KeyboardInterrupt:
|
||
log_info('Получен сигнал завершения (Ctrl+C)')
|
||
log_info('Останавливаем сервер...')
|
||
try:
|
||
# Отменяем рекламу
|
||
advertising_manager.UnregisterAdvertisement(adv.get_path())
|
||
log_info('✅ Advertisement отменён')
|
||
except Exception as e:
|
||
log_error(f'❌ Ошибка при отмене advertisement: {e}')
|
||
try:
|
||
# Отменяем регистрацию GATT приложения
|
||
service_manager.UnregisterApplication(app.get_path())
|
||
log_info('✅ GATT Application отменён')
|
||
except Exception as e:
|
||
log_error(f'❌ Ошибка при отмене GATT application: {e}')
|
||
log_info('Сервер остановлен')
|
||
return 0
|
||
|
||
|
||
if __name__ == '__main__':
|
||
sys.exit(main() or 0)
|