""" Формирование кадра VDL AIS (флаги HDLC, bit stuffing, CRC-16-CCITT) и NRZI. Опционально: выравнивание payload до октета и добор NRZ до N бит перед NRZI (аналог padd_frame в GNU Radio). """ from __future__ import annotations from typing import Optional from bitarray import bitarray FLAG = bitarray("01111110") def _bits_to_bytes_msb(bits: bitarray) -> bytes: out = bytearray() for i in range(0, len(bits), 8): chunk = bits[i : i + 8] if len(chunk) < 8: pad = bitarray(8 - len(chunk)) pad.setall(0) chunk = chunk + pad v = 0 for b in chunk: v = (v << 1) | int(b) out.append(v) return bytes(out) def crc16_ccitt_fcs(data_bits: bitarray) -> int: """FCS по данным payload (без FCS), длина кратна 8 бит.""" b = _bits_to_bytes_msb(data_bits) crc = 0xFFFF poly = 0x1021 for byte in b: crc ^= byte << 8 for _ in range(8): if crc & 0x8000: crc = ((crc << 1) ^ poly) & 0xFFFF else: crc = (crc << 1) & 0xFFFF return crc ^ 0xFFFF def _int_to_bits(val: int, n: int) -> bitarray: out = bitarray() for i in range(n - 1, -1, -1): out.append((val >> i) & 1) return out def bit_stuff(bits: bitarray) -> bitarray: out = bitarray() count = 0 for b in bits: out.append(b) if b: count += 1 if count == 5: out.append(0) count = 0 else: count = 0 return out def _pad_payload_to_octet_boundary(payload_bits: bitarray) -> bitarray: r = len(payload_bits) % 8 if r == 0: return payload_bits z = bitarray(8 - r) z.setall(0) return payload_bits + z def _pad_nrz_to_min_bits(nrz_bits: bitarray, min_len: int) -> bitarray: if min_len <= 0 or len(nrz_bits) >= min_len: return nrz_bits tail = bitarray(min_len - len(nrz_bits)) tail.setall(0) return nrz_bits + tail def build_hdlc_frame( payload_bits: bitarray, *, pad_payload_to_octet: bool = False ) -> bitarray: """payload_bits — двоичное тело AIS (как pyais Payload.to_bitarray()).""" pl = payload_bits if len(pl) % 8 != 0: if pad_payload_to_octet: pl = _pad_payload_to_octet_boundary(pl) else: raise ValueError("AIS payload length must be multiple of 8 bits for this CRC path") fcs = crc16_ccitt_fcs(pl) fcs_bits = _int_to_bits(fcs, 16) data = pl + fcs_bits stuffed = bit_stuff(data) return FLAG + stuffed + FLAG def nrzi_encode(bits: bitarray) -> bitarray: """NRZI: 1 — уровень без изменения, 0 — переключение.""" level = 0 out = bitarray() for b in bits: if not int(b): level = 1 - level out.append(level) return out def preamble_alternating(num_bits: int = 24) -> bitarray: """ ITU-style dotting: чередование 1/0 (101010…), как gr-aistx LEN_PREAMBLE и phy._preamble_bits. Вариант 010101… дал бы после NRZI (packed, старт уровня 0) байты 0xCC… вместо 0x66… — та же частота, другая фаза; к стаффингу это не относится. """ out = bitarray() for i in range(num_bits): out.append(1 - (i & 1)) return out def hdlc_nrzi_bytes_no_preamble( payload_bits: bitarray, *, nrzi_byte_mode: str = "packed", pad_payload_to_octet: bool = False, pad_nrz_total_bits: Optional[int] = None, ) -> bytes: """Только HDLC+NRZI без преамбулы. pad_nrz_total_bits — добор нулей в NRZ до длины (как GNU Radio padd_frame).""" frame = build_hdlc_frame(payload_bits, pad_payload_to_octet=pad_payload_to_octet) frame = _pad_nrz_to_min_bits(frame, pad_nrz_total_bits or 0) nrz = nrzi_encode(frame) if nrzi_byte_mode == "expanded": return bytes(0xFF if int(b) else 0x00 for b in nrz) return _bits_to_bytes_msb(nrz) def phy_frame_bit_counts( payload_bits: bitarray, *, pad_payload_to_octet: bool = False ) -> dict: """Длины для отладки: payload, поле между флагами (со стаффингом), полный HDLC.""" pl_in = len(payload_bits) pl = payload_bits if len(pl) % 8 != 0 and pad_payload_to_octet: pl = _pad_payload_to_octet_boundary(pl) frame = build_hdlc_frame(pl, pad_payload_to_octet=pad_payload_to_octet) inner = frame[8:-8] return { "payload_bits_input": pl_in, "payload_bits_after_pad": len(pl), "bits_between_flags_stuffed": len(inner), "hdlc_frame_bits": len(frame), } def ais_channel_to_nrzi_bytes( payload_bits: bitarray, *, preamble_bits: int = 24, nrzi_byte_mode: str = "packed", pad_payload_to_octet: bool = False, pad_nrz_total_bits: Optional[int] = None, ) -> bytes: """ nrzi_byte_mode: 'packed' — 8 NRZI-сэмплов в байт (MSB первый); 'expanded' — один байт на NRZI-бит: 0x00 / 0xFF. pad_nrz_total_bits — после преамбулы+кадра дописать нули в NRZ до этой длины (типично 200 для GNU Radio). """ frame = build_hdlc_frame(payload_bits, pad_payload_to_octet=pad_payload_to_octet) pre = preamble_alternating(preamble_bits) if preamble_bits > 0 else bitarray() combined = pre + frame combined = _pad_nrz_to_min_bits(combined, pad_nrz_total_bits or 0) nrzi = nrzi_encode(combined) if nrzi_byte_mode == "expanded": return bytes(0xFF if int(b) else 0x00 for b in nrzi) return _bits_to_bytes_msb(nrzi)