#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ CLI: AIS fields (type, MMSI, lat/lon, …) -> NRZI bit stream (and optional packed bytes). Uses AIVDM_Encoder.py in the parent directory for the PDU, then phy.build_nrzi_frame (same stages as gr-aistx Build_Frame: CRC, reverse, stuff, flags, NRZI). """ from __future__ import annotations import argparse import os import sys # Каталог со скриптом (phy.py) и корень репозитория (опционально AIVDM_Encoder.py) _SCRIPT_DIR = os.path.abspath(os.path.dirname(__file__)) _ROOT = os.path.abspath(os.path.join(_SCRIPT_DIR, "..")) for _p in (_SCRIPT_DIR, _ROOT): if _p not in sys.path: sys.path.insert(0, _p) from phy import build_nrzi_frame, nrzi_bits_to_bytes # noqa: E402 try: import AIVDM_Encoder as enc # noqa: E402 except ImportError: enc = None # type: ignore def build_payload(options) -> str: if enc is None: raise SystemExit( "AIVDM_Encoder.py не найден в корне репозитория. " "Для веб-транспондера используйте pyais + опцию «Кодер: phy.py»." ) t = options.type if t == 1: return enc.encode_1( int(options.mmsi), int(options.status), float(options.speed), float(options.lon), float(options.lat), float(options.course), int(options.ts), ) if t == 4: return enc.encode_4( int(options.mmsi), float(options.speed), float(options.lon), float(options.lat), float(options.course), int(options.ts), ) if t == 14: return enc.encode_14(int(options.mmsi), options.sart_msg) if t == 18: return enc.encode_18( int(options.mmsi), float(options.speed), float(options.lon), float(options.lat), float(options.course), int(options.ts), ) if t == 20: return enc.encode_20( int(options.mmsi), int(options.fatdmaoffset), int(options.fatdmaslots), int(options.fatdmatimeout), int(options.fatdmaincrement), ) if t == 21: v = 1 if options.v_AtoN else 0 return enc.encode_21( int(options.mmsi), int(options.aid_type), options.aid_name, float(options.lon), float(options.lat), options.vsize, v, ) if t == 22: return enc.encode_22( int(options.mmsi), int(options.channel_a), int(options.channel_b), float(options.ne_lon), float(options.ne_lat), float(options.sw_lon), float(options.sw_lat), ) if t == 23: return enc.encode_23( int(options.mmsi), float(options.ne_lon), float(options.ne_lat), float(options.sw_lon), float(options.sw_lat), int(options.interval), int(options.quiet), ) if t == 24: if options.part.upper() == "A": return enc.encode_24(int(options.mmsi), "A", __vname=options.vname.upper()) return enc.encode_24( int(options.mmsi), "B", __callsign=options.callsign.upper(), __vsize=options.vsize, __vtype=int(options.vtype), ) raise SystemExit("Unsupported type %r" % (t,)) def main() -> None: p = argparse.ArgumentParser(description="AIS parameters -> NRZI bit frame (gr-aistx-compatible chain).") p.add_argument("--type", type=int, required=True, help="AIS message type (1,4,14,18,20-24)") p.add_argument("--mmsi", type=int, default=247320162) p.add_argument("--lat", type=float, default=45.6910166666667, help="Latitude (types 1,4,18,21,22,23)") p.add_argument("--lon", type=float, default=9.72357833333333, help="Longitude (alias for encoder --long)") p.add_argument("--speed", type=float, default=0.1) p.add_argument("--course", type=float, default=83.4) p.add_argument("--ts", type=int, default=38) p.add_argument("--status", type=int, default=15) p.add_argument("--sart-msg", dest="sart_msg", default="SART ACTIVE") p.add_argument("--fatdmaoffset", type=int, default=0) p.add_argument("--fatdmaslots", type=int, default=0) p.add_argument("--fatdmatimeout", type=int, default=0) p.add_argument("--fatdmaincrement", type=int, default=0) p.add_argument("--v_AtoN", action="store_true") p.add_argument("--aid_type", type=int, default=0) p.add_argument("--aid_name", default="@@@@@@@@@@@@@@@@@@@@") p.add_argument("--vsize", default="90x14") p.add_argument("--channel_a", type=int, default=2087) p.add_argument("--channel_b", type=int, default=2088) p.add_argument("--ne_lon", type=float, default=9.9) p.add_argument("--ne_lat", type=float, default=45.8) p.add_argument("--sw_lon", type=float, default=9.5) p.add_argument("--sw_lat", type=float, default=45.5) p.add_argument("--interval", type=int, default=1) p.add_argument("--quiet", type=int, default=15) p.add_argument("--part", default="A") p.add_argument("--vname", default="NAN") p.add_argument("--callsign", default="KC9CAF") p.add_argument("--vtype", type=int, default=60) p.add_argument("--no-nrzi", action="store_true", help="Output NRZ frame before NRZI (debug)") p.add_argument("--bytes", action="store_true", help="Write packed bytes (MSB-first) to stdout (binary)") p.add_argument("--print-payload", action="store_true", help="Print 0/1 PDU line before the frame") args = p.parse_args() if enc is None: p.error("AIVDM_Encoder.py не найден; CLI encode_to_nrzi недоступен.") payload = build_payload(args) if args.print_payload: print(payload, file=sys.stderr) bits = build_nrzi_frame(payload, enable_nrzi=not args.no_nrzi) if args.bytes: sys.stdout.buffer.write(nrzi_bits_to_bytes(bits)) else: print("".join(str(b) for b in bits)) if __name__ == "__main__": main()