Files
Home_assistant/backend/app/shopping/service.py
T
2026-06-10 12:03:05 +03:00

224 lines
8.0 KiB
Python

from datetime import datetime, timezone
from typing import Any
from sqlalchemy import func, select
from sqlalchemy.orm import Session, selectinload
from app.db.models import ShoppingList, ShoppingListItem
class ShoppingService:
def __init__(self, db: Session):
self.db = db
def snapshot(self) -> dict[str, Any]:
lists = self.list_lists(include_items=True)
total_items = sum(len(lst.get("items") or []) for lst in lists)
unchecked = sum(
1
for lst in lists
for item in (lst.get("items") or [])
if not item.get("checked")
)
return {
"lists": lists,
"list_count": len(lists),
"total_items": total_items,
"unchecked_items": unchecked,
}
def list_lists(self, *, include_items: bool = False) -> list[dict[str, Any]]:
stmt = select(ShoppingList).order_by(ShoppingList.sort_order, ShoppingList.name)
if include_items:
stmt = stmt.options(selectinload(ShoppingList.items))
rows = list(self.db.scalars(stmt).all())
return [self._list_to_dict(row, include_items=include_items) for row in rows]
def get_list(
self,
list_id: int | None = None,
*,
name: str | None = None,
) -> dict[str, Any] | None:
row = self._resolve_list(list_id=list_id, name=name)
if not row:
return None
return self._list_to_dict(row, include_items=True)
def create_list(self, name: str) -> dict[str, Any]:
clean = name.strip()
if not clean:
raise ValueError("Название списка не может быть пустым")
existing = self.db.scalar(select(ShoppingList).where(ShoppingList.name == clean))
if existing:
return {"ok": True, "list": self._list_to_dict(existing, include_items=True), "created": False}
max_order = self.db.scalar(select(func.max(ShoppingList.sort_order))) or 0
row = ShoppingList(name=clean, sort_order=max_order + 1)
self.db.add(row)
self.db.commit()
self.db.refresh(row)
return {"ok": True, "list": self._list_to_dict(row, include_items=True), "created": True}
def rename_list(self, list_id: int, name: str) -> dict[str, Any]:
row = self._require_list(list_id)
clean = name.strip()
if not clean:
raise ValueError("Название списка не может быть пустым")
conflict = self.db.scalar(
select(ShoppingList).where(ShoppingList.name == clean, ShoppingList.id != list_id)
)
if conflict:
raise ValueError(f"Список «{clean}» уже существует")
row.name = clean
row.updated_at = datetime.now(timezone.utc)
self.db.commit()
self.db.refresh(row)
return {"ok": True, "list": self._list_to_dict(row, include_items=True)}
def delete_list(self, list_id: int) -> dict[str, Any]:
row = self._require_list(list_id)
name = row.name
self.db.delete(row)
self.db.commit()
return {"ok": True, "deleted_list_id": list_id, "name": name}
def add_items(
self,
items: list[dict[str, Any]],
*,
list_id: int | None = None,
list_name: str | None = None,
create_list: bool = True,
) -> dict[str, Any]:
if not items:
raise ValueError("Нужен хотя бы один товар")
row = self._resolve_list(list_id=list_id, name=list_name)
if not row and list_name and create_list:
created = self.create_list(list_name)
row = self._require_list(created["list"]["id"])
if not row:
raise ValueError("Укажи list_id или list_name")
max_order = self.db.scalar(
select(func.max(ShoppingListItem.sort_order)).where(ShoppingListItem.list_id == row.id)
) or 0
added: list[dict[str, Any]] = []
for idx, raw in enumerate(items, start=1):
text = (raw.get("text") or "").strip()
if not text:
continue
item = ShoppingListItem(
list_id=row.id,
text=text,
quantity=raw.get("quantity"),
unit=(raw.get("unit") or "").strip(),
sort_order=max_order + idx,
)
self.db.add(item)
added.append(item)
if not added:
raise ValueError("Нет валидных товаров для добавления")
row.updated_at = datetime.now(timezone.utc)
self.db.commit()
for item in added:
self.db.refresh(item)
return {
"ok": True,
"list_id": row.id,
"list_name": row.name,
"added": [self._item_to_dict(i) for i in added],
"list": self._list_to_dict(self._require_list(row.id), include_items=True),
}
def set_item_checked(self, item_id: int, checked: bool) -> dict[str, Any]:
item = self._require_item(item_id)
item.checked = checked
item.shopping_list.updated_at = datetime.now(timezone.utc)
self.db.commit()
return {"ok": True, "item": self._item_to_dict(item)}
def remove_item(self, item_id: int) -> dict[str, Any]:
item = self._require_item(item_id)
data = self._item_to_dict(item)
list_id = item.list_id
self.db.delete(item)
self.db.commit()
return {"ok": True, "removed": data, "list_id": list_id}
def clear_checked(self, list_id: int) -> dict[str, Any]:
row = self._require_list(list_id)
removed = 0
for item in list(row.items):
if item.checked:
self.db.delete(item)
removed += 1
row.updated_at = datetime.now(timezone.utc)
self.db.commit()
return {
"ok": True,
"list_id": list_id,
"removed_count": removed,
"list": self._list_to_dict(self._require_list(list_id), include_items=True),
}
def _resolve_list(
self,
*,
list_id: int | None = None,
name: str | None = None,
) -> ShoppingList | None:
if list_id is not None:
return self.db.scalar(
select(ShoppingList)
.where(ShoppingList.id == list_id)
.options(selectinload(ShoppingList.items))
)
if name:
clean = name.strip()
return self.db.scalar(
select(ShoppingList)
.where(ShoppingList.name == clean)
.options(selectinload(ShoppingList.items))
)
return None
def _require_list(self, list_id: int) -> ShoppingList:
row = self._resolve_list(list_id=list_id)
if not row:
raise ValueError(f"Список #{list_id} не найден")
return row
def _require_item(self, item_id: int) -> ShoppingListItem:
item = self.db.get(ShoppingListItem, item_id)
if not item:
raise ValueError(f"Позиция #{item_id} не найдена")
return item
def _item_to_dict(self, item: ShoppingListItem) -> dict[str, Any]:
return {
"id": item.id,
"list_id": item.list_id,
"text": item.text,
"quantity": item.quantity,
"unit": item.unit,
"checked": item.checked,
"sort_order": item.sort_order,
}
def _list_to_dict(self, row: ShoppingList, *, include_items: bool) -> dict[str, Any]:
data: dict[str, Any] = {
"id": row.id,
"name": row.name,
"sort_order": row.sort_order,
"item_count": len(row.items) if row.items is not None else 0,
"unchecked_count": sum(1 for i in (row.items or []) if not i.checked),
}
if include_items:
data["items"] = [self._item_to_dict(i) for i in (row.items or [])]
return data