Когда LLM-запрос идёт 12 секунд, а у вас 80 RPS на endpoint — синхронный паттерн «принял HTTP, вызвал OpenAI, ответил» убьёт сервис за 45 секунд. Все воркеры зависнут на ожидании, очередь TCP-коннектов забьётся, балансировщик начнёт отдавать 503. Правильный production-паттерн — async queue + worker + webhook callback. Через единый шлюз Promptra флагманы (Claude Opus 4.7 — 350/1790 ₽, GPT-5.5 — 350/2150 ₽, Gemini 3.1 Pro — 140/860 ₽, DeepSeek V4 Pro — 30/60 ₽) доступны через OpenAI-совместимый формат, что делает архитектуру переносимой между провайдерами одной заменой base_url.
Этот гайд — рабочий код FastAPI + Celery + Redis для production LLM-pipeline: HTTP-вход валидирует и кладёт задачу за 50 мс, worker выполняет долгий вызов, webhook callback возвращает результат с HMAC-подписью. Включены idempotency keys, retry с exponential backoff, dead-letter queue, мониторинг через Flower и точные настройки таймаутов. оплата в рублях по договору, полный пакет закрывающих документов.
TL;DR — async pipeline за 5 шагов
- HTTP-ручка
POST /v1/jobsпринимает запрос сIdempotency-Key, валидирует, кладёт задачу в Redis-очередь, отвечает202 Acceptedсjob_idза 50 мс. - Celery worker берёт задачу, вызывает LLM через Promptra с timeout 120 сек, сохраняет результат в Redis.
- После успеха worker шлёт POST на
callback_urlклиента с результатом и заголовкомX-Promptra-Signature(HMAC-SHA256). - На сбое callback — retry 5 раз с backoff (1с, 5с, 25с, 2 мин, 10 мин), после — dead-letter queue.
- Клиент проверяет подпись через
hmac.compare_digestи подтверждает приём200 OK. Эта статья — production-расширение нашего pillar-гида полный технический гид по LLM API на Python: токены, function calling, streaming, RAG, async/batch.
Почему синхронный вызов в HTTP-обработчике — это катастрофа
Стандартная FastAPI-ручка с прямым вызовом OpenAI выглядит «нормально» — но в production она ломается линейно с нагрузкой:
# АНТИПАТТЕРН — НЕ ДЕЛАЙТЕ ТАК В PRODUCTION
@app.post("/chat")
async def chat(req: ChatRequest):
response = await client.chat.completions.create(
model="claude-opus-4-7",
messages=req.messages,
)
return {"answer": response.choices[0].message.content}Что произойдёт под нагрузкой 50 RPS если средняя латентность Opus 4.7 — 12 секунд?
- Одновременно в обработке: 50 × 12 = 600 запросов.
- Воркеров (uvicorn workers) обычно 4–8 — каждый держит async event loop, но всё равно есть лимит на одновременные коннекты к OpenAI клиенту (по умолчанию 100).
- Через минуту коннекты исчерпаны, новые запросы зависают в pool wait, клиенты получают 504 Gateway Timeout.
- Браузер закрывает соединение по своему таймауту 30 сек — LLM продолжает генерить ответ, который никому не нужен, но за токены вы платите.
Правильно — разделить приём запроса (быстрый HTTP) и выполнение (долгий worker). HTTP-ручка работает 50 мс, worker — 12 секунд. Сервер обслуживает тысячи RPS, worker pool масштабируется отдельно. См. также гайд Streaming LLM-ответов через SSE — это альтернатива для интерактивных чатов, async webhook — для batch и интеграций.

