03075f1ef1
Closes TG-4 Co-authored-by: Cursor <cursoragent@cursor.com>
179 lines
5.8 KiB
Python
179 lines
5.8 KiB
Python
"""
|
|
Формирование кадра VDL AIS (флаги HDLC, bit stuffing, CRC-16-CCITT) и NRZI.
|
|
Опционально: выравнивание payload до октета и добор NRZ до N бит перед NRZI (аналог padd_frame в GNU Radio).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from typing import Optional
|
|
|
|
from bitarray import bitarray
|
|
|
|
FLAG = bitarray("01111110")
|
|
|
|
|
|
def _bits_to_bytes_msb(bits: bitarray) -> bytes:
|
|
out = bytearray()
|
|
for i in range(0, len(bits), 8):
|
|
chunk = bits[i : i + 8]
|
|
if len(chunk) < 8:
|
|
pad = bitarray(8 - len(chunk))
|
|
pad.setall(0)
|
|
chunk = chunk + pad
|
|
v = 0
|
|
for b in chunk:
|
|
v = (v << 1) | int(b)
|
|
out.append(v)
|
|
return bytes(out)
|
|
|
|
|
|
def crc16_ccitt_fcs(data_bits: bitarray) -> int:
|
|
"""FCS по данным payload (без FCS), длина кратна 8 бит."""
|
|
b = _bits_to_bytes_msb(data_bits)
|
|
crc = 0xFFFF
|
|
poly = 0x1021
|
|
for byte in b:
|
|
crc ^= byte << 8
|
|
for _ in range(8):
|
|
if crc & 0x8000:
|
|
crc = ((crc << 1) ^ poly) & 0xFFFF
|
|
else:
|
|
crc = (crc << 1) & 0xFFFF
|
|
return crc ^ 0xFFFF
|
|
|
|
|
|
def _int_to_bits(val: int, n: int) -> bitarray:
|
|
out = bitarray()
|
|
for i in range(n - 1, -1, -1):
|
|
out.append((val >> i) & 1)
|
|
return out
|
|
|
|
|
|
def bit_stuff(bits: bitarray) -> bitarray:
|
|
out = bitarray()
|
|
count = 0
|
|
for b in bits:
|
|
out.append(b)
|
|
if b:
|
|
count += 1
|
|
if count == 5:
|
|
out.append(0)
|
|
count = 0
|
|
else:
|
|
count = 0
|
|
return out
|
|
|
|
|
|
def _pad_payload_to_octet_boundary(payload_bits: bitarray) -> bitarray:
|
|
r = len(payload_bits) % 8
|
|
if r == 0:
|
|
return payload_bits
|
|
z = bitarray(8 - r)
|
|
z.setall(0)
|
|
return payload_bits + z
|
|
|
|
|
|
def _pad_nrz_to_min_bits(nrz_bits: bitarray, min_len: int) -> bitarray:
|
|
if min_len <= 0 or len(nrz_bits) >= min_len:
|
|
return nrz_bits
|
|
tail = bitarray(min_len - len(nrz_bits))
|
|
tail.setall(0)
|
|
return nrz_bits + tail
|
|
|
|
|
|
def build_hdlc_frame(
|
|
payload_bits: bitarray, *, pad_payload_to_octet: bool = False
|
|
) -> bitarray:
|
|
"""payload_bits — двоичное тело AIS (как pyais Payload.to_bitarray())."""
|
|
pl = payload_bits
|
|
if len(pl) % 8 != 0:
|
|
if pad_payload_to_octet:
|
|
pl = _pad_payload_to_octet_boundary(pl)
|
|
else:
|
|
raise ValueError("AIS payload length must be multiple of 8 bits for this CRC path")
|
|
fcs = crc16_ccitt_fcs(pl)
|
|
fcs_bits = _int_to_bits(fcs, 16)
|
|
data = pl + fcs_bits
|
|
stuffed = bit_stuff(data)
|
|
return FLAG + stuffed + FLAG
|
|
|
|
|
|
def nrzi_encode(bits: bitarray) -> bitarray:
|
|
"""NRZI: 1 — уровень без изменения, 0 — переключение."""
|
|
level = 0
|
|
out = bitarray()
|
|
for b in bits:
|
|
if not int(b):
|
|
level = 1 - level
|
|
out.append(level)
|
|
return out
|
|
|
|
|
|
def preamble_alternating(num_bits: int = 24) -> bitarray:
|
|
"""
|
|
ITU-style dotting: чередование 1/0 (101010…), как gr-aistx LEN_PREAMBLE и phy._preamble_bits.
|
|
Вариант 010101… дал бы после NRZI (packed, старт уровня 0) байты 0xCC… вместо 0x66… — та же частота,
|
|
другая фаза; к стаффингу это не относится.
|
|
"""
|
|
out = bitarray()
|
|
for i in range(num_bits):
|
|
out.append(1 - (i & 1))
|
|
return out
|
|
|
|
|
|
def hdlc_nrzi_bytes_no_preamble(
|
|
payload_bits: bitarray,
|
|
*,
|
|
nrzi_byte_mode: str = "packed",
|
|
pad_payload_to_octet: bool = False,
|
|
pad_nrz_total_bits: Optional[int] = None,
|
|
) -> bytes:
|
|
"""Только HDLC+NRZI без преамбулы. pad_nrz_total_bits — добор нулей в NRZ до длины (как GNU Radio padd_frame)."""
|
|
frame = build_hdlc_frame(payload_bits, pad_payload_to_octet=pad_payload_to_octet)
|
|
frame = _pad_nrz_to_min_bits(frame, pad_nrz_total_bits or 0)
|
|
nrz = nrzi_encode(frame)
|
|
if nrzi_byte_mode == "expanded":
|
|
return bytes(0xFF if int(b) else 0x00 for b in nrz)
|
|
return _bits_to_bytes_msb(nrz)
|
|
|
|
|
|
def phy_frame_bit_counts(
|
|
payload_bits: bitarray, *, pad_payload_to_octet: bool = False
|
|
) -> dict:
|
|
"""Длины для отладки: payload, поле между флагами (со стаффингом), полный HDLC."""
|
|
pl_in = len(payload_bits)
|
|
pl = payload_bits
|
|
if len(pl) % 8 != 0 and pad_payload_to_octet:
|
|
pl = _pad_payload_to_octet_boundary(pl)
|
|
frame = build_hdlc_frame(pl, pad_payload_to_octet=pad_payload_to_octet)
|
|
inner = frame[8:-8]
|
|
return {
|
|
"payload_bits_input": pl_in,
|
|
"payload_bits_after_pad": len(pl),
|
|
"bits_between_flags_stuffed": len(inner),
|
|
"hdlc_frame_bits": len(frame),
|
|
}
|
|
|
|
|
|
def ais_channel_to_nrzi_bytes(
|
|
payload_bits: bitarray,
|
|
*,
|
|
preamble_bits: int = 24,
|
|
nrzi_byte_mode: str = "packed",
|
|
pad_payload_to_octet: bool = False,
|
|
pad_nrz_total_bits: Optional[int] = None,
|
|
) -> bytes:
|
|
"""
|
|
nrzi_byte_mode:
|
|
'packed' — 8 NRZI-сэмплов в байт (MSB первый);
|
|
'expanded' — один байт на NRZI-бит: 0x00 / 0xFF.
|
|
pad_nrz_total_bits — после преамбулы+кадра дописать нули в NRZ до этой длины (типично 200 для GNU Radio).
|
|
"""
|
|
frame = build_hdlc_frame(payload_bits, pad_payload_to_octet=pad_payload_to_octet)
|
|
pre = preamble_alternating(preamble_bits) if preamble_bits > 0 else bitarray()
|
|
combined = pre + frame
|
|
combined = _pad_nrz_to_min_bits(combined, pad_nrz_total_bits or 0)
|
|
nrzi = nrzi_encode(combined)
|
|
if nrzi_byte_mode == "expanded":
|
|
return bytes(0xFF if int(b) else 0x00 for b in nrzi)
|
|
return _bits_to_bytes_msb(nrzi)
|