from datetime import datetime, timezone from sqlalchemy import select from sqlalchemy.orm import Session from app.db.models import PomodoroSession def _utcnow() -> datetime: return datetime.now(timezone.utc) class PomodoroService: def __init__(self, db: Session): self.db = db def _get_active(self) -> PomodoroSession | None: stmt = ( select(PomodoroSession) .where(PomodoroSession.status.in_(("running", "paused"))) .order_by(PomodoroSession.id.desc()) .limit(1) ) return self.db.scalar(stmt) def _elapsed(self, session: PomodoroSession) -> int: elapsed = session.elapsed_seconds if session.status == "running" and session.started_at: started = session.started_at if started.tzinfo is None: started = started.replace(tzinfo=timezone.utc) delta = _utcnow() - started elapsed += int(delta.total_seconds()) return elapsed def _remaining(self, session: PomodoroSession) -> int: total = session.duration_min * 60 return max(0, total - self._elapsed(session)) def _to_status_dict(self, session: PomodoroSession | None) -> dict: if not session: return { "status": "idle", "duration_min": 25, "task_note": "", "elapsed_seconds": 0, "remaining_seconds": 0, "session_id": None, } elapsed = self._elapsed(session) total = session.duration_min * 60 remaining = max(0, total - elapsed) if session.status == "running" and remaining == 0: session.status = "completed" session.finished_at = _utcnow() session.completed = True self.db.commit() self.db.refresh(session) return { "status": session.status, "duration_min": session.duration_min, "task_note": session.task_note, "elapsed_seconds": elapsed, "remaining_seconds": remaining, "session_id": session.id, "started_at": session.started_at.isoformat() if session.started_at else None, "finished_at": session.finished_at.isoformat() if session.finished_at else None, } def get_status(self) -> dict: return self._to_status_dict(self._get_active()) def start(self, duration_min: int = 25, task_note: str = "") -> dict: active = self._get_active() if active: raise ValueError("Таймер уже запущен. Сначала остановите текущую сессию.") session = PomodoroSession( status="running", duration_min=duration_min, task_note=task_note, started_at=_utcnow(), ) self.db.add(session) self.db.commit() self.db.refresh(session) return self._to_status_dict(session) def pause(self) -> dict: session = self._get_active() if not session or session.status != "running": raise ValueError("Нет активного запущенного таймера.") session.elapsed_seconds = self._elapsed(session) session.status = "paused" session.paused_at = _utcnow() session.started_at = None self.db.commit() self.db.refresh(session) return self._to_status_dict(session) def resume(self) -> dict: session = self._get_active() if not session or session.status != "paused": raise ValueError("Нет таймера на паузе.") session.status = "running" session.started_at = _utcnow() session.paused_at = None self.db.commit() self.db.refresh(session) return self._to_status_dict(session) def stop(self, result: str = "", completed: bool = False) -> dict: session = self._get_active() if not session: raise ValueError("Нет активного таймера.") session.elapsed_seconds = self._elapsed(session) session.status = "completed" if completed else "cancelled" session.result = result session.completed = completed session.finished_at = _utcnow() session.started_at = None self.db.commit() self.db.refresh(session) return self._to_status_dict(session) def history(self, limit: int = 20) -> list[dict]: stmt = ( select(PomodoroSession) .where(PomodoroSession.status.in_(("completed", "cancelled"))) .order_by(PomodoroSession.finished_at.desc()) .limit(limit) ) sessions = self.db.scalars(stmt).all() return [ { "id": s.id, "status": s.status, "duration_min": s.duration_min, "task_note": s.task_note, "result": s.result, "completed": s.completed, "elapsed_seconds": s.elapsed_seconds, "finished_at": s.finished_at.isoformat() if s.finished_at else None, } for s in sessions ]