promptra
← Все статьи
Гайды6 мин чтения

Streaming LLM-ответов через SSE: Python-гайд для real-time чатов и UI

Технический гайд 2026: streaming LLM-ответов через Server-Sent Events на Python. Рабочий код для OpenAI (GPT-5.5), Anthropic (Claude Opus 4.7) и Gemini 3.1 Pro, обработка chunks, robust error handling, retry, UI integration, FastAPI proxy и точные числа TTFT/throughput.

Инфографика SSE-стрима LLM: блок «Python client» с stream=True, поток chunk-ов идёт через SSE-канал, поверх — счётчик TTFT мс и индикатор накопления текста в UI окно чата; плоский векторный стиль в кремово-терракотовой палитре

Когда пользователь нажимает «Отправить» в чате — он хочет видеть ответ сейчас, а не через 10 секунд. Streaming через Server-Sent Events (SSE) решает это: первый токен приходит за 300–800 мс, дальше текст набирается в UI плавно, как живая печать. Через единый шлюз Promptra все флагманы — Claude Opus 4.7 (350/1790 ₽), GPT-5.5 (350/2150 ₽), Gemini 3.1 Pro (140/860 ₽), DeepSeek V4 Pro (30/60 ₽) — поддерживают streaming через OpenAI-совместимый формат stream=True, что радикально упрощает архитектуру.

Этот гайд — рабочий код стриминга на Python для всех трёх семейств моделей, обработка chunks с TTFT-замером, robust error handling с retry, готовый FastAPI proxy для проброса в браузер, и рекомендации по UI integration. Если вы строите чат-интерфейс, агента с интерактивным ответом или CLI-тул с живым выводом — это базовый паттерн. оплата в рублях по договору, полный пакет закрывающих документов, цены в рублях по курсу ЦБ.

TL;DR — streaming за 15 строк

from openai import OpenAI

client = OpenAI(api_key="sk-promptra-...", base_url="https://api.promptra.ru/v1")

stream = client.chat.completions.create(
    model="gpt-5-5",
    messages=[{"role": "user", "content": "Расскажи о SSE в 5 предложениях."}],
    stream=True,
)

for chunk in stream:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="", flush=True)

Один параметр stream=True превращает обычный вызов в SSE-поток. Дальше — итерация по chunks, печать delta.content без переноса строки и flush=True чтобы видеть в реальном времени.

Что такое SSE и почему именно он

Server-Sent Events — простой протокол стриминга от сервера к клиенту поверх обычного HTTP-соединения. Сервер держит TCP-соединение открытым и периодически шлёт куски данных в формате data: <строка>\n\n. Клиент читает их по мере прихода. Это однонаправленный канал (только сервер → клиент), что делает SSE проще WebSocket: не нужен upgrade-handshake, работает через любой HTTP proxy и CDN, автоматически переподключается при разрыве. Эта статья — часть pillar-гида: полный технический гид по LLM API на Python — токены, function calling, streaming, RAG, batch.

Для LLM streaming SSE идеален:

  • Модель генерирует токены последовательно — нужен только канал сервер→клиент.
  • Если клиент закроет соединение — сервер может прервать генерацию (экономия токенов).
  • SSE поддерживается всеми браузерами через нативный EventSource API.
  • HTTP/1.1 keep-alive держит соединение без накладных расходов.

Все три флагмана — OpenAI, Anthropic, Google — используют SSE для streaming endpoint'а. Через единый шлюз Promptra формат нормализован: даже Claude и Gemini выдают chunks в OpenAI-совместимом виде с delta.content — что упрощает фронтенд.

![Схема SSE-канала: слева блок «Python/Browser client», справа «LLM API», между ними длинная горизонтальная линия с поочередными метками chunk1, chunk2, chunk3 ... [DONE] идущими слева направо, справа поверх — счётчик «TTFT 470 ms»; заголовок «Server-Sent Events: однонаправленный поток»](/blog/streaming-sse-llm-api-python-realtime-otvety/img-1.webp)

Streaming на OpenAI SDK (GPT-5.5)

Минимальный паттерн с замером TTFT и подсчётом токенов:

import time
from openai import OpenAI

client = OpenAI(
    api_key="sk-promptra-...",
    base_url="https://api.promptra.ru/v1",
)

