promptra
← Все статьи
Гайды7 мин чтения

Webhook'и и async events с LLM в production: best practices 2026 для Python

Production-гайд 2026 для Python: webhook'и и асинхронные события с LLM API. FastAPI + Celery worker pattern, idempotency keys через Redis, retry queue с dead-letter, HMAC-подпись, обработка long-running запросов (Opus 4.7, GPT-5.5) без блокировки веб-сервера, мониторинг и точные числа.

Инфографика async webhook-архитектуры с LLM: HTTP-вход с подписью HMAC идёт в FastAPI, оттуда задача попадает в Celery-очередь Redis, worker дёргает Promptra API и возвращает результат в callback URL клиента; плоский векторный стиль в кремово-терракотовой палитре

Когда LLM-запрос идёт 12 секунд, а у вас 80 RPS на endpoint — синхронный паттерн «принял HTTP, вызвал OpenAI, ответил» убьёт сервис за 45 секунд. Все воркеры зависнут на ожидании, очередь TCP-коннектов забьётся, балансировщик начнёт отдавать 503. Правильный production-паттерн — async queue + worker + webhook callback. Через единый шлюз Promptra флагманы (Claude Opus 4.7 — 350/1790 ₽, GPT-5.5 — 350/2150 ₽, Gemini 3.1 Pro — 140/860 ₽, DeepSeek V4 Pro — 30/60 ₽) доступны через OpenAI-совместимый формат, что делает архитектуру переносимой между провайдерами одной заменой base_url.

Этот гайд — рабочий код FastAPI + Celery + Redis для production LLM-pipeline: HTTP-вход валидирует и кладёт задачу за 50 мс, worker выполняет долгий вызов, webhook callback возвращает результат с HMAC-подписью. Включены idempotency keys, retry с exponential backoff, dead-letter queue, мониторинг через Flower и точные настройки таймаутов. оплата в рублях по договору, полный пакет закрывающих документов.

TL;DR — async pipeline за 5 шагов

  1. HTTP-ручка POST /v1/jobs принимает запрос с Idempotency-Key, валидирует, кладёт задачу в Redis-очередь, отвечает 202 Accepted с job_id за 50 мс.
  2. Celery worker берёт задачу, вызывает LLM через Promptra с timeout 120 сек, сохраняет результат в Redis.
  3. После успеха worker шлёт POST на callback_url клиента с результатом и заголовком X-Promptra-Signature (HMAC-SHA256).
  4. На сбое callback — retry 5 раз с backoff (1с, 5с, 25с, 2 мин, 10 мин), после — dead-letter queue.
  5. Клиент проверяет подпись через hmac.compare_digest и подтверждает приём 200 OK. Эта статья — production-расширение нашего pillar-гида полный технический гид по LLM API на Python: токены, function calling, streaming, RAG, async/batch.

Почему синхронный вызов в HTTP-обработчике — это катастрофа

Стандартная FastAPI-ручка с прямым вызовом OpenAI выглядит «нормально» — но в production она ломается линейно с нагрузкой:

# АНТИПАТТЕРН — НЕ ДЕЛАЙТЕ ТАК В PRODUCTION
@app.post("/chat")
async def chat(req: ChatRequest):
    response = await client.chat.completions.create(
        model="claude-opus-4-7",
        messages=req.messages,
    )
    return {"answer": response.choices[0].message.content}

Что произойдёт под нагрузкой 50 RPS если средняя латентность Opus 4.7 — 12 секунд?

  • Одновременно в обработке: 50 × 12 = 600 запросов.
  • Воркеров (uvicorn workers) обычно 4–8 — каждый держит async event loop, но всё равно есть лимит на одновременные коннекты к OpenAI клиенту (по умолчанию 100).
  • Через минуту коннекты исчерпаны, новые запросы зависают в pool wait, клиенты получают 504 Gateway Timeout.
  • Браузер закрывает соединение по своему таймауту 30 сек — LLM продолжает генерить ответ, который никому не нужен, но за токены вы платите.

