"""Multi-fragment AIS assembler: strict-order and strengthened-key tests. These tests exercise the assembler logic only up to fragment validation. Actual decoding with pyais is covered by a separate integration check if pyais is importable. """ from __future__ import annotations import pytest from ais_hub.core.stats import Stats from ais_hub.parser.ais import AisAssembler SRC_A = "ais_udp:10.0.0.1:5000" SRC_B = "ais_udp:10.0.0.2:5000" def _frag(total: int, n: int, seq: str, channel: str = "A", payload: str = "abcd", fill: str = "0") -> str: # Compute checksum so parse_sentence accepts the line. body = f"AIVDM,{total},{n},{seq},{channel},{payload},{fill}" cs = 0 for ch in body.encode("ascii"): cs ^= ch return f"!{body}*{cs:02X}" def test_single_fragment_fast_path_attempts_decode(): """A single-fragment sentence is passed to pyais; decode may fail, but the assembler must not keep any buffers for it.""" stats = Stats() a = AisAssembler(stats=stats) line = _frag(1, 1, "") a.feed(SRC_A, line) # No multi-fragment buffers. assert len(a._buffers) == 0 def test_multi_fragment_happy_path_clears_buffer(): stats = Stats() a = AisAssembler(stats=stats) a.feed(SRC_A, _frag(2, 1, "3")) assert len(a._buffers) == 1 a.feed(SRC_A, _frag(2, 2, "3")) # After the final fragment arrives, the buffer is removed (regardless # of whether pyais decodes successfully). assert len(a._buffers) == 0 def test_out_of_order_fragment_triggers_error_and_reset(): stats = Stats() a = AisAssembler(stats=stats) a.feed(SRC_A, _frag(3, 1, "7")) a.feed(SRC_A, _frag(3, 3, "7")) # skipped fragment 2 assert stats.counters["ais_fragment_errors"] >= 1 def test_duplicate_fragment_is_error(): stats = Stats() a = AisAssembler(stats=stats) a.feed(SRC_A, _frag(2, 1, "9")) a.feed(SRC_A, _frag(2, 1, "9")) # duplicate assert stats.counters["ais_fragment_errors"] >= 1 def test_mid_stream_fragment_without_leading_is_error(): stats = Stats() a = AisAssembler(stats=stats) a.feed(SRC_A, _frag(2, 2, "4")) assert stats.counters["ais_fragment_errors"] >= 1 assert len(a._buffers) == 0 def test_strengthened_key_isolates_sources(): """Two sources reusing the same seq_id must not cross-contaminate.""" stats = Stats() a = AisAssembler(stats=stats) a.feed(SRC_A, _frag(2, 1, "5")) a.feed(SRC_B, _frag(2, 1, "5")) assert len(a._buffers) == 2 # two independent buffers assert stats.counters.get("ais_fragment_errors", 0) == 0 def test_multi_fragment_without_seq_id_is_error(): stats = Stats() a = AisAssembler(stats=stats) a.feed(SRC_A, _frag(2, 1, "")) # empty seq_id on a multi-fragment assert stats.counters["ais_fragment_errors"] >= 1 def test_total_mismatch_on_followup_is_error(): stats = Stats() a = AisAssembler(stats=stats) a.feed(SRC_A, _frag(2, 1, "2")) # Fragment claims total=3 now; must be treated as a fresh start or error. a.feed(SRC_A, _frag(3, 2, "2")) assert stats.counters["ais_fragment_errors"] >= 1 def test_ttl_gc_evicts_stale_buffers(): stats = Stats() a = AisAssembler(stats=stats, ttl_sec=1.0) a.feed(SRC_A, _frag(2, 1, "8"), ts=1000.0) assert len(a._buffers) == 1 a.gc(now=1002.0) assert len(a._buffers) == 0 assert stats.counters["ais_fragment_timeouts"] >= 1 def test_bad_checksum_ignored_with_counter(): stats = Stats() a = AisAssembler(stats=stats) a.feed(SRC_A, "!AIVDM,1,1,,A,x,0*00") # deliberately wrong checksum assert stats.counters["parser_checksum_errors"] >= 1