fixed rp api
This commit is contained in:
@@ -0,0 +1,47 @@
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.shopping.service import ShoppingService
|
||||
|
||||
MAX_LISTS_IN_CONTEXT = 8
|
||||
MAX_ITEMS_PER_LIST = 12
|
||||
|
||||
|
||||
def get_shopping_snapshot(db: Session) -> dict[str, Any]:
|
||||
return ShoppingService(db).snapshot()
|
||||
|
||||
|
||||
def format_shopping_context(snapshot: dict[str, Any]) -> str:
|
||||
lines = ["[Списки покупок]"]
|
||||
lists = snapshot.get("lists") or []
|
||||
|
||||
if not lists:
|
||||
lines.append("Списков пока нет. create_shopping_list или add_shopping_items.")
|
||||
return "\n".join(lines)
|
||||
|
||||
lines.append(
|
||||
f"Всего списков: {snapshot.get('list_count', len(lists))}, "
|
||||
f"неотмеченных позиций: {snapshot.get('unchecked_items', 0)}."
|
||||
)
|
||||
lines.append("Для изменений вызывай tools: list_shopping_lists, add_shopping_items, check_shopping_item.")
|
||||
|
||||
for lst in lists[:MAX_LISTS_IN_CONTEXT]:
|
||||
items = lst.get("items") or []
|
||||
unchecked = [i for i in items if not i.get("checked")]
|
||||
preview = unchecked[:MAX_ITEMS_PER_LIST]
|
||||
parts = []
|
||||
for item in preview:
|
||||
qty = item.get("quantity")
|
||||
unit = (item.get("unit") or "").strip()
|
||||
label = item["text"]
|
||||
if qty is not None:
|
||||
label = f"{label} ({qty}{' ' + unit if unit else ''})"
|
||||
parts.append(f"#{item['id']} {label}")
|
||||
tail = f" +{len(unchecked) - len(preview)} ещё" if len(unchecked) > len(preview) else ""
|
||||
if parts:
|
||||
lines.append(f"- «{lst['name']}» (#{lst['id']}): {', '.join(parts)}{tail}")
|
||||
else:
|
||||
lines.append(f"- «{lst['name']}» (#{lst['id']}): всё отмечено или пусто")
|
||||
|
||||
return "\n".join(lines)
|
||||
@@ -0,0 +1,223 @@
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy import func, select
|
||||
from sqlalchemy.orm import Session, selectinload
|
||||
|
||||
from app.db.models import ShoppingList, ShoppingListItem
|
||||
|
||||
|
||||
class ShoppingService:
|
||||
def __init__(self, db: Session):
|
||||
self.db = db
|
||||
|
||||
def snapshot(self) -> dict[str, Any]:
|
||||
lists = self.list_lists(include_items=True)
|
||||
total_items = sum(len(lst.get("items") or []) for lst in lists)
|
||||
unchecked = sum(
|
||||
1
|
||||
for lst in lists
|
||||
for item in (lst.get("items") or [])
|
||||
if not item.get("checked")
|
||||
)
|
||||
return {
|
||||
"lists": lists,
|
||||
"list_count": len(lists),
|
||||
"total_items": total_items,
|
||||
"unchecked_items": unchecked,
|
||||
}
|
||||
|
||||
def list_lists(self, *, include_items: bool = False) -> list[dict[str, Any]]:
|
||||
stmt = select(ShoppingList).order_by(ShoppingList.sort_order, ShoppingList.name)
|
||||
if include_items:
|
||||
stmt = stmt.options(selectinload(ShoppingList.items))
|
||||
rows = list(self.db.scalars(stmt).all())
|
||||
return [self._list_to_dict(row, include_items=include_items) for row in rows]
|
||||
|
||||
def get_list(
|
||||
self,
|
||||
list_id: int | None = None,
|
||||
*,
|
||||
name: str | None = None,
|
||||
) -> dict[str, Any] | None:
|
||||
row = self._resolve_list(list_id=list_id, name=name)
|
||||
if not row:
|
||||
return None
|
||||
return self._list_to_dict(row, include_items=True)
|
||||
|
||||
def create_list(self, name: str) -> dict[str, Any]:
|
||||
clean = name.strip()
|
||||
if not clean:
|
||||
raise ValueError("Название списка не может быть пустым")
|
||||
existing = self.db.scalar(select(ShoppingList).where(ShoppingList.name == clean))
|
||||
if existing:
|
||||
return {"ok": True, "list": self._list_to_dict(existing, include_items=True), "created": False}
|
||||
|
||||
max_order = self.db.scalar(select(func.max(ShoppingList.sort_order))) or 0
|
||||
row = ShoppingList(name=clean, sort_order=max_order + 1)
|
||||
self.db.add(row)
|
||||
self.db.commit()
|
||||
self.db.refresh(row)
|
||||
return {"ok": True, "list": self._list_to_dict(row, include_items=True), "created": True}
|
||||
|
||||
def rename_list(self, list_id: int, name: str) -> dict[str, Any]:
|
||||
row = self._require_list(list_id)
|
||||
clean = name.strip()
|
||||
if not clean:
|
||||
raise ValueError("Название списка не может быть пустым")
|
||||
conflict = self.db.scalar(
|
||||
select(ShoppingList).where(ShoppingList.name == clean, ShoppingList.id != list_id)
|
||||
)
|
||||
if conflict:
|
||||
raise ValueError(f"Список «{clean}» уже существует")
|
||||
row.name = clean
|
||||
row.updated_at = datetime.now(timezone.utc)
|
||||
self.db.commit()
|
||||
self.db.refresh(row)
|
||||
return {"ok": True, "list": self._list_to_dict(row, include_items=True)}
|
||||
|
||||
def delete_list(self, list_id: int) -> dict[str, Any]:
|
||||
row = self._require_list(list_id)
|
||||
name = row.name
|
||||
self.db.delete(row)
|
||||
self.db.commit()
|
||||
return {"ok": True, "deleted_list_id": list_id, "name": name}
|
||||
|
||||
def add_items(
|
||||
self,
|
||||
items: list[dict[str, Any]],
|
||||
*,
|
||||
list_id: int | None = None,
|
||||
list_name: str | None = None,
|
||||
create_list: bool = True,
|
||||
) -> dict[str, Any]:
|
||||
if not items:
|
||||
raise ValueError("Нужен хотя бы один товар")
|
||||
|
||||
row = self._resolve_list(list_id=list_id, name=list_name)
|
||||
if not row and list_name and create_list:
|
||||
created = self.create_list(list_name)
|
||||
row = self._require_list(created["list"]["id"])
|
||||
if not row:
|
||||
raise ValueError("Укажи list_id или list_name")
|
||||
|
||||
max_order = self.db.scalar(
|
||||
select(func.max(ShoppingListItem.sort_order)).where(ShoppingListItem.list_id == row.id)
|
||||
) or 0
|
||||
added: list[dict[str, Any]] = []
|
||||
for idx, raw in enumerate(items, start=1):
|
||||
text = (raw.get("text") or "").strip()
|
||||
if not text:
|
||||
continue
|
||||
item = ShoppingListItem(
|
||||
list_id=row.id,
|
||||
text=text,
|
||||
quantity=raw.get("quantity"),
|
||||
unit=(raw.get("unit") or "").strip(),
|
||||
sort_order=max_order + idx,
|
||||
)
|
||||
self.db.add(item)
|
||||
added.append(item)
|
||||
|
||||
if not added:
|
||||
raise ValueError("Нет валидных товаров для добавления")
|
||||
|
||||
row.updated_at = datetime.now(timezone.utc)
|
||||
self.db.commit()
|
||||
for item in added:
|
||||
self.db.refresh(item)
|
||||
|
||||
return {
|
||||
"ok": True,
|
||||
"list_id": row.id,
|
||||
"list_name": row.name,
|
||||
"added": [self._item_to_dict(i) for i in added],
|
||||
"list": self._list_to_dict(self._require_list(row.id), include_items=True),
|
||||
}
|
||||
|
||||
def set_item_checked(self, item_id: int, checked: bool) -> dict[str, Any]:
|
||||
item = self._require_item(item_id)
|
||||
item.checked = checked
|
||||
item.shopping_list.updated_at = datetime.now(timezone.utc)
|
||||
self.db.commit()
|
||||
return {"ok": True, "item": self._item_to_dict(item)}
|
||||
|
||||
def remove_item(self, item_id: int) -> dict[str, Any]:
|
||||
item = self._require_item(item_id)
|
||||
data = self._item_to_dict(item)
|
||||
list_id = item.list_id
|
||||
self.db.delete(item)
|
||||
self.db.commit()
|
||||
return {"ok": True, "removed": data, "list_id": list_id}
|
||||
|
||||
def clear_checked(self, list_id: int) -> dict[str, Any]:
|
||||
row = self._require_list(list_id)
|
||||
removed = 0
|
||||
for item in list(row.items):
|
||||
if item.checked:
|
||||
self.db.delete(item)
|
||||
removed += 1
|
||||
row.updated_at = datetime.now(timezone.utc)
|
||||
self.db.commit()
|
||||
return {
|
||||
"ok": True,
|
||||
"list_id": list_id,
|
||||
"removed_count": removed,
|
||||
"list": self._list_to_dict(self._require_list(list_id), include_items=True),
|
||||
}
|
||||
|
||||
def _resolve_list(
|
||||
self,
|
||||
*,
|
||||
list_id: int | None = None,
|
||||
name: str | None = None,
|
||||
) -> ShoppingList | None:
|
||||
if list_id is not None:
|
||||
return self.db.scalar(
|
||||
select(ShoppingList)
|
||||
.where(ShoppingList.id == list_id)
|
||||
.options(selectinload(ShoppingList.items))
|
||||
)
|
||||
if name:
|
||||
clean = name.strip()
|
||||
return self.db.scalar(
|
||||
select(ShoppingList)
|
||||
.where(ShoppingList.name == clean)
|
||||
.options(selectinload(ShoppingList.items))
|
||||
)
|
||||
return None
|
||||
|
||||
def _require_list(self, list_id: int) -> ShoppingList:
|
||||
row = self._resolve_list(list_id=list_id)
|
||||
if not row:
|
||||
raise ValueError(f"Список #{list_id} не найден")
|
||||
return row
|
||||
|
||||
def _require_item(self, item_id: int) -> ShoppingListItem:
|
||||
item = self.db.get(ShoppingListItem, item_id)
|
||||
if not item:
|
||||
raise ValueError(f"Позиция #{item_id} не найдена")
|
||||
return item
|
||||
|
||||
def _item_to_dict(self, item: ShoppingListItem) -> dict[str, Any]:
|
||||
return {
|
||||
"id": item.id,
|
||||
"list_id": item.list_id,
|
||||
"text": item.text,
|
||||
"quantity": item.quantity,
|
||||
"unit": item.unit,
|
||||
"checked": item.checked,
|
||||
"sort_order": item.sort_order,
|
||||
}
|
||||
|
||||
def _list_to_dict(self, row: ShoppingList, *, include_items: bool) -> dict[str, Any]:
|
||||
data: dict[str, Any] = {
|
||||
"id": row.id,
|
||||
"name": row.name,
|
||||
"sort_order": row.sort_order,
|
||||
"item_count": len(row.items) if row.items is not None else 0,
|
||||
"unchecked_count": sum(1 for i in (row.items or []) if not i.checked),
|
||||
}
|
||||
if include_items:
|
||||
data["items"] = [self._item_to_dict(i) for i in (row.items or [])]
|
||||
return data
|
||||
Reference in New Issue
Block a user