Правильно — разделить приём запроса (быстрый HTTP) и выполнение (долгий worker). HTTP-ручка работает 50 мс, worker — 12 секунд. Сервер обслуживает тысячи RPS, worker pool масштабируется отдельно. См. также гайд Streaming LLM-ответов через SSE — это альтернатива для интерактивных чатов, async webhook — для batch и интеграций.

Сравнительная схема: слева красный блок «синхронный паттерн» с очередью зависших воркеров и индикатором 504 Gateway Timeout, справа зелёный блок «async pipeline» с быстрым HTTP-ответом 202 и отдельным worker pool; стрелка времени снизу показывает «50 мс» против «12 с»; заголовок «Почему синхронный паттерн ломается»

Архитектура: FastAPI + Celery + Redis

Базовый стек для async LLM-pipeline:

  • FastAPI — HTTP-вход, валидация, постановка задач. Документация — fastapi.tiangolo.com.
  • Celery — distributed task queue с retry, scheduling и мониторингом. Альтернативы: RQ (проще), Arq (async-first), Dramatiq.
  • Redis — backend для очереди и хранилище job state. Документация — redis.io.
  • Flower — веб-UI для мониторинга Celery, показывает live воркеры, очереди и фейлы.

Минимальная структура проекта:

project/
├── app/
│   ├── main.py           # FastAPI HTTP-вход
│   ├── tasks.py          # Celery tasks
│   ├── celery_app.py     # Celery конфиг
│   ├── llm_client.py     # обёртка над Promptra
│   ├── webhooks.py       # отправка callback
│   └── models.py         # Pydantic схемы
├── docker-compose.yml    # redis + flower + worker
└── requirements.txt

celery_app.py — конфигурация воркеров с правильными таймаутами:

from celery import Celery
import os

celery_app = Celery(
    "promptra_jobs",
    broker=os.getenv("REDIS_URL", "redis://localhost:6379/0"),
    backend=os.getenv("REDIS_URL", "redis://localhost:6379/0"),
)

celery_app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="Europe/Moscow",
    enable_utc=True,
    task_acks_late=True,              # ack только после успеха
    task_reject_on_worker_lost=True,  # вернуть в очередь если worker упал
    task_time_limit=180,              # hard kill через 3 минуты
    task_soft_time_limit=170,         # SoftTimeLimitExceeded для cleanup
    worker_prefetch_multiplier=1,     # не забирать задачи в pool заранее
    worker_max_tasks_per_child=100,   # рестарт worker'а после 100 задач (борьба с утечками)
    broker_connection_retry_on_startup=True,
    result_expires=86400,             # результаты живут 24ч
)

celery_app.conf.task_routes = {
    "tasks.llm_completion": {"queue": "llm"},
    "tasks.send_callback": {"queue": "callbacks"},
}

Две очереди — llm и callbacks — позволяют масштабировать воркеры независимо. Если callback зависает, LLM-задачи продолжают идти.

HTTP-ручка с idempotency

main.py — приём запроса за 50 мс:

import uuid
from fastapi import FastAPI, Header, HTTPException
from pydantic import BaseModel
import redis.asyncio as redis
from app.tasks import llm_completion

app = FastAPI
r = redis.from_url("redis://localhost:6379/1")

class JobRequest(BaseModel):
    model: str
    messages: list[dict]
    callback_url: str
    max_tokens: int = 2000