Архитектура: FastAPI + Celery + Redis
Базовый стек для async LLM-pipeline:
- FastAPI — HTTP-вход, валидация, постановка задач. Документация — fastapi.tiangolo.com.
- Celery — distributed task queue с retry, scheduling и мониторингом. Альтернативы: RQ (проще), Arq (async-first), Dramatiq.
- Redis — backend для очереди и хранилище job state. Документация — redis.io.
- Flower — веб-UI для мониторинга Celery, показывает live воркеры, очереди и фейлы.
Минимальная структура проекта:
project/
├── app/
│ ├── main.py # FastAPI HTTP-вход
│ ├── tasks.py # Celery tasks
│ ├── celery_app.py # Celery конфиг
│ ├── llm_client.py # обёртка над Promptra
│ ├── webhooks.py # отправка callback
│ └── models.py # Pydantic схемы
├── docker-compose.yml # redis + flower + worker
└── requirements.txtcelery_app.py — конфигурация воркеров с правильными таймаутами:
from celery import Celery
import os
celery_app = Celery(
"promptra_jobs",
broker=os.getenv("REDIS_URL", "redis://localhost:6379/0"),
backend=os.getenv("REDIS_URL", "redis://localhost:6379/0"),
)
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="Europe/Moscow",
enable_utc=True,
task_acks_late=True, # ack только после успеха
task_reject_on_worker_lost=True, # вернуть в очередь если worker упал
task_time_limit=180, # hard kill через 3 минуты
task_soft_time_limit=170, # SoftTimeLimitExceeded для cleanup
worker_prefetch_multiplier=1, # не забирать задачи в pool заранее
worker_max_tasks_per_child=100, # рестарт worker'а после 100 задач (борьба с утечками)
broker_connection_retry_on_startup=True,
result_expires=86400, # результаты живут 24ч
)
celery_app.conf.task_routes = {
"tasks.llm_completion": {"queue": "llm"},
"tasks.send_callback": {"queue": "callbacks"},
}Две очереди — llm и callbacks — позволяют масштабировать воркеры независимо. Если callback зависает, LLM-задачи продолжают идти.
HTTP-ручка с idempotency
main.py — приём запроса за 50 мс:
import uuid
from fastapi import FastAPI, Header, HTTPException
from pydantic import BaseModel
import redis.asyncio as redis
from app.tasks import llm_completion
app = FastAPI
r = redis.from_url("redis://localhost:6379/1")
class JobRequest(BaseModel):
model: str
messages: list[dict]
callback_url: str
max_tokens: int = 2000
@app.post("/v1/jobs", status_code=202)
async def create_job(
req: JobRequest,
idempotency_key: str = Header(..., alias="Idempotency-Key"),
):
# Проверяем дедупликацию
existing = await r.get(f"idempotency:{idempotency_key}")
if existing:
return {"job_id": existing.decode, "status": "duplicate"}
job_id = str(uuid.uuid4)
# Сохраняем привязку idempotency_key → job_id на 24 часа
await r.setex(f"idempotency:{idempotency_key}", 86400, job_id)
await r.hset(f"job:{job_id}", mapping={
"status": "queued",
"model": req.model,
"callback_url": req.callback_url,
})
# Постановка задачи (не блокирует HTTP)
llm_completion.apply_async(
args=[job_id, req.model_dump],
queue="llm",
)
return {"job_id": job_id, "status": "queued"}
@app.get("/v1/jobs/{job_id}")
async def get_job(job_id: str):
data = await r.hgetall(f"job:{job_id}")
if not data:
raise HTTPException(404, "Job not found")
return {k.decode: v.decode for k, v in data.items}Ключевые моменты:
- status_code=202 Accepted — стандарт REST для async. Клиент знает, что запрос принят, но не выполнен.
- Idempotency-Key обязателен (Header(...)) — клиент генерит UUIDv4 для каждой логической операции. Если он повторит запрос — получит тот же job_id.
- TTL 86400 (24 часа) — после этого ключ протухает, повторный запрос создаст новую задачу. Подбирайте под ваш use case.
- GET
/v1/jobs/{job_id}— клиент может опрашивать статус (long polling fallback если webhook callback не доступен).
Подробнее про idempotency и retry в B2B контексте — Чек-лист 12 вопросов поставщику LLM API.