def stream_chat(prompt: str, model: str = "gpt-5-5") -> dict:
    start = time.perf_counter
    ttft = None
    accumulated = []
    usage = None

    stream = client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        stream=True,
        stream_options={"include_usage": True},
    )

    for chunk in stream:
        if not chunk.choices:
            # последний chunk содержит usage
            if chunk.usage:
                usage = chunk.usage
            continue

        delta = chunk.choices[0].delta
        if delta.content:
            if ttft is None:
                ttft = (time.perf_counter - start) * 1000   # мс
            accumulated.append(delta.content)
            print(delta.content, end="", flush=True)

    total_ms = (time.perf_counter - start) * 1000
    full_text = "".join(accumulated)

    return {
        "text": full_text,
        "ttft_ms": round(ttft, 1) if ttft else None,
        "total_ms": round(total_ms, 1),
        "prompt_tokens": usage.prompt_tokens if usage else None,
        "completion_tokens": usage.completion_tokens if usage else None,
        "tokens_per_sec": round(
            usage.completion_tokens / ((total_ms - ttft) / 1000), 1
        ) if usage and ttft else None,
    }

stream_options={"include_usage": True} — критичный параметр. Без него usage в стриме не возвращается, и вы не сможете посчитать стоимость. Подробнее про подсчёт токенов и цены — в материале «Как считать токены в LLM».

То же на Claude Opus 4.7 — смена одной строки

Через шлюз Promptra Claude доступен по OpenAI-совместимому формату:

result = stream_chat("Объясни асинхронность в Python", model="claude-opus-4-7")
print(f"\nTTFT: {result['ttft_ms']} мс, всего: {result['total_ms']} мс")
print(f"Throughput: {result['tokens_per_sec']} tok/s")

В нативном Anthropic SDK chunks имеют другую структуру (event-based: message_start, content_block_delta, message_stop), но через OpenAI-совместимый слой это нормализуется. Это значит один и тот же код работает на всех моделях — вы меняете строку model и сравниваете результаты. Подробности нативного формата Anthropic — в их streaming documentation, параметры streaming для OpenAI описаны в OpenAI API reference.

Реальные TTFT и throughput (Promptra benchmark 2026-05)

Замеряли через единый шлюз на одинаковом промте «Объясни принцип работы X в 200 словах»:

МодельTTFT медианаThroughputTTFT p99
GPT-5.5510 мс78 tok/s980 мс
GPT-5.4320 мс105 tok/s620 мс
Claude Opus 4.7680 мс65 tok/s1280 мс
Claude Sonnet 4.6410 мс92 tok/s780 мс
Gemini 3.1 Pro380 мс88 tok/s720 мс
Gemini 3.5 Flash240 мс140 tok/s470 мс
DeepSeek V4 Pro290 мс110 tok/s540 мс

Для UX чата хорошим считается TTFT под 700 мс — пользователь видит реакцию почти мгновенно. Throughput важен для длинных ответов: 100 tok/s — это ~75 русских слов в секунду, читается комфортно.

Горизонтальная столбчатая диаграмма TTFT по 7 моделям: «Gemini 3.5 Flash — 240 мс» самый короткий терракотовый, «GPT-5.4 — 320 мс», «Gemini 3.1 Pro — 380 мс», «Claude Sonnet 4.6 — 410 мс», «GPT-5.5 — 510 мс», «Claude Opus 4.7 — 680 мс»; заголовок «TTFT медиана: время до первого токена»

Streaming function calls

Когда в стриме включён tool calling, chunks отдают delta.tool_calls с кусочками JSON arguments. Аккумулируете их по индексу и парсите целиком только в конце:

def stream_with_tools(prompt: str, tools: list) -> dict:
    stream = client.chat.completions.create(
        model="claude-opus-4-7",
        messages=[{"role": "user", "content": prompt}],
        tools=tools,
        stream=True,
    )

    tool_calls = {}   # index → accumulated
    content = []
    finish_reason = None

    for chunk in stream:
        if not chunk.choices:
            continue
        delta = chunk.choices[0].delta
        finish_reason = chunk.choices[0].finish_reason or finish_reason

        if delta.content:
            content.append(delta.content)

        if delta.tool_calls:
            for tc in delta.tool_calls:
                idx = tc.index
                if idx not in tool_calls:
                    tool_calls[idx] = {
                        "id": tc.id or "",
                        "name": "",
                        "arguments": "",
                    }
                if tc.function:
                    if tc.function.name:
                        tool_calls[idx]["name"] = tc.function.name
                    if tc.function.arguments:
                        tool_calls[idx]["arguments"] += tc.function.arguments

    # парсим целиком только после стрима
    parsed = []
    if finish_reason == "tool_calls":
        for idx, call in sorted(tool_calls.items):
            parsed.append({
                "id": call["id"],
                "name": call["name"],
                "arguments": json.loads(call["arguments"]),
            })

    return {
        "content": "".join(content),
        "tool_calls": parsed,
        "finish_reason": finish_reason,
    }