@app.post("/v1/jobs", status_code=202)
async def create_job(
    req: JobRequest,
    idempotency_key: str = Header(..., alias="Idempotency-Key"),
):
    # Проверяем дедупликацию
    existing = await r.get(f"idempotency:{idempotency_key}")
    if existing:
        return {"job_id": existing.decode, "status": "duplicate"}

    job_id = str(uuid.uuid4)

    # Сохраняем привязку idempotency_key → job_id на 24 часа
    await r.setex(f"idempotency:{idempotency_key}", 86400, job_id)
    await r.hset(f"job:{job_id}", mapping={
        "status": "queued",
        "model": req.model,
        "callback_url": req.callback_url,
    })

    # Постановка задачи (не блокирует HTTP)
    llm_completion.apply_async(
        args=[job_id, req.model_dump],
        queue="llm",
    )

    return {"job_id": job_id, "status": "queued"}

@app.get("/v1/jobs/{job_id}")
async def get_job(job_id: str):
    data = await r.hgetall(f"job:{job_id}")
    if not data:
        raise HTTPException(404, "Job not found")
    return {k.decode: v.decode for k, v in data.items}

Ключевые моменты:

  • status_code=202 Accepted — стандарт REST для async. Клиент знает, что запрос принят, но не выполнен.
  • Idempotency-Key обязателен (Header(...)) — клиент генерит UUIDv4 для каждой логической операции. Если он повторит запрос — получит тот же job_id.
  • TTL 86400 (24 часа) — после этого ключ протухает, повторный запрос создаст новую задачу. Подбирайте под ваш use case.
  • GET /v1/jobs/{job_id} — клиент может опрашивать статус (long polling fallback если webhook callback не доступен).

Подробнее про idempotency и retry в B2B контексте — Чек-лист 12 вопросов поставщику LLM API.

Диаграмма последовательности idempotency-проверки: клиент отправляет POST с Idempotency-Key, FastAPI проверяет Redis-ключ, если есть — возвращает существующий job_id, если нет — создаёт новый и сохраняет привязку на 24 часа; заголовок «Дедупликация через Idempotency-Key»

Celery worker: LLM-вызов с retry

tasks.py — основная задача:

from celery import shared_task
from celery.exceptions import SoftTimeLimitExceeded
import httpx
import redis
from openai import OpenAI, APIError, RateLimitError
from app.webhooks import send_callback

r = redis.from_url("redis://localhost:6379/1")

llm_client = OpenAI(
    api_key=os.getenv("PROMPTRA_API_KEY"),
    base_url="https://api.promptra.ru/v1",
    timeout=httpx.Timeout(connect=5.0, read=120.0, write=10.0, pool=5.0),
    max_retries=0,  # ретраи делаем сами через Celery
)

@shared_task(
    bind=True,
    autoretry_for=(APIError, httpx.HTTPError),
    retry_backoff=True,
    retry_backoff_max=60,
    retry_jitter=True,
    max_retries=3,
)
def llm_completion(self, job_id: str, request: dict):
    r.hset(f"job:{job_id}", "status", "running")
    try:
        response = llm_client.chat.completions.create(
            model=request["model"],
            messages=request["messages"],
            max_tokens=request.get("max_tokens", 2000),
        )
        result = {
            "content": response.choices[0].message.content,
            "usage": {
                "input_tokens": response.usage.prompt_tokens,
                "output_tokens": response.usage.completion_tokens,
            },
            "model": response.model,
        }
        r.hset(f"job:{job_id}", mapping={
            "status": "completed",
            "result": json.dumps(result),
        })
        # Постановка задачи на отправку callback
        send_callback.apply_async(
            args=[job_id, request["callback_url"], result],
            queue="callbacks",
        )
        return result
    except RateLimitError as e:
        # Специальная обработка 429 — retry с увеличенным backoff
        raise self.retry(countdown=30, exc=e)
    except SoftTimeLimitExceeded:
        r.hset(f"job:{job_id}", "status", "timeout")
        raise
    except Exception as e:
        r.hset(f"job:{job_id}", mapping={
            "status": "failed",
            "error": str(e),
        })
        raise

