Files
2026-06-16 04:38:23 +00:00

526 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import time
from typing import Any
import httpx
from app.config import get_settings
WEATHER_CODES: dict[int, str] = {
0: "ясно",
1: "преимущественно ясно",
2: "переменная облачность",
3: "пасмурно",
45: "туман",
48: "изморозь",
51: "морось",
53: "морось",
55: "морось",
61: "дождь",
63: "дождь",
65: "сильный дождь",
71: "снег",
73: "снег",
75: "сильный снег",
80: "ливень",
81: "ливень",
82: "сильный ливень",
95: "гроза",
96: "гроза с градом",
99: "гроза с градом",
}
WEATHER_QUERY_KEYWORDS = (
"погод", "дожд", "снег", "ветер", "температур", "градус", "мороз", "жар",
"на улице", "одеть", "зонт", "прогноз", "завтра", "послезавтра", "выходн",
"weather", "rain", "forecast", "umbrella", "outside",
)
_cache: dict[str, Any] = {
"data": None,
"fetched_at": 0.0,
"expires_at": 0.0,
"source": "local",
"local_coverage": {"current": [], "hourly": [], "daily": []},
"merged_fields": [],
}
CURRENT_FIELDS = (
"temperature_2m",
"apparent_temperature",
"relative_humidity_2m",
"precipitation",
"weather_code",
"wind_speed_10m",
)
HOURLY_FIELDS = (
"temperature_2m",
"precipitation_probability",
"precipitation",
"weather_code",
)
DAILY_FIELDS = (
"weather_code",
"temperature_2m_max",
"temperature_2m_min",
"precipitation_sum",
"precipitation_probability_max",
"wind_speed_10m_max",
)
RECOMMENDED_SYNC_DOMAINS = "dwd_icon,ncep_gfs013,ncep_gefs025"
RECOMMENDED_SYNC_VARIABLES = (
"temperature_2m,dew_point_2m,relative_humidity_2m,precipitation_probability,"
"precipitation,rain,cloud_cover,weather_code,wind_u_component_10m,wind_v_component_10m"
)
SYNC_HINT = (
"Локальный open-meteo-sync отдаёт неполные данные. "
f"SYNC_DOMAINS={RECOMMENDED_SYNC_DOMAINS} "
f"SYNC_VARIABLES={RECOMMENDED_SYNC_VARIABLES} (~12 GB). "
"Документация: github.com/open-meteo/open-data/tree/main/tutorial_weather_api"
)
PRECIP_PROB_HINT = (
"Для вероятности дождя добавь ncep_gefs025 в SYNC_DOMAINS "
"и precipitation_probability в SYNC_VARIABLES."
)
def weather_query_relevant(query: str) -> bool:
q = (query or "").lower()
return any(kw in q for kw in WEATHER_QUERY_KEYWORDS)
def _hourly_series(hourly: dict[str, Any], key: str) -> list[Any]:
values = hourly.get(key)
return values if isinstance(values, list) else []
def _daily_series(daily: dict[str, Any], key: str) -> list[Any]:
values = daily.get(key)
return values if isinstance(values, list) else []
def _hourly_start_index(times: list[str], anchor_time: str | None) -> int:
if not times:
return 0
if not anchor_time:
return 0
best = 0
for i, t in enumerate(times):
if t <= anchor_time:
best = i
else:
break
return best
def _field_coverage(raw: dict[str, Any]) -> dict[str, list[str]]:
current = raw.get("current") or {}
hourly = raw.get("hourly") or {}
daily = raw.get("daily") or {}
current_present = [key for key in CURRENT_FIELDS if current.get(key) is not None]
hourly_present = []
for key in HOURLY_FIELDS:
series = _hourly_series(hourly, key)
if any(v is not None for v in series):
hourly_present.append(key)
daily_present = []
for key in DAILY_FIELDS:
series = _daily_series(daily, key)
if any(v is not None for v in series):
daily_present.append(key)
return {"current": current_present, "hourly": hourly_present, "daily": daily_present}
def _coverage_sufficient(coverage: dict[str, list[str]]) -> bool:
current = set(coverage.get("current") or [])
hourly = set(coverage.get("hourly") or [])
if "weather_code" not in current:
return False
if len(current) < 3:
return False
if "weather_code" not in hourly and "temperature_2m" not in hourly:
return False
return True
def _local_needs_sync_hint(local_coverage: dict[str, list[str]]) -> bool:
current = set(local_coverage.get("current") or [])
hourly = set(local_coverage.get("hourly") or [])
if "temperature_2m" not in current:
return True
if "weather_code" not in current:
return True
if "temperature_2m" not in hourly:
return True
return False
def _missing_precip_probability(coverage: dict[str, list[str]]) -> bool:
return "precipitation_probability" not in set(coverage.get("hourly") or [])
def _fmt_num(value: Any, *, suffix: str = "") -> str:
if value is None:
return ""
if isinstance(value, float):
text = f"{value:.1f}".rstrip("0").rstrip(".")
else:
text = str(value)
return f"{text}{suffix}" if suffix else text
def _conditions(code: Any) -> str:
if code is None:
return "неизвестно"
return WEATHER_CODES.get(int(code), "неизвестно")
def _format_day_label(date_str: str, index: int) -> str:
if index == 0:
return "Сегодня"
if index == 1:
return "Завтра"
if not date_str:
return f"День {index + 1}"
parts = date_str.split("-")
if len(parts) == 3:
return f"{parts[2]}.{parts[1]}"
return date_str
def _merge_hourly_field(target: dict[str, Any], source: dict[str, Any], field: str) -> bool:
hourly_t = target.setdefault("hourly", {})
hourly_s = source.get("hourly") or {}
src = hourly_s.get(field)
if not isinstance(src, list) or not any(v is not None for v in src):
return False
dst = hourly_t.get(field)
if isinstance(dst, list) and len(dst) == len(src):
hourly_t[field] = [
dst[i] if dst[i] is not None else src[i]
for i in range(len(src))
]
else:
hourly_t[field] = src
return True
class OpenMeteoClient:
def __init__(self) -> None:
settings = get_settings()
self.base_url = settings.openmeteo_base_url.rstrip("/")
self.fallback_url = (settings.openmeteo_fallback_url or "").strip().rstrip("/")
self.fallback_on_partial = settings.openmeteo_fallback_on_partial
self.lat = settings.weather_lat
self.lon = settings.weather_lon
self.location_name = settings.weather_location_name
self.cache_ttl = settings.weather_cache_sec
self.forecast_days = max(2, int(settings.weather_forecast_days or 7))
def _request_params(self) -> dict[str, Any]:
return {
"latitude": self.lat,
"longitude": self.lon,
"current": ",".join(CURRENT_FIELDS),
"hourly": ",".join(HOURLY_FIELDS),
"daily": ",".join(DAILY_FIELDS),
"timezone": "auto",
"forecast_days": self.forecast_days,
}
def _fetch_from_url(self, base_url: str) -> dict[str, Any]:
with httpx.Client(timeout=20.0) as client:
response = client.get(f"{base_url.rstrip('/')}/v1/forecast", params=self._request_params())
response.raise_for_status()
return response.json()
def _fetch_raw(self) -> dict[str, Any]:
now = time.time()
if _cache["data"] and now < _cache["expires_at"]:
return _cache["data"]
local_raw = self._fetch_from_url(self.base_url)
local_coverage = _field_coverage(local_raw)
source = "local"
raw = local_raw
merged_fields: list[str] = []
need_fallback = (
self.fallback_on_partial
and self.fallback_url
and self.fallback_url.rstrip("/") != self.base_url
)
if need_fallback:
try:
fallback_raw = self._fetch_from_url(self.fallback_url)
fallback_coverage = _field_coverage(fallback_raw)
if not _coverage_sufficient(local_coverage) and _coverage_sufficient(fallback_coverage):
raw = fallback_raw
source = "fallback"
elif _missing_precip_probability(local_coverage) and not _missing_precip_probability(fallback_coverage):
if _merge_hourly_field(raw, fallback_raw, "precipitation_probability"):
merged_fields.append("precipitation_probability")
source = "merged"
except Exception:
pass
_cache["data"] = raw
_cache["fetched_at"] = now
_cache["expires_at"] = now + self.cache_ttl
_cache["source"] = source
_cache["local_coverage"] = local_coverage
_cache["merged_fields"] = merged_fields
return raw
def cache_status(self) -> dict[str, Any]:
now = time.time()
fetched_at = float(_cache.get("fetched_at") or 0)
expires_at = float(_cache.get("expires_at") or 0)
has_data = _cache.get("data") is not None
age_sec = int(now - fetched_at) if fetched_at else None
expires_in_sec = max(0, int(expires_at - now)) if expires_at else None
return {
"has_data": has_data,
"cached": bool(has_data and expires_at and now < expires_at),
"fetched_at": fetched_at or None,
"age_sec": age_sec,
"ttl_sec": self.cache_ttl,
"expires_in_sec": expires_in_sec,
"source": _cache.get("source") or "local",
"merged_fields": list(_cache.get("merged_fields") or []),
}
def _build_hourly_slice(self, raw: dict[str, Any], hours_ahead: int) -> list[dict[str, Any]]:
current = raw.get("current") or {}
hourly = raw.get("hourly") or {}
times = hourly.get("time") or []
start = _hourly_start_index(times, current.get("time"))
end = min(start + hours_ahead, len(times))
rows: list[dict[str, Any]] = []
for i in range(start, end):
code = _hourly_series(hourly, "weather_code")[i] if i < len(_hourly_series(hourly, "weather_code")) else None
temp_series = _hourly_series(hourly, "temperature_2m")
precip_series = _hourly_series(hourly, "precipitation")
prob_series = _hourly_series(hourly, "precipitation_probability")
rows.append({
"time": times[i],
"temperature_c": temp_series[i] if i < len(temp_series) else None,
"precipitation_mm": precip_series[i] if i < len(precip_series) else None,
"precipitation_probability": prob_series[i] if i < len(prob_series) else None,
"weather_code": code,
"conditions": _conditions(code),
})
return rows
def _build_daily_slice(self, raw: dict[str, Any], days_ahead: int) -> list[dict[str, Any]]:
daily = raw.get("daily") or {}
times = daily.get("time") or []
limit = min(days_ahead, len(times))
rows: list[dict[str, Any]] = []
for i in range(limit):
code = _daily_series(daily, "weather_code")[i] if i < len(_daily_series(daily, "weather_code")) else None
rows.append({
"date": times[i],
"label": _format_day_label(times[i], i),
"temperature_max_c": _daily_series(daily, "temperature_2m_max")[i] if i < len(_daily_series(daily, "temperature_2m_max")) else None,
"temperature_min_c": _daily_series(daily, "temperature_2m_min")[i] if i < len(_daily_series(daily, "temperature_2m_min")) else None,
"precipitation_sum_mm": _daily_series(daily, "precipitation_sum")[i] if i < len(_daily_series(daily, "precipitation_sum")) else None,
"precipitation_probability_max": _daily_series(daily, "precipitation_probability_max")[i] if i < len(_daily_series(daily, "precipitation_probability_max")) else None,
"wind_speed_max_kmh": _daily_series(daily, "wind_speed_10m_max")[i] if i < len(_daily_series(daily, "wind_speed_10m_max")) else None,
"weather_code": code,
"conditions": _conditions(code),
})
return rows
def fetch_forecast(self, hours_ahead: int = 12, days_ahead: int = 7) -> dict[str, Any]:
hours_ahead = max(1, min(int(hours_ahead), 168))
days_ahead = max(1, min(int(days_ahead), self.forecast_days))
try:
raw = self._fetch_raw()
except Exception as exc:
return {"ok": False, "error": str(exc), "location": self.location_name}
current = raw.get("current") or {}
code = current.get("weather_code")
coverage = _field_coverage(raw)
local_coverage = _cache.get("local_coverage") or coverage
sync_hint = ""
if _local_needs_sync_hint(local_coverage):
sync_hint = SYNC_HINT
elif _missing_precip_probability(local_coverage):
sync_hint = PRECIP_PROB_HINT
return {
"ok": True,
"location": self.location_name,
"data_source": _cache.get("source") or "local",
"merged_fields": list(_cache.get("merged_fields") or []),
"local_field_coverage": local_coverage,
"field_coverage": coverage,
"sync_hint": sync_hint,
"current": {
"time": current.get("time"),
"temperature_c": current.get("temperature_2m"),
"apparent_temperature_c": current.get("apparent_temperature"),
"humidity_pct": current.get("relative_humidity_2m"),
"precipitation_mm": current.get("precipitation"),
"wind_speed_kmh": current.get("wind_speed_10m"),
"weather_code": code,
"conditions": _conditions(code),
},
"hourly": self._build_hourly_slice(raw, hours_ahead),
"daily": self._build_daily_slice(raw, days_ahead),
}
def fetch_current_and_hourly(self, hours_ahead: int = 12) -> dict[str, Any]:
return self.fetch_forecast(hours_ahead=hours_ahead, days_ahead=min(7, self.forecast_days))
def rain_summary(self, hours_ahead: int = 12, daily: list[dict[str, Any]] | None = None) -> str:
data = self.fetch_forecast(hours_ahead=hours_ahead, days_ahead=2)
if not data.get("ok"):
return f"Погода недоступна: {data.get('error', 'ошибка')}"
rainy_hours = []
for hour in data.get("hourly") or []:
prob = hour.get("precipitation_probability")
precip = hour.get("precipitation_mm") or 0
if (prob is not None and prob >= 40) or precip > 0:
time_str = (hour.get("time") or "")[11:16]
prob_text = f"{prob}%" if prob is not None else ""
rainy_hours.append(f"{time_str} ({prob_text}, {precip} мм)")
lines: list[str] = []
if rainy_hours:
lines.append("Ожидаются осадки: " + ", ".join(rainy_hours[:6]))
else:
lines.append("Существенных осадков в ближайшие часы не ожидается.")
days = daily if daily is not None else data.get("daily") or []
if len(days) > 1:
tomorrow = days[1]
tmax = tomorrow.get("temperature_max_c")
tmin = tomorrow.get("temperature_min_c")
prob = tomorrow.get("precipitation_probability_max")
precip = tomorrow.get("precipitation_sum_mm") or 0
cond = tomorrow.get("conditions") or "неизвестно"
prob_part = f", дождь до {prob}%" if prob is not None and prob >= 30 else ""
precip_part = f", {precip} мм" if precip > 0 else ""
lines.append(
f"Завтра: {_fmt_num(tmin)}{_fmt_num(tmax, suffix='°C')}, {cond}{prob_part}{precip_part}."
)
return " ".join(lines)
def daily_summary(self, days_ahead: int = 7) -> str:
data = self.fetch_forecast(hours_ahead=1, days_ahead=days_ahead)
if not data.get("ok"):
return ""
parts = []
for day in data.get("daily") or []:
label = day.get("label") or day.get("date")
tmax = day.get("temperature_max_c")
tmin = day.get("temperature_min_c")
cond = day.get("conditions") or "неизвестно"
prob = day.get("precipitation_probability_max")
prob_part = f", дождь до {prob}%" if prob is not None and prob >= 30 else ""
parts.append(f"{label}: {_fmt_num(tmin)}{_fmt_num(tmax, suffix='°C')}, {cond}{prob_part}")
return "; ".join(parts)
def format_weather_snapshot(data: dict[str, Any] | None = None, *, include_daily: bool = True) -> str:
client = OpenMeteoClient()
snapshot = data if data is not None else client.fetch_forecast(hours_ahead=6, days_ahead=3)
lines = ["[Погода]"]
if not snapshot.get("ok"):
lines.append(f"Данные недоступны ({snapshot.get('error', 'ошибка')}).")
lines.append("Для точного ответа вызови get_weather.")
return "\n".join(lines)
cur = snapshot.get("current") or {}
apparent = cur.get("apparent_temperature_c")
wind = cur.get("wind_speed_kmh")
apparent_part = f", ощущается {_fmt_num(apparent, suffix='°C')}" if apparent is not None else ""
wind_part = f", ветер {_fmt_num(wind, suffix=' км/ч')}" if wind is not None else ""
lines.append(
f"{snapshot.get('location')}: {_fmt_num(cur.get('temperature_c'), suffix='°C')}"
f"{apparent_part}, {cur.get('conditions') or 'неизвестно'}{wind_part}."
)
rainy_hours = []
for hour in snapshot.get("hourly") or []:
prob = hour.get("precipitation_probability")
precip = hour.get("precipitation_mm") or 0
if (prob is not None and prob >= 40) or precip > 0:
time_str = (hour.get("time") or "")[11:16]
prob_text = f"{prob}%" if prob is not None else ""
rainy_hours.append(f"{time_str} ({prob_text}, {precip} мм)")
if rainy_hours:
lines.append("Ожидаются осадки: " + ", ".join(rainy_hours[:6]))
else:
lines.append("Существенных осадков в ближайшие часы не ожидается.")
if include_daily:
days = snapshot.get("daily") or []
if len(days) > 1:
tomorrow = days[1]
lines.append(
f"Завтра: {_fmt_num(tomorrow.get('temperature_min_c'))}"
f"{_fmt_num(tomorrow.get('temperature_max_c'), suffix='°C')}, "
f"{tomorrow.get('conditions') or 'неизвестно'}."
)
if len(days) > 2:
week_bits = []
for day in days[2:7]:
week_bits.append(
f"{day.get('label')}: {_fmt_num(day.get('temperature_min_c'))}"
f"{_fmt_num(day.get('temperature_max_c'), suffix='°C')}"
)
if week_bits:
lines.append("Далее: " + "; ".join(week_bits) + ".")
lines.append("Подробнее — get_weather (hours_ahead, days_ahead).")
return "\n".join(lines)
def build_weather_dashboard(hours_ahead: int = 12, days_ahead: int = 7) -> dict[str, Any]:
client = OpenMeteoClient()
weather = client.fetch_forecast(hours_ahead=hours_ahead, days_ahead=days_ahead)
return {
"weather": weather,
"rain_summary": client.rain_summary(hours_ahead=hours_ahead, daily=weather.get("daily")) if weather.get("ok") else "",
"daily_summary": client.daily_summary(days_ahead=days_ahead) if weather.get("ok") else "",
"assistant_context": format_weather_snapshot(weather),
"cache": client.cache_status(),
"config": {
"location": client.location_name,
"latitude": client.lat,
"longitude": client.lon,
"openmeteo_base_url": client.base_url,
"cache_ttl_sec": client.cache_ttl,
"forecast_days": client.forecast_days,
"timezone": "auto",
},
"available_fields": {
"current": list(CURRENT_FIELDS),
"hourly": list(HOURLY_FIELDS),
"daily": list(DAILY_FIELDS),
},
"field_coverage": weather.get("field_coverage") if weather.get("ok") else {"current": [], "hourly": [], "daily": []},
"local_field_coverage": weather.get("local_field_coverage") if weather.get("ok") else {"current": [], "hourly": [], "daily": []},
"data_source": weather.get("data_source", "local") if weather.get("ok") else "local",
"merged_fields": weather.get("merged_fields", []) if weather.get("ok") else [],
"sync_hint": weather.get("sync_hint", "") if weather.get("ok") else SYNC_HINT,
"recommended_sync": {
"domains": RECOMMENDED_SYNC_DOMAINS,
"variables": RECOMMENDED_SYNC_VARIABLES,
},
"assistant_tools": {
"get_weather": "Сейчас + почасово (hours_ahead до 168) + по дням (days_ahead до 16)",
"get_morning_briefing": "Погода + заголовки RSS-новостей",
},
"system_prompt": "Блок [Погода] в system prompt — только если запрос про погоду/одежду/прогноз.",
}