import json import logging import re from typing import Any from sqlalchemy.orm import Session from app.config import get_settings from app.llm.client import LLMClient from app.memory.service import MemoryService from app.projects.structuring import strip_markdown_json logger = logging.getLogger(__name__) SKIP_USER_PATTERN = re.compile( r"^(ок|ok|да|нет|спасибо|thanks|\.{1,3}|👍|\+1)$", re.IGNORECASE, ) EXTRACTION_PROMPT = """ Ты извлекаешь долгосрочные факты о пользователе из фрагмента диалога. Ответь ТОЛЬКО JSON без markdown. Схема: { "facts": [ {"content": "текст факта", "category": "preference|person|habit|project|fact", "importance": 1} ], "profile": {"name": "", "age": "", "timezone": "", "notes": ""} } Правила: - Сохраняй устойчивое: имя, возраст, предпочтения, привычки, проекты, семья, работа. - НЕ сохраняй: статус помидоро, погоду, разовые команды, ролевую игру, выдумки ассистента. - profile — только поля с новыми значениями (пустые строки не включай). - facts — короткие утверждения от первого лица пользователя («люблю кофе», «меня зовут …»). - Если нечего сохранять — {"facts": [], "profile": {}}. - Не дублируй уже известное (см. текущий профиль и факты ниже). - importance: 5 критично (имя), 4 важно, 3 обычно, 2 мелочь. """.strip() def _should_skip_extraction(user_text: str) -> bool: text = user_text.strip() if len(text) < 4: return True if SKIP_USER_PATTERN.match(text): return True return False async def _call_extractor( user_text: str, assistant_text: str, snapshot: dict[str, Any], ) -> dict[str, Any]: profile = snapshot.get("profile") or {} facts = snapshot.get("facts") or [] known = [ f"Профиль: {json.dumps(profile, ensure_ascii=False)}", "Факты:", *[f"- {f.get('content')}" for f in facts[:30]], ] settings = get_settings() extract_model = settings.memory_extract_model.strip() or None llm = LLMClient() result = await llm.complete( [ {"role": "system", "content": EXTRACTION_PROMPT}, { "role": "user", "content": ( "\n".join(known) + "\n\n---\nДиалог:\nПользователь: " + user_text + "\nАссистент: " + assistant_text[:1500] ), }, ], temperature=0.2, model=extract_model, ) raw = strip_markdown_json(result.get("content") or "") if not raw: return {"facts": [], "profile": {}} parsed = json.loads(raw) if not isinstance(parsed, dict): return {"facts": [], "profile": {}} return parsed async def extract_after_turn( db: Session, session_id: int, user_text: str, assistant_text: str, *, force: bool = False, ) -> dict[str, Any]: if not force and _should_skip_extraction(user_text): return {"ok": True, "skipped": "short_message", "saved": []} if not (assistant_text or "").strip(): return {"ok": True, "skipped": "no_assistant_reply", "saved": []} memory = MemoryService(db) snapshot = memory.snapshot(session_id) try: parsed = await _call_extractor(user_text, assistant_text, snapshot) except (json.JSONDecodeError, Exception) as exc: logger.warning("Memory extraction failed: %s", exc) return {"ok": False, "error": str(exc), "saved": []} saved: list[dict[str, Any]] = [] profile_updates = parsed.get("profile") or {} if isinstance(profile_updates, dict): filtered = { k: str(v).strip() for k, v in profile_updates.items() if v and str(v).strip() } if filtered: memory.update_profile(filtered) saved.append({"type": "profile", "updates": filtered}) facts = parsed.get("facts") or [] if isinstance(facts, list): for item in facts: if not isinstance(item, dict): continue content = (item.get("content") or "").strip() if not content or len(content) < 3: continue try: result = memory.remember_fact( content, category=str(item.get("category") or "fact")[:64], importance=int(item.get("importance") or 3), session_id=session_id, source="auto", ) saved.append({"type": "fact", **result}) except ValueError: continue return {"ok": True, "saved": saved, "count": len(saved)}