200 lines
7.2 KiB
Python
200 lines
7.2 KiB
Python
from __future__ import annotations
|
||
|
||
import base64
|
||
import json
|
||
import logging
|
||
from dataclasses import dataclass, field
|
||
from typing import Any
|
||
|
||
from openai import APIStatusError
|
||
|
||
from app.llm.client import LLMClient
|
||
from app.projects.structuring import strip_markdown_json
|
||
from app.vision.preprocess import PreparedImage, prepare_image
|
||
from app.vision.prompts import VISION_SYSTEM_PROMPT
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class VisionUnavailableError(Exception):
|
||
"""Vision LLM endpoint missing or unreachable on OpenRouter."""
|
||
|
||
def __init__(self, model: str, detail: str) -> None:
|
||
self.model = model
|
||
super().__init__(detail)
|
||
|
||
|
||
@dataclass
|
||
class VisionResult:
|
||
parsed: dict[str, Any] = field(default_factory=dict)
|
||
raw_content: str = ""
|
||
model: str = ""
|
||
usage: dict[str, Any] = field(default_factory=dict)
|
||
image_meta: dict[str, Any] = field(default_factory=dict)
|
||
parse_error: str | None = None
|
||
|
||
|
||
class VisionService:
|
||
def __init__(self) -> None:
|
||
self.llm = LLMClient()
|
||
|
||
async def analyze(self, image_bytes: bytes, *, user_hint: str = "") -> VisionResult:
|
||
prepared = prepare_image(image_bytes)
|
||
return await self.analyze_prepared(prepared, user_hint=user_hint)
|
||
|
||
async def analyze_prepared(self, prepared: PreparedImage, *, user_hint: str = "") -> VisionResult:
|
||
b64 = base64.standard_b64encode(prepared.jpeg_bytes).decode("ascii")
|
||
hint = f"\n\nПодсказка пользователя: {user_hint.strip()}" if user_hint.strip() else ""
|
||
messages: list[dict[str, Any]] = [
|
||
{"role": "system", "content": VISION_SYSTEM_PROMPT},
|
||
{
|
||
"role": "user",
|
||
"content": [
|
||
{"type": "text", "text": f"Извлеки данные со скриншота.{hint}"},
|
||
{
|
||
"type": "image_url",
|
||
"image_url": {"url": f"data:image/jpeg;base64,{b64}"},
|
||
},
|
||
],
|
||
},
|
||
]
|
||
|
||
model = self.llm.vision_model
|
||
try:
|
||
response = await self.llm.complete_vision(messages)
|
||
except APIStatusError as exc:
|
||
if exc.status_code == 404:
|
||
raise VisionUnavailableError(
|
||
model,
|
||
f"Vision-модель «{model}» недоступна на OpenRouter. "
|
||
"Укажите другую в Settings (например google/gemini-2.5-flash-lite).",
|
||
) from exc
|
||
raise
|
||
raw = (response.get("content") or "").strip()
|
||
parsed: dict[str, Any] = {}
|
||
parse_error: str | None = None
|
||
try:
|
||
parsed = json.loads(strip_markdown_json(raw))
|
||
if not isinstance(parsed, dict):
|
||
parse_error = "Vision response is not a JSON object"
|
||
parsed = {}
|
||
except json.JSONDecodeError as exc:
|
||
parse_error = str(exc)
|
||
parsed = {"description": raw[:2000], "document_type": "other", "raw_fallback": True}
|
||
|
||
return VisionResult(
|
||
parsed=parsed,
|
||
raw_content=raw,
|
||
model=str(response.get("model") or self.llm.vision_model),
|
||
usage=dict(response.get("usage") or {}),
|
||
image_meta=prepared.to_meta(),
|
||
parse_error=parse_error,
|
||
)
|
||
|
||
|
||
def _format_screenshot_block(
|
||
result: VisionResult,
|
||
*,
|
||
index: int | None = None,
|
||
total: int | None = None,
|
||
) -> str:
|
||
parsed = result.parsed or {}
|
||
doc_type = parsed.get("document_type") or "other"
|
||
confidence = parsed.get("confidence") or "unknown"
|
||
if index is not None and total is not None and total > 1:
|
||
header = f"[Скриншот {index}/{total}: {doc_type}, confidence={confidence}]"
|
||
else:
|
||
header = f"[Скриншот: {doc_type}, confidence={confidence}]"
|
||
lines = [header]
|
||
|
||
if parsed.get("description"):
|
||
lines.append(f"Описание: {parsed['description']}")
|
||
|
||
extracted = parsed.get("extracted_text") or []
|
||
if extracted:
|
||
lines.append("Текст с экрана:")
|
||
lines.extend(f"- {line}" for line in extracted if str(line).strip())
|
||
|
||
tables = parsed.get("tables") or []
|
||
if tables:
|
||
lines.append("Таблицы:")
|
||
for table in tables:
|
||
title = table.get("title") if isinstance(table, dict) else None
|
||
if title:
|
||
lines.append(f" [{title}]")
|
||
rows = table.get("rows") if isinstance(table, dict) else None
|
||
if isinstance(rows, list):
|
||
for row in rows:
|
||
if isinstance(row, list):
|
||
lines.append(" | " + " | ".join(str(cell) for cell in row))
|
||
|
||
hints = parsed.get("fitness_hints")
|
||
if hints:
|
||
lines.append(f"Подсказки для фитнеса: {json.dumps(hints, ensure_ascii=False)}")
|
||
|
||
if result.parse_error:
|
||
lines.append(f"(parse_error: {result.parse_error})")
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
def format_user_message(caption: str, result: VisionResult) -> str:
|
||
return format_user_messages(caption, [result])
|
||
|
||
|
||
def format_user_messages(caption: str, results: list[VisionResult]) -> str:
|
||
if not results:
|
||
return caption.strip()
|
||
total = len(results)
|
||
blocks = [
|
||
_format_screenshot_block(result, index=index, total=total)
|
||
for index, result in enumerate(results, start=1)
|
||
]
|
||
text = "\n\n".join(blocks)
|
||
if caption.strip():
|
||
text = f"{text}\n\nПодпись: {caption.strip()}"
|
||
return text
|
||
|
||
|
||
VISION_TURN_HINT = (
|
||
"[Скриншоты в этом сообщении]: vision уже извлекла данные с каждой картинки в блоки [Скриншот] ниже. "
|
||
"Отвечай по Описанию и извлечённому тексту как по увиденному. "
|
||
"Не утверждай, что не видишь изображения, и не предлагай настроить vision API."
|
||
)
|
||
|
||
|
||
def format_vision_turn_hint(user_text: str) -> str:
|
||
if "[Скриншот" not in (user_text or ""):
|
||
return ""
|
||
return VISION_TURN_HINT
|
||
|
||
|
||
def vision_debug_payload(result: VisionResult) -> dict[str, Any]:
|
||
from app.config import get_settings
|
||
|
||
payload: dict[str, Any] = {
|
||
"model": result.model,
|
||
"parsed": result.parsed,
|
||
"image_meta": result.image_meta,
|
||
"usage": result.usage,
|
||
"parse_error": result.parse_error,
|
||
}
|
||
if get_settings().vision_debug_enabled:
|
||
payload["raw_content"] = result.raw_content
|
||
return payload
|
||
|
||
|
||
def vision_debug_payloads(results: list[VisionResult]) -> dict[str, Any] | None:
|
||
if not results:
|
||
return None
|
||
items = [vision_debug_payload(result) for result in results]
|
||
if len(items) == 1:
|
||
return items[0]
|
||
models = {str(item.get("model") or "") for item in items}
|
||
payload: dict[str, Any] = {
|
||
"count": len(items),
|
||
"images": items,
|
||
"model": next(iter(models)) if len(models) == 1 else sorted(models),
|
||
}
|
||
return payload
|