168 lines
5.1 KiB
Python
168 lines
5.1 KiB
Python
"""Binary decoder tests for AIS-catcher Mini UDP telemetry.
|
|
|
|
The expected byte layouts come from ais-mini's UDP comment block; any
|
|
layout drift (field order, endianness, sizes) must fail here before the
|
|
ingest runs on real hardware.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import struct
|
|
|
|
import pytest
|
|
|
|
from ais_hub.parser.aiscatcher import (
|
|
decode_rssi_iq,
|
|
decode_signal_events,
|
|
decode_slot_bitmap,
|
|
decode_slot_detail,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Slot bitmap (315 bytes)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _build_bitmap(
|
|
*,
|
|
channel: bytes = b"A",
|
|
utc_minute: int = 28_279_310,
|
|
slots_total: int = 2250,
|
|
occupied: int = 437,
|
|
noise: float = 0.0,
|
|
threshold: float = 0.0,
|
|
slot0_ms: int = 28_279_310 * 60_000,
|
|
first_occ_ms: int = 28_279_310 * 60_000 + 123,
|
|
bitmap: bytes | None = None,
|
|
) -> bytes:
|
|
if bitmap is None:
|
|
bitmap = bytes(282)
|
|
assert len(channel) == 1
|
|
assert len(bitmap) == 282
|
|
head = struct.pack(
|
|
">BIHHffQQ",
|
|
channel[0], utc_minute, slots_total, occupied,
|
|
noise, threshold, slot0_ms, first_occ_ms,
|
|
)
|
|
assert len(head) == 33
|
|
return head + bitmap
|
|
|
|
|
|
def test_bitmap_decodes_roundtrip():
|
|
bm = bytes(range(256)) + bytes(282 - 256)
|
|
data = _build_bitmap(channel=b"B", occupied=1234, bitmap=bm)
|
|
snap = decode_slot_bitmap(data)
|
|
assert snap is not None
|
|
assert snap.channel == "B"
|
|
assert snap.utc_minute == 28_279_310
|
|
assert snap.slots_total == 2250
|
|
assert snap.occupied_count == 1234
|
|
assert snap.slot0_unix_ms == 28_279_310 * 60_000
|
|
assert snap.first_occupied_unix_ms == 28_279_310 * 60_000 + 123
|
|
assert snap.bitmap == bm
|
|
# occupied_fraction is derived
|
|
assert snap.occupied_fraction() == pytest.approx(1234 / 2250)
|
|
|
|
|
|
def test_bitmap_rejects_wrong_size():
|
|
data = _build_bitmap()
|
|
assert decode_slot_bitmap(data[:-1]) is None
|
|
assert decode_slot_bitmap(data + b"\x00") is None
|
|
|
|
|
|
def test_bitmap_unknown_channel_byte_marked_questionmark():
|
|
data = _build_bitmap(channel=b"X")
|
|
snap = decode_slot_bitmap(data)
|
|
assert snap is not None
|
|
assert snap.channel == "?"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Slot detail
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_slot_detail_roundtrip():
|
|
entries = [(0, -85.5), (100, -72.125), (2249, -99.0)]
|
|
head = struct.pack(">BIQH", ord(b"A"), 28_279_310, 28_279_310 * 60_000, len(entries))
|
|
body = b"".join(struct.pack(">Hf", slot, lvl) for slot, lvl in entries)
|
|
detail = decode_slot_detail(head + body)
|
|
assert detail is not None
|
|
assert detail.channel == "A"
|
|
assert detail.utc_minute == 28_279_310
|
|
assert detail.slot0_unix_ms == 28_279_310 * 60_000
|
|
assert [(e.slot, round(e.level_db, 4)) for e in detail.entries] == [
|
|
(0, -85.5),
|
|
(100, -72.125),
|
|
(2249, -99.0),
|
|
]
|
|
|
|
|
|
def test_slot_detail_rejects_short_body():
|
|
head = struct.pack(">BIQH", ord(b"A"), 1, 2, 3)
|
|
# only 1 full entry provided where 3 claimed
|
|
body = struct.pack(">Hf", 10, -70.0)
|
|
assert decode_slot_detail(head + body) is None
|
|
|
|
|
|
def test_slot_detail_empty_is_valid():
|
|
head = struct.pack(">BIQH", ord(b"B"), 1, 2, 0)
|
|
detail = decode_slot_detail(head)
|
|
assert detail is not None
|
|
assert detail.entries == []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# RSSI IQ (8 bytes)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_rssi_decodes_pair():
|
|
data = struct.pack(">ff", -42.25, -55.75)
|
|
rssi = decode_rssi_iq(data)
|
|
assert rssi is not None
|
|
assert rssi.power_a_db == pytest.approx(-42.25)
|
|
assert rssi.power_b_db == pytest.approx(-55.75)
|
|
|
|
|
|
def test_rssi_wrong_size():
|
|
assert decode_rssi_iq(b"\x00" * 7) is None
|
|
assert decode_rssi_iq(b"\x00" * 9) is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Signal events (decode markers)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_signal_events_roundtrip():
|
|
events = [
|
|
(1_700_000_000_000, 42, 257_123_456, -72.0),
|
|
(1_700_000_000_100, 1500, 227_000_001, -88.5),
|
|
]
|
|
head = struct.pack(">BH", ord(b"A"), len(events))
|
|
body = b"".join(struct.pack(">QHIf", ts, slot, mmsi, lvl)
|
|
for ts, slot, mmsi, lvl in events)
|
|
batch = decode_signal_events(head + body)
|
|
assert batch is not None
|
|
assert batch.channel == "A"
|
|
assert len(batch.events) == 2
|
|
assert batch.events[0].unix_ms == 1_700_000_000_000
|
|
assert batch.events[0].slot == 42
|
|
assert batch.events[0].mmsi == 257_123_456
|
|
assert batch.events[0].level_db == pytest.approx(-72.0)
|
|
assert batch.events[1].mmsi == 227_000_001
|
|
|
|
|
|
def test_signal_events_empty():
|
|
data = struct.pack(">BH", ord(b"A"), 0)
|
|
batch = decode_signal_events(data)
|
|
assert batch is not None
|
|
assert batch.events == []
|
|
|
|
|
|
def test_signal_events_rejects_short_body():
|
|
head = struct.pack(">BH", ord(b"A"), 5)
|
|
assert decode_signal_events(head + b"\x00" * 10) is None
|