Files
ChatAIBot/services/rpg_story.py
T
2026-06-05 14:57:15 +03:00

568 lines
19 KiB
Python

"""Linear story arc: global plot, steps, one active quest."""
import json
import logging
logger = logging.getLogger(__name__)
def migrate_beats_to_steps(arc: dict) -> dict:
"""One-time: collapse legacy beats[] into steps[]."""
if not isinstance(arc, dict):
return {}
if arc.get("steps"):
return arc
beats = arc.get("beats") or []
if not isinstance(beats, list) or not beats:
return arc
steps = []
for i, b in enumerate(beats):
if not isinstance(b, dict):
continue
title = (b.get("title") or b.get("injection") or f"Step {i + 1}").strip()[:120]
steps.append({
"id": b.get("id") or f"s_m{i + 1}",
"title": title,
"goal": title,
"completion_criteria": f"Player and character naturally complete: {title}",
"character_guidance": "Stay in character; move toward the goal without teleporting.",
"injection": (b.get("injection") or "").strip(),
"choices": b.get("choices") or [],
})
arc = dict(arc)
arc["steps"] = steps
arc.pop("beats", None)
arc.setdefault("current_step_index", 0)
arc.setdefault("status", "active")
arc.setdefault("global_story", arc.get("next_beat_hint") or arc.get("title") or "")
arc.setdefault("ending", "")
arc.setdefault("reward", "")
logger.info("migrate_beats_to_steps: %d steps", len(steps))
return arc
def normalize_story_arc(raw: dict, genre: str = "adventure") -> dict:
"""Ensure required fields on a story arc from LLM or migration."""
from services.rpg_plot import format_genres
if not isinstance(raw, dict):
return {}
arc = migrate_beats_to_steps(raw)
arc.setdefault("title", "Story arc")
arc.setdefault("genre_blend", format_genres(genre))
arc.setdefault("global_story", "")
arc.setdefault("ending", "")
arc.setdefault("reward", "")
arc.setdefault("boundaries", [])
arc.setdefault("current_step_index", 0)
arc.setdefault("status", "active")
if not isinstance(arc.get("steps"), list):
arc["steps"] = []
arc["steps"] = [
normalize_step(s, i) for i, s in enumerate(arc.get("steps") or []) if isinstance(s, dict)
]
meta = arc.get("meta")
if not isinstance(meta, dict):
arc["meta"] = {"arc_number": 1, "previous_arc_summary": ""}
else:
meta.setdefault("arc_number", 1)
meta.setdefault("previous_arc_summary", "")
try:
arc["current_step_index"] = max(0, int(arc.get("current_step_index") or 0))
except (TypeError, ValueError):
arc["current_step_index"] = 0
return arc
def normalize_step(step: dict, index: int = 0) -> dict:
"""Repair legacy beat-shaped steps (status_quo / choices[].injection)."""
s = dict(step)
title = (s.get("title") or s.get("goal") or "").strip()
if not title:
sq = (s.get("status_quo") or "").strip()
if sq:
title = sq.split(".")[0].strip()[:120] or sq[:120]
else:
title = f"Шаг {index + 1}"
s["id"] = (s.get("id") or f"s{index + 1}").strip()
s["title"] = title[:120]
s["goal"] = (s.get("goal") or title).strip()[:200]
s.setdefault(
"completion_criteria",
f"Сцена естественно завершает эпизод: {title}",
)
s.setdefault(
"character_guidance",
"Оставайся в роли; веди сцену к цели шага без телепортов.",
)
inj = resolve_step_injection(s)
if inj:
s["injection"] = inj
return s
def resolve_step_injection(step: dict | None) -> str:
if not isinstance(step, dict):
return ""
inj = (step.get("injection") or "").strip()
if inj:
return inj
for item in step.get("choices") or []:
if not isinstance(item, dict):
continue
c_inj = (item.get("injection") or "").strip()
if c_inj:
return c_inj
return (step.get("status_quo") or "").strip()
def format_new_arc_opening(arc: dict | None, step: dict | None, *, lang: str = "ru") -> str:
arc = arc or {}
title = (arc.get("title") or "Новая арка").strip()
inj = resolve_step_injection(step)
if not inj:
inj = (arc.get("global_story") or "").strip()[:400]
if lang == "ru":
header = f"📖 Новая арка: «{title}»"
else:
header = f"📖 New arc: «{title}»"
return f"{header}\n\n{inj}".strip() if inj else header
def get_current_step(arc: dict | None) -> dict | None:
arc = arc or {}
steps = arc.get("steps") or []
if not isinstance(steps, list) or not steps:
return None
idx = int(arc.get("current_step_index") or 0)
if idx < 0 or idx >= len(steps):
return None
step = steps[idx]
return step if isinstance(step, dict) else None
def step_progress(arc: dict | None) -> tuple[int, int]:
arc = arc or {}
steps = arc.get("steps") or []
total = len(steps) if isinstance(steps, list) else 0
idx = int(arc.get("current_step_index") or 0)
if total == 0:
return 0, 0
return min(idx + 1, total), total
def is_arc_completed(arc: dict | None) -> bool:
arc = arc or {}
if arc.get("status") == "completed":
return True
steps = arc.get("steps") or []
idx = int(arc.get("current_step_index") or 0)
return bool(steps) and idx >= len(steps)
def should_show_step_injection(arc: dict) -> bool:
if is_arc_completed(arc):
return False
meta = arc.get("meta") if isinstance(arc.get("meta"), dict) else {}
shown = meta.get("injection_shown_for_step")
idx = int(arc.get("current_step_index") or 0)
return shown != idx
def mark_injection_shown(arc: dict) -> None:
meta = arc.get("meta")
if not isinstance(meta, dict):
meta = {}
arc["meta"] = meta
meta["injection_shown_for_step"] = int(arc.get("current_step_index") or 0)
def format_step_guidance_for_character(
step: dict, arc: dict | None = None, *, lang: str = "ru"
) -> str:
if not step:
return ""
goal = (step.get("goal") or step.get("title") or "").strip()
guidance = (step.get("character_guidance") or "").strip()
title = (step.get("title") or "").strip()
cur, total = step_progress(arc or {})
global_story = ((arc or {}).get("global_story") or "").strip()
ending = ((arc or {}).get("ending") or "").strip()
if lang == "ru":
lines = [
"--- Текущий шаг сюжета (ОБЯЗАТЕЛЬНОЕ направление) ---",
f"Шаг {cur}/{total}: {title}" if total else f"Шаг: {title}",
f"Цель эпизода: {goal}",
]
if guidance:
lines.append(f"Поведение персонажа: {guidance}")
if global_story:
lines.append(f"Глобальный конвой: {global_story[:400]}")
if ending:
lines.append(f"Финал арки (не раскрывать досрочно): {ending[:300]}")
lines.append(
"Веди сцену к цели шага естественно, в роли, без телепортов и смены локации без игрока."
)
lines.append("---")
else:
lines = [
"--- Current story step (MANDATORY direction) ---",
f"Step {cur}/{total}: {title}" if total else f"Step: {title}",
f"Episode goal: {goal}",
]
if guidance:
lines.append(f"Character behavior: {guidance}")
if global_story:
lines.append(f"Global arc: {global_story[:400]}")
if ending:
lines.append(f"Arc ending (do not spoil early): {ending[:300]}")
lines.append(
"Move the scene toward the step goal naturally, in character, no teleporting."
)
lines.append("---")
return "\n\n" + "\n".join(lines) + "\n"
def format_step_hint_for_character(injection: str, *, lang: str = "ru") -> str:
inj = (injection or "").strip()
if not inj:
return ""
if lang == "ru":
header = "--- Сюжетная подсказка (не цитируй дословно) ---"
footer = (
"Продолжи текущую сцену естественно по-русски; не цитируй подсказку дословно; "
"не меняй локацию без согласия игрока."
)
else:
header = "--- Plot hint (do not quote verbatim) ---"
footer = (
"Continue naturally in English; do not quote verbatim; "
"do not change location without player consent."
)
return f"\n\n{header}\n{inj}\n{footer}\n---"
def format_step_for_narrator(
arc: dict | None,
quests: list | None = None,
status_quo: str = "",
) -> str:
arc = normalize_story_arc(arc or {}) if arc else {}
parts: list[str] = []
parts.append(f"Story arc: {arc.get('title', '')} [{arc.get('status', 'active')}]")
if arc.get("global_story"):
parts.append(f"Global story: {arc['global_story'][:500]}")
if arc.get("ending"):
parts.append(f"Planned ending: {arc['ending'][:300]}")
if arc.get("reward"):
parts.append(f"Reward hook: {arc['reward'][:200]}")
cur, total = step_progress(arc)
step = get_current_step(arc)
if is_arc_completed(arc):
parts.append(
"IMPORTANT: Arc COMPLETED. Offer 2-4 choices including starting a NEW story arc "
"(e.g. label about a new adventure/chapter) while keeping character continuity."
)
elif step:
parts.append(f"Current step: {cur}/{total} — «{step.get('title', '')}»")
parts.append(f"Step goal: {step.get('goal', '')}")
parts.append(f"Completion criteria: {step.get('completion_criteria', '')}")
parts.append(
"Set step_complete:true ONLY when completion_criteria are clearly met in recent chat. "
"Do NOT use quest_updates for step progression — the engine handles quests."
)
else:
parts.append("No active step — arc may need rollover.")
if quests:
parts.append("Quest log:")
for q in quests:
parts.append(f" [{q.get('status', 'active')}] {q.get('title', '')}")
sq = (status_quo or "").strip()
if sq:
parts.append(f"Status quo: {sq[:400]}")
return "\n".join(parts)
async def sync_quest_to_current_step(session_id: str, arc: dict) -> int:
from services.memory import get_quests, upsert_quest
arc = normalize_story_arc(arc)
quests = await get_quests(session_id)
step = get_current_step(arc)
closed = 0
if is_arc_completed(arc) or not step:
for q in quests:
if q.get("status") == "active":
await upsert_quest(session_id, q["title"], "done")
closed += 1
return closed
target_title = (step.get("title") or "Current step").strip()[:120]
for q in quests:
if q.get("status") == "active" and (q.get("title") or "").strip() != target_title:
await upsert_quest(session_id, q["title"], "done")
closed += 1
existing_titles = {(q.get("title") or "").strip() for q in quests}
if target_title not in existing_titles:
await upsert_quest(session_id, target_title, "active")
else:
for q in quests:
if (q.get("title") or "").strip() == target_title and q.get("status") != "active":
await upsert_quest(session_id, target_title, "active")
return closed
async def apply_step_advance(session_id: str, arc: dict) -> dict:
from services.memory import update_session_plot_arc, upsert_quest
arc = normalize_story_arc(arc)
steps = arc.get("steps") or []
idx = int(arc.get("current_step_index") or 0)
result = {
"advanced": False,
"arc_completed": False,
"new_step": None,
"injection": "",
"step_index": idx,
}
if is_arc_completed(arc) or idx >= len(steps):
arc["status"] = "completed"
result["arc_completed"] = True
await sync_quest_to_current_step(session_id, arc)
await update_session_plot_arc(session_id, json.dumps(arc, ensure_ascii=False))
return result
old_step = steps[idx] if idx < len(steps) else None
if old_step and isinstance(old_step, dict):
title = (old_step.get("title") or "").strip()
if title:
await upsert_quest(session_id, title, "done")
idx += 1
arc["current_step_index"] = idx
if idx >= len(steps):
arc["status"] = "completed"
result["arc_completed"] = True
await sync_quest_to_current_step(session_id, arc)
else:
new_step = steps[idx]
result["advanced"] = True
result["new_step"] = new_step
result["step_index"] = idx
if isinstance(new_step, dict):
result["injection"] = (new_step.get("injection") or "").strip()
await sync_quest_to_current_step(session_id, arc)
meta = arc.get("meta")
if not isinstance(meta, dict):
meta = {}
arc["meta"] = meta
meta.pop("injection_shown_for_step", None)
await update_session_plot_arc(session_id, json.dumps(arc, ensure_ascii=False))
logger.info("apply_step_advance: idx=%s completed=%s", idx, result["arc_completed"])
return result
async def reconcile_story_arc(
session_id: str,
*,
persona_name: str = "Character",
genre: str = "adventure",
) -> tuple[dict, bool]:
from services.memory import get_session, update_session_plot_arc
session = await get_session(session_id)
if not session or not session.get("rpg_enabled"):
return {}, False
try:
arc = json.loads(session.get("plot_arc_json") or "{}")
except (json.JSONDecodeError, TypeError):
arc = {}
if not isinstance(arc, dict):
arc = {}
before = json.dumps(arc, sort_keys=True)
arc = normalize_story_arc(arc, genre=session.get("genre") or genre)
changed = before != json.dumps(arc, sort_keys=True)
closed = await sync_quest_to_current_step(session_id, arc)
if closed:
changed = True
if changed:
await update_session_plot_arc(session_id, json.dumps(arc, ensure_ascii=False))
return arc, changed
async def apply_story_post(
session_id: str,
post: dict,
arc: dict,
rpg_settings: dict,
) -> dict:
from services.memory import get_session
arc = normalize_story_arc(arc)
out = {
"step_advanced": False,
"arc_completed": False,
"step_injection": "",
"new_step_title": "",
"arc": arc,
}
if not rpg_settings.get("quests", True) or is_arc_completed(arc):
return out
note = (post.get("step_completion_note") or "").strip()
step_complete = bool(post.get("step_complete"))
# LLM sometimes provides a completion note but misses the boolean.
# Treat a non-empty note as a conservative signal to advance.
if not step_complete and note:
step_complete = True
logger.info("step_complete fallback via note: %s", note[:200])
if not step_complete:
return out
if note:
logger.info("step_complete: %s", note[:200])
advance = await apply_step_advance(session_id, arc)
out["step_advanced"] = advance.get("advanced", False)
out["arc_completed"] = advance.get("arc_completed", False)
out["step_injection"] = advance.get("injection") or ""
new_step = advance.get("new_step")
if isinstance(new_step, dict):
out["new_step_title"] = (new_step.get("title") or "").strip()
session = await get_session(session_id) or {}
try:
out["arc"] = json.loads(session.get("plot_arc_json") or "{}")
except (json.JSONDecodeError, TypeError):
pass
return out
NEW_ARC_CHOICE_MARKERS = (
"новая арка",
"новую арку",
"new arc",
"new chapter",
"следующая арка",
"начать новую",
# LLM/UI sometimes label next-arc choices as "new quest" instead of "new arc".
# The router still keys off this function, so we recognize both.
"новый квест",
"новую квест",
"начать новый квест",
"new quest",
"start new quest",
)
def is_new_arc_request(message: str) -> bool:
t = (message or "").lower()
return any(m in t for m in NEW_ARC_CHOICE_MARKERS)
def normalize_new_arc_first(value: str | None) -> str | None:
v = (value or "").strip().lower()
return v if v in ("user", "character") else None
def new_arc_roll_choice(*, lang: str = "ru") -> dict:
label = "Начать новую арку" if lang == "ru" else "Start new arc"
return {
"id": "new_arc",
"type": "new_arc_roll",
"label": label,
"source": "story_engine",
}
def filter_new_arc_noise_choices(choices: list) -> list:
"""Drop LLM 'start new quest' duplicates when we show the structured roll button."""
out = []
for c in choices or []:
if not isinstance(c, dict):
continue
if c.get("type") == "new_arc_roll":
out.append(c)
continue
if is_new_arc_request(c.get("label", "")):
continue
out.append(c)
return out
def append_new_arc_roll_choice(choices: list, *, lang: str = "ru") -> list:
choices = filter_new_arc_noise_choices(choices)
if any(c.get("type") == "new_arc_roll" for c in choices):
return choices
return [*choices, new_arc_roll_choice(lang=lang)]
async def roll_next_arc(
session_id: str,
persona: dict,
greeting_or_context: str,
genre: str,
*,
lang: str = "ru",
recent_context: str = "",
facts_block: str = "",
) -> dict:
from services.memory import get_session, update_session_plot_arc
from services.rpg_plot import generate_next_arc
session = await get_session(session_id) or {}
try:
old_arc = json.loads(session.get("plot_arc_json") or "{}")
except (json.JSONDecodeError, TypeError):
old_arc = {}
summary = (
f"Title: {old_arc.get('title', '')}. "
f"Story: {(old_arc.get('global_story') or '')[:500]}. "
f"Ending: {(old_arc.get('ending') or '')[:300]}. "
f"Reward: {(old_arc.get('reward') or '')[:200]}."
)
arc_num = int((old_arc.get("meta") or {}).get("arc_number") or 1) + 1
ctx_parts = [recent_context.strip(), greeting_or_context.strip()]
combined_context = "\n".join(p for p in ctx_parts if p)
new_arc = await generate_next_arc(
persona.get("name", "Character"),
persona.get("description", ""),
persona.get("scenario", ""),
combined_context,
previous_arc_summary=summary,
facts_block=facts_block,
genre=genre,
lang=lang,
)
if not new_arc:
return old_arc
new_arc = normalize_story_arc(new_arc, genre=genre)
meta = new_arc.get("meta") or {}
meta["arc_number"] = arc_num
meta["previous_arc_summary"] = summary[:800]
new_arc["meta"] = meta
new_arc["current_step_index"] = 0
new_arc["status"] = "active"
await update_session_plot_arc(session_id, json.dumps(new_arc, ensure_ascii=False))
await sync_quest_to_current_step(session_id, new_arc)
logger.info("roll_next_arc: arc #%s «%s»", arc_num, new_arc.get("title"))
return new_arc