Что важно:

  • autoretry_for + retry_backoff — Celery сам ретраит на API-ошибки. retry_backoff=True даёт 1, 2, 4, 8 секунд; retry_backoff_max=60 ограничивает; retry_jitter=True добавляет случайность ±25% против thundering herd.
  • max_retries=3 — четвёртая попытка не делается, задача уходит в failed.
  • RateLimitError обрабатывается отдельно с фиксированным countdown=30 — обычно провайдер даёт retry-after хедер, можно вычитать его и использовать вместо 30.
  • SoftTimeLimitExceeded — за 10 сек до hard kill worker получает это исключение. Можно успеть закрыть HTTP-коннект, записать состояние, дочитать stream.

Про exponential backoff и token bucket подробно — Rate limiting и retry стратегии для LLM API.

Webhook callback с HMAC-подписью

webhooks.py — отправка результата клиенту с подписью:

import hmac
import hashlib
import json
import time
import httpx
from celery import shared_task
from celery.exceptions import Retry

WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET")  # 32+ байта random

def sign_payload(payload: bytes, timestamp: int, secret: str) -> str:
    """Подписываем timestamp.payload — защита от replay."""
    message = f"{timestamp}.".encode + payload
    sig = hmac.new(secret.encode, message, hashlib.sha256).hexdigest
    return f"t={timestamp},v1={sig}"

@shared_task(
    bind=True,
    autoretry_for=(httpx.HTTPError,),
    retry_backoff=True,
    retry_backoff_max=600,
    retry_jitter=True,
    max_retries=5,
)
def send_callback(self, job_id: str, callback_url: str, result: dict):
    payload = json.dumps({"job_id": job_id, "result": result}).encode
    timestamp = int(time.time)
    signature = sign_payload(payload, timestamp, WEBHOOK_SECRET)

    with httpx.Client(timeout=10.0) as client:
        response = client.post(
            callback_url,
            content=payload,
            headers={
                "Content-Type": "application/json",
                "X-Promptra-Signature": signature,
                "X-Promptra-Job-Id": job_id,
                "User-Agent": "Promptra-Webhook/1.0",
            },
        )
        if response.status_code >= 500:
            # 5xx — retry
            raise httpx.HTTPStatusError(
                f"Server error: {response.status_code}",
                request=response.request,
                response=response,
            )
        if response.status_code >= 400:
            # 4xx — не retry, баг клиента
            r.hset(f"job:{job_id}", "callback_status", f"client_error_{response.status_code}")
            return
        r.hset(f"job:{job_id}", "callback_status", "delivered")

Параметры retry:

  • max_retries=5 + retry_backoff_max=600 → попытки на 1с, 5с, 25с, 2 мин, 10 мин (с jitter).
  • После 5 неудач задача автоматически уходит в Celery dead-letter (если настроен) или просто marks as failed.
  • 4xx ошибки не retry'им — это значит у клиента баг (например, не проверяет подпись), retry не поможет.

Клиентская проверка подписи:

def verify_signature(payload: bytes, signature_header: str, secret: str, max_age: int = 300) -> bool:
    """Возвращает True если подпись валидна и timestamp в окне 5 минут."""
    try:
        parts = dict(p.split("=") for p in signature_header.split(","))
        timestamp = int(parts["t"])
        received_sig = parts["v1"]
    except (KeyError, ValueError):
        return False

    # Проверка свежести (защита от replay)
    if abs(time.time - timestamp) > max_age:
        return False

    # Считаем ожидаемую подпись
    message = f"{timestamp}.".encode + payload
    expected = hmac.new(secret.encode, message, hashlib.sha256).hexdigest

    # Константно-временное сравнение
    return hmac.compare_digest(expected, received_sig)

Критически важно — hmac.compare_digest, а не ==. Обычное сравнение строк может leak'нуть timing attack: первый отличающийся байт прерывает сравнение, и атакующий по разнице во времени может восстановить подпись.

Архитектурная диаграмма webhook callback с HMAC: worker формирует payload, подписывает HMAC-SHA256 с timestamp, отправляет POST с заголовком X-Promptra-Signature; клиент проверяет timestamp в окне 5 минут и константно-временное сравнение через hmac.compare_digest; заголовок «HMAC-подпись webhook»

