import json import logging import os import re from services.llm import LLMError, send_message_with_model, send_message logger = logging.getLogger(__name__) FACTS_MODEL = os.getenv("RPG_FACTS_MODEL", "").strip() or "deepseek/deepseek-chat-v3" FACTS_STORE_LIMIT = int(os.getenv("FACTS_STORE_LIMIT", "100")) FACTS_PROMPT_MAX = int(os.getenv("FACTS_PROMPT_MAX", "40")) FACTS_DEDUP_THRESHOLD = int(os.getenv("FACTS_DEDUP_THRESHOLD", "30")) FACTS_COMPRESS_TARGET = int(os.getenv("FACTS_COMPRESS_TARGET", "22")) FACTS_SYSTEM = """Extract NEW stable facts from the conversation. Return ONLY valid JSON (no markdown), as an array of objects: [{"text": "short durable fact", "rp_day": "when this became true in story time"}] Rules: - Return at most 5 NEW facts per turn. If nothing new, return []. - Do NOT repeat or rephrase facts already listed under "Already known". - Facts must be DURABLE world/character state only: traits, relationships, inventory, locations, secrets revealed, lasting abilities/rules. - NEVER store plot events or scene narration (no "they went", "they decided", "they hugged", "they started a new arc", "they stepped through the portal"). - Skip momentary emotions unless they permanently change a relationship. - text <= 120 chars each. - rp_day: in-world time label (день 1, второй день, та же ночь, через год). Use RP time hint when unclear.""" FACTS_COMPRESS_SYSTEM = """You consolidate RPG session memory for a long-running chat. Return ONLY valid JSON (no markdown): an array of {{"text": "...", "rp_day": "..."}}. Goals: - Use ONLY information from the input facts. NEVER invent or infer new facts. - Aggressively MERGE near-duplicates (same topic in RU/EN, Rin/Рин, Grigo/Григорий). - Keep ONE best fact per topic; combine rp_day if needed (e.g. "день 1–2"). - DROP redundant, trivial, superseded, and ALL one-off narrative/event facts. - DROP facts that describe a single scene action (went, decided, hugged, called for help, stepped into portal). - KEEP durable state only: names, nicknames, relationships, inventory, home items, locations, lasting abilities, secrets/identity, unresolved mysteries. - Target at most {target} facts (fewer is better). Each text <= 120 chars. - rp_day = in-world labels only.""" _NARRATIVE_EVENT_RE = re.compile( r"(?:" r"отправил(?:ись|а|и)?|решили|начали|обнялись|шагнули|вызвали|стали ближе|" r"передали|раскрыла|начали новую арку|вместе шагнули|тактическ(?:ое|и) отступлени|" r"went to|decided to|hugged|stepped through|called for help|started a new arc" r")", re.IGNORECASE, ) _NAME_ALIASES = ( ("grigoriy", "григорий"), ("grigo", "григо"), ("grigory", "григорий"), ("rin", "рин"), ("player", "игрок"), ("user", "игрок"), ("glade", "полян"), ("flowers", "цвет"), ("flower", "цвет"), ("magical", "волшеб"), ("magic", "волшеб"), ("glow", "свет"), ("glowing", "свет"), ) def parse_fact_entry(raw) -> dict | None: if isinstance(raw, dict): text = (raw.get("text") or raw.get("fact") or "").strip() rp_day = (raw.get("rp_day") or raw.get("learned") or raw.get("day") or "").strip() elif isinstance(raw, str): text = raw.strip() rp_day = "" else: return None if not text: return None return {"text": text[:120], "rp_day": rp_day[:80]} def parse_facts_list(facts_json: str | None) -> list[dict]: try: data = json.loads(facts_json or "[]") except json.JSONDecodeError: return [] if not isinstance(data, list): return [] out: list[dict] = [] seen: set[str] = set() for item in data: entry = parse_fact_entry(item) if not entry: continue key = entry["text"].lower() if key in seen: continue seen.add(key) out.append(entry) return out def facts_list_to_json(facts: list[dict]) -> str: return json.dumps( [{"text": f["text"], "rp_day": f.get("rp_day", "")} for f in facts], ensure_ascii=False, ) def rp_day_from_scene(scene_json: str | None) -> str: try: scene = json.loads(scene_json or "{}") if isinstance(scene, dict): day = (scene.get("day") or "").strip() if day: return day[:80] except (json.JSONDecodeError, TypeError): pass return "" def _normalize_fact_text(text: str) -> str: t = (text or "").lower() for a, b in _NAME_ALIASES: t = t.replace(a, b) t = re.sub(r"[^\w\s]", " ", t, flags=re.UNICODE) return re.sub(r"\s+", " ", t).strip() def _fact_tokens(text: str) -> set[str]: words = re.findall(r"[\w]+", _normalize_fact_text(text), flags=re.UNICODE) stop = {"the", "and", "that", "with", "this", "have", "has", "was", "are", "для", "что", "как", "это", "на", "в", "и", "а"} return {w for w in words if len(w) > 2 and w not in stop} def facts_are_similar(a: str, b: str) -> bool: al, bl = a.lower().strip(), b.lower().strip() if al == bl: return True shorter, longer = (al, bl) if len(al) <= len(bl) else (bl, al) if shorter in longer and len(shorter) / max(len(longer), 1) >= 0.35: return True ta, tb = _fact_tokens(a), _fact_tokens(b) if not ta or not tb: return False overlap = len(ta & tb) / len(ta | tb) return overlap >= 0.32 def is_likely_narrative_event(text: str) -> bool: """One-off scene actions — not durable memory.""" t = (text or "").strip() if not t: return True if _NARRATIVE_EVENT_RE.search(t): return True if "новую арку" in t.lower() or "new arc" in t.lower(): return True return False def filter_durable_facts(facts: list[dict]) -> list[dict]: return [f for f in facts if not is_likely_narrative_event(f.get("text", ""))] def validate_compressed_against_source( original: list[dict], compressed: list[dict] ) -> list[dict]: """Reject LLM-hallucinated facts not grounded in the input list.""" if not compressed: return [] out: list[dict] = [] for c in compressed: text = (c.get("text") or "").strip() if not text or is_likely_narrative_event(text): continue if any(facts_are_similar(text, o.get("text", "")) for o in original): out.append(c) return out def dedupe_facts_fuzzy(facts: list[dict]) -> list[dict]: out: list[dict] = [] for f in facts: placed = False for i, existing in enumerate(out): if facts_are_similar(f["text"], existing["text"]): if len(f["text"]) > len(existing["text"]): out[i]["text"] = f["text"][:120] if f.get("rp_day") and not existing.get("rp_day"): out[i]["rp_day"] = f["rp_day"] placed = True break if not placed: out.append(dict(f)) return out def merge_facts( existing_json: str, new_facts: list, *, rp_day_default: str = "", ) -> str: merged = parse_facts_list(existing_json) seen = {f["text"].lower() for f in merged} default_day = (rp_day_default or "").strip()[:80] for raw in new_facts or []: entry = parse_fact_entry(raw) if not entry: continue if not entry["rp_day"] and default_day: entry["rp_day"] = default_day key = entry["text"].lower() if key in seen: for i, existing in enumerate(merged): if existing["text"].lower() == key: if entry["rp_day"] and not existing.get("rp_day"): merged[i]["rp_day"] = entry["rp_day"] break continue dup = False for i, existing in enumerate(merged): if facts_are_similar(entry["text"], existing["text"]): if len(entry["text"]) > len(existing["text"]): merged[i]["text"] = entry["text"] if entry["rp_day"] and not existing.get("rp_day"): merged[i]["rp_day"] = entry["rp_day"] dup = True break if dup: continue seen.add(key) merged.append(entry) return facts_list_to_json(merged) async def compress_facts( facts: list[dict], *, scene_context: str = "", status_quo: str = "", target: int = FACTS_COMPRESS_TARGET, ) -> list[dict]: payload = json.dumps(facts, ensure_ascii=False, indent=2) user = ( f"Current fact count: {len(facts)}. Target after merge: <= {target}.\n\n" f"Facts JSON:\n{payload}\n" ) if scene_context.strip(): user += f"\nCurrent scene:\n{scene_context.strip()[:1500]}\n" if status_quo.strip(): user += f"\nStatus quo:\n{status_quo.strip()[:1500]}\n" system = FACTS_COMPRESS_SYSTEM.format(target=target) messages = [ {"role": "system", "content": system}, {"role": "user", "content": user}, ] try: raw = await ( send_message_with_model(messages, FACTS_MODEL) if FACTS_MODEL else send_message(messages) ) except LLMError as e: logger.warning("compress_facts LLM failed: %s", e) return dedupe_facts_fuzzy(facts)[-target:] except Exception as e: logger.warning("compress_facts unexpected: %s", e) return dedupe_facts_fuzzy(facts)[-target:] cleaned = raw.strip() if cleaned.startswith("```"): cleaned = cleaned.split("\n", 1)[1] if "\n" in cleaned else cleaned if cleaned.endswith("```"): cleaned = cleaned.rsplit("```", 1)[0] cleaned = cleaned.strip() try: data = json.loads(cleaned) if isinstance(data, list): out = [] for item in data: entry = parse_fact_entry(item) if entry: out.append(entry) if out: out = validate_compressed_against_source(facts, out) if out: logger.info("compress_facts: %d -> %d", len(facts), len(out)) return dedupe_facts_fuzzy(out)[:FACTS_STORE_LIMIT] except json.JSONDecodeError: logger.warning("compress_facts JSON parse failed. Raw=%.400s", raw) return dedupe_facts_fuzzy(facts)[-target:] async def merge_facts_persist( existing_json: str, new_facts: list, *, rp_day_default: str = "", scene_context: str = "", status_quo: str = "", ) -> str: """Merge, fuzzy-dedupe, LLM-compress when list grows too large.""" merged_json = merge_facts( existing_json, new_facts, rp_day_default=rp_day_default ) facts = dedupe_facts_fuzzy(parse_facts_list(merged_json)) facts = filter_durable_facts(facts) if len(facts) > FACTS_DEDUP_THRESHOLD: facts = await compress_facts( facts, scene_context=scene_context, status_quo=status_quo, target=FACTS_COMPRESS_TARGET, ) facts = dedupe_facts_fuzzy(facts) if len(facts) > FACTS_STORE_LIMIT: facts = await compress_facts( facts, scene_context=scene_context, status_quo=status_quo, target=FACTS_STORE_LIMIT, ) return facts_list_to_json(facts) async def extract_facts( context_messages: list[dict], *, rp_day_hint: str = "", existing_json: str = "", ) -> list[dict]: transcript = "\n".join( f"{m.get('role')}: {m.get('content', '')}".strip() for m in context_messages if m.get("role") in ("user", "assistant") )[-6000:] hint = (rp_day_hint or "").strip() known = parse_facts_list(existing_json) user_parts = [] if known: known_lines = "\n".join( f"- [{f.get('rp_day') or '?'}] {f['text']}" for f in known[-40:] ) user_parts.append(f"Already known facts (do NOT repeat):\n{known_lines}\n") if hint: user_parts.append(f"RP time hint: {hint}\n") user_parts.append(f"New transcript:\n{transcript}") user = "\n".join(user_parts) messages = [ {"role": "system", "content": FACTS_SYSTEM}, {"role": "user", "content": user}, ] try: raw = await ( send_message_with_model(messages, FACTS_MODEL) if FACTS_MODEL else send_message(messages) ) except LLMError as e: logger.warning("extract_facts LLM failed (model=%s): %s", FACTS_MODEL or "SYSTEM", e) return [] except Exception as e: logger.warning("extract_facts unexpected: %s", e) return [] cleaned = raw.strip() if cleaned.startswith("```"): cleaned = cleaned.split("\n", 1)[1] if "\n" in cleaned else cleaned if cleaned.endswith("```"): cleaned = cleaned.rsplit("```", 1)[0] cleaned = cleaned.strip() try: data = json.loads(cleaned) if isinstance(data, list): out: list[dict] = [] for item in data[:8]: entry = parse_fact_entry(item) if not entry: continue if any(facts_are_similar(entry["text"], k["text"]) for k in known): continue if is_likely_narrative_event(entry["text"]): continue if not entry["rp_day"] and hint: entry["rp_day"] = hint[:80] out.append(entry) return out except json.JSONDecodeError: logger.warning("extract_facts JSON parse failed. Raw=%.400s", raw) return [] def facts_to_prompt(facts_json: str, max_items: int = FACTS_PROMPT_MAX) -> str: facts = filter_durable_facts(dedupe_facts_fuzzy(parse_facts_list(facts_json))) if not facts: return "" recent = facts[-max_items:] lines = [] for f in recent: day = (f.get("rp_day") or "").strip() if day: lines.append(f"- [{day}] {f['text']}") else: lines.append(f"- {f['text']}") block = "\n".join(lines) total = len(facts) header = "--- Facts (persistent memory" if total > len(recent): header += f", showing {len(recent)} of {total}" header += ") ---" return f"{header}\n{block}\n---"