import asyncio import logging import os import uuid from pathlib import Path import httpx from dotenv import load_dotenv load_dotenv() logger = logging.getLogger(__name__) SD_BASE_URL = os.getenv("SD_BASE_URL", "http://127.0.0.1:8188").rstrip("/") SD_STEPS = int(os.getenv("SD_STEPS", "28")) SD_CFG = float(os.getenv("SD_CFG", "7")) SD_SAMPLER = os.getenv("SD_SAMPLER", "euler") SD_SCHEDULER = os.getenv("SD_SCHEDULER", "normal") SD_CHECKPOINT = os.getenv("SD_CHECKPOINT", "NetaYumev35_pretrained_all_in_one.safetensors") SD_DEFAULT_NEGATIVE = os.getenv( "SD_DEFAULT_NEGATIVE", "low quality, worst quality, blurry, bad anatomy, watermark, text", ) IMAGES_DIR = Path(os.getenv("IMAGES_DIR", "static/images")) def split_prompt_and_negative(full_prompt: str) -> tuple[str, str]: if "\n\nNegative prompt:" in full_prompt: pos, _, neg = full_prompt.partition("\n\nNegative prompt:") return pos.strip(), neg.strip() return full_prompt.strip(), SD_DEFAULT_NEGATIVE def _build_workflow(positive: str, negative: str) -> dict: """Minimal KSampler workflow for ComfyUI API.""" return { "4": {"class_type": "CheckpointLoaderSimple", "inputs": {"ckpt_name": SD_CHECKPOINT}}, "5": {"class_type": "EmptyLatentImage", "inputs": {"width": 832, "height": 1216, "batch_size": 1}}, "6": {"class_type": "CLIPTextEncode", "inputs": {"text": positive, "clip": ["4", 1]}}, "7": {"class_type": "CLIPTextEncode", "inputs": {"text": negative, "clip": ["4", 1]}}, "8": {"class_type": "VAEDecode", "inputs": {"samples": ["10", 0], "vae": ["4", 2]}}, "9": {"class_type": "SaveImage", "inputs": {"filename_prefix": "chatbot", "images": ["8", 0]}}, "10": { "class_type": "KSampler", "inputs": { "model": ["4", 0], "positive": ["6", 0], "negative": ["7", 0], "latent_image": ["5", 0], "seed": int(uuid.uuid4().int % 2**32), "steps": SD_STEPS, "cfg": SD_CFG, "sampler_name": SD_SAMPLER, "scheduler": SD_SCHEDULER, "denoise": 1.0, }, }, } async def check_sd() -> bool: try: async with httpx.AsyncClient(timeout=5) as client: r = await client.get(f"{SD_BASE_URL}/system_stats") return r.status_code == 200 except Exception: return False async def txt2img(prompt: str, negative_prompt: str | None = None) -> tuple[bytes, str]: neg = negative_prompt or SD_DEFAULT_NEGATIVE workflow = _build_workflow(prompt, neg) client_id = uuid.uuid4().hex logger.info("ComfyUI request → %s prompt: %.120s", SD_BASE_URL, prompt) async with httpx.AsyncClient(timeout=300) as client: # queue the prompt resp = await client.post( f"{SD_BASE_URL}/prompt", json={"prompt": workflow, "client_id": client_id}, ) resp.raise_for_status() prompt_id = resp.json()["prompt_id"] logger.info("ComfyUI queued prompt_id=%s", prompt_id) # poll until done for _ in range(300): await asyncio.sleep(1) hist = await client.get(f"{SD_BASE_URL}/history/{prompt_id}") data = hist.json() if prompt_id in data: outputs = data[prompt_id]["outputs"] # find first image output for node_output in outputs.values(): if "images" in node_output: img_info = node_output["images"][0] img_resp = await client.get( f"{SD_BASE_URL}/view", params={"filename": img_info["filename"], "subfolder": img_info.get("subfolder", ""), "type": img_info.get("type", "output")}, ) img_resp.raise_for_status() image_bytes = img_resp.content IMAGES_DIR.mkdir(parents=True, exist_ok=True) filename = f"{uuid.uuid4().hex}.png" (IMAGES_DIR / filename).write_bytes(image_bytes) logger.info("ComfyUI done → saved %s", filename) return image_bytes, f"images/{filename}" break raise RuntimeError("ComfyUI generation timed out or produced no output") async def generate_from_full_prompt(full_prompt: str) -> tuple[str | None, str | None]: positive, negative = split_prompt_and_negative(full_prompt) try: _, rel_path = await txt2img(positive, negative) return rel_path, None except Exception as e: logger.error("ComfyUI error: %s", e) return None, str(e)