""" Настройки «нашего судна» для Class B / B+ и сборка AIS-сообщений 18, 19, 24. """ from __future__ import annotations import json import os import re import socket import struct import subprocess import sys import threading import time from typing import Any, Dict, List, Optional, Tuple from bitarray import bitarray try: from pyais.messages import MSG_CLASS except ImportError: MSG_CLASS = {} try: from pyais.constants import EpfdType except ImportError: EpfdType = None # type: ignore from ais_phy import ( ais_channel_to_nrzi_bytes, hdlc_nrzi_bytes_no_preamble, phy_frame_bit_counts, ) try: from ais_nrzi_pipeline import phy as _aistx_phy # type: ignore except ImportError: _aistx_phy = None def is_aistx_phy_available() -> bool: return _aistx_phy is not None def _expand_nrzi_packed_msb(packed: bytes) -> bytes: """Один байт NRZI → 8 байт 0x00/0xFF (MSB первый).""" out = bytearray() for byte in packed: for i in range(7, -1, -1): out.append(0xFF if (byte >> i) & 1 else 0x00) return bytes(out) CONFIG_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "transponder_config.json") _config_lock = threading.Lock() # Фиксированная доставка NRZI (как тестовый слот / приёмник на устройстве). NRZI_UDP_HOST = "127.0.0.1" NRZI_UDP_PORT = 6010 DEFAULT_TRANSPONDER: Dict[str, Any] = { "mmsi": 123456789, "shipname": "MYSHIP", "callsign": "CALLSG", "ship_type": 37, "to_bow": 10, "to_stern": 10, "to_port": 3, "to_starboard": 3, "vendorid": "1HZZZZ", "model": 1, "serial": 1, "nrzi_mode": "packed", # По умолчанию gr-aistx-цепочка; ais_phy — альтернатива (pyais CRC без reverse-byte). "nrzi_encoder": "aistx", "nrzi_preamble_bits": 24, "include_nrzi_preamble": True, # Как в GNU Radio: выравнивание payload до октета нулями; добор NRZ до N бит перед NRZI. "nrzi_pad_payload_to_octet": True, # GNU Radio часто 200; с преамбулой 24+HDLC NRZ обычно >200 бит — добор до 200 не меняет поток. "nrzi_pad_nrz_bits": 256, "use_gps_motion": True, # Заголовок UDP как у «Тест слота»: 1 байт канал + uint16 LE слот + NRZI "nrzi_slot_channel": "A", "nrzi_slot": 0, # После UDP: пауза 50–100 ms, затем запуск скрипта (напр. pulse_once.py на Orange Pi). "tx_gpio_pulse_auto": False, "tx_gpio_pulse_script": "", "tx_gpio_pulse_delay_ms": 75, } def build_slot_udp_payload(channel: str, slot: int, nrzi_body: bytes) -> bytes: """Тот же формат, что /api/send_test_slot: ch(0=A,1=B) + slot + данные.""" ch = str(channel).strip().upper() ch_byte = b"\x00" if ch == "A" else b"\x01" try: s = int(slot) except (TypeError, ValueError): s = 0 s = max(0, min(2249, s)) return ch_byte + struct.pack(" Dict[str, Any]: with _config_lock: if not os.path.exists(CONFIG_PATH): return dict(DEFAULT_TRANSPONDER) try: with open(CONFIG_PATH, "r", encoding="utf-8") as f: data = json.load(f) out = dict(DEFAULT_TRANSPONDER) for k, v in data.items(): if k in DEFAULT_TRANSPONDER: out[k] = v return out except (OSError, json.JSONDecodeError): return dict(DEFAULT_TRANSPONDER) def save_transponder_config(cfg: Dict[str, Any]) -> Dict[str, Any]: merged = normalize_transponder_config(cfg) with _config_lock: with open(CONFIG_PATH, "w", encoding="utf-8") as f: json.dump(merged, f, indent=2, ensure_ascii=False) return merged def normalize_transponder_config(cfg: Dict[str, Any]) -> Dict[str, Any]: out = dict(DEFAULT_TRANSPONDER) for k in DEFAULT_TRANSPONDER: if k not in cfg: continue v = cfg[k] if k in ( "mmsi", "ship_type", "to_bow", "to_stern", "to_port", "to_starboard", "model", "serial", ): try: out[k] = int(v) except (TypeError, ValueError): out[k] = DEFAULT_TRANSPONDER[k] elif k in ("use_gps_motion", "include_nrzi_preamble"): if isinstance(v, str): out[k] = v.strip().lower() in ("1", "true", "yes", "on") else: out[k] = bool(v) elif k == "nrzi_mode": s = str(v).lower() out[k] = s if s in ("packed", "expanded") else "packed" elif k == "nrzi_encoder": s = str(v).strip().lower() if s == "aistx" and _aistx_phy is None: out[k] = "ais_phy" else: out[k] = s if s in ("ais_phy", "aistx") else DEFAULT_TRANSPONDER[k] elif k == "nrzi_slot_channel": s = str(v).strip().upper() out[k] = s if s in ("A", "B") else DEFAULT_TRANSPONDER[k] elif k == "nrzi_slot": try: si = int(v) out[k] = max(0, min(2249, si)) except (TypeError, ValueError): out[k] = DEFAULT_TRANSPONDER[k] elif k == "nrzi_preamble_bits": try: n = int(v) out[k] = max(0, min(128, n)) except (TypeError, ValueError): out[k] = DEFAULT_TRANSPONDER[k] elif k == "nrzi_pad_nrz_bits": try: n = int(v) out[k] = max(0, min(4096, n)) except (TypeError, ValueError): out[k] = DEFAULT_TRANSPONDER[k] elif k == "nrzi_pad_payload_to_octet": if isinstance(v, str): out[k] = v.strip().lower() in ("1", "true", "yes", "on") else: out[k] = bool(v) elif k in ("shipname", "callsign", "vendorid", "tx_gpio_pulse_script"): out[k] = str(v) if v is not None else DEFAULT_TRANSPONDER[k] elif k == "tx_gpio_pulse_auto": if isinstance(v, str): out[k] = v.strip().lower() in ("1", "true", "yes", "on") else: out[k] = bool(v) elif k == "tx_gpio_pulse_delay_ms": try: n = int(v) out[k] = max(50, min(100, n)) except (TypeError, ValueError): out[k] = DEFAULT_TRANSPONDER[k] else: out[k] = v return out def merge_transponder_request( base: Dict[str, Any], body: Optional[Dict[str, Any]] ) -> Dict[str, Any]: merged = dict(base) if body: for k in DEFAULT_TRANSPONDER: if k in body: merged[k] = body[k] return normalize_transponder_config(merged) def _second_field() -> int: return int(time.gmtime(time.time()).tm_sec) % 60 def _motion_from_ownship(cfg: Dict[str, Any], own: Dict[str, Any]) -> Tuple[float, float, float, float, int]: lat = float(own.get("lat") or 0.0) lon = float(own.get("lon") or 0.0) speed = float(own.get("speed") or 0.0) if cfg.get("use_gps_motion", True) else 0.0 course = float(own.get("course") or 0.0) if cfg.get("use_gps_motion", True) else 0.0 heading = int(own.get("heading") or own.get("course") or 0) if cfg.get("use_gps_motion", True) else 0 heading = max(0, min(511, heading)) return lat, lon, speed, course, heading def build_type18_dict(cfg: Dict[str, Any], own: Dict[str, Any]) -> Dict[str, Any]: lat, lon, speed, course, heading = _motion_from_ownship(cfg, own) return { "msg_type": 18, "repeat": 0, "mmsi": int(cfg["mmsi"]), "lat": lat, "lon": lon, "speed": speed, "course": course, "heading": heading, "second": _second_field(), } def build_type19_dict(cfg: Dict[str, Any], own: Dict[str, Any]) -> Dict[str, Any]: lat, lon, speed, course, heading = _motion_from_ownship(cfg, own) epfd = EpfdType.GPS if EpfdType is not None else 1 return { "msg_type": 19, "repeat": 0, "mmsi": int(cfg["mmsi"]), "lat": lat, "lon": lon, "speed": speed, "course": course, "heading": heading, "second": _second_field(), "shipname": str(cfg.get("shipname") or "")[:20], "ship_type": int(cfg.get("ship_type") or 0), "to_bow": int(cfg.get("to_bow") or 0), "to_stern": int(cfg.get("to_stern") or 0), "to_port": int(cfg.get("to_port") or 0), "to_starboard": int(cfg.get("to_starboard") or 0), "epfd": epfd, "raim": False, "dte": False, "assigned": False, } def build_type24a_dict(cfg: Dict[str, Any]) -> Dict[str, Any]: return { "msg_type": 24, "repeat": 0, "partno": 0, "mmsi": int(cfg["mmsi"]), "shipname": str(cfg.get("shipname") or "")[:20], } def build_type24b_dict(cfg: Dict[str, Any]) -> Dict[str, Any]: vid = str(cfg.get("vendorid") or "")[:6] if len(vid) < 6: vid = (vid + "@@@@@@")[:6] return { "msg_type": 24, "repeat": 0, "partno": 1, "mmsi": int(cfg["mmsi"]), "ship_type": int(cfg.get("ship_type") or 0), "vendorid": vid, "model": int(cfg.get("model") or 0), "serial": int(cfg.get("serial") or 0), "callsign": str(cfg.get("callsign") or "")[:7], "to_bow": int(cfg.get("to_bow") or 0), "to_stern": int(cfg.get("to_stern") or 0), "to_port": int(cfg.get("to_port") or 0), "to_starboard": int(cfg.get("to_starboard") or 0), } def payload_bits_from_dict( data: Dict[str, Any], *, pad_payload_to_octet: bool = False ) -> bitarray: if not MSG_CLASS: raise RuntimeError("pyais не установлен") raw_t = data.get("msg_type", data.get("type")) if raw_t is None: raise ValueError("В данных AIS нужен msg_type (или устаревшее type)") ais_type = int(raw_t) # pyais Payload.create игнорирует неизвестные ключи; type/msg_type не являются полями attrs. create_kw = {k: v for k, v in data.items() if k not in ("type", "msg_type")} payload = MSG_CLASS[ais_type].create(**create_kw) bits = payload.to_bitarray() if len(bits) % 8 != 0 and not pad_payload_to_octet: raise ValueError(f"Payload type {ais_type} has length {len(bits)} not multiple of 8") return bits def nrzi_bytes_for_payload( data: Dict[str, Any], mode: str, *, preamble_bits: int = 24, pad_payload_to_octet: bool = False, pad_nrz_total_bits: Optional[int] = None, ) -> bytes: bits = payload_bits_from_dict(data, pad_payload_to_octet=pad_payload_to_octet) return ais_channel_to_nrzi_bytes( bits, preamble_bits=preamble_bits, nrzi_byte_mode=mode, pad_payload_to_octet=pad_payload_to_octet, pad_nrz_total_bits=pad_nrz_total_bits, ) def nrzi_bytes_hdlc_only( data: Dict[str, Any], mode: str, *, pad_payload_to_octet: bool = False, pad_nrz_total_bits: Optional[int] = None, ) -> bytes: bits = payload_bits_from_dict(data, pad_payload_to_octet=pad_payload_to_octet) return hdlc_nrzi_bytes_no_preamble( bits, nrzi_byte_mode=mode, pad_payload_to_octet=pad_payload_to_octet, pad_nrz_total_bits=pad_nrz_total_bits, ) def _encode_nrzi_for_dict( data: Dict[str, Any], cfg: Dict[str, Any], *, include_preamble: bool, mode: str, pre_bits: int, pad_oct: bool, pad_nrz: Optional[int], ) -> bytes: enc = str(cfg.get("nrzi_encoder") or DEFAULT_TRANSPONDER["nrzi_encoder"]).strip().lower() if enc == "aistx" and _aistx_phy is not None: bits = payload_bits_from_dict(data, pad_payload_to_octet=pad_oct) if include_preamble: fb = _aistx_phy.build_nrzi_frame(bits.to01(), enable_nrzi=True) raw = _aistx_phy.nrzi_bits_to_bytes(fb) if mode == "expanded": raw = _expand_nrzi_packed_msb(raw) return raw return nrzi_bytes_hdlc_only( data, mode, pad_payload_to_octet=pad_oct, pad_nrz_total_bits=pad_nrz, ) if include_preamble: return nrzi_bytes_for_payload( data, mode, preamble_bits=pre_bits, pad_payload_to_octet=pad_oct, pad_nrz_total_bits=pad_nrz, ) return nrzi_bytes_hdlc_only( data, mode, pad_payload_to_octet=pad_oct, pad_nrz_total_bits=pad_nrz, ) def parse_nrzi_hex(hex_text: str) -> bytes: s = re.sub(r"[\s:,\n\r\t]+", "", (hex_text or "").strip()) if not s: raise ValueError("Пустая hex-строка") if len(s) % 2: raise ValueError("Hex должен содержать чётное число символов") try: return bytes.fromhex(s) except ValueError as e: raise ValueError(f"Некорректный hex: {e}") from e def _nrzi_pad_opts(cfg: Dict[str, Any]) -> Tuple[bool, Optional[int]]: pad_oct = bool(cfg.get("nrzi_pad_payload_to_octet", True)) try: n = int(cfg.get("nrzi_pad_nrz_bits", 256)) except (TypeError, ValueError): n = 256 pad_nrz = None if n <= 0 else max(1, min(4096, n)) return pad_oct, pad_nrz def _send_udp(host: str, port: int, data: bytes) -> None: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: sock.sendto(data, (host, port)) finally: sock.close() def tx_gpio_pulse( cfg: Dict[str, Any], *, after_udp: bool ) -> Dict[str, Any]: """ after_udp=True: только если tx_gpio_pulse_auto; пауза 50–100 ms после UDP, затем скрипт. after_udp=False: ручной импульс (без паузы, без проверки auto). """ if after_udp and not bool(cfg.get("tx_gpio_pulse_auto")): return {"skipped": True} script = str(cfg.get("tx_gpio_pulse_script") or "").strip() if not script: if after_udp: return {"skipped": True, "reason": "no_script"} return {"ok": False, "error": "Не задан путь к скрипту импульса TX"} if not os.path.isfile(script): err = f"Файл не найден: {script}" if after_udp: return {"ok": False, "error": err} return {"ok": False, "error": err} if after_udp: try: ms = int(cfg.get("tx_gpio_pulse_delay_ms", 75)) except (TypeError, ValueError): ms = 75 ms = max(50, min(100, ms)) time.sleep(ms / 1000.0) try: r = subprocess.run( [sys.executable, script], capture_output=True, text=True, timeout=15, ) out: Dict[str, Any] = { "ok": r.returncode == 0, "returncode": r.returncode, } if r.stderr and r.stderr.strip(): out["stderr"] = r.stderr.strip()[:500] if r.stdout and r.stdout.strip(): out["stdout"] = r.stdout.strip()[:300] if r.returncode != 0 and not out.get("stderr"): out["error"] = f"exit {r.returncode}" return out except subprocess.TimeoutExpired: return {"ok": False, "error": "GPIO script timeout"} except OSError as e: return {"ok": False, "error": str(e)} def _append_gpio_result(sent: Dict[str, Any], pulse: Dict[str, Any]) -> None: if pulse.get("skipped"): return lst: List[Dict[str, Any]] = sent.setdefault("gpio_pulse", []) # type: ignore[assignment] lst.append(pulse) if not pulse.get("ok"): err = pulse.get("error") or pulse.get("stderr") or f"code {pulse.get('returncode')}" sent["errors"].append(f"GPIO TX: {err}") def send_raw_nrzi_packet( channel: str, slot: int, nrzi_body: bytes, *, cfg: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """UDP: канал + слот + сырой NRZI (как тест слота).""" ch = str(channel).strip().upper() if ch not in ("A", "B"): raise ValueError("channel must be A or B") try: slot_n = int(slot) except (TypeError, ValueError): slot_n = 0 slot_n = max(0, min(2249, slot_n)) packet = build_slot_udp_payload(ch, slot_n, nrzi_body) _send_udp(NRZI_UDP_HOST, NRZI_UDP_PORT, packet) out: Dict[str, Any] = { "dest": f"{NRZI_UDP_HOST}:{NRZI_UDP_PORT}", "slot_channel": ch, "slot": slot_n, "payload_bytes": len(nrzi_body), "udp_bytes": len(packet), "errors": [], } if cfg is not None: _append_gpio_result(out, tx_gpio_pulse(cfg, after_udp=True)) return out def send_transmission(cfg: Dict[str, Any], own: Dict[str, Any], which: str) -> Dict[str, Any]: """ Только NRZI по UDP на NRZI_UDP_HOST:NRZI_UDP_PORT. which: '18', '19', '24A', '24B', 'broadcast' """ mode = cfg.get("nrzi_mode") or "packed" ch = str(cfg.get("nrzi_slot_channel") or "A").strip().upper() if ch not in ("A", "B"): ch = "A" try: slot_n = int(cfg.get("nrzi_slot", 0)) except (TypeError, ValueError): slot_n = 0 slot_n = max(0, min(2249, slot_n)) sent: Dict[str, Any] = { "nrzi_bytes": 0, "errors": [], "dest": f"{NRZI_UDP_HOST}:{NRZI_UDP_PORT}", "slot_channel": ch, "slot": slot_n, } include_preamble = bool(cfg.get("include_nrzi_preamble", True)) try: pre_bits = int(cfg.get("nrzi_preamble_bits", 24)) except (TypeError, ValueError): pre_bits = 24 pre_bits = max(0, min(128, pre_bits)) pad_oct, pad_nrz = _nrzi_pad_opts(cfg) def one(data: Dict[str, Any], label: str) -> None: try: raw = _encode_nrzi_for_dict( data, cfg, include_preamble=include_preamble, mode=mode, pre_bits=pre_bits, pad_oct=pad_oct, pad_nrz=pad_nrz, ) packet = build_slot_udp_payload(ch, slot_n, raw) _send_udp(NRZI_UDP_HOST, NRZI_UDP_PORT, packet) sent["nrzi_bytes"] += len(packet) _append_gpio_result(sent, tx_gpio_pulse(cfg, after_udp=True)) except Exception as e: sent["errors"].append(f"NRZI {label}: {e}") if which == "broadcast": one(build_type18_dict(cfg, own), "18") one(build_type19_dict(cfg, own), "19") one(build_type24a_dict(cfg), "24A") one(build_type24b_dict(cfg), "24B") elif which == "18": one(build_type18_dict(cfg, own), "18") elif which == "19": one(build_type19_dict(cfg, own), "19") elif which == "24A": one(build_type24a_dict(cfg), "24A") elif which == "24B": one(build_type24b_dict(cfg), "24B") else: raise ValueError("which must be 18, 19, 24A, 24B, broadcast") return sent def build_preview(cfg: Dict[str, Any], own: Dict[str, Any]) -> Dict[str, Any]: """NRZI (hex) для типов 18, 19, 24A, 24B; отправка — на 127.0.0.1:6010.""" if not MSG_CLASS: raise RuntimeError("pyais не установлен") mode = cfg.get("nrzi_mode") or "packed" include_preamble = bool(cfg.get("include_nrzi_preamble", True)) try: pre_bits = int(cfg.get("nrzi_preamble_bits", 24)) except (TypeError, ValueError): pre_bits = 24 pre_bits = max(0, min(128, pre_bits)) pad_oct, pad_nrz = _nrzi_pad_opts(cfg) d18 = build_type18_dict(cfg, own) d19 = build_type19_dict(cfg, own) d24a = build_type24a_dict(cfg) d24b = build_type24b_dict(cfg) ch = str(cfg.get("nrzi_slot_channel") or "A").strip().upper() if ch not in ("A", "B"): ch = "A" try: slot_n = int(cfg.get("nrzi_slot", 0)) except (TypeError, ValueError): slot_n = 0 slot_n = max(0, min(2249, slot_n)) out: Dict[str, Any] = { "nrzi_hex": {}, "udp_frame_hex": {}, "dest": f"{NRZI_UDP_HOST}:{NRZI_UDP_PORT}", "slot_channel": ch, "slot": slot_n, "phy": { "preamble_bits": pre_bits if include_preamble else 0, "include_preamble": include_preamble, "nrzi_mode": mode, "nrzi_encoder": str(cfg.get("nrzi_encoder") or DEFAULT_TRANSPONDER["nrzi_encoder"]), "aistx_phy_note": ( "Кодер aistx: ais_nrzi_pipeline/phy.py (reverse bits, CRC как gr-aistx, кадр до 256 бит)." if str(cfg.get("nrzi_encoder") or "").lower() == "aistx" else None ), "fcs": "CRC-16-CCITT poly 0x1021, init 0xFFFF, XOROUT 0xFFFF; 16 бит FCS в потоке MSB первым", "hdlc": "флаги 0x7E, bit stuffing между флагами", "nrzi": "0 — переключение уровня, 1 — без изменения", "packed": "8 NRZI-сэмплов на байт, первый бит — MSB байта (для UDP)", "test_slot_note": ( "«Тест слота» шлёт фиксированный TEST_AIS_NRZI (32 B, проверен на передатчике). " "NRZI ниже — по умолчанию ais_nrzi_pipeline/phy.py (gr-aistx); при выборе ais_phy — другая CRC/битовый порядок." ), "pad_payload_to_octet": pad_oct, "pad_nrz_total_bits": pad_nrz, "per_message": {}, }, } enc_ui = str(cfg.get("nrzi_encoder") or DEFAULT_TRANSPONDER["nrzi_encoder"]).strip().lower() for key, d in [("18", d18), ("19", d19), ("24A", d24a), ("24B", d24b)]: bits = payload_bits_from_dict(d, pad_payload_to_octet=pad_oct) if enc_ui == "aistx" and _aistx_phy is not None: plen = len(bits) padb = (8 - (plen % 8)) % 8 out["phy"]["per_message"][key] = { "payload_bits_input": plen, "payload_bits_after_pad": plen + padb, "encoder": "aistx", "hdlc_note": "статы HDLC/stuff — см. phy.py (gr-aistx)", } else: out["phy"]["per_message"][key] = phy_frame_bit_counts( bits, pad_payload_to_octet=pad_oct ) raw = _encode_nrzi_for_dict( d, cfg, include_preamble=include_preamble, mode=mode, pre_bits=pre_bits, pad_oct=pad_oct, pad_nrz=pad_nrz, ) out["nrzi_hex"][key] = raw.hex() out["udp_frame_hex"][key] = build_slot_udp_payload(ch, slot_n, raw).hex() out["phy"]["per_message"][key]["nrzi_packed_bytes"] = len(raw) out["phy"]["per_message"][key]["udp_total_bytes"] = 3 + len(raw) return out