"""Linear story arc: global plot, steps, one active quest.""" import json import logging logger = logging.getLogger(__name__) def migrate_beats_to_steps(arc: dict) -> dict: """One-time: collapse legacy beats[] into steps[].""" if not isinstance(arc, dict): return {} if arc.get("steps"): return arc beats = arc.get("beats") or [] if not isinstance(beats, list) or not beats: return arc steps = [] for i, b in enumerate(beats): if not isinstance(b, dict): continue title = (b.get("title") or b.get("injection") or f"Step {i + 1}").strip()[:120] steps.append({ "id": b.get("id") or f"s_m{i + 1}", "title": title, "goal": title, "completion_criteria": f"Player and character naturally complete: {title}", "character_guidance": "Stay in character; move toward the goal without teleporting.", "injection": (b.get("injection") or "").strip(), "choices": b.get("choices") or [], }) arc = dict(arc) arc["steps"] = steps arc.pop("beats", None) arc.setdefault("current_step_index", 0) arc.setdefault("status", "active") arc.setdefault("global_story", arc.get("next_beat_hint") or arc.get("title") or "") arc.setdefault("ending", "") arc.setdefault("reward", "") logger.info("migrate_beats_to_steps: %d steps", len(steps)) return arc def normalize_story_arc(raw: dict, genre: str = "adventure") -> dict: """Ensure required fields on a story arc from LLM or migration.""" from services.rpg_plot import format_genres if not isinstance(raw, dict): return {} arc = migrate_beats_to_steps(raw) arc.setdefault("title", "Story arc") arc.setdefault("genre_blend", format_genres(genre)) arc.setdefault("global_story", "") arc.setdefault("ending", "") arc.setdefault("reward", "") arc.setdefault("boundaries", []) arc.setdefault("current_step_index", 0) arc.setdefault("status", "active") if not isinstance(arc.get("steps"), list): arc["steps"] = [] arc["steps"] = [ normalize_step(s, i) for i, s in enumerate(arc.get("steps") or []) if isinstance(s, dict) ] meta = arc.get("meta") if not isinstance(meta, dict): arc["meta"] = {"arc_number": 1, "previous_arc_summary": ""} else: meta.setdefault("arc_number", 1) meta.setdefault("previous_arc_summary", "") try: arc["current_step_index"] = max(0, int(arc.get("current_step_index") or 0)) except (TypeError, ValueError): arc["current_step_index"] = 0 return arc def normalize_step(step: dict, index: int = 0) -> dict: """Repair legacy beat-shaped steps (status_quo / choices[].injection).""" s = dict(step) title = (s.get("title") or s.get("goal") or "").strip() if not title: sq = (s.get("status_quo") or "").strip() if sq: title = sq.split(".")[0].strip()[:120] or sq[:120] else: title = f"Шаг {index + 1}" s["id"] = (s.get("id") or f"s{index + 1}").strip() s["title"] = title[:120] s["goal"] = (s.get("goal") or title).strip()[:200] s.setdefault( "completion_criteria", f"Сцена естественно завершает эпизод: {title}", ) s.setdefault( "character_guidance", "Оставайся в роли; веди сцену к цели шага без телепортов.", ) inj = resolve_step_injection(s) if inj: s["injection"] = inj return s def resolve_step_injection(step: dict | None) -> str: if not isinstance(step, dict): return "" inj = (step.get("injection") or "").strip() if inj: return inj for item in step.get("choices") or []: if not isinstance(item, dict): continue c_inj = (item.get("injection") or "").strip() if c_inj: return c_inj return (step.get("status_quo") or "").strip() def format_new_arc_opening(arc: dict | None, step: dict | None, *, lang: str = "ru") -> str: arc = arc or {} title = (arc.get("title") or "Новая арка").strip() inj = resolve_step_injection(step) if not inj: inj = (arc.get("global_story") or "").strip()[:400] if lang == "ru": header = f"📖 Новая арка: «{title}»" else: header = f"📖 New arc: «{title}»" return f"{header}\n\n{inj}".strip() if inj else header def get_current_step(arc: dict | None) -> dict | None: arc = arc or {} steps = arc.get("steps") or [] if not isinstance(steps, list) or not steps: return None idx = int(arc.get("current_step_index") or 0) if idx < 0 or idx >= len(steps): return None step = steps[idx] return step if isinstance(step, dict) else None def step_progress(arc: dict | None) -> tuple[int, int]: arc = arc or {} steps = arc.get("steps") or [] total = len(steps) if isinstance(steps, list) else 0 idx = int(arc.get("current_step_index") or 0) if total == 0: return 0, 0 return min(idx + 1, total), total def is_arc_completed(arc: dict | None) -> bool: arc = arc or {} if arc.get("status") == "completed": return True steps = arc.get("steps") or [] idx = int(arc.get("current_step_index") or 0) return bool(steps) and idx >= len(steps) def should_show_step_injection(arc: dict) -> bool: if is_arc_completed(arc): return False meta = arc.get("meta") if isinstance(arc.get("meta"), dict) else {} shown = meta.get("injection_shown_for_step") idx = int(arc.get("current_step_index") or 0) return shown != idx def mark_injection_shown(arc: dict) -> None: meta = arc.get("meta") if not isinstance(meta, dict): meta = {} arc["meta"] = meta meta["injection_shown_for_step"] = int(arc.get("current_step_index") or 0) def format_step_guidance_for_character( step: dict, arc: dict | None = None, *, lang: str = "ru" ) -> str: if not step: return "" goal = (step.get("goal") or step.get("title") or "").strip() guidance = (step.get("character_guidance") or "").strip() title = (step.get("title") or "").strip() cur, total = step_progress(arc or {}) global_story = ((arc or {}).get("global_story") or "").strip() ending = ((arc or {}).get("ending") or "").strip() if lang == "ru": lines = [ "--- Текущий шаг сюжета (ОБЯЗАТЕЛЬНОЕ направление) ---", f"Шаг {cur}/{total}: {title}" if total else f"Шаг: {title}", f"Цель эпизода: {goal}", ] if guidance: lines.append(f"Поведение персонажа: {guidance}") if global_story: lines.append(f"Глобальный конвой: {global_story[:400]}") if ending: lines.append(f"Финал арки (не раскрывать досрочно): {ending[:300]}") lines.append( "Веди сцену к цели шага естественно, в роли, без телепортов и смены локации без игрока." ) lines.append("---") else: lines = [ "--- Current story step (MANDATORY direction) ---", f"Step {cur}/{total}: {title}" if total else f"Step: {title}", f"Episode goal: {goal}", ] if guidance: lines.append(f"Character behavior: {guidance}") if global_story: lines.append(f"Global arc: {global_story[:400]}") if ending: lines.append(f"Arc ending (do not spoil early): {ending[:300]}") lines.append( "Move the scene toward the step goal naturally, in character, no teleporting." ) lines.append("---") return "\n\n" + "\n".join(lines) + "\n" def format_step_hint_for_character(injection: str, *, lang: str = "ru") -> str: inj = (injection or "").strip() if not inj: return "" if lang == "ru": header = "--- Сюжетная подсказка (не цитируй дословно) ---" footer = ( "Продолжи текущую сцену естественно по-русски; не цитируй подсказку дословно; " "не меняй локацию без согласия игрока." ) else: header = "--- Plot hint (do not quote verbatim) ---" footer = ( "Continue naturally in English; do not quote verbatim; " "do not change location without player consent." ) return f"\n\n{header}\n{inj}\n{footer}\n---" def format_step_for_narrator( arc: dict | None, quests: list | None = None, status_quo: str = "", ) -> str: arc = normalize_story_arc(arc or {}) if arc else {} parts: list[str] = [] parts.append(f"Story arc: {arc.get('title', '')} [{arc.get('status', 'active')}]") if arc.get("global_story"): parts.append(f"Global story: {arc['global_story'][:500]}") if arc.get("ending"): parts.append(f"Planned ending: {arc['ending'][:300]}") if arc.get("reward"): parts.append(f"Reward hook: {arc['reward'][:200]}") cur, total = step_progress(arc) step = get_current_step(arc) if is_arc_completed(arc): parts.append( "IMPORTANT: Arc COMPLETED. Offer 2-4 choices including starting a NEW story arc " "(e.g. label about a new adventure/chapter) while keeping character continuity." ) elif step: parts.append(f"Current step: {cur}/{total} — «{step.get('title', '')}»") parts.append(f"Step goal: {step.get('goal', '')}") parts.append(f"Completion criteria: {step.get('completion_criteria', '')}") parts.append( "Set step_complete:true ONLY when completion_criteria are clearly met in recent chat. " "Do NOT use quest_updates for step progression — the engine handles quests." ) else: parts.append("No active step — arc may need rollover.") if quests: parts.append("Quest log:") for q in quests: parts.append(f" [{q.get('status', 'active')}] {q.get('title', '')}") sq = (status_quo or "").strip() if sq: parts.append(f"Status quo: {sq[:400]}") return "\n".join(parts) async def sync_quest_to_current_step(session_id: str, arc: dict) -> int: from services.memory import get_quests, upsert_quest arc = normalize_story_arc(arc) quests = await get_quests(session_id) step = get_current_step(arc) closed = 0 if is_arc_completed(arc) or not step: for q in quests: if q.get("status") == "active": await upsert_quest(session_id, q["title"], "done") closed += 1 return closed target_title = (step.get("title") or "Current step").strip()[:120] for q in quests: if q.get("status") == "active" and (q.get("title") or "").strip() != target_title: await upsert_quest(session_id, q["title"], "done") closed += 1 existing_titles = {(q.get("title") or "").strip() for q in quests} if target_title not in existing_titles: await upsert_quest(session_id, target_title, "active") else: for q in quests: if (q.get("title") or "").strip() == target_title and q.get("status") != "active": await upsert_quest(session_id, target_title, "active") return closed async def apply_step_advance(session_id: str, arc: dict) -> dict: from services.memory import update_session_plot_arc, upsert_quest arc = normalize_story_arc(arc) steps = arc.get("steps") or [] idx = int(arc.get("current_step_index") or 0) result = { "advanced": False, "arc_completed": False, "new_step": None, "injection": "", "step_index": idx, } if is_arc_completed(arc) or idx >= len(steps): arc["status"] = "completed" result["arc_completed"] = True await sync_quest_to_current_step(session_id, arc) await update_session_plot_arc(session_id, json.dumps(arc, ensure_ascii=False)) return result old_step = steps[idx] if idx < len(steps) else None if old_step and isinstance(old_step, dict): title = (old_step.get("title") or "").strip() if title: await upsert_quest(session_id, title, "done") idx += 1 arc["current_step_index"] = idx if idx >= len(steps): arc["status"] = "completed" result["arc_completed"] = True await sync_quest_to_current_step(session_id, arc) else: new_step = steps[idx] result["advanced"] = True result["new_step"] = new_step result["step_index"] = idx if isinstance(new_step, dict): result["injection"] = (new_step.get("injection") or "").strip() await sync_quest_to_current_step(session_id, arc) meta = arc.get("meta") if not isinstance(meta, dict): meta = {} arc["meta"] = meta meta.pop("injection_shown_for_step", None) await update_session_plot_arc(session_id, json.dumps(arc, ensure_ascii=False)) logger.info("apply_step_advance: idx=%s completed=%s", idx, result["arc_completed"]) return result async def reconcile_story_arc( session_id: str, *, persona_name: str = "Character", genre: str = "adventure", ) -> tuple[dict, bool]: from services.memory import get_session, update_session_plot_arc session = await get_session(session_id) if not session or not session.get("rpg_enabled"): return {}, False try: arc = json.loads(session.get("plot_arc_json") or "{}") except (json.JSONDecodeError, TypeError): arc = {} if not isinstance(arc, dict): arc = {} before = json.dumps(arc, sort_keys=True) arc = normalize_story_arc(arc, genre=session.get("genre") or genre) changed = before != json.dumps(arc, sort_keys=True) closed = await sync_quest_to_current_step(session_id, arc) if closed: changed = True if changed: await update_session_plot_arc(session_id, json.dumps(arc, ensure_ascii=False)) return arc, changed async def apply_story_post( session_id: str, post: dict, arc: dict, rpg_settings: dict, ) -> dict: from services.memory import get_session arc = normalize_story_arc(arc) out = { "step_advanced": False, "arc_completed": False, "step_injection": "", "new_step_title": "", "arc": arc, } if not rpg_settings.get("quests", True) or is_arc_completed(arc): return out note = (post.get("step_completion_note") or "").strip() step_complete = bool(post.get("step_complete")) # LLM sometimes provides a completion note but misses the boolean. # Treat a non-empty note as a conservative signal to advance. if not step_complete and note: step_complete = True logger.info("step_complete fallback via note: %s", note[:200]) if not step_complete: return out if note: logger.info("step_complete: %s", note[:200]) advance = await apply_step_advance(session_id, arc) out["step_advanced"] = advance.get("advanced", False) out["arc_completed"] = advance.get("arc_completed", False) out["step_injection"] = advance.get("injection") or "" new_step = advance.get("new_step") if isinstance(new_step, dict): out["new_step_title"] = (new_step.get("title") or "").strip() session = await get_session(session_id) or {} try: out["arc"] = json.loads(session.get("plot_arc_json") or "{}") except (json.JSONDecodeError, TypeError): pass return out NEW_ARC_CHOICE_MARKERS = ( "новая арка", "новую арку", "new arc", "new chapter", "следующая арка", "начать новую", # LLM/UI sometimes label next-arc choices as "new quest" instead of "new arc". # The router still keys off this function, so we recognize both. "новый квест", "новую квест", "начать новый квест", "new quest", "start new quest", ) def is_new_arc_request(message: str) -> bool: t = (message or "").lower() return any(m in t for m in NEW_ARC_CHOICE_MARKERS) def normalize_new_arc_first(value: str | None) -> str | None: v = (value or "").strip().lower() return v if v in ("user", "character") else None def new_arc_roll_choice(*, lang: str = "ru") -> dict: label = "Начать новую арку" if lang == "ru" else "Start new arc" return { "id": "new_arc", "type": "new_arc_roll", "label": label, "source": "story_engine", } def filter_new_arc_noise_choices(choices: list) -> list: """Drop LLM 'start new quest' duplicates when we show the structured roll button.""" out = [] for c in choices or []: if not isinstance(c, dict): continue if c.get("type") == "new_arc_roll": out.append(c) continue if is_new_arc_request(c.get("label", "")): continue out.append(c) return out def append_new_arc_roll_choice(choices: list, *, lang: str = "ru") -> list: choices = filter_new_arc_noise_choices(choices) if any(c.get("type") == "new_arc_roll" for c in choices): return choices return [*choices, new_arc_roll_choice(lang=lang)] async def roll_next_arc( session_id: str, persona: dict, greeting_or_context: str, genre: str, *, lang: str = "ru", recent_context: str = "", facts_block: str = "", ) -> dict: from services.memory import get_session, update_session_plot_arc from services.rpg_plot import generate_next_arc session = await get_session(session_id) or {} try: old_arc = json.loads(session.get("plot_arc_json") or "{}") except (json.JSONDecodeError, TypeError): old_arc = {} summary = ( f"Title: {old_arc.get('title', '')}. " f"Story: {(old_arc.get('global_story') or '')[:500]}. " f"Ending: {(old_arc.get('ending') or '')[:300]}. " f"Reward: {(old_arc.get('reward') or '')[:200]}." ) arc_num = int((old_arc.get("meta") or {}).get("arc_number") or 1) + 1 ctx_parts = [recent_context.strip(), greeting_or_context.strip()] combined_context = "\n".join(p for p in ctx_parts if p) new_arc = await generate_next_arc( persona.get("name", "Character"), persona.get("description", ""), persona.get("scenario", ""), combined_context, previous_arc_summary=summary, facts_block=facts_block, genre=genre, lang=lang, ) if not new_arc: return old_arc new_arc = normalize_story_arc(new_arc, genre=genre) meta = new_arc.get("meta") or {} meta["arc_number"] = arc_num meta["previous_arc_summary"] = summary[:800] new_arc["meta"] = meta new_arc["current_step_index"] = 0 new_arc["status"] = "active" await update_session_plot_arc(session_id, json.dumps(new_arc, ensure_ascii=False)) await sync_quest_to_current_step(session_id, new_arc) logger.info("roll_next_arc: arc #%s «%s»", arc_num, new_arc.get("title")) return new_arc