122 lines
4.7 KiB
Python
122 lines
4.7 KiB
Python
import asyncio
|
|
import logging
|
|
import os
|
|
import uuid
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
SD_BASE_URL = os.getenv("SD_BASE_URL", "http://127.0.0.1:8188").rstrip("/")
|
|
SD_STEPS = int(os.getenv("SD_STEPS", "28"))
|
|
SD_CFG = float(os.getenv("SD_CFG", "7"))
|
|
SD_SAMPLER = os.getenv("SD_SAMPLER", "euler")
|
|
SD_SCHEDULER = os.getenv("SD_SCHEDULER", "normal")
|
|
SD_CHECKPOINT = os.getenv("SD_CHECKPOINT", "NetaYumev35_pretrained_all_in_one.safetensors")
|
|
SD_DEFAULT_NEGATIVE = os.getenv(
|
|
"SD_DEFAULT_NEGATIVE",
|
|
"low quality, worst quality, blurry, bad anatomy, watermark, text",
|
|
)
|
|
IMAGES_DIR = Path(os.getenv("IMAGES_DIR", "static/images"))
|
|
|
|
|
|
def split_prompt_and_negative(full_prompt: str) -> tuple[str, str]:
|
|
if "\n\nNegative prompt:" in full_prompt:
|
|
pos, _, neg = full_prompt.partition("\n\nNegative prompt:")
|
|
return pos.strip(), neg.strip()
|
|
return full_prompt.strip(), SD_DEFAULT_NEGATIVE
|
|
|
|
|
|
def _build_workflow(positive: str, negative: str) -> dict:
|
|
"""Minimal KSampler workflow for ComfyUI API."""
|
|
return {
|
|
"4": {"class_type": "CheckpointLoaderSimple", "inputs": {"ckpt_name": SD_CHECKPOINT}},
|
|
"5": {"class_type": "EmptyLatentImage", "inputs": {"width": 832, "height": 1216, "batch_size": 1}},
|
|
"6": {"class_type": "CLIPTextEncode", "inputs": {"text": positive, "clip": ["4", 1]}},
|
|
"7": {"class_type": "CLIPTextEncode", "inputs": {"text": negative, "clip": ["4", 1]}},
|
|
"8": {"class_type": "VAEDecode", "inputs": {"samples": ["10", 0], "vae": ["4", 2]}},
|
|
"9": {"class_type": "SaveImage", "inputs": {"filename_prefix": "chatbot", "images": ["8", 0]}},
|
|
"10": {
|
|
"class_type": "KSampler",
|
|
"inputs": {
|
|
"model": ["4", 0],
|
|
"positive": ["6", 0],
|
|
"negative": ["7", 0],
|
|
"latent_image": ["5", 0],
|
|
"seed": int(uuid.uuid4().int % 2**32),
|
|
"steps": SD_STEPS,
|
|
"cfg": SD_CFG,
|
|
"sampler_name": SD_SAMPLER,
|
|
"scheduler": SD_SCHEDULER,
|
|
"denoise": 1.0,
|
|
},
|
|
},
|
|
}
|
|
|
|
|
|
async def check_sd() -> bool:
|
|
try:
|
|
async with httpx.AsyncClient(timeout=5) as client:
|
|
r = await client.get(f"{SD_BASE_URL}/system_stats")
|
|
return r.status_code == 200
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
async def txt2img(prompt: str, negative_prompt: str | None = None) -> tuple[bytes, str]:
|
|
neg = negative_prompt or SD_DEFAULT_NEGATIVE
|
|
workflow = _build_workflow(prompt, neg)
|
|
client_id = uuid.uuid4().hex
|
|
|
|
logger.info("ComfyUI request → %s prompt: %.120s", SD_BASE_URL, prompt)
|
|
async with httpx.AsyncClient(timeout=300) as client:
|
|
# queue the prompt
|
|
resp = await client.post(
|
|
f"{SD_BASE_URL}/prompt",
|
|
json={"prompt": workflow, "client_id": client_id},
|
|
)
|
|
resp.raise_for_status()
|
|
prompt_id = resp.json()["prompt_id"]
|
|
logger.info("ComfyUI queued prompt_id=%s", prompt_id)
|
|
|
|
# poll until done
|
|
for _ in range(300):
|
|
await asyncio.sleep(1)
|
|
hist = await client.get(f"{SD_BASE_URL}/history/{prompt_id}")
|
|
data = hist.json()
|
|
if prompt_id in data:
|
|
outputs = data[prompt_id]["outputs"]
|
|
# find first image output
|
|
for node_output in outputs.values():
|
|
if "images" in node_output:
|
|
img_info = node_output["images"][0]
|
|
img_resp = await client.get(
|
|
f"{SD_BASE_URL}/view",
|
|
params={"filename": img_info["filename"], "subfolder": img_info.get("subfolder", ""), "type": img_info.get("type", "output")},
|
|
)
|
|
img_resp.raise_for_status()
|
|
image_bytes = img_resp.content
|
|
|
|
IMAGES_DIR.mkdir(parents=True, exist_ok=True)
|
|
filename = f"{uuid.uuid4().hex}.png"
|
|
(IMAGES_DIR / filename).write_bytes(image_bytes)
|
|
logger.info("ComfyUI done → saved %s", filename)
|
|
return image_bytes, f"images/{filename}"
|
|
break
|
|
|
|
raise RuntimeError("ComfyUI generation timed out or produced no output")
|
|
|
|
|
|
async def generate_from_full_prompt(full_prompt: str) -> tuple[str | None, str | None]:
|
|
positive, negative = split_prompt_and_negative(full_prompt)
|
|
try:
|
|
_, rel_path = await txt2img(positive, negative)
|
|
return rel_path, None
|
|
except Exception as e:
|
|
logger.error("ComfyUI error: %s", e)
|
|
return None, str(e)
|