423 lines
14 KiB
Python
423 lines
14 KiB
Python
import json
|
||
import logging
|
||
import os
|
||
import re
|
||
|
||
from services.llm import LLMError, send_message_with_model, send_message
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
FACTS_MODEL = os.getenv("RPG_FACTS_MODEL", "").strip() or "deepseek/deepseek-chat-v3"
|
||
FACTS_STORE_LIMIT = int(os.getenv("FACTS_STORE_LIMIT", "100"))
|
||
FACTS_PROMPT_MAX = int(os.getenv("FACTS_PROMPT_MAX", "40"))
|
||
FACTS_DEDUP_THRESHOLD = int(os.getenv("FACTS_DEDUP_THRESHOLD", "30"))
|
||
FACTS_COMPRESS_TARGET = int(os.getenv("FACTS_COMPRESS_TARGET", "22"))
|
||
|
||
FACTS_SYSTEM = """Extract NEW stable facts from the conversation.
|
||
Return ONLY valid JSON (no markdown), as an array of objects:
|
||
[{"text": "short durable fact", "rp_day": "when this became true in story time"}]
|
||
|
||
Rules:
|
||
- Return at most 5 NEW facts per turn. If nothing new, return [].
|
||
- Do NOT repeat or rephrase facts already listed under "Already known".
|
||
- Facts must be DURABLE world/character state only:
|
||
traits, relationships, inventory, locations, secrets revealed, lasting abilities/rules.
|
||
- NEVER store plot events or scene narration (no "they went", "they decided", "they hugged",
|
||
"they started a new arc", "they stepped through the portal").
|
||
- Skip momentary emotions unless they permanently change a relationship.
|
||
- text <= 120 chars each.
|
||
- rp_day: in-world time label (день 1, второй день, та же ночь, через год). Use RP time hint when unclear."""
|
||
|
||
FACTS_COMPRESS_SYSTEM = """You consolidate RPG session memory for a long-running chat.
|
||
Return ONLY valid JSON (no markdown): an array of {{"text": "...", "rp_day": "..."}}.
|
||
|
||
Goals:
|
||
- Use ONLY information from the input facts. NEVER invent or infer new facts.
|
||
- Aggressively MERGE near-duplicates (same topic in RU/EN, Rin/Рин, Grigo/Григорий).
|
||
- Keep ONE best fact per topic; combine rp_day if needed (e.g. "день 1–2").
|
||
- DROP redundant, trivial, superseded, and ALL one-off narrative/event facts.
|
||
- DROP facts that describe a single scene action (went, decided, hugged, called for help, stepped into portal).
|
||
- KEEP durable state only: names, nicknames, relationships, inventory, home items, locations,
|
||
lasting abilities, secrets/identity, unresolved mysteries.
|
||
- Target at most {target} facts (fewer is better). Each text <= 120 chars.
|
||
- rp_day = in-world labels only."""
|
||
|
||
_NARRATIVE_EVENT_RE = re.compile(
|
||
r"(?:"
|
||
r"отправил(?:ись|а|и)?|решили|начали|обнялись|шагнули|вызвали|стали ближе|"
|
||
r"передали|раскрыла|начали новую арку|вместе шагнули|тактическ(?:ое|и) отступлени|"
|
||
r"went to|decided to|hugged|stepped through|called for help|started a new arc"
|
||
r")",
|
||
re.IGNORECASE,
|
||
)
|
||
|
||
_NAME_ALIASES = (
|
||
("grigoriy", "григорий"),
|
||
("grigo", "григо"),
|
||
("grigory", "григорий"),
|
||
("rin", "рин"),
|
||
("player", "игрок"),
|
||
("user", "игрок"),
|
||
("glade", "полян"),
|
||
("flowers", "цвет"),
|
||
("flower", "цвет"),
|
||
("magical", "волшеб"),
|
||
("magic", "волшеб"),
|
||
("glow", "свет"),
|
||
("glowing", "свет"),
|
||
)
|
||
|
||
|
||
def parse_fact_entry(raw) -> dict | None:
|
||
if isinstance(raw, dict):
|
||
text = (raw.get("text") or raw.get("fact") or "").strip()
|
||
rp_day = (raw.get("rp_day") or raw.get("learned") or raw.get("day") or "").strip()
|
||
elif isinstance(raw, str):
|
||
text = raw.strip()
|
||
rp_day = ""
|
||
else:
|
||
return None
|
||
if not text:
|
||
return None
|
||
return {"text": text[:120], "rp_day": rp_day[:80]}
|
||
|
||
|
||
def parse_facts_list(facts_json: str | None) -> list[dict]:
|
||
try:
|
||
data = json.loads(facts_json or "[]")
|
||
except json.JSONDecodeError:
|
||
return []
|
||
if not isinstance(data, list):
|
||
return []
|
||
out: list[dict] = []
|
||
seen: set[str] = set()
|
||
for item in data:
|
||
entry = parse_fact_entry(item)
|
||
if not entry:
|
||
continue
|
||
key = entry["text"].lower()
|
||
if key in seen:
|
||
continue
|
||
seen.add(key)
|
||
out.append(entry)
|
||
return out
|
||
|
||
|
||
def facts_list_to_json(facts: list[dict]) -> str:
|
||
return json.dumps(
|
||
[{"text": f["text"], "rp_day": f.get("rp_day", "")} for f in facts],
|
||
ensure_ascii=False,
|
||
)
|
||
|
||
|
||
def rp_day_from_scene(scene_json: str | None) -> str:
|
||
try:
|
||
scene = json.loads(scene_json or "{}")
|
||
if isinstance(scene, dict):
|
||
day = (scene.get("day") or "").strip()
|
||
if day:
|
||
return day[:80]
|
||
except (json.JSONDecodeError, TypeError):
|
||
pass
|
||
return ""
|
||
|
||
|
||
def _normalize_fact_text(text: str) -> str:
|
||
t = (text or "").lower()
|
||
for a, b in _NAME_ALIASES:
|
||
t = t.replace(a, b)
|
||
t = re.sub(r"[^\w\s]", " ", t, flags=re.UNICODE)
|
||
return re.sub(r"\s+", " ", t).strip()
|
||
|
||
|
||
def _fact_tokens(text: str) -> set[str]:
|
||
words = re.findall(r"[\w]+", _normalize_fact_text(text), flags=re.UNICODE)
|
||
stop = {"the", "and", "that", "with", "this", "have", "has", "was", "are", "для", "что", "как", "это", "на", "в", "и", "а"}
|
||
return {w for w in words if len(w) > 2 and w not in stop}
|
||
|
||
|
||
def facts_are_similar(a: str, b: str) -> bool:
|
||
al, bl = a.lower().strip(), b.lower().strip()
|
||
if al == bl:
|
||
return True
|
||
shorter, longer = (al, bl) if len(al) <= len(bl) else (bl, al)
|
||
if shorter in longer and len(shorter) / max(len(longer), 1) >= 0.35:
|
||
return True
|
||
ta, tb = _fact_tokens(a), _fact_tokens(b)
|
||
if not ta or not tb:
|
||
return False
|
||
overlap = len(ta & tb) / len(ta | tb)
|
||
return overlap >= 0.32
|
||
|
||
|
||
def is_likely_narrative_event(text: str) -> bool:
|
||
"""One-off scene actions — not durable memory."""
|
||
t = (text or "").strip()
|
||
if not t:
|
||
return True
|
||
if _NARRATIVE_EVENT_RE.search(t):
|
||
return True
|
||
if "новую арку" in t.lower() or "new arc" in t.lower():
|
||
return True
|
||
return False
|
||
|
||
|
||
def filter_durable_facts(facts: list[dict]) -> list[dict]:
|
||
return [f for f in facts if not is_likely_narrative_event(f.get("text", ""))]
|
||
|
||
|
||
def validate_compressed_against_source(
|
||
original: list[dict], compressed: list[dict]
|
||
) -> list[dict]:
|
||
"""Reject LLM-hallucinated facts not grounded in the input list."""
|
||
if not compressed:
|
||
return []
|
||
out: list[dict] = []
|
||
for c in compressed:
|
||
text = (c.get("text") or "").strip()
|
||
if not text or is_likely_narrative_event(text):
|
||
continue
|
||
if any(facts_are_similar(text, o.get("text", "")) for o in original):
|
||
out.append(c)
|
||
return out
|
||
|
||
|
||
def dedupe_facts_fuzzy(facts: list[dict]) -> list[dict]:
|
||
out: list[dict] = []
|
||
for f in facts:
|
||
placed = False
|
||
for i, existing in enumerate(out):
|
||
if facts_are_similar(f["text"], existing["text"]):
|
||
if len(f["text"]) > len(existing["text"]):
|
||
out[i]["text"] = f["text"][:120]
|
||
if f.get("rp_day") and not existing.get("rp_day"):
|
||
out[i]["rp_day"] = f["rp_day"]
|
||
placed = True
|
||
break
|
||
if not placed:
|
||
out.append(dict(f))
|
||
return out
|
||
|
||
|
||
def merge_facts(
|
||
existing_json: str,
|
||
new_facts: list,
|
||
*,
|
||
rp_day_default: str = "",
|
||
) -> str:
|
||
merged = parse_facts_list(existing_json)
|
||
seen = {f["text"].lower() for f in merged}
|
||
default_day = (rp_day_default or "").strip()[:80]
|
||
|
||
for raw in new_facts or []:
|
||
entry = parse_fact_entry(raw)
|
||
if not entry:
|
||
continue
|
||
if not entry["rp_day"] and default_day:
|
||
entry["rp_day"] = default_day
|
||
key = entry["text"].lower()
|
||
if key in seen:
|
||
for i, existing in enumerate(merged):
|
||
if existing["text"].lower() == key:
|
||
if entry["rp_day"] and not existing.get("rp_day"):
|
||
merged[i]["rp_day"] = entry["rp_day"]
|
||
break
|
||
continue
|
||
dup = False
|
||
for i, existing in enumerate(merged):
|
||
if facts_are_similar(entry["text"], existing["text"]):
|
||
if len(entry["text"]) > len(existing["text"]):
|
||
merged[i]["text"] = entry["text"]
|
||
if entry["rp_day"] and not existing.get("rp_day"):
|
||
merged[i]["rp_day"] = entry["rp_day"]
|
||
dup = True
|
||
break
|
||
if dup:
|
||
continue
|
||
seen.add(key)
|
||
merged.append(entry)
|
||
|
||
return facts_list_to_json(merged)
|
||
|
||
|
||
async def compress_facts(
|
||
facts: list[dict],
|
||
*,
|
||
scene_context: str = "",
|
||
status_quo: str = "",
|
||
target: int = FACTS_COMPRESS_TARGET,
|
||
) -> list[dict]:
|
||
payload = json.dumps(facts, ensure_ascii=False, indent=2)
|
||
user = (
|
||
f"Current fact count: {len(facts)}. Target after merge: <= {target}.\n\n"
|
||
f"Facts JSON:\n{payload}\n"
|
||
)
|
||
if scene_context.strip():
|
||
user += f"\nCurrent scene:\n{scene_context.strip()[:1500]}\n"
|
||
if status_quo.strip():
|
||
user += f"\nStatus quo:\n{status_quo.strip()[:1500]}\n"
|
||
|
||
system = FACTS_COMPRESS_SYSTEM.format(target=target)
|
||
messages = [
|
||
{"role": "system", "content": system},
|
||
{"role": "user", "content": user},
|
||
]
|
||
try:
|
||
raw = await (
|
||
send_message_with_model(messages, FACTS_MODEL)
|
||
if FACTS_MODEL
|
||
else send_message(messages)
|
||
)
|
||
except LLMError as e:
|
||
logger.warning("compress_facts LLM failed: %s", e)
|
||
return dedupe_facts_fuzzy(facts)[-target:]
|
||
except Exception as e:
|
||
logger.warning("compress_facts unexpected: %s", e)
|
||
return dedupe_facts_fuzzy(facts)[-target:]
|
||
|
||
cleaned = raw.strip()
|
||
if cleaned.startswith("```"):
|
||
cleaned = cleaned.split("\n", 1)[1] if "\n" in cleaned else cleaned
|
||
if cleaned.endswith("```"):
|
||
cleaned = cleaned.rsplit("```", 1)[0]
|
||
cleaned = cleaned.strip()
|
||
try:
|
||
data = json.loads(cleaned)
|
||
if isinstance(data, list):
|
||
out = []
|
||
for item in data:
|
||
entry = parse_fact_entry(item)
|
||
if entry:
|
||
out.append(entry)
|
||
if out:
|
||
out = validate_compressed_against_source(facts, out)
|
||
if out:
|
||
logger.info("compress_facts: %d -> %d", len(facts), len(out))
|
||
return dedupe_facts_fuzzy(out)[:FACTS_STORE_LIMIT]
|
||
except json.JSONDecodeError:
|
||
logger.warning("compress_facts JSON parse failed. Raw=%.400s", raw)
|
||
return dedupe_facts_fuzzy(facts)[-target:]
|
||
|
||
|
||
async def merge_facts_persist(
|
||
existing_json: str,
|
||
new_facts: list,
|
||
*,
|
||
rp_day_default: str = "",
|
||
scene_context: str = "",
|
||
status_quo: str = "",
|
||
) -> str:
|
||
"""Merge, fuzzy-dedupe, LLM-compress when list grows too large."""
|
||
merged_json = merge_facts(
|
||
existing_json, new_facts, rp_day_default=rp_day_default
|
||
)
|
||
facts = dedupe_facts_fuzzy(parse_facts_list(merged_json))
|
||
facts = filter_durable_facts(facts)
|
||
if len(facts) > FACTS_DEDUP_THRESHOLD:
|
||
facts = await compress_facts(
|
||
facts,
|
||
scene_context=scene_context,
|
||
status_quo=status_quo,
|
||
target=FACTS_COMPRESS_TARGET,
|
||
)
|
||
facts = dedupe_facts_fuzzy(facts)
|
||
if len(facts) > FACTS_STORE_LIMIT:
|
||
facts = await compress_facts(
|
||
facts,
|
||
scene_context=scene_context,
|
||
status_quo=status_quo,
|
||
target=FACTS_STORE_LIMIT,
|
||
)
|
||
return facts_list_to_json(facts)
|
||
|
||
|
||
async def extract_facts(
|
||
context_messages: list[dict],
|
||
*,
|
||
rp_day_hint: str = "",
|
||
existing_json: str = "",
|
||
) -> list[dict]:
|
||
transcript = "\n".join(
|
||
f"{m.get('role')}: {m.get('content', '')}".strip()
|
||
for m in context_messages
|
||
if m.get("role") in ("user", "assistant")
|
||
)[-6000:]
|
||
|
||
hint = (rp_day_hint or "").strip()
|
||
known = parse_facts_list(existing_json)
|
||
user_parts = []
|
||
if known:
|
||
known_lines = "\n".join(
|
||
f"- [{f.get('rp_day') or '?'}] {f['text']}" for f in known[-40:]
|
||
)
|
||
user_parts.append(f"Already known facts (do NOT repeat):\n{known_lines}\n")
|
||
if hint:
|
||
user_parts.append(f"RP time hint: {hint}\n")
|
||
user_parts.append(f"New transcript:\n{transcript}")
|
||
user = "\n".join(user_parts)
|
||
|
||
messages = [
|
||
{"role": "system", "content": FACTS_SYSTEM},
|
||
{"role": "user", "content": user},
|
||
]
|
||
|
||
try:
|
||
raw = await (
|
||
send_message_with_model(messages, FACTS_MODEL)
|
||
if FACTS_MODEL
|
||
else send_message(messages)
|
||
)
|
||
except LLMError as e:
|
||
logger.warning("extract_facts LLM failed (model=%s): %s", FACTS_MODEL or "SYSTEM", e)
|
||
return []
|
||
except Exception as e:
|
||
logger.warning("extract_facts unexpected: %s", e)
|
||
return []
|
||
|
||
cleaned = raw.strip()
|
||
if cleaned.startswith("```"):
|
||
cleaned = cleaned.split("\n", 1)[1] if "\n" in cleaned else cleaned
|
||
if cleaned.endswith("```"):
|
||
cleaned = cleaned.rsplit("```", 1)[0]
|
||
cleaned = cleaned.strip()
|
||
try:
|
||
data = json.loads(cleaned)
|
||
if isinstance(data, list):
|
||
out: list[dict] = []
|
||
for item in data[:8]:
|
||
entry = parse_fact_entry(item)
|
||
if not entry:
|
||
continue
|
||
if any(facts_are_similar(entry["text"], k["text"]) for k in known):
|
||
continue
|
||
if is_likely_narrative_event(entry["text"]):
|
||
continue
|
||
if not entry["rp_day"] and hint:
|
||
entry["rp_day"] = hint[:80]
|
||
out.append(entry)
|
||
return out
|
||
except json.JSONDecodeError:
|
||
logger.warning("extract_facts JSON parse failed. Raw=%.400s", raw)
|
||
return []
|
||
|
||
|
||
def facts_to_prompt(facts_json: str, max_items: int = FACTS_PROMPT_MAX) -> str:
|
||
facts = filter_durable_facts(dedupe_facts_fuzzy(parse_facts_list(facts_json)))
|
||
if not facts:
|
||
return ""
|
||
recent = facts[-max_items:]
|
||
lines = []
|
||
for f in recent:
|
||
day = (f.get("rp_day") or "").strip()
|
||
if day:
|
||
lines.append(f"- [{day}] {f['text']}")
|
||
else:
|
||
lines.append(f"- {f['text']}")
|
||
block = "\n".join(lines)
|
||
total = len(facts)
|
||
header = "--- Facts (persistent memory"
|
||
if total > len(recent):
|
||
header += f", showing {len(recent)} of {total}"
|
||
header += ") ---"
|
||
return f"{header}\n{block}\n---"
|