223 lines
6.9 KiB
Python
223 lines
6.9 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import secrets
|
|
from pathlib import Path
|
|
|
|
from sqlalchemy import inspect, text
|
|
|
|
from app.auth.tokens import hash_token
|
|
from app.character.card import DEFAULT_CARD, normalize_card
|
|
from app.config import get_settings
|
|
from app.db.base import engine
|
|
from app.db.models import CharacterCard, User
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
TENANT_TABLES = (
|
|
"chat_sessions",
|
|
"user_profile",
|
|
"memory_facts",
|
|
"fitness_profiles",
|
|
"body_metrics",
|
|
"food_logs",
|
|
"water_logs",
|
|
"workout_logs",
|
|
"step_logs",
|
|
"fitness_reminders",
|
|
"shopping_lists",
|
|
"reminders",
|
|
"documents",
|
|
"pomodoro_cycles",
|
|
"pomodoro_sessions",
|
|
"project_bindings",
|
|
"work_items",
|
|
)
|
|
|
|
LEGACY_CARD_PATH = Path("./data/character.json")
|
|
|
|
|
|
def _table_exists(name: str) -> bool:
|
|
return name in inspect(engine).get_table_names()
|
|
|
|
|
|
def _columns(table: str) -> set[str]:
|
|
if not _table_exists(table):
|
|
return set()
|
|
return {col["name"] for col in inspect(engine).get_columns(table)}
|
|
|
|
|
|
def _add_column_if_missing(table: str, column: str, ddl: str) -> None:
|
|
if column in _columns(table):
|
|
return
|
|
with engine.begin() as conn:
|
|
conn.execute(text(ddl))
|
|
|
|
|
|
def _ensure_users_table() -> None:
|
|
User.__table__.create(engine, checkfirst=True)
|
|
|
|
|
|
def _ensure_character_cards_table() -> None:
|
|
CharacterCard.__table__.create(engine, checkfirst=True)
|
|
|
|
|
|
def _add_user_id_columns() -> None:
|
|
for table in TENANT_TABLES:
|
|
if not _table_exists(table):
|
|
continue
|
|
_add_column_if_missing(
|
|
table,
|
|
"user_id",
|
|
f"ALTER TABLE {table} ADD COLUMN user_id INTEGER REFERENCES users(id) ON DELETE CASCADE",
|
|
)
|
|
with engine.begin() as conn:
|
|
conn.execute(text(f"CREATE INDEX IF NOT EXISTS ix_{table}_user_id ON {table} (user_id)"))
|
|
|
|
|
|
def _ensure_default_user() -> tuple[int, str | None]:
|
|
settings = get_settings()
|
|
with engine.begin() as conn:
|
|
row = conn.execute(text("SELECT id FROM users ORDER BY id LIMIT 1")).fetchone()
|
|
if row:
|
|
return int(row[0]), None
|
|
|
|
username = settings.default_user_username or "owner"
|
|
display_name = settings.default_user_display_name or username
|
|
plain_token = (settings.default_api_token or "").strip()
|
|
generated = False
|
|
if not plain_token:
|
|
plain_token = secrets.token_urlsafe(32)
|
|
generated = True
|
|
|
|
token_hash = hash_token(plain_token)
|
|
conn.execute(
|
|
text(
|
|
"INSERT INTO users (id, username, display_name, api_token_hash, is_active) "
|
|
"VALUES (1, :username, :display_name, :token_hash, 1)"
|
|
),
|
|
{"username": username, "display_name": display_name, "token_hash": token_hash},
|
|
)
|
|
if generated:
|
|
logger.warning(
|
|
"DEFAULT_API_TOKEN not set — generated token for user '%s': %s",
|
|
username,
|
|
plain_token,
|
|
)
|
|
return 1, plain_token
|
|
return 1, None
|
|
|
|
|
|
def _backfill_user_id(default_user_id: int = 1) -> None:
|
|
with engine.begin() as conn:
|
|
for table in TENANT_TABLES:
|
|
if not _table_exists(table):
|
|
continue
|
|
conn.execute(
|
|
text(f"UPDATE {table} SET user_id = :uid WHERE user_id IS NULL"),
|
|
{"uid": default_user_id},
|
|
)
|
|
|
|
|
|
def _rebuild_shopping_unique() -> None:
|
|
if not _table_exists("shopping_lists"):
|
|
return
|
|
with engine.begin() as conn:
|
|
conn.execute(text("CREATE UNIQUE INDEX IF NOT EXISTS uq_shopping_lists_user_name ON shopping_lists (user_id, name)"))
|
|
|
|
|
|
def _rebuild_project_bindings_unique() -> None:
|
|
if not _table_exists("project_bindings"):
|
|
return
|
|
with engine.begin() as conn:
|
|
conn.execute(
|
|
text(
|
|
"CREATE UNIQUE INDEX IF NOT EXISTS uq_project_bindings_user_slug "
|
|
"ON project_bindings (user_id, taiga_slug)"
|
|
)
|
|
)
|
|
|
|
|
|
def _import_character_card(user_id: int) -> None:
|
|
with engine.begin() as conn:
|
|
existing = conn.execute(
|
|
text("SELECT id FROM character_cards WHERE user_id = :uid"),
|
|
{"uid": user_id},
|
|
).fetchone()
|
|
if existing:
|
|
return
|
|
|
|
card = normalize_card(DEFAULT_CARD)
|
|
if LEGACY_CARD_PATH.is_file():
|
|
try:
|
|
raw = json.loads(LEGACY_CARD_PATH.read_text(encoding="utf-8"))
|
|
card = normalize_card(raw)
|
|
except (json.JSONDecodeError, OSError):
|
|
pass
|
|
|
|
conn.execute(
|
|
text("INSERT INTO character_cards (user_id, card_json) VALUES (:uid, :json)"),
|
|
{"uid": user_id, "json": json.dumps(card, ensure_ascii=False)},
|
|
)
|
|
|
|
|
|
def _backfill_qdrant_user_id(default_user_id: int = 1) -> None:
|
|
settings = get_settings()
|
|
if not settings.rag_enabled:
|
|
return
|
|
try:
|
|
from app.rag.store import COLLECTION_DOC_CHUNKS, COLLECTION_FACTS, COLLECTION_SUMMARIES, _client
|
|
except Exception:
|
|
logger.exception("Qdrant backfill skipped")
|
|
return
|
|
|
|
try:
|
|
client = _client()
|
|
except Exception:
|
|
logger.warning('Qdrant unavailable, skipping user_id backfill')
|
|
return
|
|
|
|
for collection in (COLLECTION_FACTS, COLLECTION_SUMMARIES, COLLECTION_DOC_CHUNKS):
|
|
try:
|
|
if not client.collection_exists(collection):
|
|
continue
|
|
except Exception:
|
|
logger.warning('Qdrant unavailable for collection %s', collection)
|
|
continue
|
|
offset = None
|
|
while True:
|
|
points, offset = client.scroll(
|
|
collection_name=collection,
|
|
limit=100,
|
|
offset=offset,
|
|
with_payload=True,
|
|
with_vectors=False,
|
|
)
|
|
if not points:
|
|
break
|
|
missing = [point.id for point in points if (point.payload or {}).get("user_id") is None]
|
|
if missing:
|
|
client.set_payload(
|
|
collection_name=collection,
|
|
payload={"user_id": default_user_id},
|
|
points=missing,
|
|
)
|
|
if offset is None:
|
|
break
|
|
logger.info("Qdrant user_id backfill completed for user_id=%s", default_user_id)
|
|
|
|
|
|
def run_multi_user_migrations() -> str | None:
|
|
"""Returns newly generated API token if any."""
|
|
_ensure_users_table()
|
|
_ensure_character_cards_table()
|
|
_add_user_id_columns()
|
|
user_id, new_token = _ensure_default_user()
|
|
_backfill_user_id(user_id)
|
|
_rebuild_shopping_unique()
|
|
_rebuild_project_bindings_unique()
|
|
_import_character_card(user_id)
|
|
_backfill_qdrant_user_id(user_id)
|
|
return new_token
|