Files
2026-06-16 04:38:23 +00:00

200 lines
7.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import base64
import json
import logging
from dataclasses import dataclass, field
from typing import Any
from openai import APIStatusError
from app.llm.client import LLMClient
from app.projects.structuring import strip_markdown_json
from app.vision.preprocess import PreparedImage, prepare_image
from app.vision.prompts import VISION_SYSTEM_PROMPT
logger = logging.getLogger(__name__)
class VisionUnavailableError(Exception):
"""Vision LLM endpoint missing or unreachable on OpenRouter."""
def __init__(self, model: str, detail: str) -> None:
self.model = model
super().__init__(detail)
@dataclass
class VisionResult:
parsed: dict[str, Any] = field(default_factory=dict)
raw_content: str = ""
model: str = ""
usage: dict[str, Any] = field(default_factory=dict)
image_meta: dict[str, Any] = field(default_factory=dict)
parse_error: str | None = None
class VisionService:
def __init__(self) -> None:
self.llm = LLMClient()
async def analyze(self, image_bytes: bytes, *, user_hint: str = "") -> VisionResult:
prepared = prepare_image(image_bytes)
return await self.analyze_prepared(prepared, user_hint=user_hint)
async def analyze_prepared(self, prepared: PreparedImage, *, user_hint: str = "") -> VisionResult:
b64 = base64.standard_b64encode(prepared.jpeg_bytes).decode("ascii")
hint = f"\n\nПодсказка пользователя: {user_hint.strip()}" if user_hint.strip() else ""
messages: list[dict[str, Any]] = [
{"role": "system", "content": VISION_SYSTEM_PROMPT},
{
"role": "user",
"content": [
{"type": "text", "text": f"Извлеки данные со скриншота.{hint}"},
{
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{b64}"},
},
],
},
]
model = self.llm.vision_model
try:
response = await self.llm.complete_vision(messages)
except APIStatusError as exc:
if exc.status_code == 404:
raise VisionUnavailableError(
model,
f"Vision-модель «{model}» недоступна на OpenRouter. "
"Укажите другую в Settings (например google/gemini-2.5-flash-lite).",
) from exc
raise
raw = (response.get("content") or "").strip()
parsed: dict[str, Any] = {}
parse_error: str | None = None
try:
parsed = json.loads(strip_markdown_json(raw))
if not isinstance(parsed, dict):
parse_error = "Vision response is not a JSON object"
parsed = {}
except json.JSONDecodeError as exc:
parse_error = str(exc)
parsed = {"description": raw[:2000], "document_type": "other", "raw_fallback": True}
return VisionResult(
parsed=parsed,
raw_content=raw,
model=str(response.get("model") or self.llm.vision_model),
usage=dict(response.get("usage") or {}),
image_meta=prepared.to_meta(),
parse_error=parse_error,
)
def _format_screenshot_block(
result: VisionResult,
*,
index: int | None = None,
total: int | None = None,
) -> str:
parsed = result.parsed or {}
doc_type = parsed.get("document_type") or "other"
confidence = parsed.get("confidence") or "unknown"
if index is not None and total is not None and total > 1:
header = f"[Скриншот {index}/{total}: {doc_type}, confidence={confidence}]"
else:
header = f"[Скриншот: {doc_type}, confidence={confidence}]"
lines = [header]
if parsed.get("description"):
lines.append(f"Описание: {parsed['description']}")
extracted = parsed.get("extracted_text") or []
if extracted:
lines.append("Текст с экрана:")
lines.extend(f"- {line}" for line in extracted if str(line).strip())
tables = parsed.get("tables") or []
if tables:
lines.append("Таблицы:")
for table in tables:
title = table.get("title") if isinstance(table, dict) else None
if title:
lines.append(f" [{title}]")
rows = table.get("rows") if isinstance(table, dict) else None
if isinstance(rows, list):
for row in rows:
if isinstance(row, list):
lines.append(" | " + " | ".join(str(cell) for cell in row))
hints = parsed.get("fitness_hints")
if hints:
lines.append(f"Подсказки для фитнеса: {json.dumps(hints, ensure_ascii=False)}")
if result.parse_error:
lines.append(f"(parse_error: {result.parse_error})")
return "\n".join(lines)
def format_user_message(caption: str, result: VisionResult) -> str:
return format_user_messages(caption, [result])
def format_user_messages(caption: str, results: list[VisionResult]) -> str:
if not results:
return caption.strip()
total = len(results)
blocks = [
_format_screenshot_block(result, index=index, total=total)
for index, result in enumerate(results, start=1)
]
text = "\n\n".join(blocks)
if caption.strip():
text = f"{text}\n\nПодпись: {caption.strip()}"
return text
VISION_TURN_HINT = (
"[Скриншоты в этом сообщении]: vision уже извлекла данные с каждой картинки в блоки [Скриншот] ниже. "
"Отвечай по Описанию и извлечённому тексту как по увиденному. "
"Не утверждай, что не видишь изображения, и не предлагай настроить vision API."
)
def format_vision_turn_hint(user_text: str) -> str:
if "[Скриншот" not in (user_text or ""):
return ""
return VISION_TURN_HINT
def vision_debug_payload(result: VisionResult) -> dict[str, Any]:
from app.config import get_settings
payload: dict[str, Any] = {
"model": result.model,
"parsed": result.parsed,
"image_meta": result.image_meta,
"usage": result.usage,
"parse_error": result.parse_error,
}
if get_settings().vision_debug_enabled:
payload["raw_content"] = result.raw_content
return payload
def vision_debug_payloads(results: list[VisionResult]) -> dict[str, Any] | None:
if not results:
return None
items = [vision_debug_payload(result) for result in results]
if len(items) == 1:
return items[0]
models = {str(item.get("model") or "") for item in items}
payload: dict[str, Any] = {
"count": len(items),
"images": items,
"model": next(iter(models)) if len(models) == 1 else sorted(models),
}
return payload