Files
Home_assistant/backend/app/memory/extract.py
T
2026-06-10 13:06:44 +03:00

152 lines
5.1 KiB
Python

import json
import logging
import re
from typing import Any
from sqlalchemy.orm import Session
from app.config import get_settings
from app.llm.client import LLMClient
from app.memory.service import MemoryService
from app.projects.structuring import strip_markdown_json
logger = logging.getLogger(__name__)
SKIP_USER_PATTERN = re.compile(
r"^(ок|ok|да|нет|спасибо|thanks|\.{1,3}|👍|\+1)$",
re.IGNORECASE,
)
EXTRACTION_PROMPT = """
Ты извлекаешь долгосрочные факты о пользователе из фрагмента диалога.
Ответь ТОЛЬКО JSON без markdown.
Схема:
{
"facts": [
{"content": "текст факта", "category": "preference|person|habit|project|fact", "importance": 1}
],
"profile": {"name": "", "age": "", "timezone": "", "notes": ""}
}
Правила:
- Сохраняй устойчивое: имя, возраст, предпочтения, привычки, проекты, семья, работа.
- НЕ сохраняй: статус помидоро, погоду, разовые команды, ролевую игру, выдумки ассистента.
- profile — только поля с новыми значениями (пустые строки не включай).
- facts — короткие утверждения от первого лица пользователя («люблю кофе», «меня зовут …»).
- Если нечего сохранять — {"facts": [], "profile": {}}.
- Не дублируй уже известное (см. текущий профиль и факты ниже).
- importance: 5 критично (имя), 4 важно, 3 обычно, 2 мелочь.
""".strip()
def _should_skip_extraction(user_text: str) -> bool:
text = user_text.strip()
if len(text) < 4:
return True
if SKIP_USER_PATTERN.match(text):
return True
return False
async def _call_extractor(
user_text: str,
assistant_text: str,
snapshot: dict[str, Any],
) -> dict[str, Any]:
profile = snapshot.get("profile") or {}
facts = snapshot.get("facts") or []
known = [
f"Профиль: {json.dumps(profile, ensure_ascii=False)}",
"Факты:",
*[f"- {f.get('content')}" for f in facts[:30]],
]
settings = get_settings()
extract_model = settings.memory_extract_model.strip() or None
llm = LLMClient()
result = await llm.complete(
[
{"role": "system", "content": EXTRACTION_PROMPT},
{
"role": "user",
"content": (
"\n".join(known)
+ "\n\n---\nДиалог:\nПользователь: "
+ user_text
+ "\nАссистент: "
+ assistant_text[:1500]
),
},
],
temperature=0.2,
model=extract_model,
)
raw = strip_markdown_json(result.get("content") or "")
if not raw:
return {"facts": [], "profile": {}}
parsed = json.loads(raw)
if not isinstance(parsed, dict):
return {"facts": [], "profile": {}}
return parsed
async def extract_after_turn(
db: Session,
session_id: int,
user_text: str,
assistant_text: str,
*,
force: bool = False,
) -> dict[str, Any]:
if not force and _should_skip_extraction(user_text):
return {"ok": True, "skipped": "short_message", "saved": []}
if not (assistant_text or "").strip():
return {"ok": True, "skipped": "no_assistant_reply", "saved": []}
memory = MemoryService(db)
snapshot = memory.snapshot(session_id)
try:
parsed = await _call_extractor(user_text, assistant_text, snapshot)
except (json.JSONDecodeError, Exception) as exc:
logger.warning("Memory extraction failed: %s", exc)
return {"ok": False, "error": str(exc), "saved": []}
saved: list[dict[str, Any]] = []
profile_updates = parsed.get("profile") or {}
if isinstance(profile_updates, dict):
filtered = {
k: str(v).strip()
for k, v in profile_updates.items()
if v and str(v).strip()
}
if filtered:
memory.update_profile(filtered)
saved.append({"type": "profile", "updates": filtered})
facts = parsed.get("facts") or []
if isinstance(facts, list):
for item in facts:
if not isinstance(item, dict):
continue
content = (item.get("content") or "").strip()
if not content or len(content) < 3:
continue
try:
result = memory.remember_fact(
content,
category=str(item.get("category") or "fact")[:64],
importance=int(item.get("importance") or 3),
session_id=session_id,
source="auto",
)
saved.append({"type": "fact", **result})
except ValueError:
continue
return {"ok": True, "saved": saved, "count": len(saved)}