from __future__ import annotations import base64 import json import logging from dataclasses import dataclass, field from typing import Any from openai import APIStatusError from app.llm.client import LLMClient from app.projects.structuring import strip_markdown_json from app.vision.preprocess import PreparedImage, prepare_image from app.vision.prompts import VISION_SYSTEM_PROMPT logger = logging.getLogger(__name__) class VisionUnavailableError(Exception): """Vision LLM endpoint missing or unreachable on OpenRouter.""" def __init__(self, model: str, detail: str) -> None: self.model = model super().__init__(detail) @dataclass class VisionResult: parsed: dict[str, Any] = field(default_factory=dict) raw_content: str = "" model: str = "" usage: dict[str, Any] = field(default_factory=dict) image_meta: dict[str, Any] = field(default_factory=dict) parse_error: str | None = None class VisionService: def __init__(self) -> None: self.llm = LLMClient() async def analyze(self, image_bytes: bytes, *, user_hint: str = "") -> VisionResult: prepared = prepare_image(image_bytes) return await self.analyze_prepared(prepared, user_hint=user_hint) async def analyze_prepared(self, prepared: PreparedImage, *, user_hint: str = "") -> VisionResult: b64 = base64.standard_b64encode(prepared.jpeg_bytes).decode("ascii") hint = f"\n\nПодсказка пользователя: {user_hint.strip()}" if user_hint.strip() else "" messages: list[dict[str, Any]] = [ {"role": "system", "content": VISION_SYSTEM_PROMPT}, { "role": "user", "content": [ {"type": "text", "text": f"Извлеки данные со скриншота.{hint}"}, { "type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}, }, ], }, ] model = self.llm.vision_model try: response = await self.llm.complete_vision(messages) except APIStatusError as exc: if exc.status_code == 404: raise VisionUnavailableError( model, f"Vision-модель «{model}» недоступна на OpenRouter. " "Укажите другую в Settings (например google/gemini-2.5-flash-lite).", ) from exc raise raw = (response.get("content") or "").strip() parsed: dict[str, Any] = {} parse_error: str | None = None try: parsed = json.loads(strip_markdown_json(raw)) if not isinstance(parsed, dict): parse_error = "Vision response is not a JSON object" parsed = {} except json.JSONDecodeError as exc: parse_error = str(exc) parsed = {"description": raw[:2000], "document_type": "other", "raw_fallback": True} return VisionResult( parsed=parsed, raw_content=raw, model=str(response.get("model") or self.llm.vision_model), usage=dict(response.get("usage") or {}), image_meta=prepared.to_meta(), parse_error=parse_error, ) def _format_screenshot_block( result: VisionResult, *, index: int | None = None, total: int | None = None, ) -> str: parsed = result.parsed or {} doc_type = parsed.get("document_type") or "other" confidence = parsed.get("confidence") or "unknown" if index is not None and total is not None and total > 1: header = f"[Скриншот {index}/{total}: {doc_type}, confidence={confidence}]" else: header = f"[Скриншот: {doc_type}, confidence={confidence}]" lines = [header] if parsed.get("description"): lines.append(f"Описание: {parsed['description']}") extracted = parsed.get("extracted_text") or [] if extracted: lines.append("Текст с экрана:") lines.extend(f"- {line}" for line in extracted if str(line).strip()) tables = parsed.get("tables") or [] if tables: lines.append("Таблицы:") for table in tables: title = table.get("title") if isinstance(table, dict) else None if title: lines.append(f" [{title}]") rows = table.get("rows") if isinstance(table, dict) else None if isinstance(rows, list): for row in rows: if isinstance(row, list): lines.append(" | " + " | ".join(str(cell) for cell in row)) hints = parsed.get("fitness_hints") if hints: lines.append(f"Подсказки для фитнеса: {json.dumps(hints, ensure_ascii=False)}") if result.parse_error: lines.append(f"(parse_error: {result.parse_error})") return "\n".join(lines) def format_user_message(caption: str, result: VisionResult) -> str: return format_user_messages(caption, [result]) def format_user_messages(caption: str, results: list[VisionResult]) -> str: if not results: return caption.strip() total = len(results) blocks = [ _format_screenshot_block(result, index=index, total=total) for index, result in enumerate(results, start=1) ] text = "\n\n".join(blocks) if caption.strip(): text = f"{text}\n\nПодпись: {caption.strip()}" return text VISION_TURN_HINT = ( "[Скриншоты в этом сообщении]: vision уже извлекла данные с каждой картинки в блоки [Скриншот] ниже. " "Отвечай по Описанию и извлечённому тексту как по увиденному. " "Не утверждай, что не видишь изображения, и не предлагай настроить vision API." ) def format_vision_turn_hint(user_text: str) -> str: if "[Скриншот" not in (user_text or ""): return "" return VISION_TURN_HINT def vision_debug_payload(result: VisionResult) -> dict[str, Any]: from app.config import get_settings payload: dict[str, Any] = { "model": result.model, "parsed": result.parsed, "image_meta": result.image_meta, "usage": result.usage, "parse_error": result.parse_error, } if get_settings().vision_debug_enabled: payload["raw_content"] = result.raw_content return payload def vision_debug_payloads(results: list[VisionResult]) -> dict[str, Any] | None: if not results: return None items = [vision_debug_payload(result) for result in results] if len(items) == 1: return items[0] models = {str(item.get("model") or "") for item in items} payload: dict[str, Any] = { "count": len(items), "images": items, "model": next(iter(models)) if len(models) == 1 else sorted(models), } return payload