Не пытайтесь делать json.loads на каждом chunk — он будет невалидным. Подробности про function calling — в материале «Function calling и tool use на Python».

Robust error handling и retry

Стрим может оборваться в трёх местах: до первого chunk'а (TTFT timeout), в середине (network drop), на финальном chunk'е (content_filter). Стратегия:

import time
from openai import OpenAI, APIConnectionError, APIStatusError

def stream_with_retry(prompt: str, max_retries: int = 3) -> str:
    for attempt in range(max_retries):
        try:
            accumulated = []
            stream = client.chat.completions.create(
                model="claude-opus-4-7",
                messages=[{"role": "user", "content": prompt}],
                stream=True,
                timeout=300,   # глобальный timeout
            )
            for chunk in stream:
                if not chunk.choices:
                    continue
                delta = chunk.choices[0].delta
                if delta.content:
                    accumulated.append(delta.content)
                    yield delta.content   # отдаём наружу
            return   # успешно дошли до конца

        except (APIConnectionError, TimeoutError) as e:
            partial = "".join(accumulated)
            if partial and attempt == max_retries - 1:
                # последняя попытка — возвращаем что есть
                yield f"\n[Прервано после {len(partial)} символов: {e}]"
                return
            # exponential backoff
            time.sleep(2 ** attempt)
            continue
        except APIStatusError as e:
            # 4xx ошибка — retry не поможет
            yield f"\n[Ошибка API: {e.message}]"
            return

Что важно:

  • Сохраняйте partial output — токены, которые модель уже отдала, оплачены. Их разумно показать пользователю с пометкой «прервано».
  • Exponential backoff — 1с, 2с, 4с между попытками. Без него вы заDDoSите шлюз при массовом сбое сети.
  • Не retry'те 4xx — это ваши ошибки (невалидный запрос, лимит, нет ключа), повторение не поможет.
  • Глобальный timeout 300 секунд — длинные reasoning-ответы (Opus 4.7) могут идти долго, но не бесконечно.
Дерево обработки ошибок: блок «Ошибка в стриме» — три ветки: «TTFT timeout → retry с backoff», «network drop → сохранить partial → retry», «4xx error → не retry, вернуть ошибку»; терракотовые подписи на критичных путях, заголовок «Robust streaming: что делать при сбое»

FastAPI proxy для браузера

Стандартный паттерн: фронтенд не знает ключа LLM, ходит к вашему FastAPI endpoint, тот стримит наружу:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from openai import AsyncOpenAI
import json

app = FastAPI

async_client = AsyncOpenAI(
    api_key="sk-promptra-...",
    base_url="https://api.promptra.ru/v1",
)

class ChatRequest(BaseModel):
    message: str
    model: str = "claude-opus-4-7"

async def event_stream(req: ChatRequest):
    stream = await async_client.chat.completions.create(
        model=req.model,
        messages=[{"role": "user", "content": req.message}],
        stream=True,
        stream_options={"include_usage": True},
    )
    async for chunk in stream:
        if chunk.choices and chunk.choices[0].delta.content:
            payload = json.dumps({
                "type": "text",
                "content": chunk.choices[0].delta.content,
            }, ensure_ascii=False)
            yield f"data: {payload}\n\n"
        if chunk.usage:
            payload = json.dumps({
                "type": "usage",
                "prompt_tokens": chunk.usage.prompt_tokens,
                "completion_tokens": chunk.usage.completion_tokens,
            })
            yield f"data: {payload}\n\n"
    yield "data: [DONE]\n\n"

@app.post("/chat/stream")
async def chat_stream(req: ChatRequest):
    return StreamingResponse(
        event_stream(req),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",   # отключить буфер nginx
        },
    )

X-Accel-Buffering: no — критичный заголовок для nginx, без него стрим буферизуется. На фронтенде стандартный EventSource:

const evtSource = new EventSource('/chat/stream', { withCredentials: true });
evtSource.onmessage = (e) => {
  if (e.data === '[DONE]') {
    evtSource.close;
    return;
  }
  const payload = JSON.parse(e.data);
  if (payload.type === 'text') {
    document.getElementById('output').textContent += payload.content;
  }
};

Подробнее про архитектуру чат-бота — в материале «Чат-бот на нейросети API».

Архитектурная схема: «Browser (EventSource)» → POST «FastAPI /chat/stream» → стрим к «Promptra api gateway» → «LLM модель», обратный поток chunks через FastAPI к браузеру с надписью «text/event-stream»; заголовок «Proxy архитектура: ключ не в браузере»

UI integration: что не забыть

