131 lines
4.3 KiB
Python
131 lines
4.3 KiB
Python
from typing import Any
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.character.service import CharacterService
|
|
from app.config import get_settings
|
|
from app.db.models import Message
|
|
from app.homelab.comfyui import ComfyUIClient
|
|
from app.integrations.rp_chat import RpChatClient
|
|
|
|
|
|
def _card_image_settings() -> dict[str, Any]:
|
|
return CharacterService().get_card().get("data", {})
|
|
|
|
|
|
def _session_messages(db: Session, session_id: int | None, limit: int = 8) -> list[dict[str, str]]:
|
|
if not session_id:
|
|
return []
|
|
rows = db.scalars(
|
|
select(Message)
|
|
.where(
|
|
Message.session_id == session_id,
|
|
Message.role.in_(("user", "assistant")),
|
|
)
|
|
.order_by(Message.created_at.desc())
|
|
.limit(limit)
|
|
).all()
|
|
rows = list(reversed(rows))
|
|
return [{"role": m.role, "content": (m.content or "").strip()} for m in rows if m.content.strip()]
|
|
|
|
|
|
def _append_lora(positive: str, lora_name: str, lora_weight: float) -> str:
|
|
if not lora_name or f"<lora:{lora_name}" in positive:
|
|
return positive
|
|
return f"{positive} <lora:{lora_name}:{lora_weight}>"
|
|
|
|
|
|
async def generate_image(
|
|
db: Session,
|
|
*,
|
|
session_id: int | None = None,
|
|
draw_self: bool = False,
|
|
scene_description: str = "",
|
|
) -> dict[str, Any]:
|
|
card = _card_image_settings()
|
|
settings = get_settings()
|
|
|
|
if not card.get("sd_enabled", True):
|
|
return {"ok": False, "error": "Генерация изображений отключена в настройках персонажа"}
|
|
|
|
if not draw_self and not scene_description.strip():
|
|
return {"ok": False, "error": "Нужен draw_self=true или scene_description"}
|
|
|
|
appearance = (card.get("appearance_tags") or "").strip()
|
|
if draw_self and not appearance:
|
|
return {
|
|
"ok": False,
|
|
"error": "Заполни appearance_tags в настройках персонажа для «нарисуй себя»",
|
|
}
|
|
|
|
messages = _session_messages(db, session_id)
|
|
if scene_description.strip():
|
|
messages = messages + [{"role": "user", "content": scene_description.strip()}]
|
|
elif draw_self and messages:
|
|
messages = messages + [{"role": "user", "content": "Illustrate the current scene with the character."}]
|
|
elif draw_self:
|
|
messages = [{"role": "user", "content": "Portrait of the character, looking at viewer, friendly expression."}]
|
|
|
|
if settings.rp_chat_enabled:
|
|
appearance_override = (card.get("appearance_tags") or "").strip() or None
|
|
return await _generate_via_rp_chat(card, messages, appearance_override)
|
|
|
|
return await _generate_via_local_comfy(scene_description or "anime character portrait")
|
|
|
|
|
|
async def _generate_via_rp_chat(
|
|
card: dict[str, Any],
|
|
messages: list[dict[str, str]],
|
|
appearance_override: str | None,
|
|
) -> dict[str, Any]:
|
|
client = RpChatClient()
|
|
persona_id = (card.get("rp_persona_id") or "").strip() or "default"
|
|
override = appearance_override or (card.get("appearance_tags") or "").strip() or None
|
|
|
|
prompt_result = await client.sd_prompt(
|
|
persona_id,
|
|
messages,
|
|
appearance_override=override,
|
|
)
|
|
if not prompt_result.get("ok"):
|
|
return prompt_result
|
|
|
|
positive = (
|
|
prompt_result.get("hybrid_positive")
|
|
or prompt_result.get("tag_positive")
|
|
or ""
|
|
).strip()
|
|
negative = (prompt_result.get("negative") or "").strip()
|
|
if not positive:
|
|
return {"ok": False, "error": "RP-чат не вернул промпт", "raw": prompt_result}
|
|
|
|
lora = (card.get("lora_name") or "").strip()
|
|
if lora:
|
|
weight = float(card.get("lora_weight") or 0.8)
|
|
positive = _append_lora(positive, lora, weight)
|
|
|
|
gen_result = await client.generate(positive, negative)
|
|
if not gen_result.get("ok"):
|
|
return gen_result
|
|
|
|
saved = await client.save_image_locally(gen_result["image_path"])
|
|
if not saved.get("ok"):
|
|
return saved
|
|
|
|
return {
|
|
"ok": True,
|
|
"url": saved["url"],
|
|
"filename": saved["filename"],
|
|
"prompt": positive,
|
|
"backend": "rp_chat",
|
|
"persona_id": persona_id,
|
|
}
|
|
|
|
|
|
async def _generate_via_local_comfy(prompt: str) -> dict[str, Any]:
|
|
result = await ComfyUIClient().generate_image(prompt)
|
|
if result.get("ok"):
|
|
result["backend"] = "comfyui_local"
|
|
return result
|