Files
2026-06-05 14:57:15 +03:00

423 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import logging
import os
import re
from services.llm import LLMError, send_message_with_model, send_message
logger = logging.getLogger(__name__)
FACTS_MODEL = os.getenv("RPG_FACTS_MODEL", "").strip() or "deepseek/deepseek-chat-v3"
FACTS_STORE_LIMIT = int(os.getenv("FACTS_STORE_LIMIT", "100"))
FACTS_PROMPT_MAX = int(os.getenv("FACTS_PROMPT_MAX", "40"))
FACTS_DEDUP_THRESHOLD = int(os.getenv("FACTS_DEDUP_THRESHOLD", "30"))
FACTS_COMPRESS_TARGET = int(os.getenv("FACTS_COMPRESS_TARGET", "22"))
FACTS_SYSTEM = """Extract NEW stable facts from the conversation.
Return ONLY valid JSON (no markdown), as an array of objects:
[{"text": "short durable fact", "rp_day": "when this became true in story time"}]
Rules:
- Return at most 5 NEW facts per turn. If nothing new, return [].
- Do NOT repeat or rephrase facts already listed under "Already known".
- Facts must be DURABLE world/character state only:
traits, relationships, inventory, locations, secrets revealed, lasting abilities/rules.
- NEVER store plot events or scene narration (no "they went", "they decided", "they hugged",
"they started a new arc", "they stepped through the portal").
- Skip momentary emotions unless they permanently change a relationship.
- text <= 120 chars each.
- rp_day: in-world time label (день 1, второй день, та же ночь, через год). Use RP time hint when unclear."""
FACTS_COMPRESS_SYSTEM = """You consolidate RPG session memory for a long-running chat.
Return ONLY valid JSON (no markdown): an array of {{"text": "...", "rp_day": "..."}}.
Goals:
- Use ONLY information from the input facts. NEVER invent or infer new facts.
- Aggressively MERGE near-duplicates (same topic in RU/EN, Rin/Рин, Grigo/Григорий).
- Keep ONE best fact per topic; combine rp_day if needed (e.g. "день 12").
- DROP redundant, trivial, superseded, and ALL one-off narrative/event facts.
- DROP facts that describe a single scene action (went, decided, hugged, called for help, stepped into portal).
- KEEP durable state only: names, nicknames, relationships, inventory, home items, locations,
lasting abilities, secrets/identity, unresolved mysteries.
- Target at most {target} facts (fewer is better). Each text <= 120 chars.
- rp_day = in-world labels only."""
_NARRATIVE_EVENT_RE = re.compile(
r"(?:"
r"отправил(?:ись|а|и)?|решили|начали|обнялись|шагнули|вызвали|стали ближе|"
r"передали|раскрыла|начали новую арку|вместе шагнули|тактическ(?:ое|и) отступлени|"
r"went to|decided to|hugged|stepped through|called for help|started a new arc"
r")",
re.IGNORECASE,
)
_NAME_ALIASES = (
("grigoriy", "григорий"),
("grigo", "григо"),
("grigory", "григорий"),
("rin", "рин"),
("player", "игрок"),
("user", "игрок"),
("glade", "полян"),
("flowers", "цвет"),
("flower", "цвет"),
("magical", "волшеб"),
("magic", "волшеб"),
("glow", "свет"),
("glowing", "свет"),
)
def parse_fact_entry(raw) -> dict | None:
if isinstance(raw, dict):
text = (raw.get("text") or raw.get("fact") or "").strip()
rp_day = (raw.get("rp_day") or raw.get("learned") or raw.get("day") or "").strip()
elif isinstance(raw, str):
text = raw.strip()
rp_day = ""
else:
return None
if not text:
return None
return {"text": text[:120], "rp_day": rp_day[:80]}
def parse_facts_list(facts_json: str | None) -> list[dict]:
try:
data = json.loads(facts_json or "[]")
except json.JSONDecodeError:
return []
if not isinstance(data, list):
return []
out: list[dict] = []
seen: set[str] = set()
for item in data:
entry = parse_fact_entry(item)
if not entry:
continue
key = entry["text"].lower()
if key in seen:
continue
seen.add(key)
out.append(entry)
return out
def facts_list_to_json(facts: list[dict]) -> str:
return json.dumps(
[{"text": f["text"], "rp_day": f.get("rp_day", "")} for f in facts],
ensure_ascii=False,
)
def rp_day_from_scene(scene_json: str | None) -> str:
try:
scene = json.loads(scene_json or "{}")
if isinstance(scene, dict):
day = (scene.get("day") or "").strip()
if day:
return day[:80]
except (json.JSONDecodeError, TypeError):
pass
return ""
def _normalize_fact_text(text: str) -> str:
t = (text or "").lower()
for a, b in _NAME_ALIASES:
t = t.replace(a, b)
t = re.sub(r"[^\w\s]", " ", t, flags=re.UNICODE)
return re.sub(r"\s+", " ", t).strip()
def _fact_tokens(text: str) -> set[str]:
words = re.findall(r"[\w]+", _normalize_fact_text(text), flags=re.UNICODE)
stop = {"the", "and", "that", "with", "this", "have", "has", "was", "are", "для", "что", "как", "это", "на", "в", "и", "а"}
return {w for w in words if len(w) > 2 and w not in stop}
def facts_are_similar(a: str, b: str) -> bool:
al, bl = a.lower().strip(), b.lower().strip()
if al == bl:
return True
shorter, longer = (al, bl) if len(al) <= len(bl) else (bl, al)
if shorter in longer and len(shorter) / max(len(longer), 1) >= 0.35:
return True
ta, tb = _fact_tokens(a), _fact_tokens(b)
if not ta or not tb:
return False
overlap = len(ta & tb) / len(ta | tb)
return overlap >= 0.32
def is_likely_narrative_event(text: str) -> bool:
"""One-off scene actions — not durable memory."""
t = (text or "").strip()
if not t:
return True
if _NARRATIVE_EVENT_RE.search(t):
return True
if "новую арку" in t.lower() or "new arc" in t.lower():
return True
return False
def filter_durable_facts(facts: list[dict]) -> list[dict]:
return [f for f in facts if not is_likely_narrative_event(f.get("text", ""))]
def validate_compressed_against_source(
original: list[dict], compressed: list[dict]
) -> list[dict]:
"""Reject LLM-hallucinated facts not grounded in the input list."""
if not compressed:
return []
out: list[dict] = []
for c in compressed:
text = (c.get("text") or "").strip()
if not text or is_likely_narrative_event(text):
continue
if any(facts_are_similar(text, o.get("text", "")) for o in original):
out.append(c)
return out
def dedupe_facts_fuzzy(facts: list[dict]) -> list[dict]:
out: list[dict] = []
for f in facts:
placed = False
for i, existing in enumerate(out):
if facts_are_similar(f["text"], existing["text"]):
if len(f["text"]) > len(existing["text"]):
out[i]["text"] = f["text"][:120]
if f.get("rp_day") and not existing.get("rp_day"):
out[i]["rp_day"] = f["rp_day"]
placed = True
break
if not placed:
out.append(dict(f))
return out
def merge_facts(
existing_json: str,
new_facts: list,
*,
rp_day_default: str = "",
) -> str:
merged = parse_facts_list(existing_json)
seen = {f["text"].lower() for f in merged}
default_day = (rp_day_default or "").strip()[:80]
for raw in new_facts or []:
entry = parse_fact_entry(raw)
if not entry:
continue
if not entry["rp_day"] and default_day:
entry["rp_day"] = default_day
key = entry["text"].lower()
if key in seen:
for i, existing in enumerate(merged):
if existing["text"].lower() == key:
if entry["rp_day"] and not existing.get("rp_day"):
merged[i]["rp_day"] = entry["rp_day"]
break
continue
dup = False
for i, existing in enumerate(merged):
if facts_are_similar(entry["text"], existing["text"]):
if len(entry["text"]) > len(existing["text"]):
merged[i]["text"] = entry["text"]
if entry["rp_day"] and not existing.get("rp_day"):
merged[i]["rp_day"] = entry["rp_day"]
dup = True
break
if dup:
continue
seen.add(key)
merged.append(entry)
return facts_list_to_json(merged)
async def compress_facts(
facts: list[dict],
*,
scene_context: str = "",
status_quo: str = "",
target: int = FACTS_COMPRESS_TARGET,
) -> list[dict]:
payload = json.dumps(facts, ensure_ascii=False, indent=2)
user = (
f"Current fact count: {len(facts)}. Target after merge: <= {target}.\n\n"
f"Facts JSON:\n{payload}\n"
)
if scene_context.strip():
user += f"\nCurrent scene:\n{scene_context.strip()[:1500]}\n"
if status_quo.strip():
user += f"\nStatus quo:\n{status_quo.strip()[:1500]}\n"
system = FACTS_COMPRESS_SYSTEM.format(target=target)
messages = [
{"role": "system", "content": system},
{"role": "user", "content": user},
]
try:
raw = await (
send_message_with_model(messages, FACTS_MODEL)
if FACTS_MODEL
else send_message(messages)
)
except LLMError as e:
logger.warning("compress_facts LLM failed: %s", e)
return dedupe_facts_fuzzy(facts)[-target:]
except Exception as e:
logger.warning("compress_facts unexpected: %s", e)
return dedupe_facts_fuzzy(facts)[-target:]
cleaned = raw.strip()
if cleaned.startswith("```"):
cleaned = cleaned.split("\n", 1)[1] if "\n" in cleaned else cleaned
if cleaned.endswith("```"):
cleaned = cleaned.rsplit("```", 1)[0]
cleaned = cleaned.strip()
try:
data = json.loads(cleaned)
if isinstance(data, list):
out = []
for item in data:
entry = parse_fact_entry(item)
if entry:
out.append(entry)
if out:
out = validate_compressed_against_source(facts, out)
if out:
logger.info("compress_facts: %d -> %d", len(facts), len(out))
return dedupe_facts_fuzzy(out)[:FACTS_STORE_LIMIT]
except json.JSONDecodeError:
logger.warning("compress_facts JSON parse failed. Raw=%.400s", raw)
return dedupe_facts_fuzzy(facts)[-target:]
async def merge_facts_persist(
existing_json: str,
new_facts: list,
*,
rp_day_default: str = "",
scene_context: str = "",
status_quo: str = "",
) -> str:
"""Merge, fuzzy-dedupe, LLM-compress when list grows too large."""
merged_json = merge_facts(
existing_json, new_facts, rp_day_default=rp_day_default
)
facts = dedupe_facts_fuzzy(parse_facts_list(merged_json))
facts = filter_durable_facts(facts)
if len(facts) > FACTS_DEDUP_THRESHOLD:
facts = await compress_facts(
facts,
scene_context=scene_context,
status_quo=status_quo,
target=FACTS_COMPRESS_TARGET,
)
facts = dedupe_facts_fuzzy(facts)
if len(facts) > FACTS_STORE_LIMIT:
facts = await compress_facts(
facts,
scene_context=scene_context,
status_quo=status_quo,
target=FACTS_STORE_LIMIT,
)
return facts_list_to_json(facts)
async def extract_facts(
context_messages: list[dict],
*,
rp_day_hint: str = "",
existing_json: str = "",
) -> list[dict]:
transcript = "\n".join(
f"{m.get('role')}: {m.get('content', '')}".strip()
for m in context_messages
if m.get("role") in ("user", "assistant")
)[-6000:]
hint = (rp_day_hint or "").strip()
known = parse_facts_list(existing_json)
user_parts = []
if known:
known_lines = "\n".join(
f"- [{f.get('rp_day') or '?'}] {f['text']}" for f in known[-40:]
)
user_parts.append(f"Already known facts (do NOT repeat):\n{known_lines}\n")
if hint:
user_parts.append(f"RP time hint: {hint}\n")
user_parts.append(f"New transcript:\n{transcript}")
user = "\n".join(user_parts)
messages = [
{"role": "system", "content": FACTS_SYSTEM},
{"role": "user", "content": user},
]
try:
raw = await (
send_message_with_model(messages, FACTS_MODEL)
if FACTS_MODEL
else send_message(messages)
)
except LLMError as e:
logger.warning("extract_facts LLM failed (model=%s): %s", FACTS_MODEL or "SYSTEM", e)
return []
except Exception as e:
logger.warning("extract_facts unexpected: %s", e)
return []
cleaned = raw.strip()
if cleaned.startswith("```"):
cleaned = cleaned.split("\n", 1)[1] if "\n" in cleaned else cleaned
if cleaned.endswith("```"):
cleaned = cleaned.rsplit("```", 1)[0]
cleaned = cleaned.strip()
try:
data = json.loads(cleaned)
if isinstance(data, list):
out: list[dict] = []
for item in data[:8]:
entry = parse_fact_entry(item)
if not entry:
continue
if any(facts_are_similar(entry["text"], k["text"]) for k in known):
continue
if is_likely_narrative_event(entry["text"]):
continue
if not entry["rp_day"] and hint:
entry["rp_day"] = hint[:80]
out.append(entry)
return out
except json.JSONDecodeError:
logger.warning("extract_facts JSON parse failed. Raw=%.400s", raw)
return []
def facts_to_prompt(facts_json: str, max_items: int = FACTS_PROMPT_MAX) -> str:
facts = filter_durable_facts(dedupe_facts_fuzzy(parse_facts_list(facts_json)))
if not facts:
return ""
recent = facts[-max_items:]
lines = []
for f in recent:
day = (f.get("rp_day") or "").strip()
if day:
lines.append(f"- [{day}] {f['text']}")
else:
lines.append(f"- {f['text']}")
block = "\n".join(lines)
total = len(facts)
header = "--- Facts (persistent memory"
if total > len(recent):
header += f", showing {len(recent)} of {total}"
header += ") ---"
return f"{header}\n{block}\n---"