smart tdee
This commit is contained in:
@@ -0,0 +1,10 @@
|
||||
from app.vision.analyze import VisionResult, VisionService, format_user_message, format_user_messages, vision_debug_payload, vision_debug_payloads
|
||||
|
||||
__all__ = [
|
||||
"VisionResult",
|
||||
"VisionService",
|
||||
"format_user_message",
|
||||
"format_user_messages",
|
||||
"vision_debug_payload",
|
||||
"vision_debug_payloads",
|
||||
]
|
||||
@@ -0,0 +1,199 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import json
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
from openai import APIStatusError
|
||||
|
||||
from app.llm.client import LLMClient
|
||||
from app.projects.structuring import strip_markdown_json
|
||||
from app.vision.preprocess import PreparedImage, prepare_image
|
||||
from app.vision.prompts import VISION_SYSTEM_PROMPT
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class VisionUnavailableError(Exception):
|
||||
"""Vision LLM endpoint missing or unreachable on OpenRouter."""
|
||||
|
||||
def __init__(self, model: str, detail: str) -> None:
|
||||
self.model = model
|
||||
super().__init__(detail)
|
||||
|
||||
|
||||
@dataclass
|
||||
class VisionResult:
|
||||
parsed: dict[str, Any] = field(default_factory=dict)
|
||||
raw_content: str = ""
|
||||
model: str = ""
|
||||
usage: dict[str, Any] = field(default_factory=dict)
|
||||
image_meta: dict[str, Any] = field(default_factory=dict)
|
||||
parse_error: str | None = None
|
||||
|
||||
|
||||
class VisionService:
|
||||
def __init__(self) -> None:
|
||||
self.llm = LLMClient()
|
||||
|
||||
async def analyze(self, image_bytes: bytes, *, user_hint: str = "") -> VisionResult:
|
||||
prepared = prepare_image(image_bytes)
|
||||
return await self.analyze_prepared(prepared, user_hint=user_hint)
|
||||
|
||||
async def analyze_prepared(self, prepared: PreparedImage, *, user_hint: str = "") -> VisionResult:
|
||||
b64 = base64.standard_b64encode(prepared.jpeg_bytes).decode("ascii")
|
||||
hint = f"\n\nПодсказка пользователя: {user_hint.strip()}" if user_hint.strip() else ""
|
||||
messages: list[dict[str, Any]] = [
|
||||
{"role": "system", "content": VISION_SYSTEM_PROMPT},
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{"type": "text", "text": f"Извлеки данные со скриншота.{hint}"},
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {"url": f"data:image/jpeg;base64,{b64}"},
|
||||
},
|
||||
],
|
||||
},
|
||||
]
|
||||
|
||||
model = self.llm.vision_model
|
||||
try:
|
||||
response = await self.llm.complete_vision(messages)
|
||||
except APIStatusError as exc:
|
||||
if exc.status_code == 404:
|
||||
raise VisionUnavailableError(
|
||||
model,
|
||||
f"Vision-модель «{model}» недоступна на OpenRouter. "
|
||||
"Укажите другую в Settings (например google/gemini-2.5-flash-lite).",
|
||||
) from exc
|
||||
raise
|
||||
raw = (response.get("content") or "").strip()
|
||||
parsed: dict[str, Any] = {}
|
||||
parse_error: str | None = None
|
||||
try:
|
||||
parsed = json.loads(strip_markdown_json(raw))
|
||||
if not isinstance(parsed, dict):
|
||||
parse_error = "Vision response is not a JSON object"
|
||||
parsed = {}
|
||||
except json.JSONDecodeError as exc:
|
||||
parse_error = str(exc)
|
||||
parsed = {"description": raw[:2000], "document_type": "other", "raw_fallback": True}
|
||||
|
||||
return VisionResult(
|
||||
parsed=parsed,
|
||||
raw_content=raw,
|
||||
model=str(response.get("model") or self.llm.vision_model),
|
||||
usage=dict(response.get("usage") or {}),
|
||||
image_meta=prepared.to_meta(),
|
||||
parse_error=parse_error,
|
||||
)
|
||||
|
||||
|
||||
def _format_screenshot_block(
|
||||
result: VisionResult,
|
||||
*,
|
||||
index: int | None = None,
|
||||
total: int | None = None,
|
||||
) -> str:
|
||||
parsed = result.parsed or {}
|
||||
doc_type = parsed.get("document_type") or "other"
|
||||
confidence = parsed.get("confidence") or "unknown"
|
||||
if index is not None and total is not None and total > 1:
|
||||
header = f"[Скриншот {index}/{total}: {doc_type}, confidence={confidence}]"
|
||||
else:
|
||||
header = f"[Скриншот: {doc_type}, confidence={confidence}]"
|
||||
lines = [header]
|
||||
|
||||
if parsed.get("description"):
|
||||
lines.append(f"Описание: {parsed['description']}")
|
||||
|
||||
extracted = parsed.get("extracted_text") or []
|
||||
if extracted:
|
||||
lines.append("Текст с экрана:")
|
||||
lines.extend(f"- {line}" for line in extracted if str(line).strip())
|
||||
|
||||
tables = parsed.get("tables") or []
|
||||
if tables:
|
||||
lines.append("Таблицы:")
|
||||
for table in tables:
|
||||
title = table.get("title") if isinstance(table, dict) else None
|
||||
if title:
|
||||
lines.append(f" [{title}]")
|
||||
rows = table.get("rows") if isinstance(table, dict) else None
|
||||
if isinstance(rows, list):
|
||||
for row in rows:
|
||||
if isinstance(row, list):
|
||||
lines.append(" | " + " | ".join(str(cell) for cell in row))
|
||||
|
||||
hints = parsed.get("fitness_hints")
|
||||
if hints:
|
||||
lines.append(f"Подсказки для фитнеса: {json.dumps(hints, ensure_ascii=False)}")
|
||||
|
||||
if result.parse_error:
|
||||
lines.append(f"(parse_error: {result.parse_error})")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def format_user_message(caption: str, result: VisionResult) -> str:
|
||||
return format_user_messages(caption, [result])
|
||||
|
||||
|
||||
def format_user_messages(caption: str, results: list[VisionResult]) -> str:
|
||||
if not results:
|
||||
return caption.strip()
|
||||
total = len(results)
|
||||
blocks = [
|
||||
_format_screenshot_block(result, index=index, total=total)
|
||||
for index, result in enumerate(results, start=1)
|
||||
]
|
||||
text = "\n\n".join(blocks)
|
||||
if caption.strip():
|
||||
text = f"{text}\n\nПодпись: {caption.strip()}"
|
||||
return text
|
||||
|
||||
|
||||
VISION_TURN_HINT = (
|
||||
"[Скриншоты в этом сообщении]: vision уже извлекла данные с каждой картинки в блоки [Скриншот] ниже. "
|
||||
"Отвечай по Описанию и извлечённому тексту как по увиденному. "
|
||||
"Не утверждай, что не видишь изображения, и не предлагай настроить vision API."
|
||||
)
|
||||
|
||||
|
||||
def format_vision_turn_hint(user_text: str) -> str:
|
||||
if "[Скриншот" not in (user_text or ""):
|
||||
return ""
|
||||
return VISION_TURN_HINT
|
||||
|
||||
|
||||
def vision_debug_payload(result: VisionResult) -> dict[str, Any]:
|
||||
from app.config import get_settings
|
||||
|
||||
payload: dict[str, Any] = {
|
||||
"model": result.model,
|
||||
"parsed": result.parsed,
|
||||
"image_meta": result.image_meta,
|
||||
"usage": result.usage,
|
||||
"parse_error": result.parse_error,
|
||||
}
|
||||
if get_settings().vision_debug_enabled:
|
||||
payload["raw_content"] = result.raw_content
|
||||
return payload
|
||||
|
||||
|
||||
def vision_debug_payloads(results: list[VisionResult]) -> dict[str, Any] | None:
|
||||
if not results:
|
||||
return None
|
||||
items = [vision_debug_payload(result) for result in results]
|
||||
if len(items) == 1:
|
||||
return items[0]
|
||||
models = {str(item.get("model") or "") for item in items}
|
||||
payload: dict[str, Any] = {
|
||||
"count": len(items),
|
||||
"images": items,
|
||||
"model": next(iter(models)) if len(models) == 1 else sorted(models),
|
||||
}
|
||||
return payload
|
||||
@@ -0,0 +1,53 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
from dataclasses import dataclass
|
||||
|
||||
from PIL import Image, ImageOps
|
||||
|
||||
from app.config import get_settings
|
||||
|
||||
|
||||
@dataclass
|
||||
class PreparedImage:
|
||||
jpeg_bytes: bytes
|
||||
width: int
|
||||
height: int
|
||||
original_bytes: int
|
||||
compressed_bytes: int
|
||||
mime: str = "image/jpeg"
|
||||
|
||||
def to_meta(self) -> dict[str, int | str]:
|
||||
return {
|
||||
"mime": self.mime,
|
||||
"width": self.width,
|
||||
"height": self.height,
|
||||
"original_bytes": self.original_bytes,
|
||||
"compressed_bytes": self.compressed_bytes,
|
||||
}
|
||||
|
||||
|
||||
def prepare_image(raw_bytes: bytes) -> PreparedImage:
|
||||
settings = get_settings()
|
||||
max_edge = max(256, int(settings.vision_max_edge_px))
|
||||
quality = max(40, min(95, int(settings.vision_jpeg_quality)))
|
||||
|
||||
with Image.open(io.BytesIO(raw_bytes)) as img:
|
||||
img = ImageOps.exif_transpose(img)
|
||||
img = img.convert("RGB")
|
||||
width, height = img.size
|
||||
if max(width, height) > max_edge:
|
||||
img.thumbnail((max_edge, max_edge), Image.Resampling.LANCZOS)
|
||||
width, height = img.size
|
||||
|
||||
buffer = io.BytesIO()
|
||||
img.save(buffer, format="JPEG", quality=quality, optimize=True)
|
||||
jpeg_bytes = buffer.getvalue()
|
||||
|
||||
return PreparedImage(
|
||||
jpeg_bytes=jpeg_bytes,
|
||||
width=width,
|
||||
height=height,
|
||||
original_bytes=len(raw_bytes),
|
||||
compressed_bytes=len(jpeg_bytes),
|
||||
)
|
||||
@@ -0,0 +1,30 @@
|
||||
VISION_SYSTEM_PROMPT = """
|
||||
Ты OCR-ассистент для скриншотов приложений здоровья и фитнеса (Mi Fitness, Xiaomi, Zepp Life и аналоги).
|
||||
Извлеки ВСЕ видимые тексты, числа и таблицы. Приоритет — измеримые данные: длительность, калории, пульс, шаги, дистанция, дата, название активности.
|
||||
Ответ — ТОЛЬКО JSON без markdown и комментариев.
|
||||
Схема:
|
||||
{
|
||||
"description": "краткое описание экрана",
|
||||
"document_type": "fitness_workout|fitness_steps|fitness_summary|other",
|
||||
"extracted_text": ["строка1"],
|
||||
"tables": [{"title": "заголовок или null", "rows": [["ячейка1", "ячейка2"]]}],
|
||||
"fitness_hints": {
|
||||
"title": null,
|
||||
"activity_type": null,
|
||||
"duration_min": null,
|
||||
"active_calories": null,
|
||||
"total_calories": null,
|
||||
"steps": null,
|
||||
"avg_heart_rate": null,
|
||||
"date": null
|
||||
},
|
||||
"confidence": "high|medium|low",
|
||||
"notes": ""
|
||||
}
|
||||
Правила:
|
||||
- extracted_text — все значимые строки с экрана по порядку сверху вниз.
|
||||
- tables — любые табличные блоки (заголовок + строки).
|
||||
- fitness_hints — только если данные явно видны; иначе null.
|
||||
- duration_min — целые минуты; steps — целое число; калории и пульс — числа.
|
||||
- confidence=low если текст размыт или часть обрезана.
|
||||
""".strip()
|
||||
@@ -0,0 +1,32 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
|
||||
from app.config import get_settings
|
||||
from app.vision.preprocess import PreparedImage
|
||||
|
||||
|
||||
def save_upload(prepared: PreparedImage, *, user_id: int) -> str:
|
||||
settings = get_settings()
|
||||
user_dir = Path(settings.uploads_dir) / str(user_id)
|
||||
user_dir.mkdir(parents=True, exist_ok=True)
|
||||
name = f"{uuid.uuid4().hex}.jpg"
|
||||
path = user_dir / name
|
||||
path.write_bytes(prepared.jpeg_bytes)
|
||||
return name
|
||||
|
||||
|
||||
def upload_media_path(user_id: int, filename: str) -> str:
|
||||
return f"/api/v1/media/uploads/{user_id}/{filename}"
|
||||
|
||||
|
||||
def format_upload_images_markdown(user_id: int, filenames: list[str]) -> str:
|
||||
if not filenames:
|
||||
return ""
|
||||
total = len(filenames)
|
||||
lines: list[str] = []
|
||||
for index, name in enumerate(filenames, start=1):
|
||||
alt = f"скриншот {index}/{total}" if total > 1 else "скриншот"
|
||||
lines.append(f"})")
|
||||
return "\n".join(lines)
|
||||
Reference in New Issue
Block a user