Taiga integration

This commit is contained in:
2026-06-09 12:47:13 +03:00
parent c8599b3d13
commit 1f83dcb574
30 changed files with 1543 additions and 115 deletions
+3 -1
View File
@@ -1,9 +1,11 @@
from fastapi import APIRouter
from app.api.routes import character, chat, health, pomodoro
from app.api.routes import character, chat, health, pomodoro, projects, webhooks
api_router = APIRouter(prefix="/api/v1")
api_router.include_router(health.router, tags=["health"])
api_router.include_router(chat.router, prefix="/chat", tags=["chat"])
api_router.include_router(pomodoro.router, prefix="/pomodoro", tags=["pomodoro"])
api_router.include_router(character.router, tags=["character"])
api_router.include_router(projects.router, tags=["projects"])
api_router.include_router(webhooks.router, tags=["webhooks"])
+76
View File
@@ -0,0 +1,76 @@
from typing import Any
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
from app.db.base import get_db
from app.projects.service import ProjectService
router = APIRouter()
class GiteaBinding(BaseModel):
gitea_owner: str = Field(min_length=1)
gitea_repo: str = Field(min_length=1)
default_branch: str = "main"
class WorkItemCreate(BaseModel):
text: str = Field(min_length=1)
project_slug: str | None = None
@router.get("/projects")
def list_projects(db: Session = Depends(get_db)) -> list[dict[str, Any]]:
return ProjectService(db).list_projects()
@router.post("/projects/sync-taiga")
def sync_taiga_projects(db: Session = Depends(get_db)) -> list[dict[str, Any]]:
try:
return ProjectService(db).sync_taiga_projects()
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
@router.put("/projects/{taiga_slug}/gitea")
def bind_gitea(
taiga_slug: str,
payload: GiteaBinding,
db: Session = Depends(get_db),
) -> dict[str, Any]:
try:
return ProjectService(db).bind_gitea(
taiga_slug,
payload.gitea_owner,
payload.gitea_repo,
payload.default_branch,
)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
@router.post("/work-items")
async def create_work_item(
payload: WorkItemCreate,
db: Session = Depends(get_db),
) -> dict[str, Any]:
try:
return await ProjectService(db).create_work_item(
payload.text,
project_slug=payload.project_slug,
)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
except Exception as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
@router.get("/work-items")
def list_work_items(
limit: int = 30,
status: str | None = None,
db: Session = Depends(get_db),
) -> list[dict[str, Any]]:
return ProjectService(db).list_work_items(limit=limit, status=status)
+94
View File
@@ -0,0 +1,94 @@
import hashlib
import hmac
import json
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, Request
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.config import get_settings
from app.db.base import SessionLocal, get_db
from app.db.models import ChatSession, Message, ProjectBinding
from app.projects.service import ProjectService
router = APIRouter()
def _verify_gitea_signature(body: bytes, signature: str | None, secret: str) -> bool:
if not secret:
return True
if not signature:
return False
if signature.startswith("sha256="):
signature = signature[7:]
expected = hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()
return hmac.compare_digest(expected, signature)
def _post_close_notice(results: list[dict[str, Any]], owner: str, repo: str) -> None:
if not results:
return
db = SessionLocal()
try:
session = db.scalar(
select(ChatSession).order_by(ChatSession.updated_at.desc()).limit(1)
)
if not session:
session = ChatSession(title="Git")
db.add(session)
db.commit()
db.refresh(session)
lines = [f"🔀 **Push** `{owner}/{repo}`"]
for item in results:
if "closed" in item:
lines.append(f"- `{item.get('commit', '?')}`: закрыто {item['closed']}")
elif "error" in item:
lines.append(f"- ошибка: {item['error']}")
db.add(Message(session_id=session.id, role="notice", content="\n".join(lines)))
db.commit()
finally:
db.close()
@router.post("/webhooks/gitea")
async def gitea_webhook(request: Request, db: Session = Depends(get_db)) -> dict[str, Any]:
body = await request.body()
settings = get_settings()
signature = request.headers.get("X-Gitea-Signature")
if not _verify_gitea_signature(body, signature, settings.gitea_webhook_secret):
raise HTTPException(status_code=401, detail="Invalid webhook signature")
payload = json.loads(body)
if payload.get("secret") and settings.gitea_webhook_secret:
if payload.get("secret") != settings.gitea_webhook_secret:
raise HTTPException(status_code=401, detail="Invalid webhook secret")
event = request.headers.get("X-Gitea-Event", "")
if event != "push":
return {"ok": True, "skipped": event}
repo = payload.get("repository", {})
owner = repo.get("owner", {}).get("login", "")
repo_name = repo.get("name", "")
if not owner or not repo_name:
raise HTTPException(status_code=400, detail="Missing repository info")
binding = db.scalar(
select(ProjectBinding).where(
ProjectBinding.gitea_owner == owner,
ProjectBinding.gitea_repo == repo_name,
)
)
if not binding:
return {"ok": True, "skipped": "unknown repo"}
commits = payload.get("commits") or []
service = ProjectService(db)
results = service.process_push(owner, repo_name, commits)
_post_close_notice(results, owner, repo_name)
return {"ok": True, "results": results}
+4 -2
View File
@@ -6,9 +6,11 @@ TOOLS_INSTRUCTIONS = """
- Любой вопрос о таймере, помидоро, задачах или истории — СНАЧАЛА вызывай соответствующий инструмент.
- Никогда не выдумывай статус таймера или список задач.
- После вызова инструмента кратко объясни результат пользователю по-человечески.
- Инструменты: get_pomodoro_status, start_pomodoro, start_short_break, start_long_break,
- Помидоро: get_pomodoro_status, start_pomodoro, start_short_break, start_long_break,
stop_pomodoro, skip_pomodoro_phase, reset_pomodoro_cycle, get_pomodoro_history.
- reset_pomodoro_cycle — только когда пользователь явно просит сбросить цикл.
- Задачи: sync_taiga_projects, list_taiga_projects, create_work_item, list_work_items.
- create_work_item — при «заведи баг/фичу», «добавь в таигу»; передай полный текст пользователя.
- Список проектов и открытых задач уже в контексте — не выдумывай, при необходимости уточни tool-вызовом.
""".strip()
DEFAULT_CARD: dict[str, Any] = {
+54
View File
@@ -67,9 +67,63 @@ def format_pomodoro_notice(tool_name: str, raw_result: str) -> str | None:
if tool_name == "get_pomodoro_history":
return _format_history_notice(data)
if tool_name == "create_work_item":
return _format_work_item_notice(data)
if tool_name == "list_work_items":
return _format_work_items_list_notice(data)
if tool_name == "sync_taiga_projects":
return f"📋 Синхронизировано проектов Taiga: **{len(data)}**"
if tool_name == "list_taiga_projects":
if not isinstance(data, list) or not data:
return "📋 Проекты Taiga не найдены. Вызовите sync_taiga_projects."
lines = ["📋 **Проекты:**"]
for p in data:
gitea = f"{p.get('gitea_owner')}/{p.get('gitea_repo')}" if p.get("gitea_configured") else ""
lines.append(f"- `{p.get('slug')}`: {p.get('name')} · Gitea: {gitea}")
return "\n".join(lines)
return None
def _format_work_item_notice(data: dict[str, Any]) -> str | None:
if data.get("error"):
return f"📋 {data['error']}"
if not data.get("ok"):
return None
taiga = data.get("taiga", {})
gitea = data.get("gitea", {})
lines = [
"📋 **Создано:**",
f"- Taiga: #{taiga.get('ref')}{taiga.get('subject')}",
f"- URL: {taiga.get('url')}",
]
if gitea.get("url"):
lines.append(f"- Gitea: {gitea.get('url')}")
if data.get("branch"):
lines.append(f"- Ветка: `{data['branch']}`")
subtasks = data.get("subtasks") or []
if subtasks:
lines.append("**Подзадачи:**")
for t in subtasks:
lines.append(f"- #{t.get('ref')} {t.get('subject')}")
return "\n".join(lines)
def _format_work_items_list_notice(data: Any) -> str | None:
if not isinstance(data, list) or not data:
return "📋 Work items не найдены."
lines = ["📋 **Work items:**"]
for item in data[:15]:
lines.append(
f"- [{item.get('status')}] #{item.get('taiga_ref')} {item.get('title')} "
f"({item.get('taiga_slug')})"
)
return "\n".join(lines)
def _format_status_notice(data: dict[str, Any]) -> str:
status = data.get("status", "idle")
phase = data.get("phase", PHASE_WORK)
+5 -2
View File
@@ -7,6 +7,7 @@ from sqlalchemy.orm import Session
from app.character.service import CharacterService
from app.chat.notices import format_pomodoro_context, format_pomodoro_notice
from app.projects.context import format_projects_context, get_projects_snapshot
from app.db.models import ChatSession, Message
from app.llm.client import LLMClient
from app.pomodoro.service import PomodoroService
@@ -45,9 +46,11 @@ class ChatService:
def _build_system_prompt(self) -> str:
status = PomodoroService(self.db).get_status()
projects_snapshot = get_projects_snapshot(self.db)
return (
f"{self.character.get_system_prompt()}\n\n"
f"{format_pomodoro_context(status)}"
f"{format_pomodoro_context(status)}\n\n"
f"{format_projects_context(projects_snapshot)}"
)
def _build_messages(self, session: ChatSession) -> list[dict[str, Any]]:
@@ -129,7 +132,7 @@ class ChatService:
for tool_call in tool_calls:
fn = tool_call["function"]
args = LLMClient.parse_tool_arguments(fn.get("arguments", ""))
result = execute_tool(self.db, fn["name"], args)
result = await execute_tool(self.db, fn["name"], args)
tool_message = {
"role": "tool",
"tool_call_id": tool_call["id"],
+21
View File
@@ -22,10 +22,31 @@ class Settings(BaseSettings):
cors_origins: str = "http://localhost:5173,http://localhost:8080,http://localhost:3000"
system_prompt_path: str = "./prompts/assistant.md"
# Taiga/Gitea on host (not in Docker) — use host.docker.internal from container
taiga_base_url: str = "http://host.docker.internal:9000"
taiga_username: str = ""
taiga_password: str = ""
taiga_public_url: str = "https://taiga.grigowashere.ru"
gitea_base_url: str = "http://host.docker.internal:3000"
gitea_token: str = ""
gitea_public_url: str = "https://git.grigowashere.ru"
gitea_webhook_secret: str = ""
repos_dir: str = "/data/repos"
@property
def cors_origins_list(self) -> list[str]:
return [origin.strip() for origin in self.cors_origins.split(",") if origin.strip()]
@property
def taiga_configured(self) -> bool:
return bool(self.taiga_username and self.taiga_password)
@property
def gitea_configured(self) -> bool:
return bool(self.gitea_token)
def load_system_prompt(self) -> str:
path = Path(self.system_prompt_path)
if path.is_file():
+42
View File
@@ -68,3 +68,45 @@ class PomodoroSession(Base):
elapsed_seconds: Mapped[int] = mapped_column(Integer, default=0)
finished_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
class TaigaProject(Base):
__tablename__ = "taiga_projects"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
taiga_id: Mapped[int] = mapped_column(Integer, unique=True, index=True)
name: Mapped[str] = mapped_column(String(255))
slug: Mapped[str] = mapped_column(String(255), unique=True, index=True)
synced_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
class ProjectBinding(Base):
__tablename__ = "project_bindings"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
taiga_slug: Mapped[str] = mapped_column(String(255), unique=True, index=True)
gitea_owner: Mapped[str] = mapped_column(String(255), default="")
gitea_repo: Mapped[str] = mapped_column(String(255), default="")
default_branch: Mapped[str] = mapped_column(String(64), default="main")
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), onupdate=func.now()
)
class WorkItem(Base):
__tablename__ = "work_items"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
taiga_slug: Mapped[str] = mapped_column(String(255), index=True)
taiga_project_id: Mapped[int] = mapped_column(Integer)
taiga_story_id: Mapped[int] = mapped_column(Integer)
taiga_story_ref: Mapped[int] = mapped_column(Integer, index=True)
gitea_owner: Mapped[str] = mapped_column(String(255), default="")
gitea_repo: Mapped[str] = mapped_column(String(255), default="")
gitea_issue_number: Mapped[int | None] = mapped_column(Integer, nullable=True, index=True)
suggested_branch: Mapped[str] = mapped_column(String(255), default="")
raw_text: Mapped[str] = mapped_column(Text, default="")
title: Mapped[str] = mapped_column(String(500), default="")
status: Mapped[str] = mapped_column(String(32), default="open")
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
closed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
+54
View File
@@ -0,0 +1,54 @@
from typing import Any
import httpx
from app.config import get_settings
class GiteaClient:
def __init__(self) -> None:
settings = get_settings()
self.base_url = settings.gitea_base_url.rstrip("/")
self.public_url = settings.gitea_public_url.rstrip("/")
self.token = settings.gitea_token
def _client(self) -> httpx.Client:
return httpx.Client(
base_url=self.base_url,
timeout=30.0,
headers={
"Authorization": f"token {self.token}",
"Content-Type": "application/json",
},
)
def create_issue(
self,
owner: str,
repo: str,
title: str,
body: str,
labels: list[str] | None = None,
) -> dict[str, Any]:
payload: dict[str, Any] = {"title": title, "body": body}
if labels:
payload["labels"] = labels
with self._client() as client:
response = client.post(
f"/api/v1/repos/{owner}/{repo}/issues",
json=payload,
)
response.raise_for_status()
return response.json()
def close_issue(self, owner: str, repo: str, issue_number: int) -> dict[str, Any]:
with self._client() as client:
response = client.patch(
f"/api/v1/repos/{owner}/{repo}/issues/{issue_number}",
json={"state": "closed"},
)
response.raise_for_status()
return response.json()
def issue_url(self, owner: str, repo: str, issue_number: int) -> str:
return f"{self.public_url}/{owner}/{repo}/issues/{issue_number}"
+190
View File
@@ -0,0 +1,190 @@
from typing import Any
import httpx
from app.config import get_settings
class TaigaClient:
def __init__(self) -> None:
settings = get_settings()
self.base_url = settings.taiga_base_url.rstrip("/")
self.public_url = settings.taiga_public_url.rstrip("/")
self.username = settings.taiga_username
self.password = settings.taiga_password
self._token: str | None = None
def _client(self) -> httpx.Client:
return httpx.Client(base_url=self.base_url, timeout=30.0)
def auth(self) -> str:
if self._token:
return self._token
with self._client() as client:
response = client.post(
"/api/v1/auth",
json={
"type": "normal",
"username": self.username,
"password": self.password,
},
)
response.raise_for_status()
self._token = response.json()["auth_token"]
return self._token
def _headers(self) -> dict[str, str]:
return {
"Authorization": f"Bearer {self.auth()}",
"Content-Type": "application/json",
}
def list_projects(self) -> list[dict[str, Any]]:
with self._client() as client:
response = client.get("/api/v1/projects", headers=self._headers())
response.raise_for_status()
return response.json()
def list_open_userstories(self, project_id: int, limit: int = 8) -> list[dict[str, Any]]:
with self._client() as client:
response = client.get(
"/api/v1/userstories",
params={"project": project_id},
headers=self._headers(),
)
response.raise_for_status()
open_stories = [s for s in response.json() if not s.get("is_closed")]
return open_stories[:limit]
def list_open_tasks(self, project_id: int, limit: int = 8) -> list[dict[str, Any]]:
with self._client() as client:
response = client.get(
"/api/v1/tasks",
params={"project": project_id},
headers=self._headers(),
)
response.raise_for_status()
open_tasks = [t for t in response.json() if not t.get("is_closed")]
return open_tasks[:limit]
def get_closed_status_id(self, project_id: int, *, for_task: bool = False) -> int | None:
endpoint = "/api/v1/task-statuses" if for_task else "/api/v1/userstory-statuses"
with self._client() as client:
response = client.get(
endpoint,
params={"project": project_id},
headers=self._headers(),
)
response.raise_for_status()
items = response.json()
for status in items:
if status.get("is_closed") or status.get("name", "").lower() in (
"done",
"closed",
"завершено",
"закрыто",
):
return status["id"]
return items[-1]["id"] if items else None
def create_userstory(
self,
project_id: int,
subject: str,
description: str,
tags: list[str] | None = None,
) -> dict[str, Any]:
payload: dict[str, Any] = {
"project": project_id,
"subject": subject[:500],
"description": description,
}
if tags:
payload["tags"] = tags
with self._client() as client:
response = client.post(
"/api/v1/userstories",
headers=self._headers(),
json=payload,
)
response.raise_for_status()
return response.json()
def create_task(
self,
project_id: int,
user_story_id: int,
subject: str,
description: str = "",
) -> dict[str, Any]:
with self._client() as client:
response = client.post(
"/api/v1/tasks",
headers=self._headers(),
json={
"project": project_id,
"user_story": user_story_id,
"subject": subject[:500],
"description": description,
},
)
response.raise_for_status()
return response.json()
def close_userstory(self, story_id: int, project_id: int) -> dict[str, Any]:
status_id = self.get_closed_status_id(project_id, for_task=False)
payload: dict[str, Any] = {"version": self._get_version("userstories", story_id)}
if status_id:
payload["status"] = status_id
else:
payload["is_closed"] = True
with self._client() as client:
response = client.patch(
f"/api/v1/userstories/{story_id}",
headers=self._headers(),
json=payload,
)
response.raise_for_status()
return response.json()
def close_task(self, task_id: int, project_id: int) -> dict[str, Any]:
status_id = self.get_closed_status_id(project_id, for_task=True)
payload: dict[str, Any] = {"version": self._get_version("tasks", task_id)}
if status_id:
payload["status"] = status_id
else:
payload["is_closed"] = True
with self._client() as client:
response = client.patch(
f"/api/v1/tasks/{task_id}",
headers=self._headers(),
json=payload,
)
response.raise_for_status()
return response.json()
def get_by_ref(
self, project_id: int, ref: int, *, kind: str = "userstory"
) -> dict[str, Any] | None:
endpoint = "/api/v1/userstories" if kind == "userstory" else "/api/v1/tasks"
with self._client() as client:
response = client.get(
endpoint,
params={"project": project_id, "ref": ref},
headers=self._headers(),
)
response.raise_for_status()
items = response.json()
return items[0] if items else None
def _get_version(self, resource: str, item_id: int) -> int:
with self._client() as client:
response = client.get(
f"/api/v1/{resource}/{item_id}",
headers=self._headers(),
)
response.raise_for_status()
return response.json().get("version", 1)
def story_url(self, project_id: int, ref: int) -> str:
return f"{self.public_url}/project/0/{project_id}/us/{ref}"
+3
View File
@@ -0,0 +1,3 @@
from app.projects.service import ProjectService
__all__ = ["ProjectService"]
+43
View File
@@ -0,0 +1,43 @@
import re
GITEA_PATTERNS = [
re.compile(r"gitea\s*#(\d+)", re.IGNORECASE),
re.compile(r"fixes\s+#(\d+)", re.IGNORECASE),
re.compile(r"closes\s+gitea\s*#(\d+)", re.IGNORECASE),
]
TAIGA_STORY_PATTERNS = [
re.compile(r"taiga\s*#(\d+)", re.IGNORECASE),
re.compile(r"TG-(\d+)", re.IGNORECASE),
re.compile(r"closes\s+taiga\s*#(\d+)", re.IGNORECASE),
]
TAIGA_TASK_PATTERNS = [
re.compile(r"taiga\s+task\s*#(\d+)", re.IGNORECASE),
]
def parse_commit_message(message: str) -> dict[str, list[int]]:
gitea_refs: set[int] = set()
taiga_story_refs: set[int] = set()
taiga_task_refs: set[int] = set()
for pattern in GITEA_PATTERNS:
for match in pattern.finditer(message):
gitea_refs.add(int(match.group(1)))
for pattern in TAIGA_TASK_PATTERNS:
for match in pattern.finditer(message):
taiga_task_refs.add(int(match.group(1)))
for pattern in TAIGA_STORY_PATTERNS:
for match in pattern.finditer(message):
ref = int(match.group(1))
if ref not in taiga_task_refs:
taiga_story_refs.add(ref)
return {
"gitea": sorted(gitea_refs),
"taiga_story": sorted(taiga_story_refs),
"taiga_task": sorted(taiga_task_refs),
}
+109
View File
@@ -0,0 +1,109 @@
from typing import Any
from sqlalchemy.orm import Session
from app.config import get_settings
from app.integrations.taiga import TaigaClient
from app.projects.service import ProjectService
MAX_PROJECTS_IN_CONTEXT = 8
MAX_OPEN_PER_PROJECT = 5
def get_projects_snapshot(db: Session) -> dict[str, Any]:
settings = get_settings()
service = ProjectService(db)
if not settings.taiga_configured:
return {"configured": False, "projects": [], "open_items": [], "taiga_open": []}
projects = service.list_projects()
if not projects:
try:
projects = service.sync_taiga_projects()
except Exception:
projects = []
open_items = service.list_work_items(limit=15, status="open")
taiga_open: list[dict[str, Any]] = []
try:
client = TaigaClient()
for proj in projects[:MAX_PROJECTS_IN_CONTEXT]:
stories = client.list_open_userstories(
proj["taiga_id"], limit=MAX_OPEN_PER_PROJECT
)
if not stories:
continue
taiga_open.append(
{
"slug": proj["slug"],
"name": proj["name"],
"stories": [
{
"ref": s.get("ref"),
"subject": s.get("subject", "")[:120],
}
for s in stories
],
}
)
except Exception:
pass
return {
"configured": True,
"projects": projects,
"open_items": open_items,
"taiga_open": taiga_open,
}
def format_projects_context(snapshot: dict[str, Any]) -> str:
if not snapshot.get("configured"):
return "[Taiga/Gitea]\nНе настроено (нет TAIGA_USERNAME/PASSWORD в .env)."
lines = ["[Проекты и задачи — актуальный снимок для контекста]"]
projects = snapshot.get("projects") or []
if not projects:
lines.append("Проекты Taiga: кэш пуст. Попроси sync_taiga_projects или проверь подключение.")
else:
lines.append("Проекты Taiga:")
for p in projects[:MAX_PROJECTS_IN_CONTEXT]:
gitea = (
f"{p.get('gitea_owner')}/{p.get('gitea_repo')}"
if p.get("gitea_configured")
else "Gitea не привязан"
)
lines.append(f"- `{p.get('slug')}`: {p.get('name')} · {gitea}")
open_items = snapshot.get("open_items") or []
if open_items:
lines.append("")
lines.append("Локальные work items (созданные ассистентом, открытые):")
for item in open_items[:10]:
gitea_part = f", gitea #{item.get('gitea_issue')}" if item.get("gitea_issue") else ""
lines.append(
f"- taiga #{item.get('taiga_ref')} {item.get('title')} "
f"({item.get('taiga_slug')}{gitea_part})"
)
taiga_open = snapshot.get("taiga_open") or []
if taiga_open:
lines.append("")
lines.append("Открытые user stories в Taiga:")
for block in taiga_open:
lines.append(f" [{block.get('slug')}]")
for story in block.get("stories", []):
lines.append(f" - #{story.get('ref')} {story.get('subject')}")
elif projects:
lines.append("")
lines.append("Открытые user stories в Taiga: нет или не удалось загрузить.")
lines.append("")
lines.append(
"Для создания фичи/бага из вольного текста используй create_work_item. "
"Не выдумывай номера задач — опирайся на список выше."
)
return "\n".join(lines)
+393
View File
@@ -0,0 +1,393 @@
from datetime import datetime, timezone
from typing import Any
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.config import get_settings
from app.db.models import ProjectBinding, TaigaProject, WorkItem
from app.integrations.gitea import GiteaClient
from app.integrations.taiga import TaigaClient
from app.projects.commit_parser import parse_commit_message
from app.projects.structuring import (
format_gitea_body,
format_story_description,
slugify_branch,
structure_work_item,
)
class ProjectService:
def __init__(self, db: Session):
self.db = db
self.settings = get_settings()
def sync_taiga_projects(self) -> list[dict[str, Any]]:
if not self.settings.taiga_configured:
raise ValueError("Taiga не настроена: задайте TAIGA_USERNAME и TAIGA_PASSWORD")
client = TaigaClient()
remote = client.list_projects()
now = datetime.now(timezone.utc)
for item in remote:
slug = item.get("slug") or ""
if not slug:
continue
existing = self.db.scalar(
select(TaigaProject).where(TaigaProject.slug == slug)
)
if existing:
existing.name = item.get("name", slug)
existing.taiga_id = item["id"]
existing.synced_at = now
else:
self.db.add(
TaigaProject(
taiga_id=item["id"],
name=item.get("name", slug),
slug=slug,
synced_at=now,
)
)
self.db.commit()
return self.list_projects()
def list_projects(self) -> list[dict[str, Any]]:
stmt = (
select(TaigaProject, ProjectBinding)
.outerjoin(ProjectBinding, ProjectBinding.taiga_slug == TaigaProject.slug)
.order_by(TaigaProject.name)
)
rows = self.db.execute(stmt).all()
result = []
for taiga_proj, binding in rows:
result.append(
{
"taiga_id": taiga_proj.taiga_id,
"name": taiga_proj.name,
"slug": taiga_proj.slug,
"gitea_owner": binding.gitea_owner if binding else "",
"gitea_repo": binding.gitea_repo if binding else "",
"default_branch": binding.default_branch if binding else "main",
"gitea_configured": bool(binding and binding.gitea_owner and binding.gitea_repo),
}
)
return result
def bind_gitea(
self,
taiga_slug: str,
gitea_owner: str,
gitea_repo: str,
default_branch: str = "main",
) -> dict[str, Any]:
if not self.db.scalar(select(TaigaProject).where(TaigaProject.slug == taiga_slug)):
raise ValueError(f"Проект Taiga '{taiga_slug}' не найден. Сначала sync-taiga.")
binding = self.db.scalar(
select(ProjectBinding).where(ProjectBinding.taiga_slug == taiga_slug)
)
if binding:
binding.gitea_owner = gitea_owner
binding.gitea_repo = gitea_repo
binding.default_branch = default_branch
else:
binding = ProjectBinding(
taiga_slug=taiga_slug,
gitea_owner=gitea_owner,
gitea_repo=gitea_repo,
default_branch=default_branch,
)
self.db.add(binding)
self.db.commit()
for proj in self.list_projects():
if proj["slug"] == taiga_slug:
return proj
raise ValueError("Binding failed")
def _resolve_project(self, slug: str | None) -> tuple[TaigaProject, ProjectBinding | None]:
projects = self.db.scalars(select(TaigaProject).order_by(TaigaProject.name)).all()
if not projects:
raise ValueError("Нет проектов Taiga. Вызовите sync_taiga_projects.")
taiga_proj: TaigaProject | None = None
if slug:
taiga_proj = self.db.scalar(
select(TaigaProject).where(TaigaProject.slug == slug)
)
if not taiga_proj:
raise ValueError(f"Проект '{slug}' не найден")
else:
taiga_proj = projects[0]
binding = self.db.scalar(
select(ProjectBinding).where(ProjectBinding.taiga_slug == taiga_proj.slug)
)
return taiga_proj, binding
async def create_work_item(
self, raw_text: str, project_slug: str | None = None
) -> dict[str, Any]:
if not self.settings.taiga_configured:
raise ValueError("Taiga не настроена")
project_list = self.list_projects()
if not project_list:
self.sync_taiga_projects()
project_list = self.list_projects()
structured = await structure_work_item(raw_text, project_list)
slug = project_slug or structured.get("project_slug")
taiga_proj, binding = self._resolve_project(slug)
if binding and not (binding.gitea_owner and binding.gitea_repo):
binding = None
taiga = TaigaClient()
title = (structured.get("title") or raw_text).strip()[:500]
description = format_story_description(structured, raw_text)
tags = structured.get("tags") or []
issue_type = structured.get("issue_type", "feature")
if issue_type == "bug" and "bug" not in [t.lower() for t in tags]:
tags.append("bug")
story = taiga.create_userstory(
taiga_proj.taiga_id,
title,
description,
tags=tags,
)
subtasks = []
for child in structured.get("children") or []:
if isinstance(child, dict):
subtasks.append(
taiga.create_task(
taiga_proj.taiga_id,
story["id"],
child.get("title", "Подзадача"),
child.get("description", ""),
)
)
branch = f"feature/{story['ref']}-{slugify_branch(title)}"
gitea_issue_number = None
gitea_url = ""
labels = [issue_type] if issue_type else []
if binding and self.settings.gitea_configured:
gitea = GiteaClient()
gitea_body = format_gitea_body(
structured,
raw_text,
story["ref"],
taiga.story_url(taiga_proj.taiga_id, story["ref"]),
branch,
)
issue = gitea.create_issue(
binding.gitea_owner,
binding.gitea_repo,
title,
gitea_body,
labels=labels,
)
gitea_issue_number = issue["number"]
gitea_url = gitea.issue_url(
binding.gitea_owner, binding.gitea_repo, gitea_issue_number
)
work_item = WorkItem(
taiga_slug=taiga_proj.slug,
taiga_project_id=taiga_proj.taiga_id,
taiga_story_id=story["id"],
taiga_story_ref=story["ref"],
gitea_owner=binding.gitea_owner if binding else "",
gitea_repo=binding.gitea_repo if binding else "",
gitea_issue_number=gitea_issue_number,
suggested_branch=branch,
raw_text=raw_text,
title=title,
status="open",
)
self.db.add(work_item)
self.db.commit()
self.db.refresh(work_item)
return {
"ok": True,
"work_item_id": work_item.id,
"taiga": {
"ref": story["ref"],
"id": story["id"],
"subject": story["subject"],
"url": taiga.story_url(taiga_proj.taiga_id, story["ref"]),
},
"gitea": {
"number": gitea_issue_number,
"url": gitea_url,
},
"branch": branch,
"labels": labels,
"subtasks": [{"ref": t.get("ref"), "subject": t.get("subject")} for t in subtasks],
"questions": structured.get("questions") or [],
"project_slug": taiga_proj.slug,
}
def process_push(
self, owner: str, repo: str, commits: list[dict[str, Any]]
) -> list[dict[str, Any]]:
if not self.settings.taiga_configured:
return []
taiga = TaigaClient()
gitea = GiteaClient() if self.settings.gitea_configured else None
results: list[dict[str, Any]] = []
for commit in commits:
message = commit.get("message", "")
parsed = parse_commit_message(message)
sha = commit.get("id", "")[:8]
gitea_refs = set(parsed["gitea"])
taiga_story_refs = set(parsed["taiga_story"])
taiga_task_refs = set(parsed["taiga_task"])
linked_items = self.db.scalars(
select(WorkItem).where(
WorkItem.gitea_owner == owner,
WorkItem.gitea_repo == repo,
WorkItem.status == "open",
)
).all()
for item in linked_items:
if item.gitea_issue_number and item.gitea_issue_number in gitea_refs:
taiga_story_refs.add(item.taiga_story_ref)
if item.taiga_story_ref in taiga_story_refs and item.gitea_issue_number:
gitea_refs.add(item.gitea_issue_number)
for gitea_num in gitea_refs:
if gitea:
try:
gitea.close_issue(owner, repo, gitea_num)
except Exception as exc:
results.append({"error": f"gitea #{gitea_num}: {exc}"})
continue
for item in linked_items:
if item.gitea_issue_number == gitea_num:
self._close_work_item(item, taiga)
results.append(
{
"commit": sha,
"closed": f"gitea #{gitea_num}, taiga #{item.taiga_story_ref}",
}
)
for ref in taiga_story_refs:
project_id = self._project_id_for_ref(owner, repo, ref, linked_items)
if not project_id:
continue
story = taiga.get_by_ref(project_id, ref, kind="userstory")
if story:
taiga.close_userstory(story["id"], project_id)
for item in linked_items:
if item.taiga_story_ref == ref:
self._close_work_item(item, taiga, close_gitea=bool(gitea))
results.append({"commit": sha, "closed": f"taiga #{ref}"})
for ref in taiga_task_refs:
binding = self.db.scalar(
select(ProjectBinding).where(
ProjectBinding.gitea_owner == owner,
ProjectBinding.gitea_repo == repo,
)
)
if not binding:
continue
taiga_proj = self.db.scalar(
select(TaigaProject).where(TaigaProject.slug == binding.taiga_slug)
)
if not taiga_proj:
continue
task = taiga.get_by_ref(taiga_proj.taiga_id, ref, kind="task")
if task:
taiga.close_task(task["id"], taiga_proj.taiga_id)
results.append({"commit": sha, "closed": f"taiga task #{ref}"})
self.db.commit()
return results
def _project_id_for_ref(
self,
owner: str,
repo: str,
ref: int,
items: list[WorkItem],
) -> int | None:
for item in items:
if item.taiga_story_ref == ref:
return item.taiga_project_id
binding = self.db.scalar(
select(ProjectBinding).where(
ProjectBinding.gitea_owner == owner,
ProjectBinding.gitea_repo == repo,
)
)
if binding:
taiga_proj = self.db.scalar(
select(TaigaProject).where(TaigaProject.slug == binding.taiga_slug)
)
return taiga_proj.taiga_id if taiga_proj else None
return None
def _close_work_item(
self,
item: WorkItem,
taiga: TaigaClient,
*,
close_gitea: bool = True,
) -> None:
if item.status == "closed":
return
story = taiga.get_by_ref(item.taiga_project_id, item.taiga_story_ref, kind="userstory")
if story:
taiga.close_userstory(story["id"], item.taiga_project_id)
if (
close_gitea
and item.gitea_issue_number
and self.settings.gitea_configured
):
GiteaClient().close_issue(
item.gitea_owner, item.gitea_repo, item.gitea_issue_number
)
item.status = "closed"
item.closed_at = datetime.now(timezone.utc)
def list_work_items(self, limit: int = 30, status: str | None = None) -> list[dict[str, Any]]:
stmt = select(WorkItem).order_by(WorkItem.created_at.desc()).limit(limit)
if status:
stmt = stmt.where(WorkItem.status == status)
items = self.db.scalars(stmt).all()
settings = get_settings()
return [
{
"id": i.id,
"title": i.title,
"status": i.status,
"taiga_slug": i.taiga_slug,
"taiga_ref": i.taiga_story_ref,
"gitea_issue": i.gitea_issue_number,
"branch": i.suggested_branch,
"taiga_url": f"{settings.taiga_public_url}/project/0/{i.taiga_project_id}/us/{i.taiga_story_ref}",
"gitea_url": (
f"{settings.gitea_public_url}/{i.gitea_owner}/{i.gitea_repo}/issues/{i.gitea_issue_number}"
if i.gitea_issue_number
else ""
),
"created_at": i.created_at.isoformat() if i.created_at else None,
}
for i in items
]
+104
View File
@@ -0,0 +1,104 @@
import json
import re
from typing import Any
from app.llm.client import LLMClient
def strip_markdown_json(text: str) -> str:
text = text.strip()
fenced = re.search(r"```(?:json)?\s*(.*?)\s*```", text, re.DOTALL | re.IGNORECASE)
if fenced:
return fenced.group(1).strip()
return text
def slugify_branch(title: str, max_len: int = 40) -> str:
text = title.lower()
text = re.sub(r"[^a-z0-9а-яё]+", "-", text, flags=re.IGNORECASE)
text = re.sub(r"-+", "-", text).strip("-")
return text[:max_len] or "task"
async def structure_work_item(
raw_text: str,
projects: list[dict[str, Any]],
) -> dict[str, Any]:
project_lines = "\n".join(
f"- {p['slug']}: {p['name']} (id={p['taiga_id']})" for p in projects
)
system_prompt = f"""
Ты технический ассистент. Преобразуй сырое описание фичи или бага в строгий JSON.
Отвечай только JSON, без markdown.
Доступные проекты Taiga:
{project_lines}
Схема:
{{
"project_slug": "slug проекта из списка",
"title": "короткое название",
"description": "понятное описание",
"issue_type": "feature|bug",
"priority": "low|normal|high",
"tags": ["tag1"],
"acceptance_criteria": ["критерий 1"],
"children": [
{{"title": "подзадача", "description": "описание", "type": "Task"}}
],
"questions": ["уточняющий вопрос если данных мало"]
}}
Правила:
- Пиши на русском.
- project_slug выбери из списка; если неясно — первый подходящий или спроси в questions.
- acceptance_criteria проверяемые.
- children — технические подзадачи.
""".strip()
llm = LLMClient()
result = await llm.complete(
[
{"role": "system", "content": system_prompt},
{"role": "user", "content": raw_text},
]
)
content = strip_markdown_json(result.get("content") or "")
return json.loads(content)
def format_story_description(task: dict[str, Any], raw_text: str) -> str:
lines = [task.get("description") or raw_text]
acceptance = task.get("acceptance_criteria") or []
if acceptance:
lines.append("")
lines.append("## Acceptance criteria")
for item in acceptance:
lines.append(f"- {item}")
questions = task.get("questions") or []
if questions:
lines.append("")
lines.append("## Вопросы")
for item in questions:
lines.append(f"- {item}")
lines.append("")
lines.append("## Исходное описание")
lines.append(raw_text)
return "\n".join(lines).strip()
def format_gitea_body(
task: dict[str, Any],
raw_text: str,
taiga_ref: int,
taiga_url: str,
branch: str,
) -> str:
body = format_story_description(task, raw_text)
body += f"\n\n---\n**Taiga:** #{taiga_ref}{taiga_url}\n"
body += f"**Ветка:** `{branch}`\n"
body += "\nЗакрытие: `Closes gitea #N, taiga #REF` в коммите"
return body
+81 -17
View File
@@ -4,6 +4,7 @@ from typing import Any
from sqlalchemy.orm import Session
from app.pomodoro.service import PomodoroService
from app.projects.service import ProjectService
TOOL_DEFINITIONS: list[dict[str, Any]] = [
{
@@ -104,7 +105,7 @@ TOOL_DEFINITIONS: list[dict[str, Any]] = [
"type": "function",
"function": {
"name": "get_pomodoro_history",
"description": "ОБЯЗАТЕЛЬНО при вопросах о задачах, истории работы или что пользователь делал.",
"description": "История помидоро-сессий (таймер), не Taiga-задачи.",
"parameters": {
"type": "object",
"properties": {
@@ -114,44 +115,107 @@ TOOL_DEFINITIONS: list[dict[str, Any]] = [
},
},
},
{
"type": "function",
"function": {
"name": "sync_taiga_projects",
"description": "Синхронизировать список проектов из Taiga API. Вызывай если проекты неизвестны.",
"parameters": {"type": "object", "properties": {}, "required": []},
},
},
{
"type": "function",
"function": {
"name": "list_taiga_projects",
"description": "Список проектов Taiga с привязкой Gitea.",
"parameters": {"type": "object", "properties": {}, "required": []},
},
},
{
"type": "function",
"function": {
"name": "create_work_item",
"description": (
"Создать фичу/баг из вольного текста: структурировать через LLM, "
"создать Taiga story + Gitea issue. Вызывай при «заведи баг», «оформи фичу», «добавь в таигу»."
),
"parameters": {
"type": "object",
"properties": {
"text": {"type": "string", "description": "Полное описание от пользователя"},
"project_slug": {
"type": "string",
"description": "Slug проекта Taiga, если известен",
},
},
"required": ["text"],
},
},
},
{
"type": "function",
"function": {
"name": "list_work_items",
"description": "Список созданных work items (Taiga + Gitea связки).",
"parameters": {
"type": "object",
"properties": {
"status": {"type": "string", "description": "open или closed"},
"limit": {"type": "integer"},
},
"required": [],
},
},
},
]
def execute_tool(db: Session, name: str, arguments: dict[str, Any]) -> str:
service = PomodoroService(db)
async def execute_tool(db: Session, name: str, arguments: dict[str, Any]) -> str:
pomodoro = PomodoroService(db)
projects = ProjectService(db)
try:
if name == "get_pomodoro_status":
result = service.get_status()
result = pomodoro.get_status()
elif name == "start_pomodoro":
result = service.start_work(
result = pomodoro.start_work(
duration_min=arguments.get("duration_min"),
task_note=arguments.get("task_note", ""),
)
elif name == "start_short_break":
result = service.start_short_break(
duration_min=arguments.get("duration_min"),
)
result = pomodoro.start_short_break(duration_min=arguments.get("duration_min"))
elif name == "start_long_break":
result = service.start_long_break(
duration_min=arguments.get("duration_min"),
)
result = pomodoro.start_long_break(duration_min=arguments.get("duration_min"))
elif name == "stop_pomodoro":
result = service.stop(
result = pomodoro.stop(
result=arguments.get("result", ""),
completed=arguments.get("completed", False),
)
elif name == "skip_pomodoro_phase":
result = service.skip_phase()
result = pomodoro.skip_phase()
elif name == "reset_pomodoro_cycle":
result = service.reset_cycle(
clear_task=arguments.get("clear_task", False),
)
result = pomodoro.reset_cycle(clear_task=arguments.get("clear_task", False))
elif name == "get_pomodoro_history":
result = service.history(limit=arguments.get("limit", 10))
result = pomodoro.history(limit=arguments.get("limit", 10))
elif name == "sync_taiga_projects":
result = projects.sync_taiga_projects()
elif name == "list_taiga_projects":
result = projects.list_projects()
elif name == "create_work_item":
result = await projects.create_work_item(
arguments.get("text", ""),
project_slug=arguments.get("project_slug"),
)
elif name == "list_work_items":
result = projects.list_work_items(
limit=arguments.get("limit", 20),
status=arguments.get("status"),
)
else:
return json.dumps({"error": f"Unknown tool: {name}"}, ensure_ascii=False)
return json.dumps(result, ensure_ascii=False)
except ValueError as exc:
return json.dumps({"error": str(exc)}, ensure_ascii=False)
except Exception as exc:
return json.dumps({"error": str(exc)}, ensure_ascii=False)