Celery worker: LLM-вызов с retry
tasks.py — основная задача:
from celery import shared_task
from celery.exceptions import SoftTimeLimitExceeded
import httpx
import redis
from openai import OpenAI, APIError, RateLimitError
from app.webhooks import send_callback
r = redis.from_url("redis://localhost:6379/1")
llm_client = OpenAI(
api_key=os.getenv("PROMPTRA_API_KEY"),
base_url="https://api.promptra.ru/v1",
timeout=httpx.Timeout(connect=5.0, read=120.0, write=10.0, pool=5.0),
max_retries=0, # ретраи делаем сами через Celery
)
@shared_task(
bind=True,
autoretry_for=(APIError, httpx.HTTPError),
retry_backoff=True,
retry_backoff_max=60,
retry_jitter=True,
max_retries=3,
)
def llm_completion(self, job_id: str, request: dict):
r.hset(f"job:{job_id}", "status", "running")
try:
response = llm_client.chat.completions.create(
model=request["model"],
messages=request["messages"],
max_tokens=request.get("max_tokens", 2000),
)
result = {
"content": response.choices[0].message.content,
"usage": {
"input_tokens": response.usage.prompt_tokens,
"output_tokens": response.usage.completion_tokens,
},
"model": response.model,
}
r.hset(f"job:{job_id}", mapping={
"status": "completed",
"result": json.dumps(result),
})
# Постановка задачи на отправку callback
send_callback.apply_async(
args=[job_id, request["callback_url"], result],
queue="callbacks",
)
return result
except RateLimitError as e:
# Специальная обработка 429 — retry с увеличенным backoff
raise self.retry(countdown=30, exc=e)
except SoftTimeLimitExceeded:
r.hset(f"job:{job_id}", "status", "timeout")
raise
except Exception as e:
r.hset(f"job:{job_id}", mapping={
"status": "failed",
"error": str(e),
})
raiseЧто важно:
- autoretry_for + retry_backoff — Celery сам ретраит на API-ошибки. retry_backoff=True даёт 1, 2, 4, 8 секунд; retry_backoff_max=60 ограничивает; retry_jitter=True добавляет случайность ±25% против thundering herd.
- max_retries=3 — четвёртая попытка не делается, задача уходит в failed.
- RateLimitError обрабатывается отдельно с фиксированным countdown=30 — обычно провайдер даёт retry-after хедер, можно вычитать его и использовать вместо 30.
- SoftTimeLimitExceeded — за 10 сек до hard kill worker получает это исключение. Можно успеть закрыть HTTP-коннект, записать состояние, дочитать stream.
Про exponential backoff и token bucket подробно — Rate limiting и retry стратегии для LLM API.
Webhook callback с HMAC-подписью
webhooks.py — отправка результата клиенту с подписью:
import hmac
import hashlib
import json
import time
import httpx
from celery import shared_task
from celery.exceptions import Retry
WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET") # 32+ байта random
def sign_payload(payload: bytes, timestamp: int, secret: str) -> str:
"""Подписываем timestamp.payload — защита от replay."""
message = f"{timestamp}.".encode + payload
sig = hmac.new(secret.encode, message, hashlib.sha256).hexdigest
return f"t={timestamp},v1={sig}"
@shared_task(
bind=True,
autoretry_for=(httpx.HTTPError,),
retry_backoff=True,
retry_backoff_max=600,
retry_jitter=True,
max_retries=5,
)
def send_callback(self, job_id: str, callback_url: str, result: dict):
payload = json.dumps({"job_id": job_id, "result": result}).encode
timestamp = int(time.time)
signature = sign_payload(payload, timestamp, WEBHOOK_SECRET)
with httpx.Client(timeout=10.0) as client:
response = client.post(
callback_url,
content=payload,
headers={
"Content-Type": "application/json",
"X-Promptra-Signature": signature,
"X-Promptra-Job-Id": job_id,
"User-Agent": "Promptra-Webhook/1.0",
},
)
if response.status_code >= 500:
# 5xx — retry
raise httpx.HTTPStatusError(
f"Server error: {response.status_code}",
request=response.request,
response=response,
)
if response.status_code >= 400:
# 4xx — не retry, баг клиента
r.hset(f"job:{job_id}", "callback_status", f"client_error_{response.status_code}")
return
r.hset(f"job:{job_id}", "callback_status", "delivered")Параметры retry:
- max_retries=5 + retry_backoff_max=600 → попытки на 1с, 5с, 25с, 2 мин, 10 мин (с jitter).
- После 5 неудач задача автоматически уходит в Celery dead-letter (если настроен) или просто marks as failed.
- 4xx ошибки не retry'им — это значит у клиента баг (например, не проверяет подпись), retry не поможет.
Клиентская проверка подписи:
def verify_signature(payload: bytes, signature_header: str, secret: str, max_age: int = 300) -> bool:
"""Возвращает True если подпись валидна и timestamp в окне 5 минут."""
try:
parts = dict(p.split("=") for p in signature_header.split(","))
timestamp = int(parts["t"])
received_sig = parts["v1"]
except (KeyError, ValueError):
return False
# Проверка свежести (защита от replay)
if abs(time.time - timestamp) > max_age:
return False
# Считаем ожидаемую подпись
message = f"{timestamp}.".encode + payload
expected = hmac.new(secret.encode, message, hashlib.sha256).hexdigest
# Константно-временное сравнение
return hmac.compare_digest(expected, received_sig)Критически важно — hmac.compare_digest, а не ==. Обычное сравнение строк может leak'нуть timing attack: первый отличающийся байт прерывает сравнение, и атакующий по разнице во времени может восстановить подпись.