Когда стрим работает, надо корректно показать его пользователю. Чек-лист:

  1. Курсор «печатающего» — мигающая «|» в конце аккумулированного текста. Уберите после [DONE].
  2. Auto-scroll к низу при каждом новом chunk'е, но отменяйте если пользователь сам прокрутил вверх (читает прошлый ответ).
  3. Кнопка «Stop» — посылает abort на ваш FastAPI, тот закрывает стрим с LLM. Сэкономит токены пользователя.
  4. Кнопка «Regenerate» — повтор запроса с тем же контекстом. Полезно при content_filter или пустом ответе.
  5. Indicator расхода — после [DONE] показать «Потрачено: 1240 токенов = 1.85 ₽». Прозрачность повышает доверие.
  6. Markdown rendering — стримите plain text в буфер, а рендерите Markdown только по окончании chunk'а или по фразам. Иначе таблицы и code-блоки будут «прыгать».
  7. Code blocks с syntax highlighting — рендерите финально, не пытайтесь highlight'ить inline.

Для production-чатов добавьте rate limit на endpoint (запросов на пользователя в минуту), pre-flight бюджет-чек («Как считать токены в LLM»), и логирование всех завершённых стримов с TTFT, throughput, finish_reason.

Оплата и закрывающие документы

Юрлицо-исполнитель — российское юр.лицо , резидент РФ. Сервисная комиссия 5% берётся только при пополнении баланса, на токены наценки нет. Полный пакет закрывающих документов (договор-оферта, счёт на оплату, акт оказанных услуг, счёт-фактура, УПД) приходит через ЭДО — Диадок, СБИС, Контур. Подробнее — на странице «Тарифы».

Что дальше

Streaming через SSE — это один параметр stream=True и цикл for chunk in stream. Дальше — TTFT-замер, обработка ошибок с partial output, FastAPI-proxy для скрытия ключа, и аккуратный UI с auto-scroll и кнопкой Stop. С этим набором ваш чат становится отзывчивым на уровне ChatGPT.app или Claude.ai, а под капотом — один шлюз ко всем флагманам. Полезные следующие шаги: «Function calling на Python» для streaming tool calls, «Embeddings и RAG» для контекстуальных чатов, и «Async-вызовы и Batch API» для batch-сценариев без UI. Если нужно подобрать модель под ваш чат или подключить ключ через юрлицо — напишите команде Promptra в Telegram.

> 📚 Главный гайд по теме: Лучшая нейросеть 2026: какую LLM выбрать под задачу — связанные материалы и обзор всей категории.

FAQ

Что такое SSE и почему именно его используют LLM API?

Server-Sent Events — протокол однонаправленного стриминга от сервера к клиенту поверх HTTP. Проще WebSocket (нет upgrade-handshake), работает через все proxy и CDN. Для LLM идеально: модель генерирует токены последовательно, нужен только канал сервер→клиент. OpenAI, Anthropic и Google используют SSE.

Что такое TTFT и почему он важнее total latency?

Time To First Token — время от отправки запроса до первого токена. Пользователь начинает видеть ответ через 200–800 мс вместо 8–15 секунд. На современных моделях TTFT: GPT-5.5 — 400–700 мс, Claude Opus 4.7 — 500–900 мс, Gemini 3.1 Pro — 350–600 мс, DeepSeek V4 Pro — 200–500 мс.

Какой формат у chunk'ов в OpenAI/Anthropic streaming?

Каждый chunk — JSON в строке data: {...}\n\n. У OpenAI это delta.content или delta.tool_calls. У Anthropic нативно — events: message_start, content_block_delta. Через шлюз Promptra Claude нормализован под OpenAI-совместимый формат delta.content.

Как обрабатывать ошибки в середине стрима?

Try/except вокруг итерации stream, сохранять partial output (он уже оплачен), на retry — exponential backoff с 1с, 2с, 4с. 4xx ошибки не retry'ите. Глобальный timeout 300 секунд для длинных reasoning-ответов.

Можно ли стримить function calling?

Да. Chunks отдают delta.tool_calls с кусочками JSON arguments. Аккумулируете их по index и парсите целиком только после finish_reason='tool_calls'. Не делайте json.loads на каждом chunk — он невалидный.

Как сделать SSE proxy между фронтендом и LLM API?

FastAPI endpoint с StreamingResponse и async generator. Ключ LLM не светится в браузере, можно добавить custom-логику (логирование, биллинг, фильтрация). Заголовок X-Accel-Buffering: no критичен для nginx — иначе стрим буферизуется.

Финальная инфографика: 4 блока в ряд — «1. stream=True», «2. итерация chunks», «3. TTFT + error handling», «4. FastAPI proxy → browser EventSource», под ними подпись «Streaming за час», терракотовые стрелки между блоками; заголовок «4 шага до отзывчивого чата»