Dead-letter queue и мониторинг

После 5 неудач callback и 3 неудач LLM-вызова задача должна попасть в DLQ для ручного разбора. Celery не имеет встроенной DLQ — реализуем через дополнительную очередь:

from celery.signals import task_failure

@task_failure.connect
def handle_task_failure(sender=None, task_id=None, exception=None, args=None, kwargs=None, einfo=None, **kw):
    """Перехватываем фейлы и кладём в DLQ для ручного разбора."""
    if sender.request.retries >= sender.max_retries:
        r.lpush("dlq", json.dumps({
            "task": sender.name,
            "task_id": task_id,
            "args": args,
            "kwargs": kwargs,
            "exception": str(exception),
            "timestamp": int(time.time),
        }))
        # Алерт в Slack/Telegram
        notify_oncall(task_id, exception)

DLQ-задачи хранятся в Redis-list dlq 72 часа, потом архивируются в S3/Postgres. Раз в день оператор разбирает: чинит баг → переотправляет задачу. Подробнее про мониторинг latency, error rate и cost — в гайде Логирование и observability LLM-приложений.

Метрики, которые нужно собирать (Prometheus + Grafana):

МетрикаЧто показываетАлерт
llm_jobs_queuedРазмер очереди>500 — масштабировать воркеры
llm_jobs_processingАктивные задачиболее 100% worker — bottleneck
llm_jobs_failed_totalСчётчик фейловerror rate более 5% 5 мин
llm_job_duration_secondsГистограмма времениp99 более 60с — медленный провайдер
callback_delivery_totalДоставки callbacksuccess rate менее 95%
callback_retry_totalСумма retryвнезапный рост → проблема у клиента
dlq_sizeРазмер DLQ>0 — оператор должен разобрать

Flower UI на http://localhost:5555 показывает live состояние воркеров, очередей и недавние фейлы. В production — за nginx с basic auth.

Production-чеклист перед запуском

Минимальный сетап для production async LLM-pipeline:

  • [ ] Idempotency-Key обязателен на всех мутирующих ручках; TTL 24ч в Redis.
  • [ ] HMAC-подпись на callback с timestamp и проверкой окна 5 минут; secret 32+ байта.
  • [ ] Constant-time сравнение подписей через hmac.compare_digest.
  • [ ] timeout на HTTP-клиент — connect 5с, read 120с (для reasoning), write 10с, pool 5с.
  • [ ] task_time_limit 180с + soft_time_limit 170с в Celery — против зависших задач.
  • [ ] task_acks_late + task_reject_on_worker_lost — задача не теряется при падении worker'а.
  • [ ] worker_prefetch_multiplier=1 — равномерное распределение задач между воркерами.
  • [ ] worker_max_tasks_per_child=100 — рестарт против утечек памяти в SDK.
  • [ ] Отдельные очереди для llm и callbacks — независимое масштабирование.
  • [ ] Retry policy — 3 для LLM (autoretry_for APIError), 5 для callback (только 5xx).
  • [ ] DLQ через task_failure signal + алерт в Slack/Telegram.
  • [ ] Prometheus метрики: queued, processing, failed, duration p50/p95/p99, callback success rate.
  • [ ] Flower за nginx basic auth для мониторинга.
  • [ ] Health check Redis: при недоступности — HTTP /healthz должен вернуть 503.
  • [ ] Webhook secret rotation раз в 90 дней (поддержка двух активных secret одновременно).
  • [ ] Лимит размера body на HTTP-ручке (например, 100 KB) — против abuse.

Через шлюз Promptra все модели работают через base_url="https://api.promptra.ru/v1" без переписывания кода — миграция с прямого OpenAI описана в гайде Миграция с OpenAI на Promptra за 10 минут. Цены прозрачны 1-в-1 с провайдером по курсу ЦБ — для batch-сценариев на 1М запросов разница экономии vs российских наценок описана в Сравнении цен LLM 2026.

