Files
2026-06-05 14:57:15 +03:00

1005 lines
37 KiB
Python

import json
import aiosqlite
from database.db import DB_PATH
async def get_or_create_session(session_id: str, persona_id: str | None = None) -> dict:
"""Existing sessions keep their persona_id; persona_id applies only on INSERT."""
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT * FROM sessions WHERE session_id = ?", (session_id,)
) as cursor:
row = await cursor.fetchone()
if row:
return dict(row)
pid = (persona_id or "default").strip() or "default"
await db.execute(
"INSERT INTO sessions (session_id, persona_id) VALUES (?, ?)",
(session_id, pid),
)
await db.commit()
async with db.execute(
"SELECT * FROM sessions WHERE session_id = ?", (session_id,)
) as cursor:
row = await cursor.fetchone()
return dict(row)
async def get_all_sessions() -> list:
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT * FROM sessions ORDER BY updated_at DESC"
) as cursor:
rows = await cursor.fetchall()
return [dict(r) for r in rows]
async def get_session(session_id: str) -> dict | None:
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT * FROM sessions WHERE session_id = ?",
(session_id,),
) as cursor:
row = await cursor.fetchone()
return dict(row) if row else None
async def update_session_title(session_id: str, title: str):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"UPDATE sessions SET title = ?, updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
(title, session_id),
)
await db.commit()
async def update_session_persona(session_id: str, persona_id: str):
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT persona_id FROM sessions WHERE session_id = ?",
(session_id,),
) as cur:
row = await cur.fetchone()
prev = row["persona_id"] if row else None
await db.execute(
"UPDATE sessions SET persona_id = ?, updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
(persona_id, session_id),
)
if prev is not None and prev != persona_id:
await _reset_persona_bound_state(db, session_id)
await db.commit()
async def _reset_persona_bound_state(db: aiosqlite.Connection, session_id: str) -> None:
from services.rpg_state import DEFAULT_NARRATIVE_STATS
stats_default = json.dumps(DEFAULT_NARRATIVE_STATS, ensure_ascii=False)
await db.execute(
"""UPDATE sessions
SET facts_json = '[]',
global_plot = '',
status_quo = '',
plot_arc_json = '{}',
outfit_json = '[]',
affinity = 0,
scene_json = '{}',
narrative_stats_json = ?
WHERE session_id = ?""",
(stats_default, session_id),
)
await db.execute("DELETE FROM action_resolutions WHERE session_id = ?", (session_id,))
await db.execute("DELETE FROM rpg_quests WHERE session_id = ?", (session_id,))
async def upsert_static_system_message(
session_id: str, static_prompt: str, history: list | None = None
) -> bool:
"""Store only static persona prompt in messages. Returns True if written."""
hist = history if history is not None else await get_history(session_id)
async with aiosqlite.connect(DB_PATH) as db:
if not hist:
await db.execute(
"""INSERT INTO messages (session_id, role, content, image_prompt, image_path)
VALUES (?, 'system', ?, NULL, NULL)""",
(session_id, static_prompt),
)
await db.execute(
"UPDATE sessions SET updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
(session_id,),
)
await db.commit()
return True
if hist[0]["role"] == "system":
if hist[0]["content"] == static_prompt:
return False
await db.execute(
"""UPDATE messages SET content = ?
WHERE session_id = ? AND role = 'system'
AND id = (SELECT MIN(id) FROM messages WHERE session_id = ?)""",
(static_prompt, session_id, session_id),
)
await db.commit()
return True
await db.execute(
"""INSERT INTO messages (session_id, role, content, image_prompt, image_path)
VALUES (?, 'system', ?, NULL, NULL)""",
(session_id, static_prompt),
)
await db.commit()
return True
async def delete_dialog_messages(session_id: str) -> None:
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"DELETE FROM messages WHERE session_id = ? AND role IN ('user', 'assistant')",
(session_id,),
)
await db.commit()
async def rebind_session_persona(
session_id: str,
persona_id: str,
*,
clear_history: bool = False,
static_prompt: str,
first_mes: str | None = None,
) -> None:
session = await get_session(session_id)
if not session:
raise ValueError("Session not found")
await update_session_persona(session_id, persona_id)
if clear_history:
await delete_dialog_messages(session_id)
history = await get_history(session_id)
await upsert_static_system_message(session_id, static_prompt, history)
if clear_history and first_mes and first_mes.strip():
await add_message(session_id, "assistant", first_mes.strip())
async def update_session_rpg(session_id: str, rpg_enabled: bool):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"UPDATE sessions SET rpg_enabled = ?, updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
(1 if rpg_enabled else 0, session_id),
)
await db.commit()
async def update_session_facts(session_id: str, facts_json: str):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"UPDATE sessions SET facts_json = ?, updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
(facts_json, session_id),
)
await db.commit()
async def update_session_global_plot(session_id: str, global_plot: str):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"UPDATE sessions SET global_plot = ?, updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
(global_plot, session_id),
)
await db.commit()
async def update_session_status_quo(session_id: str, status_quo: str):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"UPDATE sessions SET status_quo = ?, updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
(status_quo, session_id),
)
await db.commit()
async def update_session_plot_arc(session_id: str, plot_arc_json: str):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"UPDATE sessions SET plot_arc_json = ?, updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
(plot_arc_json, session_id),
)
await db.commit()
async def add_action_resolution(
session_id: str,
intent_text: str,
roll: int,
outcome: str,
resolution_text: str,
message_id: int | None = None,
):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"""INSERT INTO action_resolutions
(session_id, message_id, intent_text, roll, outcome, resolution_text)
VALUES (?, ?, ?, ?, ?, ?)""",
(session_id, message_id, intent_text, roll, outcome, resolution_text),
)
await db.commit()
async def get_last_action_resolution(session_id: str) -> dict | None:
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"""SELECT * FROM action_resolutions
WHERE session_id = ?
ORDER BY id DESC LIMIT 1""",
(session_id,),
) as cur:
row = await cur.fetchone()
return dict(row) if row else None
async def delete_session(session_id: str):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute("DELETE FROM messages WHERE session_id = ?", (session_id,))
await db.execute("DELETE FROM rpg_quests WHERE session_id = ?", (session_id,))
await db.execute("DELETE FROM action_resolutions WHERE session_id = ?", (session_id,))
await db.execute("DELETE FROM sessions WHERE session_id = ?", (session_id,))
await db.commit()
async def get_action_resolutions_map(session_id: str) -> dict[int, dict]:
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"""SELECT message_id, intent_text, roll, outcome, resolution_text
FROM action_resolutions
WHERE session_id = ? AND message_id IS NOT NULL
ORDER BY id""",
(session_id,),
) as cur:
rows = await cur.fetchall()
out: dict[int, dict] = {}
for r in rows:
mid = r["message_id"]
if mid is not None:
out[int(mid)] = {
"intent_text": r["intent_text"],
"roll": r["roll"],
"outcome": r["outcome"],
"resolution_text": r["resolution_text"],
}
return out
def narrator_message_content(narrator: dict) -> str:
return json.dumps(
{
"roll": narrator.get("roll"),
"outcome": narrator.get("outcome"),
"text": narrator.get("text", ""),
"original_intent": narrator.get("original_intent"),
},
ensure_ascii=False,
)
def parse_narrator_message(content: str) -> dict | None:
try:
data = json.loads(content or "{}")
except (json.JSONDecodeError, TypeError):
return None
if not isinstance(data, dict) or not (data.get("text") or "").strip():
return None
return data
async def seed_quests_from_arc(session_id: str, arc: dict) -> int:
"""Create active quests for arc beats that are not already in rpg_quests."""
if not arc:
return 0
existing = {q["title"] for q in await get_quests(session_id)}
added = 0
for beat in arc.get("beats", []):
title = (beat.get("title") or beat.get("injection", "")).strip()[:120]
if title and title not in existing:
await upsert_quest(session_id, title, "active")
existing.add(title)
added += 1
return added
async def get_history(session_id: str) -> list:
resolutions = await get_action_resolutions_map(session_id)
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"""SELECT id, role, content, image_prompt, image_path,
image_prompt_alt, image_path_alt, choices_json
FROM messages WHERE session_id = ? ORDER BY id""",
(session_id,),
) as cursor:
rows = await cursor.fetchall()
result = []
for idx, r in enumerate(rows):
item = {
"id": r["id"],
"role": r["role"],
"content": r["content"],
"image_prompt": r["image_prompt"],
"image_path": r["image_path"],
"image_prompt_alt": r["image_prompt_alt"],
"image_path_alt": r["image_path_alt"],
"choices_json": r["choices_json"],
}
if r["role"] == "user" and r["id"] in resolutions:
item["action_resolution"] = resolutions[r["id"]]
result.append(item)
if r["role"] == "user" and r["id"] in resolutions:
nxt = rows[idx + 1] if idx + 1 < len(rows) else None
if not nxt or nxt["role"] != "narrator":
res = resolutions[r["id"]]
result.append(
{
"id": -int(r["id"]),
"role": "narrator",
"content": narrator_message_content(
{
"roll": res.get("roll"),
"outcome": res.get("outcome"),
"text": res.get("resolution_text", ""),
}
),
"image_prompt": None,
"image_path": None,
"image_prompt_alt": None,
"image_path_alt": None,
"choices_json": None,
}
)
return result
async def get_message(message_id: int) -> dict | None:
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT id, session_id, role, content FROM messages WHERE id = ?",
(message_id,),
) as cursor:
row = await cursor.fetchone()
return dict(row) if row else None
async def update_message_content(message_id: int, content: str):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"UPDATE messages SET content = ? WHERE id = ?",
(content, message_id),
)
await db.commit()
async def delete_messages_after(session_id: str, message_id: int):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"DELETE FROM messages WHERE session_id = ? AND id > ?",
(session_id, message_id),
)
await db.execute(
"UPDATE sessions SET updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
(session_id,),
)
await db.commit()
await restore_session_to_message(session_id, message_id)
async def delete_message(message_id: int):
msg = await get_message(message_id)
if not msg:
return
session_id = msg["session_id"]
anchor = await get_max_message_id_before(session_id, message_id)
async with aiosqlite.connect(DB_PATH) as db:
await db.execute("DELETE FROM messages WHERE id = ?", (message_id,))
await db.execute(
"UPDATE sessions SET updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
(session_id,),
)
await db.commit()
await restore_session_to_message(session_id, anchor)
async def delete_message_and_following(session_id: str, message_id: int) -> bool:
anchor = await get_max_message_id_before(session_id, message_id)
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"DELETE FROM messages WHERE session_id = ? AND id >= ?",
(session_id, message_id),
)
await db.execute(
"UPDATE sessions SET updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
(session_id,),
)
await db.commit()
await restore_session_to_message(session_id, anchor)
return True
async def update_message_choices(message_id: int, choices_json: str | None):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"UPDATE messages SET choices_json = ? WHERE id = ?",
(choices_json, message_id),
)
await db.commit()
async def clear_choices_for_session(session_id: str):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"UPDATE messages SET choices_json = NULL WHERE session_id = ?",
(session_id,),
)
await db.commit()
async def get_last_message_preview(session_id: str, max_len: int = 80) -> str:
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"""SELECT content, role FROM messages
WHERE session_id = ? AND role IN ('user', 'assistant')
ORDER BY id DESC LIMIT 1""",
(session_id,),
) as cursor:
row = await cursor.fetchone()
if not row:
return ""
prefix = "Вы: " if row["role"] == "user" else "AI: "
text = (row["content"] or "").replace("\n", " ").strip()
if len(text) > max_len:
text = text[:max_len] + ""
return prefix + text
async def get_max_message_id_before(session_id: str, message_id: int) -> int | None:
async with aiosqlite.connect(DB_PATH) as db:
async with db.execute(
"SELECT MAX(id) FROM messages WHERE session_id = ? AND id < ?",
(session_id, message_id),
) as cur:
row = await cur.fetchone()
if not row or row[0] is None:
return None
return int(row[0])
async def _collect_action_resolutions(session_id: str) -> list[dict]:
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"""SELECT message_id, intent_text, roll, outcome, resolution_text
FROM action_resolutions
WHERE session_id = ?
ORDER BY id""",
(session_id,),
) as cur:
rows = await cur.fetchall()
return [
{
"message_id": r["message_id"],
"intent_text": r["intent_text"],
"roll": r["roll"],
"outcome": r["outcome"],
"resolution_text": r["resolution_text"],
}
for r in rows
]
async def save_state_snapshot(session_id: str, message_id: int) -> None:
"""Persist RPG session state as it exists right after this message."""
session = await get_session(session_id)
if not session:
return
quests = await get_quests(session_id)
resolutions = await _collect_action_resolutions(session_id)
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"""INSERT INTO session_state_snapshots
(message_id, session_id, facts_json, global_plot, status_quo,
plot_arc_json, affinity, outfit_json, scene_json,
narrative_stats_json, quests_json, action_resolutions_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(message_id) DO UPDATE SET
session_id = excluded.session_id,
facts_json = excluded.facts_json,
global_plot = excluded.global_plot,
status_quo = excluded.status_quo,
plot_arc_json = excluded.plot_arc_json,
affinity = excluded.affinity,
outfit_json = excluded.outfit_json,
scene_json = excluded.scene_json,
narrative_stats_json = excluded.narrative_stats_json,
quests_json = excluded.quests_json,
action_resolutions_json = excluded.action_resolutions_json""",
(
message_id,
session_id,
session.get("facts_json", "[]"),
session.get("global_plot", ""),
session.get("status_quo", ""),
session.get("plot_arc_json", "{}"),
int(session.get("affinity") or 0),
session.get("outfit_json", "[]"),
session.get("scene_json", "{}"),
session.get("narrative_stats_json", '{"lust":0,"stamina":10,"tension":0}'),
json.dumps(quests, ensure_ascii=False),
json.dumps(resolutions, ensure_ascii=False),
),
)
await db.commit()
async def get_snapshot_at_or_before(session_id: str, message_id: int) -> dict | None:
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"""SELECT * FROM session_state_snapshots
WHERE session_id = ? AND message_id <= ?
ORDER BY message_id DESC LIMIT 1""",
(session_id, message_id),
) as cur:
row = await cur.fetchone()
return dict(row) if row else None
async def restore_session_from_snapshot(session_id: str, snapshot: dict) -> None:
from services.rpg_state import DEFAULT_NARRATIVE_STATS
stats_default = json.dumps(DEFAULT_NARRATIVE_STATS, ensure_ascii=False)
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"""UPDATE sessions SET
facts_json = ?,
global_plot = ?,
status_quo = ?,
plot_arc_json = ?,
affinity = ?,
outfit_json = ?,
scene_json = ?,
narrative_stats_json = ?,
updated_at = CURRENT_TIMESTAMP
WHERE session_id = ?""",
(
snapshot.get("facts_json", "[]"),
snapshot.get("global_plot", ""),
snapshot.get("status_quo", ""),
snapshot.get("plot_arc_json", "{}"),
int(snapshot.get("affinity") or 0),
snapshot.get("outfit_json", "[]"),
snapshot.get("scene_json", "{}"),
snapshot.get("narrative_stats_json") or stats_default,
session_id,
),
)
await db.execute("DELETE FROM rpg_quests WHERE session_id = ?", (session_id,))
try:
quests = json.loads(snapshot.get("quests_json") or "[]")
except (json.JSONDecodeError, TypeError):
quests = []
if isinstance(quests, list):
for q in quests:
if not isinstance(q, dict):
continue
title = (q.get("title") or "").strip()
if title:
await db.execute(
"INSERT INTO rpg_quests (session_id, title, status) VALUES (?, ?, ?)",
(session_id, title[:120], q.get("status", "active")),
)
await db.execute("DELETE FROM action_resolutions WHERE session_id = ?", (session_id,))
try:
resolutions = json.loads(snapshot.get("action_resolutions_json") or "[]")
except (json.JSONDecodeError, TypeError):
resolutions = []
if isinstance(resolutions, list):
for r in resolutions:
if not isinstance(r, dict):
continue
await db.execute(
"""INSERT INTO action_resolutions
(session_id, message_id, intent_text, roll, outcome, resolution_text)
VALUES (?, ?, ?, ?, ?, ?)""",
(
session_id,
r.get("message_id"),
r.get("intent_text", ""),
int(r.get("roll") or 0),
r.get("outcome", ""),
r.get("resolution_text", ""),
),
)
await db.commit()
async def restore_session_to_message(session_id: str, anchor_message_id: int | None) -> bool:
"""Restore RPG state to snapshot at anchor (or nearest earlier message)."""
if anchor_message_id is None:
await _reset_persona_bound_state_only(session_id)
return False
snap = await get_snapshot_at_or_before(session_id, anchor_message_id)
if not snap:
return False
await restore_session_from_snapshot(session_id, snap)
return True
async def _reset_persona_bound_state_only(session_id: str) -> None:
async with aiosqlite.connect(DB_PATH) as db:
await _reset_persona_bound_state(db, session_id)
await db.commit()
async def fork_session(source_session_id: str, until_message_id: int) -> str | None:
source = await get_session(source_session_id)
if not source:
return None
import uuid
new_id = "sess_" + uuid.uuid4().hex[:8]
snap = await get_snapshot_at_or_before(source_session_id, until_message_id)
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"""INSERT INTO sessions
(session_id, persona_id, title, rpg_enabled, facts_json, global_plot,
status_quo, plot_arc_json, genre, rpg_settings_json, affinity,
outfit_json, scene_json, narrative_stats_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
new_id,
source["persona_id"],
(source.get("title") or "Новый чат") + " (ветка)",
source.get("rpg_enabled", 0),
(snap or source).get("facts_json", "[]"),
(snap or source).get("global_plot", ""),
(snap or source).get("status_quo", ""),
(snap or source).get("plot_arc_json", "{}"),
source.get("genre", "adventure"),
source.get("rpg_settings_json", "{}"),
int((snap or source).get("affinity") or 0),
(snap or source).get("outfit_json", "[]"),
(snap or source).get("scene_json", "{}"),
(snap or source).get(
"narrative_stats_json", '{"lust":0,"stamina":10,"tension":0}'
),
),
)
async with db.execute(
"""SELECT id, role, content, image_prompt, image_path,
image_prompt_alt, image_path_alt, choices_json
FROM messages
WHERE session_id = ? AND id <= ? ORDER BY id""",
(source_session_id, until_message_id),
) as cur:
rows = await cur.fetchall()
id_map: dict[int, int] = {}
for r in rows:
cur_ins = await db.execute(
"""INSERT INTO messages
(session_id, role, content, image_prompt, image_path,
image_prompt_alt, image_path_alt, choices_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(new_id, r[1], r[2], r[3], r[4], r[5], r[6], r[7]),
)
id_map[int(r[0])] = int(cur_ins.lastrowid)
if snap:
try:
quests = json.loads(snap.get("quests_json") or "[]")
except (json.JSONDecodeError, TypeError):
quests = []
if isinstance(quests, list):
for q in quests:
if isinstance(q, dict) and (q.get("title") or "").strip():
await db.execute(
"INSERT INTO rpg_quests (session_id, title, status) VALUES (?, ?, ?)",
(new_id, q["title"][:120], q.get("status", "active")),
)
try:
resolutions = json.loads(snap.get("action_resolutions_json") or "[]")
except (json.JSONDecodeError, TypeError):
resolutions = []
if isinstance(resolutions, list):
for res in resolutions:
if not isinstance(res, dict):
continue
old_mid = res.get("message_id")
new_mid = id_map.get(int(old_mid)) if old_mid is not None else None
if new_mid is None:
continue
await db.execute(
"""INSERT INTO action_resolutions
(session_id, message_id, intent_text, roll, outcome, resolution_text)
VALUES (?, ?, ?, ?, ?, ?)""",
(
new_id,
new_mid,
res.get("intent_text", ""),
int(res.get("roll") or 0),
res.get("outcome", ""),
res.get("resolution_text", ""),
),
)
async with db.execute(
"""SELECT message_id, facts_json, global_plot, status_quo, plot_arc_json,
affinity, outfit_json, scene_json, narrative_stats_json,
quests_json, action_resolutions_json
FROM session_state_snapshots
WHERE session_id = ? AND message_id <= ?""",
(source_session_id, until_message_id),
) as snap_cur:
snap_rows = await snap_cur.fetchall()
for srow in snap_rows:
new_mid = id_map.get(int(srow[0]))
if new_mid is None:
continue
res_json = srow[10]
try:
res_list = json.loads(res_json or "[]")
except (json.JSONDecodeError, TypeError):
res_list = []
if isinstance(res_list, list):
remapped = []
for res in res_list:
if not isinstance(res, dict):
continue
old_mid = res.get("message_id")
nm = id_map.get(int(old_mid)) if old_mid is not None else None
if nm is not None:
remapped.append({**res, "message_id": nm})
res_json = json.dumps(remapped, ensure_ascii=False)
await db.execute(
"""INSERT INTO session_state_snapshots
(message_id, session_id, facts_json, global_plot, status_quo,
plot_arc_json, affinity, outfit_json, scene_json,
narrative_stats_json, quests_json, action_resolutions_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(new_mid, new_id, srow[1], srow[2], srow[3], srow[4], srow[5],
srow[6], srow[7], srow[8], srow[9], res_json),
)
await db.commit()
return new_id
async def add_message(
session_id: str,
role: str,
content: str,
image_prompt: str | None = None,
image_path: str | None = None,
) -> int:
async with aiosqlite.connect(DB_PATH) as db:
cur = await db.execute(
"""INSERT INTO messages (session_id, role, content, image_prompt, image_path)
VALUES (?, ?, ?, ?, ?)""",
(session_id, role, content, image_prompt, image_path),
)
msg_id = cur.lastrowid
await db.execute(
"UPDATE sessions SET updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
(session_id,),
)
await db.commit()
return msg_id
async def update_message_image(message_id: int, image_path: str):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"UPDATE messages SET image_path = ? WHERE id = ?",
(image_path, message_id),
)
await db.commit()
async def update_message_prompt(message_id: int, image_prompt: str):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"UPDATE messages SET image_prompt = ? WHERE id = ?",
(image_prompt, message_id),
)
await db.commit()
async def update_message_prompt_alt(message_id: int, image_prompt_alt: str):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"UPDATE messages SET image_prompt_alt = ? WHERE id = ?",
(image_prompt_alt, message_id),
)
await db.commit()
async def update_message_image_alt(message_id: int, image_path_alt: str):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"UPDATE messages SET image_path_alt = ? WHERE id = ?",
(image_path_alt, message_id),
)
await db.commit()
async def get_last_message_id(session_id: str) -> int | None:
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT id FROM messages WHERE session_id = ? ORDER BY id DESC LIMIT 1",
(session_id,),
) as cursor:
row = await cursor.fetchone()
return int(row["id"]) if row else None
async def get_last_assistant_message_id(session_id: str) -> int | None:
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"""SELECT id FROM messages
WHERE session_id = ? AND role = 'assistant'
ORDER BY id DESC LIMIT 1""",
(session_id,),
) as cursor:
row = await cursor.fetchone()
return row["id"] if row else None
async def clear_history(session_id: str):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"DELETE FROM messages WHERE session_id = ?", (session_id,)
)
await db.commit()
await _reset_persona_bound_state_only(session_id)
async def update_session_affinity(session_id: str, delta: int):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"UPDATE sessions SET affinity = affinity + ?, updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
(delta, session_id),
)
await db.commit()
async def set_session_affinity(session_id: str, value: int):
"""Debug / admin: set absolute affinity (-30..30)."""
clamped = max(-30, min(30, int(value)))
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"UPDATE sessions SET affinity = ?, updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
(clamped, session_id),
)
await db.commit()
return clamped
async def update_session_genre(session_id: str, genre: str):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"UPDATE sessions SET genre = ?, updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
(genre, session_id),
)
await db.commit()
async def update_session_rpg_settings(session_id: str, settings_json: str):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"UPDATE sessions SET rpg_settings_json = ?, updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
(settings_json, session_id),
)
await db.commit()
async def update_session_outfit(session_id: str, outfit_json: str):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"UPDATE sessions SET outfit_json = ?, updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
(outfit_json, session_id),
)
await db.commit()
async def update_session_scene(session_id: str, scene_json: str):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"UPDATE sessions SET scene_json = ?, updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
(scene_json, session_id),
)
await db.commit()
async def update_session_narrative_stats(session_id: str, stats_json: str):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"UPDATE sessions SET narrative_stats_json = ?, updated_at = CURRENT_TIMESTAMP WHERE session_id = ?",
(stats_json, session_id),
)
await db.commit()
async def upsert_quest(session_id: str, title: str, status: str = "active"):
async with aiosqlite.connect(DB_PATH) as db:
async with db.execute(
"SELECT id FROM rpg_quests WHERE session_id = ? AND title = ?",
(session_id, title),
) as cur:
row = await cur.fetchone()
if row:
await db.execute(
"UPDATE rpg_quests SET status = ? WHERE id = ?",
(status, row[0]),
)
else:
await db.execute(
"INSERT INTO rpg_quests (session_id, title, status) VALUES (?, ?, ?)",
(session_id, title, status),
)
await db.commit()
async def get_quests(session_id: str) -> list:
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT id, title, status FROM rpg_quests WHERE session_id = ? ORDER BY id",
(session_id,),
) as cur:
rows = await cur.fetchall()
return [dict(r) for r in rows]
async def update_quest_status(session_id: str, title: str, status: str):
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"UPDATE rpg_quests SET status = ? WHERE session_id = ? AND title = ?",
(status, session_id, title),
)
await db.commit()
async def update_quest_by_id(quest_id: int, session_id: str, status: str) -> bool:
if status not in ("active", "done", "failed"):
return False
async with aiosqlite.connect(DB_PATH) as db:
cur = await db.execute(
"UPDATE rpg_quests SET status = ? WHERE id = ? AND session_id = ?",
(status, quest_id, session_id),
)
await db.commit()
return cur.rowcount > 0
async def get_message_count(session_id: str) -> int:
async with aiosqlite.connect(DB_PATH) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT COUNT(*) as cnt FROM messages WHERE session_id = ? AND role != 'system'",
(session_id,),
) as cursor:
row = await cursor.fetchone()
return row["cnt"]