added RAG, Multiuser, TG bot
This commit is contained in:
@@ -118,6 +118,17 @@ def get_history(
|
||||
return FitnessService(db, user.id).get_history(days=days, end_day=end_day)
|
||||
|
||||
|
||||
@router.get("/fitness/charts")
|
||||
def get_charts(
|
||||
weeks: int = 52,
|
||||
trend: bool = True,
|
||||
end: str | None = None,
|
||||
db: Session = Depends(get_db), user: User = Depends(get_current_user),
|
||||
) -> dict[str, Any]:
|
||||
end_day = date.fromisoformat(end) if end else None
|
||||
return FitnessService(db, user.id).get_charts(weeks=weeks, trend=trend, end_day=end_day)
|
||||
|
||||
|
||||
@router.get("/fitness/profile")
|
||||
def get_profile(db: Session = Depends(get_db), user: User = Depends(get_current_user)) -> dict[str, Any]:
|
||||
profile = FitnessService(db, user.id).get_profile()
|
||||
|
||||
@@ -23,7 +23,7 @@ TOOLS_INSTRUCTIONS = """
|
||||
- В текстовых ответах пользователю не используй эмодзи.
|
||||
- Погода: get_weather или блок [Погода] в контексте; «что на улице» / «будет ли дождь» — не выдумывай.
|
||||
- Утренний брифинг (погода + новости) → get_morning_briefing.
|
||||
- Картинки: generate_image — «нарисуй себя» → draw_self=true; иначе scene_description на английском (booru-теги). Внешность из карточки персонажа. Не злоупотребляй.
|
||||
- Картинки: generate_image — «нарисуй себя» → draw_self=true (портрет по appearance_tags, без LLM); иначе scene_description на английском (booru-теги). Внешность из карточки персонажа. Не злоупотребляй.
|
||||
- Покупки: list_shopping_lists, create_shopping_list, add_shopping_items, check_shopping_item, remove_shopping_item, delete_shopping_list.
|
||||
- «Добавь в список покупок» → add_shopping_items (list_name + товары). «Что купить» → list_shopping_lists. Не выдумывай списки.
|
||||
- Напоминания: list_reminders, create_reminder, update_reminder, delete_reminder, complete_reminder.
|
||||
|
||||
@@ -16,6 +16,18 @@ def _format_time(seconds: int) -> str:
|
||||
return f"{minutes:02d}:{secs:02d}"
|
||||
|
||||
|
||||
def _format_image_generation_notice(data: dict[str, Any]) -> str:
|
||||
url = data.get("url", "")
|
||||
positive = (data.get("prompt") or "").strip()
|
||||
negative = (data.get("negative_prompt") or "").strip()
|
||||
lines = ["🎨 **Картинка готова**", "", f""]
|
||||
if positive:
|
||||
lines.extend(["", "**Comfy (+):**", f"```\n{positive}\n```"])
|
||||
if negative:
|
||||
lines.extend(["", "**Comfy (−):**", f"```\n{negative}\n```"])
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def format_phase_completed_notice(
|
||||
session: PomodoroSession,
|
||||
next_phase: str | None,
|
||||
@@ -253,8 +265,7 @@ def format_tool_notice(tool_name: str, raw_result: str) -> str | None:
|
||||
return f"💪 **Напоминание {r.get('kind')}** · {state}"
|
||||
|
||||
if tool_name == "generate_image" and data.get("ok"):
|
||||
url = data.get("url", "")
|
||||
return f"🎨 **Картинка готова**\n\n"
|
||||
return _format_image_generation_notice(data)
|
||||
|
||||
if tool_name == "create_shopping_list" and data.get("ok"):
|
||||
lst = data.get("list") or {}
|
||||
|
||||
@@ -0,0 +1,355 @@
|
||||
"""Weekly fitness chart data and least-squares trend lines."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections import defaultdict
|
||||
from datetime import date, datetime, timedelta, timezone
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.db.models import BodyMetric, FoodLog, StepLog, WaterLog
|
||||
|
||||
METRIC_DEFS: dict[str, dict[str, str]] = {
|
||||
"weight_kg": {"label": "Вес", "unit": "кг"},
|
||||
"body_fat_pct": {"label": "Жир", "unit": "%"},
|
||||
"calories": {"label": "Калории", "unit": "ккал/день"},
|
||||
"protein_g": {"label": "Белок", "unit": "г/день"},
|
||||
"water_l": {"label": "Вода", "unit": "л/день"},
|
||||
"steps": {"label": "Шаги", "unit": "шаг/день"},
|
||||
}
|
||||
|
||||
|
||||
def week_start(day: date) -> date:
|
||||
return day - timedelta(days=day.weekday())
|
||||
|
||||
|
||||
def linear_regression(points: list[tuple[float, float]]) -> dict[str, float] | None:
|
||||
"""Ordinary least squares y = slope * x + intercept."""
|
||||
n = len(points)
|
||||
if n < 2:
|
||||
return None
|
||||
sum_x = sum(x for x, _ in points)
|
||||
sum_y = sum(y for _, y in points)
|
||||
sum_xx = sum(x * x for x, _ in points)
|
||||
sum_xy = sum(x * y for x, y in points)
|
||||
denom = n * sum_xx - sum_x * sum_x
|
||||
if abs(denom) < 1e-12:
|
||||
return None
|
||||
slope = (n * sum_xy - sum_x * sum_y) / denom
|
||||
intercept = (sum_y - slope * sum_x) / n
|
||||
return {"slope": slope, "intercept": intercept}
|
||||
|
||||
|
||||
def _avg(values: list[float]) -> float | None:
|
||||
if not values:
|
||||
return None
|
||||
return sum(values) / len(values)
|
||||
|
||||
|
||||
def _last(values: list[tuple[date, float]]) -> float | None:
|
||||
if not values:
|
||||
return None
|
||||
values.sort(key=lambda item: item[0])
|
||||
return values[-1][1]
|
||||
|
||||
|
||||
def build_fitness_charts(
|
||||
db: Session,
|
||||
user_id: int,
|
||||
*,
|
||||
weeks: int = 52,
|
||||
trend: bool = True,
|
||||
end_day: date | None = None,
|
||||
) -> dict[str, Any]:
|
||||
weeks = max(4, min(int(weeks), 52))
|
||||
end = end_day or datetime.now(timezone.utc).date()
|
||||
last_week_start = week_start(end)
|
||||
first_week_start = last_week_start - timedelta(weeks=weeks - 1)
|
||||
|
||||
range_start = datetime.combine(first_week_start, datetime.min.time(), tzinfo=timezone.utc)
|
||||
range_end = datetime.combine(end, datetime.max.time(), tzinfo=timezone.utc)
|
||||
|
||||
daily: dict[date, dict[str, float]] = defaultdict(lambda: {
|
||||
"calories": 0.0,
|
||||
"protein_g": 0.0,
|
||||
"fat_g": 0.0,
|
||||
"carbs_g": 0.0,
|
||||
"water_ml": 0.0,
|
||||
"steps": 0.0,
|
||||
})
|
||||
daily_flags: dict[date, set[str]] = defaultdict(set)
|
||||
|
||||
foods = db.scalars(
|
||||
select(FoodLog).where(
|
||||
FoodLog.user_id == user_id,
|
||||
FoodLog.logged_at >= range_start,
|
||||
FoodLog.logged_at <= range_end,
|
||||
)
|
||||
).all()
|
||||
for row in foods:
|
||||
d = row.logged_at.date()
|
||||
daily[d]["calories"] += row.calories
|
||||
daily[d]["protein_g"] += row.protein_g
|
||||
daily[d]["fat_g"] += row.fat_g
|
||||
daily[d]["carbs_g"] += row.carbs_g
|
||||
daily_flags[d].add("nutrition")
|
||||
|
||||
waters = db.scalars(
|
||||
select(WaterLog).where(
|
||||
WaterLog.user_id == user_id,
|
||||
WaterLog.logged_at >= range_start,
|
||||
WaterLog.logged_at <= range_end,
|
||||
)
|
||||
).all()
|
||||
for row in waters:
|
||||
d = row.logged_at.date()
|
||||
daily[d]["water_ml"] += float(row.amount_ml)
|
||||
daily_flags[d].add("water")
|
||||
|
||||
steps_rows = db.scalars(
|
||||
select(StepLog).where(
|
||||
StepLog.user_id == user_id,
|
||||
StepLog.logged_at >= range_start,
|
||||
StepLog.logged_at <= range_end,
|
||||
)
|
||||
).all()
|
||||
for row in steps_rows:
|
||||
d = row.logged_at.date()
|
||||
daily[d]["steps"] += float(row.steps)
|
||||
daily_flags[d].add("steps")
|
||||
|
||||
body_rows = db.scalars(
|
||||
select(BodyMetric).where(
|
||||
BodyMetric.user_id == user_id,
|
||||
BodyMetric.recorded_at >= range_start,
|
||||
BodyMetric.recorded_at <= range_end,
|
||||
)
|
||||
).all()
|
||||
body_by_day: dict[date, list[tuple[date, float, float | None]]] = defaultdict(list)
|
||||
for row in body_rows:
|
||||
d = row.recorded_at.date()
|
||||
body_by_day[d].append((d, row.weight_kg, row.body_fat_pct))
|
||||
daily_flags[d].add("body")
|
||||
|
||||
week_slots: list[dict[str, Any]] = []
|
||||
cursor = first_week_start
|
||||
while cursor <= last_week_start:
|
||||
week_slots.append(
|
||||
{
|
||||
"week_start": cursor.isoformat(),
|
||||
"week_end": (cursor + timedelta(days=6)).isoformat(),
|
||||
}
|
||||
)
|
||||
cursor += timedelta(weeks=1)
|
||||
|
||||
days_with_data = len(daily_flags)
|
||||
weeks_with_data = 0
|
||||
|
||||
def rollup_week(metric: str) -> list[dict[str, Any]]:
|
||||
nonlocal weeks_with_data
|
||||
points: list[dict[str, Any]] = []
|
||||
local_weeks_with_data = 0
|
||||
|
||||
for idx, slot in enumerate(week_slots):
|
||||
ws = date.fromisoformat(slot["week_start"])
|
||||
we = date.fromisoformat(slot["week_end"])
|
||||
day_cursor = ws
|
||||
week_daily_values: list[float] = []
|
||||
week_body_weight: list[tuple[date, float]] = []
|
||||
week_body_fat: list[tuple[date, float]] = []
|
||||
|
||||
while day_cursor <= we:
|
||||
if day_cursor > end:
|
||||
break
|
||||
flags = daily_flags.get(day_cursor, set())
|
||||
totals = daily.get(day_cursor)
|
||||
if metric == "weight_kg":
|
||||
for _, w, _ in body_by_day.get(day_cursor, []):
|
||||
week_body_weight.append((day_cursor, w))
|
||||
elif metric == "body_fat_pct":
|
||||
for _, _, bf in body_by_day.get(day_cursor, []):
|
||||
if bf is not None:
|
||||
week_body_fat.append((day_cursor, bf))
|
||||
elif metric == "calories" and totals and "nutrition" in flags:
|
||||
week_daily_values.append(totals["calories"])
|
||||
elif metric == "protein_g" and totals and "nutrition" in flags:
|
||||
week_daily_values.append(totals["protein_g"])
|
||||
elif metric == "water_l" and totals and "water" in flags:
|
||||
week_daily_values.append(totals["water_ml"] / 1000.0)
|
||||
elif metric == "steps" and totals and "steps" in flags:
|
||||
week_daily_values.append(totals["steps"])
|
||||
day_cursor += timedelta(days=1)
|
||||
|
||||
value: float | None
|
||||
days_in_week = 0
|
||||
if metric == "weight_kg":
|
||||
value = _last(week_body_weight)
|
||||
days_in_week = len(week_body_weight)
|
||||
elif metric == "body_fat_pct":
|
||||
value = _last(week_body_fat)
|
||||
days_in_week = len(week_body_fat)
|
||||
else:
|
||||
value = _avg(week_daily_values)
|
||||
days_in_week = len(week_daily_values)
|
||||
|
||||
has_data = value is not None
|
||||
if has_data:
|
||||
local_weeks_with_data += 1
|
||||
|
||||
points.append(
|
||||
{
|
||||
"index": idx,
|
||||
"week_start": slot["week_start"],
|
||||
"week_end": slot["week_end"],
|
||||
"value": round(value, 2) if value is not None else None,
|
||||
"days_with_data": days_in_week,
|
||||
"has_data": has_data,
|
||||
}
|
||||
)
|
||||
|
||||
weeks_with_data = max(weeks_with_data, local_weeks_with_data)
|
||||
return points
|
||||
|
||||
series: dict[str, Any] = {}
|
||||
for key, meta in METRIC_DEFS.items():
|
||||
points = rollup_week(key)
|
||||
reg_points = [(float(p["index"]), float(p["value"])) for p in points if p["has_data"] and p["value"] is not None]
|
||||
trend_payload: dict[str, Any] | None = None
|
||||
if trend and len(reg_points) >= 2:
|
||||
fit = linear_regression(reg_points)
|
||||
if fit:
|
||||
line = [
|
||||
{
|
||||
"index": p["index"],
|
||||
"week_start": p["week_start"],
|
||||
"value": round(fit["slope"] * p["index"] + fit["intercept"], 2),
|
||||
}
|
||||
for p in points
|
||||
]
|
||||
trend_payload = {
|
||||
"slope_per_week": round(fit["slope"], 4),
|
||||
"intercept": round(fit["intercept"], 2),
|
||||
"points_with_data": len(reg_points),
|
||||
"line": line,
|
||||
}
|
||||
series[key] = {
|
||||
"key": key,
|
||||
"label": meta["label"],
|
||||
"unit": meta["unit"],
|
||||
"points": points,
|
||||
"trend": trend_payload,
|
||||
"data_points": sum(1 for p in points if p["has_data"]),
|
||||
}
|
||||
|
||||
use_daily = days_with_data > 0 and days_with_data <= 14 and weeks_with_data <= 2
|
||||
daily_series: dict[str, Any] | None = None
|
||||
if use_daily:
|
||||
daily_series = _build_daily_series(
|
||||
daily,
|
||||
daily_flags,
|
||||
body_by_day,
|
||||
end,
|
||||
trend=trend,
|
||||
lookback_days=min(30, max(days_with_data, 7)),
|
||||
)
|
||||
|
||||
return {
|
||||
"end_date": end.isoformat(),
|
||||
"weeks": weeks,
|
||||
"granularity": "day" if use_daily else "week",
|
||||
"first_week_start": first_week_start.isoformat(),
|
||||
"last_week_start": last_week_start.isoformat(),
|
||||
"days_with_data": days_with_data,
|
||||
"weeks_with_data": weeks_with_data,
|
||||
"series": series,
|
||||
"daily_series": daily_series,
|
||||
}
|
||||
|
||||
|
||||
def _build_daily_series(
|
||||
daily: dict[date, dict[str, float]],
|
||||
daily_flags: dict[date, set[str]],
|
||||
body_by_day: dict[date, list[tuple[date, float, float | None]]],
|
||||
end: date,
|
||||
*,
|
||||
trend: bool,
|
||||
lookback_days: int,
|
||||
) -> dict[str, Any]:
|
||||
start = end - timedelta(days=lookback_days - 1)
|
||||
day_points: list[date] = []
|
||||
cursor = start
|
||||
while cursor <= end:
|
||||
day_points.append(cursor)
|
||||
cursor += timedelta(days=1)
|
||||
|
||||
result: dict[str, Any] = {}
|
||||
for key, meta in METRIC_DEFS.items():
|
||||
points: list[dict[str, Any]] = []
|
||||
for idx, d in enumerate(day_points):
|
||||
value: float | None = None
|
||||
has_data = False
|
||||
if key == "weight_kg":
|
||||
body = body_by_day.get(d, [])
|
||||
pairs = [(x, w) for x, w, _ in body]
|
||||
value = _last(pairs) if pairs else None
|
||||
has_data = value is not None
|
||||
elif key == "body_fat_pct":
|
||||
fat_vals = [(x, bf) for x, _, bf in body_by_day.get(d, []) if bf is not None]
|
||||
value = _last(fat_vals) if fat_vals else None
|
||||
has_data = value is not None
|
||||
else:
|
||||
flags = daily_flags.get(d, set())
|
||||
totals = daily.get(d)
|
||||
if key == "calories" and totals and "nutrition" in flags:
|
||||
value = totals["calories"]
|
||||
has_data = True
|
||||
elif key == "protein_g" and totals and "nutrition" in flags:
|
||||
value = totals["protein_g"]
|
||||
has_data = True
|
||||
elif key == "water_l" and totals and "water" in flags:
|
||||
value = totals["water_ml"] / 1000.0
|
||||
has_data = True
|
||||
elif key == "steps" and totals and "steps" in flags:
|
||||
value = totals["steps"]
|
||||
has_data = True
|
||||
|
||||
points.append(
|
||||
{
|
||||
"index": idx,
|
||||
"date": d.isoformat(),
|
||||
"value": round(value, 2) if value is not None else None,
|
||||
"has_data": has_data,
|
||||
}
|
||||
)
|
||||
|
||||
reg_points = [(float(p["index"]), float(p["value"])) for p in points if p["has_data"] and p["value"] is not None]
|
||||
trend_payload: dict[str, Any] | None = None
|
||||
if trend and len(reg_points) >= 2:
|
||||
fit = linear_regression(reg_points)
|
||||
if fit:
|
||||
trend_payload = {
|
||||
"slope_per_day": round(fit["slope"], 4),
|
||||
"intercept": round(fit["intercept"], 2),
|
||||
"points_with_data": len(reg_points),
|
||||
"line": [
|
||||
{
|
||||
"index": p["index"],
|
||||
"date": p["date"],
|
||||
"value": round(fit["slope"] * p["index"] + fit["intercept"], 2),
|
||||
}
|
||||
for p in points
|
||||
],
|
||||
}
|
||||
|
||||
result[key] = {
|
||||
"key": key,
|
||||
"label": meta["label"],
|
||||
"unit": meta["unit"],
|
||||
"points": points,
|
||||
"trend": trend_payload,
|
||||
"data_points": sum(1 for p in points if p["has_data"]),
|
||||
}
|
||||
|
||||
return result
|
||||
@@ -625,6 +625,23 @@ class FitnessService:
|
||||
"reminders": self.list_reminders(),
|
||||
}
|
||||
|
||||
def get_charts(
|
||||
self,
|
||||
*,
|
||||
weeks: int = 52,
|
||||
trend: bool = True,
|
||||
end_day: date | None = None,
|
||||
) -> dict[str, Any]:
|
||||
from app.fitness.charts import build_fitness_charts
|
||||
|
||||
return build_fitness_charts(
|
||||
self.db,
|
||||
self.user_id,
|
||||
weeks=weeks,
|
||||
trend=trend,
|
||||
end_day=end_day,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _food_to_dict(row: FoodLog) -> dict[str, Any]:
|
||||
return {
|
||||
|
||||
@@ -0,0 +1,100 @@
|
||||
"""Сборка Anima-промптов без LLM (теги, без POV/hybrid)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
|
||||
ANIMA_QUALITY = "masterpiece, best quality, score_7, anime"
|
||||
ANIMA_NEGATIVE = "worst quality, low quality, score_1, score_2, score_3, blurry, jpeg artifacts, sepia"
|
||||
|
||||
_JUNK_STANDALONE_TAGS = frozenset({
|
||||
"white", "black", "skin", "ear", "ears", "girl", "boy", "fox", "wolf", "cat",
|
||||
"short", "tall", "golden", "silver", "red", "blue", "green", "purple",
|
||||
"pink", "brown", "eye", "eyes", "hair",
|
||||
})
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class AnimaPromptBundle:
|
||||
positive: str
|
||||
negative: str
|
||||
|
||||
|
||||
def _sanitize_tags(tag_str: str) -> str:
|
||||
if not tag_str:
|
||||
return ""
|
||||
out: list[str] = []
|
||||
seen: set[str] = set()
|
||||
for raw in tag_str.split(","):
|
||||
t = raw.strip()
|
||||
if not t:
|
||||
continue
|
||||
key = t.lower().replace(" ", "_")
|
||||
if key in seen or len(key) <= 2:
|
||||
continue
|
||||
if "_" not in key and key in _JUNK_STANDALONE_TAGS:
|
||||
continue
|
||||
seen.add(key)
|
||||
out.append(t if "_" in t else key)
|
||||
return ", ".join(out)
|
||||
|
||||
|
||||
def _append_lora(parts: list[str], lora_name: str, lora_weight: float) -> None:
|
||||
lora = (lora_name or "").strip()
|
||||
if not lora:
|
||||
return
|
||||
weight = lora_weight if lora_weight > 0 else 0.8
|
||||
parts.append(f"<lora:{lora}:{weight}>")
|
||||
|
||||
|
||||
def build_draw_self_prompt(
|
||||
appearance_tags: str,
|
||||
*,
|
||||
lora_name: str = "",
|
||||
lora_weight: float = 0.8,
|
||||
) -> AnimaPromptBundle:
|
||||
"""Портрет «нарисуй себя» — только booru-теги, без POV и prose."""
|
||||
appearance = _sanitize_tags(appearance_tags)
|
||||
action = "looking_at_viewer, smile, upper_body, portrait"
|
||||
environment = "simple_background, soft_lighting"
|
||||
|
||||
parts = [ANIMA_QUALITY]
|
||||
if appearance:
|
||||
parts.append(appearance)
|
||||
parts.append(action)
|
||||
parts.append(environment)
|
||||
_append_lora(parts, lora_name, lora_weight)
|
||||
|
||||
positive = ", ".join(p.strip() for p in parts if p.strip())
|
||||
return AnimaPromptBundle(positive=positive, negative=ANIMA_NEGATIVE)
|
||||
|
||||
|
||||
def build_scene_tags_prompt(
|
||||
scene_tags: str,
|
||||
appearance_tags: str,
|
||||
*,
|
||||
lora_name: str = "",
|
||||
lora_weight: float = 0.8,
|
||||
) -> AnimaPromptBundle:
|
||||
"""Прямая сцена из booru-тегов (без LLM)."""
|
||||
appearance = _sanitize_tags(appearance_tags)
|
||||
scene = _sanitize_tags(scene_tags)
|
||||
parts = [ANIMA_QUALITY]
|
||||
if appearance:
|
||||
parts.append(appearance)
|
||||
if scene:
|
||||
parts.append(scene)
|
||||
_append_lora(parts, lora_name, lora_weight)
|
||||
positive = ", ".join(p.strip() for p in parts if p.strip())
|
||||
return AnimaPromptBundle(positive=positive, negative=ANIMA_NEGATIVE)
|
||||
|
||||
|
||||
def looks_like_booru_tags(text: str) -> bool:
|
||||
"""Грубая эвристика: строка похожа на теги, а не на прозу."""
|
||||
raw = (text or "").strip()
|
||||
if not raw or len(raw) > 400:
|
||||
return False
|
||||
if raw.count(",") >= 2:
|
||||
return True
|
||||
return bool(re.search(r"\b\d+(girl|boy)s?\b", raw, re.I))
|
||||
@@ -267,6 +267,7 @@ class ComfyUIClient:
|
||||
"filename": filename,
|
||||
"url": f"/api/v1/media/generated/{filename}",
|
||||
"prompt": positive,
|
||||
"negative_prompt": negative,
|
||||
"backend": "anima" if _use_anima(self.settings) else "checkpoint",
|
||||
}
|
||||
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.character.service import CharacterService
|
||||
from app.config import get_settings
|
||||
from app.db.models import Message
|
||||
from app.homelab.anima_prompt import AnimaPromptBundle, build_draw_self_prompt, build_scene_tags_prompt, looks_like_booru_tags
|
||||
from app.homelab.comfyui import ComfyUIClient
|
||||
from app.integrations.rp_chat import RpChatClient
|
||||
|
||||
@@ -17,6 +16,10 @@ def _card_image_settings(db: Session, user_id: int) -> dict[str, Any]:
|
||||
def _session_messages(db: Session, session_id: int | None, limit: int = 8) -> list[dict[str, str]]:
|
||||
if not session_id:
|
||||
return []
|
||||
from sqlalchemy import select
|
||||
|
||||
from app.db.models import Message
|
||||
|
||||
rows = db.scalars(
|
||||
select(Message)
|
||||
.where(
|
||||
@@ -36,6 +39,43 @@ def _append_lora(positive: str, lora_name: str, lora_weight: float) -> str:
|
||||
return f"{positive} <lora:{lora_name}:{lora_weight}>"
|
||||
|
||||
|
||||
async def _generate_from_bundle(
|
||||
bundle: AnimaPromptBundle,
|
||||
*,
|
||||
backend: str,
|
||||
persona_id: str = "",
|
||||
) -> dict[str, Any]:
|
||||
settings = get_settings()
|
||||
if backend == "rp_chat":
|
||||
client = RpChatClient()
|
||||
gen_result = await client.generate(bundle.positive, bundle.negative)
|
||||
if not gen_result.get("ok"):
|
||||
return gen_result
|
||||
saved = await client.save_image_locally(gen_result["image_path"])
|
||||
if not saved.get("ok"):
|
||||
return saved
|
||||
return {
|
||||
"ok": True,
|
||||
"url": saved["url"],
|
||||
"filename": saved["filename"],
|
||||
"prompt": bundle.positive,
|
||||
"negative_prompt": bundle.negative,
|
||||
"backend": "rp_chat",
|
||||
"persona_id": persona_id,
|
||||
"prompt_mode": "direct",
|
||||
}
|
||||
|
||||
result = await ComfyUIClient().generate_image(
|
||||
bundle.positive,
|
||||
negative_prompt=bundle.negative,
|
||||
)
|
||||
if result.get("ok"):
|
||||
result["backend"] = "comfyui_local"
|
||||
result["prompt_mode"] = "direct"
|
||||
result["negative_prompt"] = bundle.negative
|
||||
return result
|
||||
|
||||
|
||||
async def generate_image(
|
||||
db: Session,
|
||||
*,
|
||||
@@ -54,25 +94,49 @@ async def generate_image(
|
||||
return {"ok": False, "error": "Нужен draw_self=true или scene_description"}
|
||||
|
||||
appearance = (card.get("appearance_tags") or "").strip()
|
||||
if draw_self and not appearance:
|
||||
return {
|
||||
"ok": False,
|
||||
"error": "Заполни appearance_tags в настройках персонажа для «нарисуй себя»",
|
||||
}
|
||||
lora_name = (card.get("lora_name") or "").strip()
|
||||
lora_weight = float(card.get("lora_weight") or 0.8)
|
||||
persona_id = (card.get("rp_persona_id") or "").strip() or "default"
|
||||
backend = "rp_chat" if settings.rp_chat_enabled else "comfyui_local"
|
||||
|
||||
if draw_self:
|
||||
if not appearance:
|
||||
return {
|
||||
"ok": False,
|
||||
"error": "Заполни appearance_tags в настройках персонажа для «нарисуй себя»",
|
||||
}
|
||||
bundle = build_draw_self_prompt(
|
||||
appearance,
|
||||
lora_name=lora_name,
|
||||
lora_weight=lora_weight,
|
||||
)
|
||||
return await _generate_from_bundle(bundle, backend=backend, persona_id=persona_id)
|
||||
|
||||
scene = scene_description.strip()
|
||||
if looks_like_booru_tags(scene):
|
||||
if not appearance:
|
||||
bundle = build_scene_tags_prompt(scene, "", lora_name=lora_name, lora_weight=lora_weight)
|
||||
else:
|
||||
bundle = build_scene_tags_prompt(
|
||||
scene,
|
||||
appearance,
|
||||
lora_name=lora_name,
|
||||
lora_weight=lora_weight,
|
||||
)
|
||||
return await _generate_from_bundle(bundle, backend=backend, persona_id=persona_id)
|
||||
|
||||
messages = _session_messages(db, session_id)
|
||||
if scene_description.strip():
|
||||
messages = messages + [{"role": "user", "content": scene_description.strip()}]
|
||||
elif draw_self and messages:
|
||||
messages = messages + [{"role": "user", "content": "Illustrate the current scene with the character."}]
|
||||
elif draw_self:
|
||||
messages = [{"role": "user", "content": "Portrait of the character, looking at viewer, friendly expression."}]
|
||||
messages = messages + [{"role": "user", "content": scene}]
|
||||
|
||||
if settings.rp_chat_enabled:
|
||||
appearance_override = (card.get("appearance_tags") or "").strip() or None
|
||||
return await _generate_via_rp_chat(card, messages, appearance_override)
|
||||
return await _generate_via_rp_chat(
|
||||
card,
|
||||
messages,
|
||||
appearance_override=appearance or None,
|
||||
)
|
||||
|
||||
return await _generate_via_local_comfy(scene_description or "anime character portrait")
|
||||
fallback = f"{appearance}, {scene}" if appearance else scene
|
||||
return await ComfyUIClient().generate_image(fallback)
|
||||
|
||||
|
||||
async def _generate_via_rp_chat(
|
||||
@@ -119,13 +183,8 @@ async def _generate_via_rp_chat(
|
||||
"url": saved["url"],
|
||||
"filename": saved["filename"],
|
||||
"prompt": positive,
|
||||
"negative_prompt": negative,
|
||||
"backend": "rp_chat",
|
||||
"persona_id": persona_id,
|
||||
"prompt_mode": "llm",
|
||||
}
|
||||
|
||||
|
||||
async def _generate_via_local_comfy(prompt: str) -> dict[str, Any]:
|
||||
result = await ComfyUIClient().generate_image(prompt)
|
||||
if result.get("ok"):
|
||||
result["backend"] = "comfyui_local"
|
||||
return result
|
||||
|
||||
@@ -30,6 +30,6 @@
|
||||
- add_shopping_items, list_shopping_lists, check_shopping_item
|
||||
|
||||
Картинки:
|
||||
- «Нарисуй себя» → generate_image с draw_self=true
|
||||
- Другая сцена → generate_image с scene_description на английском (booru-теги)
|
||||
- «Нарисуй себя» → generate_image с draw_self=true (портрет по appearance_tags, LLM sd-prompt не нужен)
|
||||
- Другая сцена → generate_image с scene_description на английском (booru-теги; если теги — тоже без LLM)
|
||||
- Внешность персонажа задаётся в настройках карточки, не выдумывай теги
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
from app.homelab.anima_prompt import (
|
||||
build_draw_self_prompt,
|
||||
build_scene_tags_prompt,
|
||||
looks_like_booru_tags,
|
||||
)
|
||||
|
||||
|
||||
def test_build_draw_self_prompt_includes_appearance():
|
||||
bundle = build_draw_self_prompt("silver_hair, wolf_ears, blue_eyes")
|
||||
assert "silver_hair" in bundle.positive
|
||||
assert "wolf_ears" in bundle.positive
|
||||
assert "looking_at_viewer" in bundle.positive
|
||||
assert "POV:" not in bundle.positive
|
||||
assert ". " not in bundle.positive
|
||||
assert "worst quality" in bundle.negative
|
||||
|
||||
|
||||
def test_build_draw_self_prompt_lora():
|
||||
bundle = build_draw_self_prompt("1girl", lora_name="rin_lora", lora_weight=0.75)
|
||||
assert "<lora:rin_lora:0.75>" in bundle.positive
|
||||
|
||||
|
||||
def test_looks_like_booru_tags():
|
||||
assert looks_like_booru_tags("1girl, smile, indoors")
|
||||
assert not looks_like_booru_tags("draw a picture of a cat on the moon")
|
||||
@@ -0,0 +1,20 @@
|
||||
from datetime import date
|
||||
|
||||
from app.fitness.charts import linear_regression, week_start
|
||||
|
||||
|
||||
def test_week_start_monday():
|
||||
assert week_start(date(2026, 6, 13)) == date(2026, 6, 8)
|
||||
|
||||
|
||||
def test_linear_regression_increasing():
|
||||
points = [(0.0, 1.0), (1.0, 2.0), (2.0, 3.0)]
|
||||
fit = linear_regression(points)
|
||||
assert fit is not None
|
||||
assert abs(fit["slope"] - 1.0) < 1e-9
|
||||
assert abs(fit["intercept"] - 1.0) < 1e-9
|
||||
|
||||
|
||||
def test_linear_regression_requires_two_points():
|
||||
assert linear_regression([(0.0, 5.0)]) is None
|
||||
assert linear_regression([]) is None
|
||||
Reference in New Issue
Block a user