Финальная инфографика production pipeline: 5 пронумерованных блоков в ряд — «1. HTTP /jobs» с иконкой запроса, «2. Redis queue» с цилиндром, «3. Celery worker → LLM» с шестерёнкой, «4. Webhook callback HMAC» с замком, «5. DLQ + monitoring» с щитом; под ними подпись «Async LLM-pipeline за день»; терракотовые стрелки между блоками; заголовок «5 шагов до stable production»

Запасные варианты: serverless и managed

Если не хочется поднимать Celery + Redis — есть managed-альтернативы:

  • AWS SQS + Lambda — очередь и worker полностью managed. Минусы: cold start 200–800 мс, vendor lock.
  • Yandex Cloud Functions + Message Queue — российский вариант, SQS-совместимый API.
  • OpenAI Batch API — для офлайн-обработки >1000 запросов с 50% скидкой. Через Promptra тот же эндпоинт работает для всех провайдеров — см. Async и Batch API LLM: 50% скидка.
  • Inngest / Temporal — managed durable execution engines с retry, scheduling и UI.

Для русского B2B на собственной VM (Yandex Cloud / VK Cloud) — Redis + Celery остаётся самым простым и контролируемым стеком. Один docker-compose, никаких внешних зависимостей кроме шлюза.

Сравнительная таблица 4 подходов async-обработки: Celery+Redis (self-hosted, контроль), SQS+Lambda (managed, cold start), Inngest (managed, retry+UI), Batch API (офлайн, 50% скидка); каждый столбец с плюсами/минусами и иконкой; заголовок «4 подхода к async LLM-jobs»

FAQ

Почему нельзя просто вызвать OpenAI напрямую в HTTP-обработчике?

LLM-запросы идут 8–25 секунд для флагманов, до 120 секунд для reasoning. Под нагрузкой 50 RPS воркеры исчерпаются за минуту, клиенты получат 504. HTTP-ручка должна работать 50 мс — принять запрос, поставить в очередь, ответить 202. Worker асинхронно выполняет вызов.

Что такое idempotency key и зачем он нужен?

Уникальный ID запроса (UUIDv4) от клиента. Если он повторит POST из-за таймаута, сервер дедуплицирует по ключу в Redis и не создаёт дубль. Без этого вы заплатите за два вызова Opus 4.7 (350/1790 ₽). TTL 24 часа. Стандарт Stripe и OpenAI Batch.

Как защитить webhook callback от подделки?

HMAC-SHA256 от timestamp.payload, заголовок X-Promptra-Signature: t=<ts>,v1=<sig>. Проверка timestamp в окне 5 минут (anti-replay). Сравнение через hmac.compare_digest — не через ==, иначе timing attack. Secret 32+ байта, ротация раз в 90 дней.

Сколько ретраев на сбойный callback?

5 попыток с exponential backoff: 1с, 5с, 25с, 2 мин, 10 мин (jitter ±20%). Retry только на 5xx и сетевые ошибки. 4xx — баг клиента, не помогает. После 5 фейлов — dead-letter queue для ручного разбора + алерт оператору.

FastAPI background_tasks или Celery?

Background_tasks — только для коротких fire-and-forget (отправить email). Нет ретраев, нет персистентности при рестарте. Для LLM jobs нужен Celery с Redis: персистентная очередь, retry policy, мониторинг через Flower. Альтернативы — RQ (проще), Arq (async-first), Dramatiq.

Как выставить timeout на LLM-запрос в worker?

Два уровня. HTTP-клиент: client.with_options(timeout=httpx.Timeout(connect=5.0, read=120.0, write=10.0, pool=5.0)). Celery task: time_limit=180, soft_time_limit=170. SoftTimeLimitExceeded даёт worker'у 10с на cleanup перед hard kill.