"""Binary decoder tests for AIS-catcher Mini UDP telemetry. The expected byte layouts come from ais-mini's UDP comment block; any layout drift (field order, endianness, sizes) must fail here before the ingest runs on real hardware. """ from __future__ import annotations import struct import pytest from ais_hub.parser.aiscatcher import ( decode_rssi_iq, decode_signal_events, decode_slot_bitmap, decode_slot_detail, ) # --------------------------------------------------------------------------- # Slot bitmap (315 bytes) # --------------------------------------------------------------------------- def _build_bitmap( *, channel: bytes = b"A", utc_minute: int = 28_279_310, slots_total: int = 2250, occupied: int = 437, noise: float = 0.0, threshold: float = 0.0, slot0_ms: int = 28_279_310 * 60_000, first_occ_ms: int = 28_279_310 * 60_000 + 123, bitmap: bytes | None = None, ) -> bytes: if bitmap is None: bitmap = bytes(282) assert len(channel) == 1 assert len(bitmap) == 282 head = struct.pack( ">BIHHffQQ", channel[0], utc_minute, slots_total, occupied, noise, threshold, slot0_ms, first_occ_ms, ) assert len(head) == 33 return head + bitmap def test_bitmap_decodes_roundtrip(): bm = bytes(range(256)) + bytes(282 - 256) data = _build_bitmap(channel=b"B", occupied=1234, bitmap=bm) snap = decode_slot_bitmap(data) assert snap is not None assert snap.channel == "B" assert snap.utc_minute == 28_279_310 assert snap.slots_total == 2250 assert snap.occupied_count == 1234 assert snap.slot0_unix_ms == 28_279_310 * 60_000 assert snap.first_occupied_unix_ms == 28_279_310 * 60_000 + 123 assert snap.bitmap == bm # occupied_fraction is derived assert snap.occupied_fraction() == pytest.approx(1234 / 2250) def test_bitmap_rejects_wrong_size(): data = _build_bitmap() assert decode_slot_bitmap(data[:-1]) is None assert decode_slot_bitmap(data + b"\x00") is None def test_bitmap_unknown_channel_byte_marked_questionmark(): data = _build_bitmap(channel=b"X") snap = decode_slot_bitmap(data) assert snap is not None assert snap.channel == "?" # --------------------------------------------------------------------------- # Slot detail # --------------------------------------------------------------------------- def test_slot_detail_roundtrip(): entries = [(0, -85.5), (100, -72.125), (2249, -99.0)] head = struct.pack(">BIQH", ord(b"A"), 28_279_310, 28_279_310 * 60_000, len(entries)) body = b"".join(struct.pack(">Hf", slot, lvl) for slot, lvl in entries) detail = decode_slot_detail(head + body) assert detail is not None assert detail.channel == "A" assert detail.utc_minute == 28_279_310 assert detail.slot0_unix_ms == 28_279_310 * 60_000 assert [(e.slot, round(e.level_db, 4)) for e in detail.entries] == [ (0, -85.5), (100, -72.125), (2249, -99.0), ] def test_slot_detail_rejects_short_body(): head = struct.pack(">BIQH", ord(b"A"), 1, 2, 3) # only 1 full entry provided where 3 claimed body = struct.pack(">Hf", 10, -70.0) assert decode_slot_detail(head + body) is None def test_slot_detail_empty_is_valid(): head = struct.pack(">BIQH", ord(b"B"), 1, 2, 0) detail = decode_slot_detail(head) assert detail is not None assert detail.entries == [] # --------------------------------------------------------------------------- # RSSI IQ (8 bytes) # --------------------------------------------------------------------------- def test_rssi_decodes_pair(): data = struct.pack(">ff", -42.25, -55.75) rssi = decode_rssi_iq(data) assert rssi is not None assert rssi.power_a_db == pytest.approx(-42.25) assert rssi.power_b_db == pytest.approx(-55.75) def test_rssi_wrong_size(): assert decode_rssi_iq(b"\x00" * 7) is None assert decode_rssi_iq(b"\x00" * 9) is None # --------------------------------------------------------------------------- # Signal events (decode markers) # --------------------------------------------------------------------------- def test_signal_events_roundtrip(): events = [ (1_700_000_000_000, 42, 257_123_456, -72.0), (1_700_000_000_100, 1500, 227_000_001, -88.5), ] head = struct.pack(">BH", ord(b"A"), len(events)) body = b"".join(struct.pack(">QHIf", ts, slot, mmsi, lvl) for ts, slot, mmsi, lvl in events) batch = decode_signal_events(head + body) assert batch is not None assert batch.channel == "A" assert len(batch.events) == 2 assert batch.events[0].unix_ms == 1_700_000_000_000 assert batch.events[0].slot == 42 assert batch.events[0].mmsi == 257_123_456 assert batch.events[0].level_db == pytest.approx(-72.0) assert batch.events[1].mmsi == 227_000_001 def test_signal_events_empty(): data = struct.pack(">BH", ord(b"A"), 0) batch = decode_signal_events(data) assert batch is not None assert batch.events == [] def test_signal_events_rejects_short_body(): head = struct.pack(">BH", ord(b"A"), 5) assert decode_signal_events(head + b"\x00" * 10) is None