Dead-letter queue и мониторинг
После 5 неудач callback и 3 неудач LLM-вызова задача должна попасть в DLQ для ручного разбора. Celery не имеет встроенной DLQ — реализуем через дополнительную очередь:
from celery.signals import task_failure
@task_failure.connect
def handle_task_failure(sender=None, task_id=None, exception=None, args=None, kwargs=None, einfo=None, **kw):
"""Перехватываем фейлы и кладём в DLQ для ручного разбора."""
if sender.request.retries >= sender.max_retries:
r.lpush("dlq", json.dumps({
"task": sender.name,
"task_id": task_id,
"args": args,
"kwargs": kwargs,
"exception": str(exception),
"timestamp": int(time.time),
}))
# Алерт в Slack/Telegram
notify_oncall(task_id, exception)DLQ-задачи хранятся в Redis-list dlq 72 часа, потом архивируются в S3/Postgres. Раз в день оператор разбирает: чинит баг → переотправляет задачу. Подробнее про мониторинг latency, error rate и cost — в гайде Логирование и observability LLM-приложений.
Метрики, которые нужно собирать (Prometheus + Grafana):
| Метрика | Что показывает | Алерт |
|---|---|---|
llm_jobs_queued | Размер очереди | >500 — масштабировать воркеры |
llm_jobs_processing | Активные задачи | более 100% worker — bottleneck |
llm_jobs_failed_total | Счётчик фейлов | error rate более 5% 5 мин |
llm_job_duration_seconds | Гистограмма времени | p99 более 60с — медленный провайдер |
callback_delivery_total | Доставки callback | success rate менее 95% |
callback_retry_total | Сумма retry | внезапный рост → проблема у клиента |
dlq_size | Размер DLQ | >0 — оператор должен разобрать |
Flower UI на http://localhost:5555 показывает live состояние воркеров, очередей и недавние фейлы. В production — за nginx с basic auth.
Production-чеклист перед запуском
Минимальный сетап для production async LLM-pipeline:
- [ ] Idempotency-Key обязателен на всех мутирующих ручках; TTL 24ч в Redis.
- [ ] HMAC-подпись на callback с timestamp и проверкой окна 5 минут; secret 32+ байта.
- [ ] Constant-time сравнение подписей через
hmac.compare_digest. - [ ] timeout на HTTP-клиент — connect 5с, read 120с (для reasoning), write 10с, pool 5с.
- [ ] task_time_limit 180с + soft_time_limit 170с в Celery — против зависших задач.
- [ ] task_acks_late + task_reject_on_worker_lost — задача не теряется при падении worker'а.
- [ ] worker_prefetch_multiplier=1 — равномерное распределение задач между воркерами.
- [ ] worker_max_tasks_per_child=100 — рестарт против утечек памяти в SDK.
- [ ] Отдельные очереди для
llmиcallbacks— независимое масштабирование. - [ ] Retry policy — 3 для LLM (autoretry_for APIError), 5 для callback (только 5xx).
- [ ] DLQ через task_failure signal + алерт в Slack/Telegram.
- [ ] Prometheus метрики: queued, processing, failed, duration p50/p95/p99, callback success rate.
- [ ] Flower за nginx basic auth для мониторинга.
- [ ] Health check Redis: при недоступности — HTTP /healthz должен вернуть 503.
- [ ] Webhook secret rotation раз в 90 дней (поддержка двух активных secret одновременно).
- [ ] Лимит размера body на HTTP-ручке (например, 100 KB) — против abuse.
Через шлюз Promptra все модели работают через base_url="https://api.promptra.ru/v1" без переписывания кода — миграция с прямого OpenAI описана в гайде Миграция с OpenAI на Promptra за 10 минут. Цены прозрачны 1-в-1 с провайдером по курсу ЦБ — для batch-сценариев на 1М запросов разница экономии vs российских наценок описана в Сравнении цен LLM 2026.

