Initial import: WebAisMap
Closes TG-4
This commit is contained in:
@@ -0,0 +1,150 @@
|
||||
import os
|
||||
import json
|
||||
import subprocess
|
||||
import threading
|
||||
|
||||
NETWORK_CONFIG_PATH = "/etc/aismap/network.json"
|
||||
NETWORK_SCRIPTS_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "scripts")
|
||||
|
||||
NETWORK_DEFAULTS = {
|
||||
"mode": "ap",
|
||||
"wifi_ssid": "",
|
||||
"wifi_psk": "",
|
||||
"wifi_ip": "192.168.22.50/24",
|
||||
"wifi_gw": "192.168.22.1",
|
||||
"wifi_dns": "8.8.8.8",
|
||||
"ap_ip": "192.168.4.1/24",
|
||||
"ap_ssid": "",
|
||||
"ap_psk": "",
|
||||
"iface": "wlan0",
|
||||
}
|
||||
HOSTAPD_CONF = "/etc/hostapd/hostapd.conf"
|
||||
network_config_lock = threading.Lock()
|
||||
|
||||
|
||||
def _read_hostapd_conf() -> dict:
|
||||
"""Reads SSID, passphrase and interface from the real hostapd.conf."""
|
||||
result = {}
|
||||
try:
|
||||
with open(HOSTAPD_CONF, "r") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line.startswith("#") or "=" not in line:
|
||||
continue
|
||||
k, v = line.split("=", 1)
|
||||
k, v = k.strip(), v.strip()
|
||||
if k == "ssid":
|
||||
result["ap_ssid"] = v
|
||||
elif k == "wpa_passphrase":
|
||||
result["ap_psk"] = v
|
||||
elif k == "interface":
|
||||
result["iface"] = v
|
||||
except (FileNotFoundError, PermissionError):
|
||||
pass
|
||||
return result
|
||||
|
||||
|
||||
def load_network_config() -> dict:
|
||||
merged = dict(NETWORK_DEFAULTS)
|
||||
hostapd_vals = _read_hostapd_conf()
|
||||
merged.update(hostapd_vals)
|
||||
try:
|
||||
with open(NETWORK_CONFIG_PATH, "r") as f:
|
||||
cfg = json.load(f)
|
||||
merged.update(cfg)
|
||||
except (FileNotFoundError, json.JSONDecodeError, PermissionError):
|
||||
pass
|
||||
return merged
|
||||
|
||||
|
||||
def save_network_config(cfg: dict):
|
||||
os.makedirs(os.path.dirname(NETWORK_CONFIG_PATH), exist_ok=True)
|
||||
with open(NETWORK_CONFIG_PATH, "w") as f:
|
||||
json.dump(cfg, f, indent=2)
|
||||
|
||||
|
||||
def get_current_network_info() -> dict:
|
||||
"""Reads live network state from the OS (Linux only)."""
|
||||
info = {"ip": None, "ssid": None, "mode": None, "iface": None}
|
||||
cfg = load_network_config()
|
||||
iface = cfg.get("iface", "wlan0")
|
||||
info["iface"] = iface
|
||||
|
||||
try:
|
||||
out = subprocess.check_output(
|
||||
["ip", "-4", "addr", "show", iface], timeout=5, stderr=subprocess.DEVNULL
|
||||
).decode()
|
||||
for line in out.splitlines():
|
||||
line = line.strip()
|
||||
if line.startswith("inet "):
|
||||
info["ip"] = line.split()[1]
|
||||
break
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
out = subprocess.check_output(
|
||||
["iw", "dev", iface, "info"], timeout=5, stderr=subprocess.DEVNULL
|
||||
).decode()
|
||||
for line in out.splitlines():
|
||||
line = line.strip()
|
||||
if line.startswith("ssid"):
|
||||
info["ssid"] = line.split(None, 1)[1]
|
||||
if line.startswith("type"):
|
||||
tp = line.split(None, 1)[1].lower()
|
||||
if "ap" in tp:
|
||||
info["mode"] = "ap"
|
||||
elif "managed" in tp or "station" in tp:
|
||||
info["mode"] = "wifi"
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if info["mode"] is None:
|
||||
try:
|
||||
subprocess.check_output(
|
||||
["pgrep", "-x", "hostapd"], timeout=3, stderr=subprocess.DEVNULL
|
||||
)
|
||||
info["mode"] = "ap"
|
||||
except Exception:
|
||||
try:
|
||||
subprocess.check_output(
|
||||
["pgrep", "-x", "wpa_supplicant"], timeout=3, stderr=subprocess.DEVNULL
|
||||
)
|
||||
info["mode"] = "wifi"
|
||||
except Exception:
|
||||
info["mode"] = cfg.get("mode", "ap")
|
||||
|
||||
return info
|
||||
|
||||
|
||||
def switch_network_mode(target_mode: str) -> dict:
|
||||
"""Runs the appropriate script to switch AP<->WiFi. Returns result dict."""
|
||||
if target_mode not in ("ap", "wifi"):
|
||||
return {"ok": False, "error": "Invalid mode, must be 'ap' or 'wifi'"}
|
||||
|
||||
script_name = "to_ap.sh" if target_mode == "ap" else "to_wifi.sh"
|
||||
script_path = os.path.join(NETWORK_SCRIPTS_DIR, script_name)
|
||||
|
||||
if not os.path.isfile(script_path):
|
||||
return {"ok": False, "error": f"Script not found: {script_path}"}
|
||||
|
||||
try:
|
||||
proc = subprocess.run(
|
||||
["bash", script_path],
|
||||
capture_output=True, text=True, timeout=30,
|
||||
)
|
||||
if proc.returncode == 0:
|
||||
cfg = load_network_config()
|
||||
cfg["mode"] = target_mode
|
||||
save_network_config(cfg)
|
||||
return {"ok": True, "output": proc.stdout[-500:] if proc.stdout else ""}
|
||||
else:
|
||||
return {
|
||||
"ok": False,
|
||||
"error": f"Script exited with code {proc.returncode}",
|
||||
"output": (proc.stdout or "") + (proc.stderr or ""),
|
||||
}
|
||||
except subprocess.TimeoutExpired:
|
||||
return {"ok": False, "error": "Script timed out (30s)"}
|
||||
except Exception as e:
|
||||
return {"ok": False, "error": str(e)}
|
||||
Reference in New Issue
Block a user