import json import os from services.llm import LLMError, send_message_with_model, send_message import logging logger = logging.getLogger(__name__) PLOT_MODEL = os.getenv("RPG_PLOT_MODEL", "").strip() or "deepseek/deepseek-chat-v3" GENRE_LABELS = { "adventure": "Adventure", "horror": "Horror", "romance": "Romance", "slice_of_life": "Slice of Life", "fantasy": "Fantasy", "sci_fi": "Sci-Fi", } ARC_SYSTEM = """You are a narrative designer for an RPG chat. Given the opening scene (greeting), character info, current facts, and genre(s), produce ONE LINEAR STORY ARC. Return ONLY valid JSON (no markdown): { "title": "short arc title", "genre_blend": "e.g. Romance + Adventure", "global_story": "2-4 sentences: setup, through-line, planned finale", "ending": "how this arc resolves", "reward": "conditional reward/hook for player and character after finale", "boundaries": ["no teleporting", "stay in character", "..."], "current_step_index": 0, "status": "active", "steps": [ { "id": "s1", "title": "short quest title (3-8 words)", "goal": "what must happen in this episode", "completion_criteria": "concrete signs the step is done (for narrator)", "character_guidance": "how the PC should behave toward the goal", "injection": "1-3 immersive sentences when this step begins", "choices": [{"id":"a","label":"..."},{"id":"b","label":"..."}] } ], "meta": {"arc_number": 1, "previous_arc_summary": ""} } Rules: - 3-5 linear steps from opening to finale. ONE quest = ONE step at a time. - Step 1 often matches what already happened in the greeting (shelter, meet, etc.). - Steps must escalate naturally (trust, daily life, adventure, climax). - No event triggers — progression is narrative completion only. - Injections and titles in session language.""" def format_genres(genre: str) -> str: parts = [g.strip() for g in genre.replace("+", ",").split(",") if g.strip()] if not parts: return "Adventure" labels = [GENRE_LABELS.get(g, g.replace("_", " ").title()) for g in parts] if len(labels) == 1: return labels[0] return " + ".join(labels) + " (cross-genre blend)" async def generate_plot_arc( persona_name: str, persona_desc: str, persona_scenario: str, greeting: str, facts_block: str = "", genre: str = "adventure", *, lang: str = "ru", recent_context: str = "", ) -> dict: from services.rpg_locale import locale_instruction, locale_label user = ( f"Character: {persona_name}\n" f"Description: {persona_desc}\n" f"Scenario: {persona_scenario}\n" f"Greeting: {greeting}\n" f"Genre: {format_genres(genre)}\n" f"Session language: {locale_label(lang)}\n" f"Facts:\n{facts_block}\n" ).strip() if recent_context.strip(): user += f"\nRecent chat:\n{recent_context.strip()[-2000:]}\n" messages = [ {"role": "system", "content": ARC_SYSTEM + "\n" + locale_instruction(lang)}, {"role": "user", "content": user}, ] try: raw = await ( send_message_with_model(messages, PLOT_MODEL) if PLOT_MODEL else send_message(messages) ) except LLMError as e: logger.warning("generate_plot_arc LLM failed (model=%s): %s", PLOT_MODEL or "SYSTEM", e) return {} except Exception as e: logger.warning("generate_plot_arc unexpected error: %s", e) return {} cleaned = raw.strip() # common OpenRouter formatting: fenced json if cleaned.startswith("```"): cleaned = cleaned.split("\n", 1)[1] if "\n" in cleaned else cleaned if cleaned.endswith("```"): cleaned = cleaned.rsplit("```", 1)[0] cleaned = cleaned.strip() try: data = json.loads(cleaned) if isinstance(data, dict): from services.rpg_story import normalize_story_arc return normalize_story_arc(data, genre=genre) return {} except Exception: logger.warning("PlotArc JSON parse failed. Raw=%.500s", raw) return {} NEXT_ARC_SYSTEM = """You are a narrative designer for an RPG chat. The previous story arc COMPLETED. Design the NEXT arc continuing the same characters and relationship. Return ONLY valid JSON (same schema as initial arc): { "title": "...", "genre_blend": "...", "global_story": "...", "ending": "...", "reward": "...", "boundaries": [], "current_step_index": 0, "status": "active", "steps": [ ... 3-5 steps ... ], "meta": {"arc_number": N, "previous_arc_summary": "..."} } Rules: - Build on previous arc outcome and reward (e.g. tickets found → cruise trip). - New arc must feel like a natural sequel, not a reset. - Keep same cast; facts and affinity continue.""" async def generate_next_arc( persona_name: str, persona_desc: str, persona_scenario: str, recent_context: str, *, previous_arc_summary: str = "", facts_block: str = "", genre: str = "adventure", lang: str = "ru", ) -> dict: from services.rpg_locale import locale_instruction, locale_label user = ( f"Character: {persona_name}\n" f"Description: {persona_desc}\n" f"Scenario: {persona_scenario}\n" f"Genre: {format_genres(genre)}\n" f"Session language: {locale_label(lang)}\n" f"Previous arc summary:\n{previous_arc_summary}\n" f"Facts:\n{facts_block}\n" f"Recent chat:\n{recent_context.strip()[-3000:]}\n" ) messages = [ {"role": "system", "content": NEXT_ARC_SYSTEM + "\n" + locale_instruction(lang)}, {"role": "user", "content": user}, ] try: raw = await ( send_message_with_model(messages, PLOT_MODEL) if PLOT_MODEL else send_message(messages) ) except LLMError as e: logger.warning("generate_next_arc failed: %s", e) return {} except Exception as e: logger.warning("generate_next_arc unexpected: %s", e) return {} cleaned = raw.strip() if cleaned.startswith("```"): cleaned = cleaned.split("\n", 1)[1] if "\n" in cleaned else cleaned if cleaned.endswith("```"): cleaned = cleaned.rsplit("```", 1)[0] cleaned = cleaned.strip() try: data = json.loads(cleaned) if isinstance(data, dict): from services.rpg_story import normalize_story_arc return normalize_story_arc(data, genre=genre) return {} except Exception: logger.warning("generate_next_arc JSON parse failed. Raw=%.400s", raw) return {} BEAT_MATCH_SYSTEM = """You decide whether the player's latest message should fire ONE scripted plot beat. Return ONLY valid JSON (no markdown): {"fire_beat_id": "id from list or null", "confidence": "high|low"} Rules: - Fire only if the message clearly matches that beat's narrative intent RIGHT NOW. - event_driven:rest — stopping to rest, sleep, camp, sauna break, recuperate (not mere sitting still in scene). - event_driven:travel — leaving, driving, journey, going to a new place, hitting the road. - event_driven:help_request — explicit plea for help/rescue/assistance. - event_driven:after_fail / after_success — follow-up to a recent failure/success beat. - Casual talk, flirting, exploring the current place without leaving does NOT fire travel. - If nothing fits well, return null. - Pick at most ONE beat; prefer high confidence only.""" def dice_outcome_to_beat_trigger(outcome: str | None) -> str | None: """Map d20 outcome to event_driven beat trigger (after_fail / after_success).""" o = (outcome or "").strip().lower() if o in ("failure", "critical failure"): return "event_driven:after_fail" if o in ("success", "critical success"): return "event_driven:after_success" return None def should_advance_arc_keywords(user_text: str) -> str | None: """Legacy keyword fallback when LLM match is unavailable.""" t = (user_text or "").lower() if any(x in t for x in ["отдыха", "ночлег", "спим", "сон", "разбить лагерь", "лагерь", "отдохн", "привала", "заправк", "саун"]): return "event_driven:rest" if any( x in t for x in [ "идем дальше", "пойдем дальше", "пойдём дальше", "едем дальше", "едем", "поехали", "выезжаем", "выезжаю", "в путь", "продолжаем путь", "уходим", "возвращаемся", "переходим", "за рул", "машин", "автомоб", "дорог", "трас", "шосс", "приех", "прибыва", "стади", "на стадион", "отправляемся", "выдвигаемся", "в дорогу", ] ): return "event_driven:travel" if any(x in t for x in ["помоги", "помочь", "нужна помощь", "спасите", "help"]): return "event_driven:help_request" return None def _parse_llm_json(raw: str) -> dict | list | None: cleaned = (raw or "").strip() if cleaned.startswith("```"): cleaned = cleaned.split("\n", 1)[1] if "\n" in cleaned else cleaned if cleaned.endswith("```"): cleaned = cleaned.rsplit("```", 1)[0] cleaned = cleaned.strip() try: return json.loads(cleaned) except json.JSONDecodeError: return None async def classify_plot_beat( user_text: str, beats: list[dict], recent_context: str = "", last_dice_outcome: str | None = None, *, lang: str = "ru", ) -> str | None: """LLM: return beat id to fire, or None.""" pending = [b for b in beats if isinstance(b, dict) and b.get("id")] if not pending or not (user_text or "").strip(): return None lines = [] for b in pending[:8]: lines.append( json.dumps( { "id": b.get("id"), "title": b.get("title", ""), "trigger": b.get("trigger", ""), "injection": (b.get("injection") or "")[:200], }, ensure_ascii=False, ) ) user = ( f"Player message:\n{user_text.strip()}\n\n" f"Pending beats:\n" + "\n".join(lines) ) if recent_context.strip(): user += f"\n\nRecent chat:\n{recent_context.strip()[-2500:]}\n" if last_dice_outcome: user += f"\nLast dice outcome this turn: {last_dice_outcome}\n" from services.rpg_locale import locale_instruction messages = [ { "role": "system", "content": BEAT_MATCH_SYSTEM + "\n" + locale_instruction(lang), }, {"role": "user", "content": user}, ] try: raw = await ( send_message_with_model(messages, PLOT_MODEL) if PLOT_MODEL else send_message(messages) ) except LLMError as e: logger.warning("classify_plot_beat LLM failed: %s", e) return None except Exception as e: logger.warning("classify_plot_beat unexpected: %s", e) return None data = _parse_llm_json(raw) if not isinstance(data, dict): return None bid = data.get("fire_beat_id") if bid in (None, "", "null", "none"): return None bid = str(bid).strip() if data.get("confidence") == "low": return None valid_ids = {str(b.get("id")) for b in pending} if bid in valid_ids: logger.info("classify_plot_beat: fired %s", bid) return bid return None def pop_beat_by_id(arc: dict, beat_id: str) -> tuple[dict, list[dict]]: beats = arc.get("beats") or [] matched, remaining = [], [] for b in beats: if isinstance(b, dict) and str(b.get("id")) == str(beat_id) and not matched: matched.append(b) else: remaining.append(b) arc["beats"] = remaining return arc, matched def beat_title(beat: dict) -> str: return ((beat.get("title") or beat.get("injection") or "")[:120]).strip() def format_beat_injection_for_character(injection: str, *, lang: str = "ru") -> str: """Soft plot hint for CHAT model — not a script to copy verbatim.""" inj = (injection or "").strip() if not inj: return "" if lang == "ru": header = "--- Сюжетная подсказка (не цитируй дословно) ---" footer = ( "Продолжи текущую сцену естественно по-русски; не цитируй подсказку дословно; " "не меняй локацию без согласия игрока." ) else: header = "--- Plot hint (do not quote verbatim) ---" footer = ( "Continue the scene naturally in English; do not quote the hint verbatim; " "do not change location without player consent." ) return f"\n\n{header}\n{inj}\n{footer}\n---" def arc_user_turn_count(history: list | None) -> int: return sum(1 for m in (history or []) if isinstance(m, dict) and m.get("role") == "user") def can_fire_beat(arc: dict, user_turn: int, *, min_gap: int = 2) -> bool: meta = arc.get("meta") if isinstance(arc.get("meta"), dict) else {} last = meta.get("last_beat_fired_at_user_turn") if last is None: return True try: last_i = int(last) except (TypeError, ValueError): return True return user_turn - last_i >= min_gap def record_beat_fired(arc: dict, beat: dict, user_turn: int) -> None: meta = arc.get("meta") if not isinstance(meta, dict): meta = {} arc["meta"] = meta meta["last_beat_fired_at_user_turn"] = user_turn bid = beat.get("id") if bid: meta["last_beat_id"] = str(bid) async def complete_quest_for_fired_beat(session_id: str, beat: dict) -> None: """Mark the fired beat's quest done so reconcile does not orphan it next turn.""" from services.memory import upsert_quest title = beat_title(beat) if title: await upsert_quest(session_id, title, "done") def count_active_quests(quests: list | None) -> int: return sum(1 for q in (quests or []) if q.get("status") == "active") def active_quest_titles_to_close(arc: dict, quests: list | None) -> list[str]: """Active quests whose title does not match any pending beat in arc.""" pending = { beat_title(b).lower() for b in (arc.get("beats") or []) if isinstance(b, dict) } to_close: list[str] = [] for q in quests or []: if q.get("status") != "active": continue tl = (q.get("title") or "").strip().lower() if tl and tl not in pending: to_close.append((q.get("title") or "").strip()) return to_close async def reconcile_active_quests_to_arc(session_id: str, arc: dict) -> int: """Mark active quests done when their beat is no longer in arc (desync after fire/replenish).""" from services.memory import get_quests, upsert_quest quests = await get_quests(session_id) titles = active_quest_titles_to_close(arc, quests) for title in titles: await upsert_quest(session_id, title, "done") if titles: logger.info( "reconcile_active_quests_to_arc: closed %d orphan(s) %s", len(titles), titles[:5], ) return len(titles) def prune_beats_for_done_quests(arc: dict, quests: list | None) -> tuple[dict, list[dict]]: """Drop beats whose title already matches a done/failed quest (manual quest close desync).""" done_titles = { (q.get("title") or "").strip().lower() for q in (quests or []) if q.get("status") in ("done", "failed") } if not done_titles: return arc, [] removed, kept = [], [] for b in arc.get("beats") or []: if isinstance(b, dict) and beat_title(b).lower() in done_titles: removed.append(b) else: kept.append(b) arc["beats"] = kept return arc, removed def pop_next_beats(arc: dict, max_beats: int = 1) -> tuple[dict, list[dict]]: beats = arc.get("beats") or [] if not isinstance(beats, list) or not beats: return arc, [] n = min(max_beats, len(beats)) matched = [b for b in beats[:n] if isinstance(b, dict)] arc["beats"] = beats[n:] return arc, matched async def process_arc_beats( arc: dict, quests: list | None, user_text: str, *, recent_context: str = "", last_dice_outcome: str | None = None, needs_check: bool = False, user_turn: int = 0, allow_stuck_recovery: bool = True, reconcile_closed_count: int = 0, lang: str = "ru", ) -> tuple[dict, list[dict], list[dict], str, dict]: """ Prune completed beats, then fire by dice outcome, LLM match, keywords, or stuck recovery. Returns (arc, fired_beats, pruned_beats, mode, extras). mode: '' | 'after_dice' | 'llm' | 'trigger' | 'stuck_recovery' | 'pruned' """ extras: dict = {"cooldown_skipped": False} if not arc: return arc, [], [], "", extras arc, pruned = prune_beats_for_done_quests(arc, quests) beats_pending = arc.get("beats") or [] if not can_fire_beat(arc, user_turn): extras["cooldown_skipped"] = True if pruned: return arc, [], pruned, "pruned", extras return arc, [], [], "", extras dice_trig = dice_outcome_to_beat_trigger(last_dice_outcome) if needs_check and dice_trig and beats_pending: arc, fired = pop_matching_beats(arc, dice_trig, max_beats=1) if fired: record_beat_fired(arc, fired[0], user_turn) logger.info( "process_arc_beats: after_dice %s -> %s", last_dice_outcome, fired[0].get("id"), ) return arc, fired, pruned, "after_dice", extras if beats_pending: beat_id = await classify_plot_beat( user_text, beats_pending, recent_context, last_dice_outcome, lang=lang, ) if beat_id: arc, fired = pop_beat_by_id(arc, beat_id) if fired: record_beat_fired(arc, fired[0], user_turn) return arc, fired, pruned, "llm", extras trig = should_advance_arc_keywords(user_text) if trig: arc, fired = pop_matching_beats(arc, trig, max_beats=1) if fired: record_beat_fired(arc, fired[0], user_turn) return arc, fired, pruned, "trigger", extras stuck_ok = ( allow_stuck_recovery and reconcile_closed_count == 0 and arc.get("beats") and count_active_quests(quests) == 0 and can_fire_beat(arc, user_turn) ) if stuck_ok: arc, fired = pop_next_beats(arc, 1) if fired: record_beat_fired(arc, fired[0], user_turn) return arc, fired, pruned, "stuck_recovery", extras if pruned: return arc, [], pruned, "pruned", extras return arc, [], [], "", extras PHASE_ORDER = ["opening", "hook", "complication", "reveal", "climax", "aftermath"] def advance_phase(arc: dict) -> bool: """Advance arc to next phase if beats are exhausted. Returns True if phase changed.""" current = arc.get("phase", "opening") if arc.get("beats"): return False try: idx = PHASE_ORDER.index(current) except ValueError: return False if idx + 1 >= len(PHASE_ORDER): return False arc["phase"] = PHASE_ORDER[idx + 1] return True BEATS_APPEND_SYSTEM = """You are a narrative designer for an RPG chat. The plot arc has NO remaining scripted beats. Generate 2-3 NEW beats to continue play. Return ONLY valid JSON (no markdown): { "beats": [ {"id":"b_new_1","title":"short quest title","trigger":"event_driven:rest|event_driven:travel|event_driven:help_request|event_driven:after_fail|event_driven:after_success", "injection":"1-3 sentences in-world", "choices":[{"id":"a","label":"..."},{"id":"b","label":"..."}]} ], "next_beat_hint": "what to push next", "phase": "hook|complication|reveal|climax|aftermath" } Match the current scene and completed quests. Do not restart finished storylines.""" async def replenish_arc_beats( arc: dict, persona_name: str, recent_context: str, quests: list, genre: str = "adventure", *, lang: str = "ru", ) -> dict: """Append new beats when arc.beats is empty so plot/quest engine can continue.""" if arc.get("beats"): return arc quest_lines = "\n".join( f" [{q.get('status')}] {q.get('title')}" for q in (quests or []) ) or " (none)" from services.rpg_locale import locale_instruction, locale_label user = ( f"Character: {persona_name}\n" f"Genre: {format_genres(genre)}\n" f"Session language: {locale_label(lang)}\n" f"Current arc title: {arc.get('title', '')}\n" f"Phase: {arc.get('phase', 'aftermath')}\n" f"Boundaries: {json.dumps(arc.get('boundaries', []), ensure_ascii=False)}\n" f"Quests:\n{quest_lines}\n\n" f"Recent chat:\n{recent_context[-4000:]}\n" ) messages = [ {"role": "system", "content": BEATS_APPEND_SYSTEM + "\n" + locale_instruction(lang)}, {"role": "user", "content": user}, ] try: raw = await ( send_message_with_model(messages, PLOT_MODEL) if PLOT_MODEL else send_message(messages) ) except LLMError as e: logger.warning("replenish_arc_beats failed: %s", e) return arc except Exception as e: logger.warning("replenish_arc_beats unexpected: %s", e) return arc cleaned = raw.strip() if cleaned.startswith("```"): cleaned = cleaned.split("\n", 1)[1] if "\n" in cleaned else cleaned if cleaned.endswith("```"): cleaned = cleaned.rsplit("```", 1)[0] cleaned = cleaned.strip() try: data = json.loads(cleaned) except Exception: logger.warning("replenish_arc_beats JSON parse failed. Raw=%.400s", raw) return arc new_beats = data.get("beats") if isinstance(data, dict) else [] if isinstance(new_beats, list) and new_beats: arc["beats"] = new_beats logger.info("replenish_arc_beats: added %d beats", len(new_beats)) if isinstance(data, dict) and data.get("next_beat_hint"): arc["next_beat_hint"] = data["next_beat_hint"] if isinstance(data, dict) and data.get("phase"): arc["phase"] = data["phase"] return arc async def reconcile_plot_arc( session_id: str, *, replenish_if_empty: bool = True, recent_context: str = "", persona_name: str = "Character", genre: str = "adventure", ) -> tuple[dict, bool]: """Sync linear story arc and single active quest. replenish_if_empty ignored (legacy).""" from services.rpg_story import reconcile_story_arc return await reconcile_story_arc( session_id, persona_name=persona_name, genre=genre, ) def pop_matching_beats(arc: dict, trigger: str, max_beats: int = 1) -> tuple[dict, list[dict]]: beats = arc.get("beats", []) if not isinstance(beats, list): return arc, [] matched, remaining = [], [] for b in beats: if len(matched) < max_beats and isinstance(b, dict) and b.get("trigger") == trigger: matched.append(b) else: remaining.append(b) arc["beats"] = remaining return arc, matched def normalize_choice( raw: dict, *, source: str = "narrator", beat: dict | None = None, ) -> dict | None: """Normalize a choice dict for storage/UI. Adds source and optional beat metadata.""" if not isinstance(raw, dict): return None label = (raw.get("label") or "").strip() if not label: return None cid = (raw.get("id") or label[:1].lower() or "a").strip() out = {"id": cid, "label": label, "source": source} if beat and source == "plot_beat": if beat.get("id"): out["beat_id"] = beat["id"] title = (beat.get("title") or "").strip() if title: out["beat_title"] = title injection = (beat.get("injection") or "").strip() if injection: out["beat_injection"] = injection return out def choices_from_beat(beat: dict) -> list[dict]: return choices_from_step(beat) def choices_from_step(step: dict) -> list[dict]: if not isinstance(step, dict): return [] out = [] for item in (step.get("choices") or []): c = normalize_choice(item, source="plot_step", beat=step) if c: if step.get("id"): c["step_id"] = step["id"] c["beat_id"] = step["id"] title = (step.get("title") or "").strip() if title: c["beat_title"] = title c["step_title"] = title inj = (step.get("injection") or "").strip() if inj: c["beat_injection"] = inj out.append(c) return out def choices_from_narrator(raw_choices: list) -> list[dict]: if not isinstance(raw_choices, list): return [] return [ c for c in (normalize_choice(item, source="narrator") for item in raw_choices) if c ]