Запасные варианты: serverless и managed
Если не хочется поднимать Celery + Redis — есть managed-альтернативы:
- AWS SQS + Lambda — очередь и worker полностью managed. Минусы: cold start 200–800 мс, vendor lock.
- Yandex Cloud Functions + Message Queue — российский вариант, SQS-совместимый API.
- OpenAI Batch API — для офлайн-обработки >1000 запросов с 50% скидкой. Через Promptra тот же эндпоинт работает для всех провайдеров — см. Async и Batch API LLM: 50% скидка.
- Inngest / Temporal — managed durable execution engines с retry, scheduling и UI.
Для русского B2B на собственной VM (Yandex Cloud / VK Cloud) — Redis + Celery остаётся самым простым и контролируемым стеком. Один docker-compose, никаких внешних зависимостей кроме шлюза.

FAQ
Почему нельзя просто вызвать OpenAI напрямую в HTTP-обработчике?
LLM-запросы идут 8–25 секунд для флагманов, до 120 секунд для reasoning. Под нагрузкой 50 RPS воркеры исчерпаются за минуту, клиенты получат 504. HTTP-ручка должна работать 50 мс — принять запрос, поставить в очередь, ответить 202. Worker асинхронно выполняет вызов.
Что такое idempotency key и зачем он нужен?
Уникальный ID запроса (UUIDv4) от клиента. Если он повторит POST из-за таймаута, сервер дедуплицирует по ключу в Redis и не создаёт дубль. Без этого вы заплатите за два вызова Opus 4.7 (350/1790 ₽). TTL 24 часа. Стандарт Stripe и OpenAI Batch.
Как защитить webhook callback от подделки?
HMAC-SHA256 от timestamp.payload, заголовок X-Promptra-Signature: t=<ts>,v1=<sig>. Проверка timestamp в окне 5 минут (anti-replay). Сравнение через hmac.compare_digest — не через ==, иначе timing attack. Secret 32+ байта, ротация раз в 90 дней.
Сколько ретраев на сбойный callback?
5 попыток с exponential backoff: 1с, 5с, 25с, 2 мин, 10 мин (jitter ±20%). Retry только на 5xx и сетевые ошибки. 4xx — баг клиента, не помогает. После 5 фейлов — dead-letter queue для ручного разбора + алерт оператору.
FastAPI background_tasks или Celery?
Background_tasks — только для коротких fire-and-forget (отправить email). Нет ретраев, нет персистентности при рестарте. Для LLM jobs нужен Celery с Redis: персистентная очередь, retry policy, мониторинг через Flower. Альтернативы — RQ (проще), Arq (async-first), Dramatiq.
Как выставить timeout на LLM-запрос в worker?
Два уровня. HTTP-клиент: client.with_options(timeout=httpx.Timeout(connect=5.0, read=120.0, write=10.0, pool=5.0)). Celery task: time_limit=180, soft_time_limit=170. SoftTimeLimitExceeded даёт worker'у 10с на cleanup перед hard kill.
