This commit is contained in:
2026-06-15 03:15:08 +00:00
parent 0c8ab6018a
commit f2e98942ff
18 changed files with 1484 additions and 261 deletions
+46 -26
View File
@@ -1,13 +1,17 @@
"""Сборка Anima-промптов без LLM (теги, без POV/hybrid)."""
"""Сборка Anima-промптов: appearance из карточки + action/outfit из контекста."""
from __future__ import annotations
import re
from dataclasses import dataclass
ANIMA_QUALITY = "masterpiece, best quality, score_7, anime"
ANIMA_NEGATIVE = "worst quality, low quality, score_1, score_2, score_3, blurry, jpeg artifacts, sepia"
_INVALID_TAGS = frozenset({
"pumped_up", "pumped", "looking_at_each_other", "couple",
"2girls", "2boys", "multiple_girls", "multiple_boys",
})
_JUNK_STANDALONE_TAGS = frozenset({
"white", "black", "skin", "ear", "ears", "girl", "boy", "fox", "wolf", "cat",
"short", "tall", "golden", "silver", "red", "blue", "green", "purple",
@@ -33,6 +37,8 @@ def _sanitize_tags(tag_str: str) -> str:
key = t.lower().replace(" ", "_")
if key in seen or len(key) <= 2:
continue
if key in _INVALID_TAGS:
continue
if "_" not in key and key in _JUNK_STANDALONE_TAGS:
continue
seen.add(key)
@@ -48,20 +54,26 @@ def _append_lora(parts: list[str], lora_name: str, lora_weight: float) -> None:
parts.append(f"<lora:{lora}:{weight}>")
def build_draw_self_prompt(
def build_character_image_prompt(
appearance_tags: str,
*,
action_tags: str = "",
outfit_tags: str = "",
environment_tags: str = "",
lora_name: str = "",
lora_weight: float = 0.8,
) -> AnimaPromptBundle:
"""Портрет «нарисуй себя» — только booru-теги, без POV и prose."""
"""Appearance (карточка) + action/outfit/env (контекст), только теги."""
appearance = _sanitize_tags(appearance_tags)
action = "looking_at_viewer, smile, upper_body, portrait"
environment = "simple_background, soft_lighting"
outfit = _sanitize_tags(outfit_tags)
action = _sanitize_tags(action_tags) or "looking_at_viewer, smile"
environment = _sanitize_tags(environment_tags) or "simple_background, soft_lighting"
parts = [ANIMA_QUALITY]
if appearance:
parts.append(appearance)
if outfit:
parts.append(outfit)
parts.append(action)
parts.append(environment)
_append_lora(parts, lora_name, lora_weight)
@@ -70,6 +82,25 @@ def build_draw_self_prompt(
return AnimaPromptBundle(positive=positive, negative=ANIMA_NEGATIVE)
def build_draw_self_prompt(
appearance_tags: str,
*,
action_tags: str = "",
outfit_tags: str = "",
environment_tags: str = "",
lora_name: str = "",
lora_weight: float = 0.8,
) -> AnimaPromptBundle:
return build_character_image_prompt(
appearance_tags,
action_tags=action_tags,
outfit_tags=outfit_tags,
environment_tags=environment_tags,
lora_name=lora_name,
lora_weight=lora_weight,
)
def build_scene_tags_prompt(
scene_tags: str,
appearance_tags: str,
@@ -77,24 +108,13 @@ def build_scene_tags_prompt(
lora_name: str = "",
lora_weight: float = 0.8,
) -> AnimaPromptBundle:
"""Прямая сцена из booru-тегов (без LLM)."""
appearance = _sanitize_tags(appearance_tags)
"""Готовые booru-теги сцены + appearance."""
scene = _sanitize_tags(scene_tags)
parts = [ANIMA_QUALITY]
if appearance:
parts.append(appearance)
if scene:
parts.append(scene)
_append_lora(parts, lora_name, lora_weight)
positive = ", ".join(p.strip() for p in parts if p.strip())
return AnimaPromptBundle(positive=positive, negative=ANIMA_NEGATIVE)
def looks_like_booru_tags(text: str) -> bool:
"""Грубая эвристика: строка похожа на теги, а не на прозу."""
raw = (text or "").strip()
if not raw or len(raw) > 400:
return False
if raw.count(",") >= 2:
return True
return bool(re.search(r"\b\d+(girl|boy)s?\b", raw, re.I))
return build_character_image_prompt(
appearance_tags,
action_tags=scene,
outfit_tags="",
environment_tags="simple_background, soft_lighting",
lora_name=lora_name,
lora_weight=lora_weight,
)
+251 -190
View File
@@ -1,190 +1,251 @@
from typing import Any
from sqlalchemy.orm import Session
from app.character.service import CharacterService
from app.config import get_settings
from app.homelab.anima_prompt import AnimaPromptBundle, build_draw_self_prompt, build_scene_tags_prompt, looks_like_booru_tags
from app.homelab.comfyui import ComfyUIClient
from app.integrations.rp_chat import RpChatClient
def _card_image_settings(db: Session, user_id: int) -> dict[str, Any]:
return CharacterService(db, user_id).get_card().get("data", {})
def _session_messages(db: Session, session_id: int | None, limit: int = 8) -> list[dict[str, str]]:
if not session_id:
return []
from sqlalchemy import select
from app.db.models import Message
rows = db.scalars(
select(Message)
.where(
Message.session_id == session_id,
Message.role.in_(("user", "assistant")),
)
.order_by(Message.created_at.desc())
.limit(limit)
).all()
rows = list(reversed(rows))
return [{"role": m.role, "content": (m.content or "").strip()} for m in rows if m.content.strip()]
def _append_lora(positive: str, lora_name: str, lora_weight: float) -> str:
if not lora_name or f"<lora:{lora_name}" in positive:
return positive
return f"{positive} <lora:{lora_name}:{lora_weight}>"
async def _generate_from_bundle(
bundle: AnimaPromptBundle,
*,
backend: str,
persona_id: str = "",
) -> dict[str, Any]:
settings = get_settings()
if backend == "rp_chat":
client = RpChatClient()
gen_result = await client.generate(bundle.positive, bundle.negative)
if not gen_result.get("ok"):
return gen_result
saved = await client.save_image_locally(gen_result["image_path"])
if not saved.get("ok"):
return saved
return {
"ok": True,
"url": saved["url"],
"filename": saved["filename"],
"prompt": bundle.positive,
"negative_prompt": bundle.negative,
"backend": "rp_chat",
"persona_id": persona_id,
"prompt_mode": "direct",
}
result = await ComfyUIClient().generate_image(
bundle.positive,
negative_prompt=bundle.negative,
)
if result.get("ok"):
result["backend"] = "comfyui_local"
result["prompt_mode"] = "direct"
result["negative_prompt"] = bundle.negative
return result
async def generate_image(
db: Session,
*,
user_id: int,
session_id: int | None = None,
draw_self: bool = False,
scene_description: str = "",
) -> dict[str, Any]:
card = _card_image_settings(db, user_id)
settings = get_settings()
if not card.get("sd_enabled", True):
return {"ok": False, "error": "Генерация изображений отключена в настройках персонажа"}
if not draw_self and not scene_description.strip():
return {"ok": False, "error": "Нужен draw_self=true или scene_description"}
appearance = (card.get("appearance_tags") or "").strip()
lora_name = (card.get("lora_name") or "").strip()
lora_weight = float(card.get("lora_weight") or 0.8)
persona_id = (card.get("rp_persona_id") or "").strip() or "default"
backend = "rp_chat" if settings.rp_chat_enabled else "comfyui_local"
if draw_self:
if not appearance:
return {
"ok": False,
"error": "Заполни appearance_tags в настройках персонажа для «нарисуй себя»",
}
bundle = build_draw_self_prompt(
appearance,
lora_name=lora_name,
lora_weight=lora_weight,
)
return await _generate_from_bundle(bundle, backend=backend, persona_id=persona_id)
scene = scene_description.strip()
if looks_like_booru_tags(scene):
if not appearance:
bundle = build_scene_tags_prompt(scene, "", lora_name=lora_name, lora_weight=lora_weight)
else:
bundle = build_scene_tags_prompt(
scene,
appearance,
lora_name=lora_name,
lora_weight=lora_weight,
)
return await _generate_from_bundle(bundle, backend=backend, persona_id=persona_id)
messages = _session_messages(db, session_id)
messages = messages + [{"role": "user", "content": scene}]
if settings.rp_chat_enabled:
return await _generate_via_rp_chat(
card,
messages,
appearance_override=appearance or None,
)
fallback = f"{appearance}, {scene}" if appearance else scene
return await ComfyUIClient().generate_image(fallback)
async def _generate_via_rp_chat(
card: dict[str, Any],
messages: list[dict[str, str]],
appearance_override: str | None,
) -> dict[str, Any]:
client = RpChatClient()
persona_id = (card.get("rp_persona_id") or "").strip() or "default"
override = appearance_override or (card.get("appearance_tags") or "").strip() or None
prompt_result = await client.sd_prompt(
persona_id,
messages,
appearance_override=override,
)
if not prompt_result.get("ok"):
return prompt_result
positive = (
prompt_result.get("hybrid_positive")
or prompt_result.get("tag_positive")
or ""
).strip()
negative = (prompt_result.get("negative") or "").strip()
if not positive:
return {"ok": False, "error": "RP-чат не вернул промпт", "raw": prompt_result}
lora = (card.get("lora_name") or "").strip()
if lora:
weight = float(card.get("lora_weight") or 0.8)
positive = _append_lora(positive, lora, weight)
gen_result = await client.generate(positive, negative)
if not gen_result.get("ok"):
return gen_result
saved = await client.save_image_locally(gen_result["image_path"])
if not saved.get("ok"):
return saved
return {
"ok": True,
"url": saved["url"],
"filename": saved["filename"],
"prompt": positive,
"negative_prompt": negative,
"backend": "rp_chat",
"persona_id": persona_id,
"prompt_mode": "llm",
}
from typing import Any
from sqlalchemy.orm import Session
from app.character.service import CharacterService
from app.config import get_settings
from app.homelab.anima_prompt import AnimaPromptBundle, build_character_image_prompt, build_scene_tags_prompt
from app.homelab.comfyui import ComfyUIClient
from app.homelab.scene_tags import extract_scene_tags, looks_like_booru_tags
from app.integrations.rp_chat import RpChatClient
def _card_image_settings(db: Session, user_id: int) -> dict[str, Any]:
return CharacterService(db, user_id).get_card().get("data", {})
def _session_messages(db: Session, session_id: int | None, limit: int = 8) -> list[dict[str, str]]:
if not session_id:
return []
from sqlalchemy import select
from app.db.models import Message
rows = db.scalars(
select(Message)
.where(
Message.session_id == session_id,
Message.role.in_(("user", "assistant")),
)
.order_by(Message.created_at.desc())
.limit(limit)
).all()
rows = list(reversed(rows))
return [{"role": m.role, "content": (m.content or "").strip()} for m in rows if m.content.strip()]
def _last_user_message(messages: list[dict[str, str]]) -> str:
for msg in reversed(messages):
if msg.get("role") == "user" and (msg.get("content") or "").strip():
return str(msg["content"]).strip()
return ""
def _append_lora(positive: str, lora_name: str, lora_weight: float) -> str:
if not lora_name or f"<lora:{lora_name}" in positive:
return positive
return f"{positive} <lora:{lora_name}:{lora_weight}>"
async def _generate_from_bundle(
bundle: AnimaPromptBundle,
*,
backend: str,
persona_id: str = "",
prompt_mode: str = "direct",
tag_source: str = "",
) -> dict[str, Any]:
if backend == "rp_chat":
client = RpChatClient()
gen_result = await client.generate(bundle.positive, bundle.negative)
if not gen_result.get("ok"):
return gen_result
saved = await client.save_image_locally(gen_result["image_path"])
if not saved.get("ok"):
return saved
return {
"ok": True,
"url": saved["url"],
"filename": saved["filename"],
"prompt": bundle.positive,
"negative_prompt": bundle.negative,
"backend": "rp_chat",
"persona_id": persona_id,
"prompt_mode": prompt_mode,
"tag_source": tag_source,
}
result = await ComfyUIClient().generate_image(
bundle.positive,
negative_prompt=bundle.negative,
)
if result.get("ok"):
result["backend"] = "comfyui_local"
result["prompt_mode"] = prompt_mode
result["negative_prompt"] = bundle.negative
result["tag_source"] = tag_source
return result
async def _build_contextual_bundle(
appearance: str,
*,
request: str,
messages: list[dict[str, str]],
lora_name: str,
lora_weight: float,
) -> tuple[AnimaPromptBundle, str]:
tags = await extract_scene_tags(request, messages, appearance_tags=appearance)
bundle = build_character_image_prompt(
appearance,
action_tags=tags.get("action_tags", ""),
outfit_tags=tags.get("outfit_tags", ""),
environment_tags=tags.get("environment_tags", ""),
lora_name=lora_name,
lora_weight=lora_weight,
)
return bundle, str(tags.get("source") or "")
async def generate_image(
db: Session,
*,
user_id: int,
session_id: int | None = None,
draw_self: bool = False,
scene_description: str = "",
) -> dict[str, Any]:
card = _card_image_settings(db, user_id)
settings = get_settings()
if not card.get("sd_enabled", True):
return {"ok": False, "error": "Генерация изображений отключена в настройках персонажа"}
if not draw_self and not scene_description.strip():
return {"ok": False, "error": "Нужен draw_self=true или scene_description"}
+210 -27
View File
@@ -29,42 +29,172 @@ WEATHER_CODES: dict[int, str] = {
99: "гроза с градом",
}
_cache: dict[str, Any] = {"data": None, "expires_at": 0.0}
_cache: dict[str, Any] = {
"data": None,
"fetched_at": 0.0,
"expires_at": 0.0,
"source": "local",
"local_coverage": {"current": [], "hourly": []},
}
CURRENT_FIELDS = (
"temperature_2m",
"apparent_temperature",
"relative_humidity_2m",
"precipitation",
"weather_code",
"wind_speed_10m",
)
HOURLY_FIELDS = (
"temperature_2m",
"precipitation_probability",
"precipitation",
"weather_code",
)
RECOMMENDED_SYNC_DOMAINS = "dwd_icon,ncep_gfs013,ncep_gefs025"
RECOMMENDED_SYNC_VARIABLES = (
"temperature_2m,dew_point_2m,relative_humidity_2m,precipitation_probability,"
"precipitation,rain,cloud_cover,weather_code,wind_u_component_10m,wind_v_component_10m"
)
SYNC_HINT = (
"Контейнер open-meteo-sync, скорее всего, качает только temperature_2m. "
f"Задай SYNC_DOMAINS={RECOMMENDED_SYNC_DOMAINS} и "
f"SYNC_VARIABLES={RECOMMENDED_SYNC_VARIABLES} (~12 GB). "
"Документация: github.com/open-meteo/open-data/tree/main/tutorial_weather_api"
)
def _hourly_series(hourly: dict[str, Any], key: str) -> list[Any]:
values = hourly.get(key)
return values if isinstance(values, list) else []
def _hourly_start_index(times: list[str], anchor_time: str | None) -> int:
if not times:
return 0
if not anchor_time:
return 0
best = 0
for i, t in enumerate(times):
if t <= anchor_time:
best = i
else:
break
return best
def _field_coverage(raw: dict[str, Any]) -> dict[str, list[str]]:
"""Какие поля реально пришли от OpenMeteo (не null)."""
current = raw.get("current") or {}
hourly = raw.get("hourly") or {}
current_present = [
key for key in CURRENT_FIELDS if current.get(key) is not None
]
hourly_present = []
for key in HOURLY_FIELDS:
series = _hourly_series(hourly, key)
if any(v is not None for v in series):
hourly_present.append(key)
return {"current": current_present, "hourly": hourly_present}
def _coverage_sufficient(coverage: dict[str, list[str]]) -> bool:
current = set(coverage.get("current") or [])
hourly = set(coverage.get("hourly") or [])
if "weather_code" not in current:
return False
if len(current) < 3:
return False
if "precipitation_probability" not in hourly and "weather_code" not in hourly:
return False
return True
def _fmt_num(value: Any, *, suffix: str = "") -> str:
if value is None:
return ""
if isinstance(value, float):
text = f"{value:.1f}".rstrip("0").rstrip(".")
else:
text = str(value)
return f"{text}{suffix}" if suffix else text
class OpenMeteoClient:
def __init__(self) -> None:
settings = get_settings()
self.base_url = settings.openmeteo_base_url.rstrip("/")
self.fallback_url = (settings.openmeteo_fallback_url or "").strip().rstrip("/")
self.fallback_on_partial = settings.openmeteo_fallback_on_partial
self.lat = settings.weather_lat
self.lon = settings.weather_lon
self.location_name = settings.weather_location_name
self.cache_ttl = settings.weather_cache_sec
def _request_params(self) -> dict[str, Any]:
return {
"latitude": self.lat,
"longitude": self.lon,
"current": ",".join(CURRENT_FIELDS),
"hourly": ",".join(HOURLY_FIELDS),
"timezone": "auto",
"forecast_days": 2,
}
def _fetch_from_url(self, base_url: str) -> dict[str, Any]:
with httpx.Client(timeout=20.0) as client:
response = client.get(f"{base_url.rstrip('/')}/v1/forecast", params=self._request_params())
response.raise_for_status()
return response.json()
def _fetch_raw(self) -> dict[str, Any]:
now = time.time()
if _cache["data"] and now < _cache["expires_at"]:
return _cache["data"]
params = {
"latitude": self.lat,
"longitude": self.lon,
"current": (
"temperature_2m,apparent_temperature,relative_humidity_2m,"
"precipitation,weather_code,wind_speed_10m"
),
"hourly": "temperature_2m,precipitation_probability,precipitation,weather_code",
"timezone": "auto",
"forecast_days": 2,
}
with httpx.Client(timeout=15.0) as client:
response = client.get(f"{self.base_url}/v1/forecast", params=params)
response.raise_for_status()
data = response.json()
local_raw = self._fetch_from_url(self.base_url)
local_coverage = _field_coverage(local_raw)
source = "local"
raw = local_raw
_cache["data"] = data
if (
self.fallback_on_partial
and self.fallback_url
and self.fallback_url.rstrip("/") != self.base_url
and not _coverage_sufficient(local_coverage)
):
try:
fallback_raw = self._fetch_from_url(self.fallback_url)
if _coverage_sufficient(_field_coverage(fallback_raw)):
raw = fallback_raw
source = "fallback"
except Exception:
pass
_cache["data"] = raw
_cache["fetched_at"] = now
_cache["expires_at"] = now + self.cache_ttl
return data
_cache["source"] = source
_cache["local_coverage"] = local_coverage
return raw
def cache_status(self) -> dict[str, Any]:
now = time.time()
fetched_at = float(_cache.get("fetched_at") or 0)
expires_at = float(_cache.get("expires_at") or 0)
has_data = _cache.get("data") is not None
age_sec = int(now - fetched_at) if fetched_at else None
expires_in_sec = max(0, int(expires_at - now)) if expires_at else None
return {
"has_data": has_data,
"cached": bool(has_data and expires_at and now < expires_at),
"fetched_at": fetched_at or None,
"age_sec": age_sec,
"ttl_sec": self.cache_ttl,
"expires_in_sec": expires_in_sec,
"source": _cache.get("source") or "local",
}
def fetch_current_and_hourly(self, hours_ahead: int = 12) -> dict[str, Any]:
try:
@@ -75,21 +205,32 @@ class OpenMeteoClient:
current = raw.get("current") or {}
hourly = raw.get("hourly") or {}
times = hourly.get("time") or []
limit = min(hours_ahead, len(times))
start = _hourly_start_index(times, current.get("time"))
end = min(start + hours_ahead, len(times))
hourly_slice = []
for i in range(limit):
for i in range(start, end):
code = _hourly_series(hourly, "weather_code")[i] if i < len(_hourly_series(hourly, "weather_code")) else None
temp_series = _hourly_series(hourly, "temperature_2m")
precip_series = _hourly_series(hourly, "precipitation")
prob_series = _hourly_series(hourly, "precipitation_probability")
hourly_slice.append({
"time": times[i],
"temperature_c": hourly.get("temperature_2m", [None])[i],
"precipitation_mm": hourly.get("precipitation", [None])[i],
"precipitation_probability": hourly.get("precipitation_probability", [None])[i],
"weather_code": hourly.get("weather_code", [None])[i],
"temperature_c": temp_series[i] if i < len(temp_series) else None,
"precipitation_mm": precip_series[i] if i < len(precip_series) else None,
"precipitation_probability": prob_series[i] if i < len(prob_series) else None,
"weather_code": code,
"conditions": WEATHER_CODES.get(code, "неизвестно") if code is not None else "неизвестно",
})
code = current.get("weather_code")
coverage = _field_coverage(raw)
return {
"ok": True,
"location": self.location_name,
"data_source": _cache.get("source") or "local",
"local_field_coverage": _cache.get("local_coverage") or coverage,
"field_coverage": coverage,
"sync_hint": SYNC_HINT if not _coverage_sufficient(_cache.get("local_coverage") or coverage) else "",
"current": {
"time": current.get("time"),
"temperature_c": current.get("temperature_2m"),
@@ -132,10 +273,13 @@ def format_weather_snapshot(data: dict[str, Any] | None = None) -> str:
return "\n".join(lines)
cur = snapshot.get("current") or {}
apparent = cur.get("apparent_temperature_c")
wind = cur.get("wind_speed_kmh")
apparent_part = f", ощущается {_fmt_num(apparent, suffix='°C')}" if apparent is not None else ""
wind_part = f", ветер {_fmt_num(wind, suffix=' км/ч')}" if wind is not None else ""
lines.append(
f"{snapshot.get('location')}: {cur.get('temperature_c')}°C "
f"(ощущается {cur.get('apparent_temperature_c')}°C), "
f"{cur.get('conditions')}, ветер {cur.get('wind_speed_kmh')} км/ч."
f"{snapshot.get('location')}: {_fmt_num(cur.get('temperature_c'), suffix='°C')}"
f"{apparent_part}, {cur.get('conditions') or 'неизвестно'}{wind_part}."
)
hourly = snapshot.get("hourly") or []
rainy_hours = []
@@ -151,3 +295,42 @@ def format_weather_snapshot(data: dict[str, Any] | None = None) -> str:
lines.append("Существенных осадков в ближайшие часы не ожидается.")
lines.append("Вопросы «что на улице» / «будет ли дождь» — get_weather.")
return "\n".join(lines)
def build_weather_dashboard(hours_ahead: int = 12) -> dict[str, Any]:
"""Полный снимок для UI: данные OpenMeteo + контекст ассистента."""
client = OpenMeteoClient()
weather = client.fetch_current_and_hourly(hours_ahead=hours_ahead)
settings = get_settings()
return {
"weather": weather,
"rain_summary": client.rain_summary(hours_ahead=hours_ahead) if weather.get("ok") else "",
"assistant_context": format_weather_snapshot(weather),
"cache": client.cache_status(),
"config": {
"location": client.location_name,
"latitude": client.lat,
"longitude": client.lon,
"openmeteo_base_url": client.base_url,
"cache_ttl_sec": client.cache_ttl,
"forecast_days": 2,
"timezone": "auto",
},
"available_fields": {
"current": list(CURRENT_FIELDS),
"hourly": list(HOURLY_FIELDS),
},
"field_coverage": weather.get("field_coverage") if weather.get("ok") else {"current": [], "hourly": []},
"local_field_coverage": weather.get("local_field_coverage") if weather.get("ok") else {"current": [], "hourly": []},
"data_source": weather.get("data_source", "local") if weather.get("ok") else "local",
"sync_hint": weather.get("sync_hint", "") if weather.get("ok") else SYNC_HINT,
"recommended_sync": {
"domains": RECOMMENDED_SYNC_DOMAINS,
"variables": RECOMMENDED_SYNC_VARIABLES,
},
"assistant_tools": {
"get_weather": "Текущая погода и почасовой прогнос (hours_ahead до 48)",
"get_morning_briefing": "Погода + заголовки RSS-новостей",
},
"system_prompt": "Краткий блок [Погода] в system prompt каждого сообщения (6 ч почасово).",
}
+189
View File
@@ -0,0 +1,189 @@
"""Извлечение action/outfit/environment в danbooru-теги из запроса и чата."""
from __future__ import annotations
import json
import logging
import re
from typing import Any
from app.config import get_settings
from app.llm.client import LLMClient
from app.projects.structuring import strip_markdown_json
logger = logging.getLogger(__name__)
SCENE_TAGS_PROMPT = """
Ты переводишь запрос на иллюстрацию персонажа в теги Stable Diffusion (danbooru/e621).
Ответь ТОЛЬКО JSON без markdown:
{
"action_tags": "pose, framing, expression, activity — 3-10 тегов через запятую",
"outfit_tags": "одежда и аксессуары или пустая строка",
"environment_tags": "локация, освещение, время суток — 2-6 тегов или пустая строка"
}
Правила:
- Только настоящие booru-теги. Пробелы в тегах → underscore (full_body, looking_at_viewer).
- НЕ дублируй внешность персонажа (волосы, глаза, уши, хвост, телосложение) — они уже в appearance_tags.
- НЕ включай quality-теги, 1girl, имена моделей.
- «полный рост» / full body → full_body, standing (НЕ upper_body, НЕ portrait).
- «портрет» / крупный план → upper_body, portrait или close-up.
- Одежду бери из запроса и контекста чата (фартук, платье, домашняя одежда → соответствующие теги).
- Если фон не указан — simple_background, soft_lighting.
- Запрещённые теги: pumped_up, looking_at_each_other, couple, 2girls.
""".strip()
def _chat_excerpt(messages: list[dict[str, str]], limit: int = 6) -> str:
lines: list[str] = []
for msg in messages[-limit:]:
role = msg.get("role", "user")
content = (msg.get("content") or "").strip()
if not content or role not in ("user", "assistant"):
continue
label = "Пользователь" if role == "user" else "Персонаж"
if len(content) > 600:
content = content[:597] + "..."
lines.append(f"{label}: {content}")
return "\n".join(lines)
def rule_based_scene_tags(request: str, messages: list[dict[str, str]] | None = None) -> dict[str, str]:
"""Быстрый fallback без LLM."""
blob = " ".join(
[
request or "",
_chat_excerpt(messages or [], limit=4),
]
).lower()
action: list[str] = []
if re.search(r"полный\s+рост|full[\s_-]?body|в\s+полный\s+рост|целиком|head\s+to\s+toe", blob):
action.extend(["full_body", "standing", "looking_at_viewer"])
elif re.search(r"портрет|portrait|крупн|upper[\s_-]?body|бust|бюст", blob):
action.extend(["upper_body", "portrait", "looking_at_viewer"])
elif re.search(r"сидит|sitting|на стуле", blob):
action.extend(["sitting", "looking_at_viewer"])
elif re.search(r"лежит|lying|на кровати", blob):
action.extend(["lying", "on_bed", "looking_at_viewer"])
else:
action.extend(["looking_at_viewer", "smile"])
if re.search(r"смущ|embarrass|blush|стесн", blob):
action.append("blush")
if re.search(r"улыб|smile|happy", blob):
action.append("smile")
outfit: list[str] = []
outfit_map = (
(r"фартук|apron", "apron"),
(r"плать|dress", "dress"),
(r"халат|robe|bathrobe", "robe"),
(r"купальник|swimsuit|bikini", "swimsuit"),
(r"школьн|school uniform|serafuku", "school_uniform"),
(r"обнаж|nude|голая|topless", "nude"),
(r"джинс|jeans", "jeans"),
(r"свитер|sweater", "sweater"),
)
for pattern, tag in outfit_map:
if re.search(pattern, blob):
outfit.append(tag)
env: list[str] = []
if re.search(r"комнат|bedroom|дома|indoors|room", blob):
env.extend(["indoors", "soft_lighting"])
elif re.search(r"улиц|outdoors|street|парк|park", blob):
env.extend(["outdoors", "daylight"])
else:
env.extend(["simple_background", "soft_lighting"])
return {
"action_tags": ", ".join(dict.fromkeys(action)),
"outfit_tags": ", ".join(dict.fromkeys(outfit)),
"environment_tags": ", ".join(dict.fromkeys(env)),
}
def _parse_tags_json(raw: str) -> dict[str, str] | None:
try:
data = json.loads(strip_markdown_json(raw))
except json.JSONDecodeError:
return None
if not isinstance(data, dict):
return None
return {
"action_tags": str(data.get("action_tags") or "").strip(),
"outfit_tags": str(data.get("outfit_tags") or "").strip(),
"environment_tags": str(data.get("environment_tags") or "").strip(),
}
async def extract_scene_tags(
request: str,
messages: list[dict[str, str]] | None = None,
*,
appearance_tags: str = "",
) -> dict[str, Any]:
"""
action/outfit/environment в booru-тегах.
Возвращает dict с полями action_tags, outfit_tags, environment_tags, source.
"""
req = (request or "").strip()
if not req and messages:
for msg in reversed(messages):
if msg.get("role") == "user" and (msg.get("content") or "").strip():
req = str(msg["content"]).strip()
break
if looks_like_booru_tags(req):
parts = [p.strip() for p in req.split(",") if p.strip()]
return {
"action_tags": ", ".join(parts),
"outfit_tags": "",
"environment_tags": "simple_background, soft_lighting",
"source": "booru_literal",
}
fallback = rule_based_scene_tags(req, messages)
settings = get_settings()
extract_model = settings.memory_extract_model.strip() or None
excerpt = _chat_excerpt(messages or [])
user_block = f"Запрос на иллюстрацию:\n{req or '(не указан — выведи нейтральную позу)'}"
if appearance_tags.strip():
user_block += f"\n\nAppearance (НЕ повторять в action/outfit): {appearance_tags.strip()}"
if excerpt:
user_block += f"\n\nКонтекст чата:\n{excerpt}"
try:
llm = LLMClient()
result = await llm.complete(
[
{"role": "system", "content": SCENE_TAGS_PROMPT},
{"role": "user", "content": user_block},
],
temperature=0.2,
model=extract_model,
for_extraction=True,
)
parsed = _parse_tags_json(result.get("content") or "")
if parsed and parsed.get("action_tags"):
parsed["source"] = "llm"
if not parsed.get("environment_tags"):
parsed["environment_tags"] = fallback["environment_tags"]
return parsed
except Exception:
logger.exception("scene tag LLM extraction failed")
fallback["source"] = "rules"
return fallback
def looks_like_booru_tags(text: str) -> bool:
raw = (text or "").strip()
if not raw or len(raw) > 400:
return False
if raw.count(",") >= 2:
return True
return bool(re.search(r"\b\d+(girl|boy)s?\b", raw, re.I))