225 lines
8.5 KiB
Python
225 lines
8.5 KiB
Python
from datetime import datetime, timezone
|
|
from typing import Any
|
|
|
|
from sqlalchemy import func, select
|
|
from sqlalchemy.orm import Session, selectinload
|
|
|
|
from app.db.models import ShoppingList, ShoppingListItem
|
|
|
|
|
|
class ShoppingService:
|
|
def __init__(self, db: Session, user_id: int):
|
|
self.db = db
|
|
self.user_id = user_id
|
|
|
|
def snapshot(self) -> dict[str, Any]:
|
|
lists = self.list_lists(include_items=True)
|
|
total_items = sum(len(lst.get("items") or []) for lst in lists)
|
|
unchecked = sum(
|
|
1
|
|
for lst in lists
|
|
for item in (lst.get("items") or [])
|
|
if not item.get("checked")
|
|
)
|
|
return {
|
|
"lists": lists,
|
|
"list_count": len(lists),
|
|
"total_items": total_items,
|
|
"unchecked_items": unchecked,
|
|
}
|
|
|
|
def list_lists(self, *, include_items: bool = False) -> list[dict[str, Any]]:
|
|
stmt = select(ShoppingList).where(ShoppingList.user_id == self.user_id).order_by(ShoppingList.sort_order, ShoppingList.name)
|
|
if include_items:
|
|
stmt = stmt.options(selectinload(ShoppingList.items))
|
|
rows = list(self.db.scalars(stmt).all())
|
|
return [self._list_to_dict(row, include_items=include_items) for row in rows]
|
|
|
|
def get_list(
|
|
self,
|
|
list_id: int | None = None,
|
|
*,
|
|
name: str | None = None,
|
|
) -> dict[str, Any] | None:
|
|
row = self._resolve_list(list_id=list_id, name=name)
|
|
if not row:
|
|
return None
|
|
return self._list_to_dict(row, include_items=True)
|
|
|
|
def create_list(self, name: str) -> dict[str, Any]:
|
|
clean = name.strip()
|
|
if not clean:
|
|
raise ValueError("Название списка не может быть пустым")
|
|
existing = self.db.scalar(select(ShoppingList).where(ShoppingList.user_id == self.user_id, ShoppingList.name == clean))
|
|
if existing:
|
|
return {"ok": True, "list": self._list_to_dict(existing, include_items=True), "created": False}
|
|
|
|
max_order = self.db.scalar(select(func.max(ShoppingList.sort_order)).where(ShoppingList.user_id == self.user_id)) or 0
|
|
row = ShoppingList(user_id=self.user_id, name=clean, sort_order=max_order + 1)
|
|
self.db.add(row)
|
|
self.db.commit()
|
|
self.db.refresh(row)
|
|
return {"ok": True, "list": self._list_to_dict(row, include_items=True), "created": True}
|
|
|
|
def rename_list(self, list_id: int, name: str) -> dict[str, Any]:
|
|
row = self._require_list(list_id)
|
|
clean = name.strip()
|
|
if not clean:
|
|
raise ValueError("Название списка не может быть пустым")
|
|
conflict = self.db.scalar(
|
|
select(ShoppingList).where(ShoppingList.user_id == self.user_id, ShoppingList.name == clean, ShoppingList.id != list_id)
|
|
)
|
|
if conflict:
|
|
raise ValueError(f"Список «{clean}» уже существует")
|
|
row.name = clean
|
|
row.updated_at = datetime.now(timezone.utc)
|
|
self.db.commit()
|
|
self.db.refresh(row)
|
|
return {"ok": True, "list": self._list_to_dict(row, include_items=True)}
|
|
|
|
def delete_list(self, list_id: int) -> dict[str, Any]:
|
|
row = self._require_list(list_id)
|
|
name = row.name
|
|
self.db.delete(row)
|
|
self.db.commit()
|
|
return {"ok": True, "deleted_list_id": list_id, "name": name}
|
|
|
|
def add_items(
|
|
self,
|
|
items: list[dict[str, Any]],
|
|
*,
|
|
list_id: int | None = None,
|
|
list_name: str | None = None,
|
|
create_list: bool = True,
|
|
) -> dict[str, Any]:
|
|
if not items:
|
|
raise ValueError("Нужен хотя бы один товар")
|
|
|
|
row = self._resolve_list(list_id=list_id, name=list_name)
|
|
if not row and list_name and create_list:
|
|
created = self.create_list(list_name)
|
|
row = self._require_list(created["list"]["id"])
|
|
if not row:
|
|
raise ValueError("Укажи list_id или list_name")
|
|
|
|
max_order = self.db.scalar(
|
|
select(func.max(ShoppingListItem.sort_order)).where(ShoppingListItem.list_id == row.id)
|
|
) or 0
|
|
added: list[dict[str, Any]] = []
|
|
for idx, raw in enumerate(items, start=1):
|
|
text = (raw.get("text") or "").strip()
|
|
if not text:
|
|
continue
|
|
item = ShoppingListItem(
|
|
list_id=row.id,
|
|
text=text,
|
|
quantity=raw.get("quantity"),
|
|
unit=(raw.get("unit") or "").strip(),
|
|
sort_order=max_order + idx,
|
|
)
|
|
self.db.add(item)
|
|
added.append(item)
|
|
|
|
if not added:
|
|
raise ValueError("Нет валидных товаров для добавления")
|
|
|
|
row.updated_at = datetime.now(timezone.utc)
|
|
self.db.commit()
|
|
for item in added:
|
|
self.db.refresh(item)
|
|
|
|
return {
|
|
"ok": True,
|
|
"list_id": row.id,
|
|
"list_name": row.name,
|
|
"added": [self._item_to_dict(i) for i in added],
|
|
"list": self._list_to_dict(self._require_list(row.id), include_items=True),
|
|
}
|
|
|
|
def set_item_checked(self, item_id: int, checked: bool) -> dict[str, Any]:
|
|
item = self._require_item(item_id)
|
|
item.checked = checked
|
|
item.shopping_list.updated_at = datetime.now(timezone.utc)
|
|
self.db.commit()
|
|
return {"ok": True, "item": self._item_to_dict(item)}
|
|
|
|
def remove_item(self, item_id: int) -> dict[str, Any]:
|
|
item = self._require_item(item_id)
|
|
data = self._item_to_dict(item)
|
|
list_id = item.list_id
|
|
self.db.delete(item)
|
|
self.db.commit()
|
|
return {"ok": True, "removed": data, "list_id": list_id}
|
|
|
|
def clear_checked(self, list_id: int) -> dict[str, Any]:
|
|
row = self._require_list(list_id)
|
|
removed = 0
|
|
for item in list(row.items):
|
|
if item.checked:
|
|
self.db.delete(item)
|
|
removed += 1
|
|
row.updated_at = datetime.now(timezone.utc)
|
|
self.db.commit()
|
|
return {
|
|
"ok": True,
|
|
"list_id": list_id,
|
|
"removed_count": removed,
|
|
"list": self._list_to_dict(self._require_list(list_id), include_items=True),
|
|
}
|
|
|
|
def _resolve_list(
|
|
self,
|
|
*,
|
|
list_id: int | None = None,
|
|
name: str | None = None,
|
|
) -> ShoppingList | None:
|
|
if list_id is not None:
|
|
return self.db.scalar(
|
|
select(ShoppingList)
|
|
.where(ShoppingList.user_id == self.user_id, ShoppingList.id == list_id)
|
|
.options(selectinload(ShoppingList.items))
|
|
)
|
|
if name:
|
|
clean = name.strip()
|
|
return self.db.scalar(
|
|
select(ShoppingList)
|
|
.where(ShoppingList.user_id == self.user_id, ShoppingList.name == clean)
|
|
.options(selectinload(ShoppingList.items))
|
|
)
|
|
return None
|
|
|
|
def _require_list(self, list_id: int) -> ShoppingList:
|
|
row = self._resolve_list(list_id=list_id)
|
|
if not row:
|
|
raise ValueError(f"Список #{list_id} не найден")
|
|
return row
|
|
|
|
def _require_item(self, item_id: int) -> ShoppingListItem:
|
|
item = self.db.get(ShoppingListItem, item_id)
|
|
if not item or item.shopping_list.user_id != self.user_id:
|
|
raise ValueError(f"Позиция #{item_id} не найдена")
|
|
return item
|
|
|
|
def _item_to_dict(self, item: ShoppingListItem) -> dict[str, Any]:
|
|
return {
|
|
"id": item.id,
|
|
"list_id": item.list_id,
|
|
"text": item.text,
|
|
"quantity": item.quantity,
|
|
"unit": item.unit,
|
|
"checked": item.checked,
|
|
"sort_order": item.sort_order,
|
|
}
|
|
|
|
def _list_to_dict(self, row: ShoppingList, *, include_items: bool) -> dict[str, Any]:
|
|
data: dict[str, Any] = {
|
|
"id": row.id,
|
|
"name": row.name,
|
|
"sort_order": row.sort_order,
|
|
"item_count": len(row.items) if row.items is not None else 0,
|
|
"unchecked_count": sum(1 for i in (row.items or []) if not i.checked),
|
|
}
|
|
if include_items:
|
|
data["items"] = [self._item_to_dict(i) for i in (row.items or [])]
|
|
return data
|