148 lines
5.0 KiB
Python
148 lines
5.0 KiB
Python
"""Smoke-level integration test: aiohttp REST endpoints on loopback.
|
|
|
|
Only tests the HTTP surface — it does not start ingest or storage tasks.
|
|
Builds a minimal app with REST + WS routes and an in-memory ``Context``.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
|
|
import pytest
|
|
from aiohttp import web
|
|
from aiohttp.test_utils import TestClient, TestServer
|
|
|
|
from ais_hub.app import Context
|
|
from ais_hub.config import Config
|
|
from ais_hub.core.bus import EventBus
|
|
from ais_hub.core.models import AisReport, TxMessage
|
|
from ais_hub.core.state import State
|
|
from ais_hub.core.stats import Stats
|
|
from ais_hub.ingest.base import RawTap
|
|
from ais_hub.publish import rest, ws as ws_module
|
|
|
|
|
|
def _build_app() -> tuple[web.Application, Context]:
|
|
cfg = Config()
|
|
stats = Stats()
|
|
bus = EventBus(stats=stats, default_maxsize=16)
|
|
state = State(bus=bus, stats=stats)
|
|
raw_tap = RawTap(stats=stats, ring_size=16)
|
|
tx_outbox: asyncio.Queue[TxMessage] = asyncio.Queue(maxsize=4)
|
|
ctx = Context(
|
|
cfg=cfg, stats=stats, bus=bus, state=state,
|
|
raw_tap=raw_tap, tx_outbox=tx_outbox,
|
|
db=None, storage_sink=None,
|
|
)
|
|
app = web.Application()
|
|
app["ctx"] = ctx
|
|
rest.register_routes(app)
|
|
ws_module.register_routes(app)
|
|
return app, ctx
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_health_stats_ownship():
|
|
app, ctx = _build_app()
|
|
async with TestClient(TestServer(app)) as client:
|
|
r = await client.get("/api/v1/health")
|
|
assert r.status == 200
|
|
body = await r.json()
|
|
assert body["status"] == "ok"
|
|
|
|
r = await client.get("/api/v1/stats")
|
|
assert r.status == 200
|
|
snap = await r.json()
|
|
assert "counters" in snap
|
|
|
|
r = await client.get("/api/v1/ownship")
|
|
assert r.status == 200
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_vessels_endpoint_reflects_state():
|
|
app, ctx = _build_app()
|
|
ctx.state.apply_ais(AisReport(
|
|
ts=100.0, source="test", kind="dynamic", mmsi=111, msg_type=1,
|
|
data={"lat": 1.23, "lon": 4.56, "sog": 3.2, "cog": 90.0},
|
|
))
|
|
ctx.state.apply_ais(AisReport(
|
|
ts=101.0, source="test", kind="static", mmsi=111, msg_type=5,
|
|
data={"name": "TEST SHIP", "callsign": "CALL"},
|
|
))
|
|
async with TestClient(TestServer(app)) as client:
|
|
r = await client.get("/api/v1/vessels")
|
|
body = await r.json()
|
|
assert len(body) == 1
|
|
assert body[0]["mmsi"] == 111
|
|
assert body[0]["name"] == "TEST SHIP"
|
|
|
|
r = await client.get("/api/v1/vessels/111")
|
|
assert r.status == 200
|
|
body = await r.json()
|
|
assert body["dynamic"]["lat"] == 1.23
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_vessels_not_found():
|
|
app, _ = _build_app()
|
|
async with TestClient(TestServer(app)) as client:
|
|
r = await client.get("/api/v1/vessels/999999999")
|
|
assert r.status == 404
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tx_post_queues_valid_nmea():
|
|
from ais_hub.parser.nmea_utils import compute_checksum
|
|
app, ctx = _build_app()
|
|
body = "AIVDM,1,1,,A,15M67FC000G?ufbE`FepT@3n00Sa,0"
|
|
line = f"!{body}*{compute_checksum(body):02X}"
|
|
async with TestClient(TestServer(app)) as client:
|
|
r = await client.post("/api/v1/tx", json={"payload": line})
|
|
assert r.status == 200
|
|
body = await r.json()
|
|
assert body["queued"] == 1
|
|
assert body["rejected"] == []
|
|
# The message is in the outbox for the UDP publisher to consume.
|
|
msg = ctx.tx_outbox.get_nowait()
|
|
assert msg.line == line
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tx_post_rejects_invalid_checksum():
|
|
app, _ = _build_app()
|
|
async with TestClient(TestServer(app)) as client:
|
|
r = await client.post("/api/v1/tx", json={"payload": "!AIVDM,1,1,,A,x,0*FF"})
|
|
assert r.status == 400 or (r.status == 200 and (await r.json())["queued"] == 0)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_nmea_tail_returns_ring_contents():
|
|
from ais_hub.core.models import RawFrame
|
|
app, ctx = _build_app()
|
|
for i in range(3):
|
|
ctx.raw_tap.publish(RawFrame(ts=float(i), source="s", kind="ais", line=f"!X{i}"))
|
|
async with TestClient(TestServer(app)) as client:
|
|
r = await client.get("/api/v1/nmea/tail?source=ais&limit=10")
|
|
assert r.status == 200
|
|
body = await r.json()
|
|
assert len(body) == 3
|
|
assert body[-1]["line"] == "!X2"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_logs_endpoint_is_independent_from_nmea():
|
|
"""/api/v1/logs must not return raw NMEA even if the tap has items."""
|
|
from ais_hub.core.models import RawFrame
|
|
app, ctx = _build_app()
|
|
ctx.raw_tap.publish(RawFrame(ts=1.0, source="s", kind="ais", line="!AIVDM,1,1,,,x,0*00"))
|
|
async with TestClient(TestServer(app)) as client:
|
|
r = await client.get("/api/v1/logs?limit=50")
|
|
assert r.status == 200
|
|
body = await r.json()
|
|
# Whatever content appears here, it must not be NMEA lines.
|
|
for item in body:
|
|
assert "line" not in item # raw NMEA rows have "line"
|
|
assert "level" in item
|