Когда пользователь нажимает «Отправить» в чате — он хочет видеть ответ сейчас, а не через 10 секунд. Streaming через Server-Sent Events (SSE) решает это: первый токен приходит за 300–800 мс, дальше текст набирается в UI плавно, как живая печать. Через единый шлюз Promptra все флагманы — Claude Opus 4.7 (350/1790 ₽), GPT-5.5 (350/2150 ₽), Gemini 3.1 Pro (140/860 ₽), DeepSeek V4 Pro (30/60 ₽) — поддерживают streaming через OpenAI-совместимый формат stream=True, что радикально упрощает архитектуру.
Этот гайд — рабочий код стриминга на Python для всех трёх семейств моделей, обработка chunks с TTFT-замером, robust error handling с retry, готовый FastAPI proxy для проброса в браузер, и рекомендации по UI integration. Если вы строите чат-интерфейс, агента с интерактивным ответом или CLI-тул с живым выводом — это базовый паттерн. оплата в рублях по договору, полный пакет закрывающих документов, цены в рублях по курсу ЦБ.
TL;DR — streaming за 15 строк
from openai import OpenAI
client = OpenAI(api_key="sk-promptra-...", base_url="https://api.promptra.ru/v1")
stream = client.chat.completions.create(
model="gpt-5-5",
messages=[{"role": "user", "content": "Расскажи о SSE в 5 предложениях."}],
stream=True,
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)Один параметр stream=True превращает обычный вызов в SSE-поток. Дальше — итерация по chunks, печать delta.content без переноса строки и flush=True чтобы видеть в реальном времени.
Что такое SSE и почему именно он
Server-Sent Events — простой протокол стриминга от сервера к клиенту поверх обычного HTTP-соединения. Сервер держит TCP-соединение открытым и периодически шлёт куски данных в формате data: <строка>\n\n. Клиент читает их по мере прихода. Это однонаправленный канал (только сервер → клиент), что делает SSE проще WebSocket: не нужен upgrade-handshake, работает через любой HTTP proxy и CDN, автоматически переподключается при разрыве. Эта статья — часть pillar-гида: полный технический гид по LLM API на Python — токены, function calling, streaming, RAG, batch.
Для LLM streaming SSE идеален:
- Модель генерирует токены последовательно — нужен только канал сервер→клиент.
- Если клиент закроет соединение — сервер может прервать генерацию (экономия токенов).
- SSE поддерживается всеми браузерами через нативный
EventSourceAPI. - HTTP/1.1 keep-alive держит соединение без накладных расходов.
Все три флагмана — OpenAI, Anthropic, Google — используют SSE для streaming endpoint'а. Через единый шлюз Promptra формат нормализован: даже Claude и Gemini выдают chunks в OpenAI-совместимом виде с delta.content — что упрощает фронтенд.
![Схема SSE-канала: слева блок «Python/Browser client», справа «LLM API», между ними длинная горизонтальная линия с поочередными метками chunk1, chunk2, chunk3 ... [DONE] идущими слева направо, справа поверх — счётчик «TTFT 470 ms»; заголовок «Server-Sent Events: однонаправленный поток»](/blog/streaming-sse-llm-api-python-realtime-otvety/img-1.webp)
Streaming на OpenAI SDK (GPT-5.5)
Минимальный паттерн с замером TTFT и подсчётом токенов:
import time
from openai import OpenAI
client = OpenAI(
api_key="sk-promptra-...",
base_url="https://api.promptra.ru/v1",
)
def stream_chat(prompt: str, model: str = "gpt-5-5") -> dict:
start = time.perf_counter
ttft = None
accumulated = []
usage = None
stream = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
stream=True,
stream_options={"include_usage": True},
)
for chunk in stream:
if not chunk.choices:
# последний chunk содержит usage
if chunk.usage:
usage = chunk.usage
continue
delta = chunk.choices[0].delta
if delta.content:
if ttft is None:
ttft = (time.perf_counter - start) * 1000 # мс
accumulated.append(delta.content)
print(delta.content, end="", flush=True)
total_ms = (time.perf_counter - start) * 1000
full_text = "".join(accumulated)
return {
"text": full_text,
"ttft_ms": round(ttft, 1) if ttft else None,
"total_ms": round(total_ms, 1),
"prompt_tokens": usage.prompt_tokens if usage else None,
"completion_tokens": usage.completion_tokens if usage else None,
"tokens_per_sec": round(
usage.completion_tokens / ((total_ms - ttft) / 1000), 1
) if usage and ttft else None,
}stream_options={"include_usage": True} — критичный параметр. Без него usage в стриме не возвращается, и вы не сможете посчитать стоимость. Подробнее про подсчёт токенов и цены — в материале «Как считать токены в LLM».
То же на Claude Opus 4.7 — смена одной строки
Через шлюз Promptra Claude доступен по OpenAI-совместимому формату:
result = stream_chat("Объясни асинхронность в Python", model="claude-opus-4-7")
print(f"\nTTFT: {result['ttft_ms']} мс, всего: {result['total_ms']} мс")
print(f"Throughput: {result['tokens_per_sec']} tok/s")В нативном Anthropic SDK chunks имеют другую структуру (event-based: message_start, content_block_delta, message_stop), но через OpenAI-совместимый слой это нормализуется. Это значит один и тот же код работает на всех моделях — вы меняете строку model и сравниваете результаты. Подробности нативного формата Anthropic — в их streaming documentation, параметры streaming для OpenAI описаны в OpenAI API reference.
Реальные TTFT и throughput (Promptra benchmark 2026-05)
Замеряли через единый шлюз на одинаковом промте «Объясни принцип работы X в 200 словах»:
| Модель | TTFT медиана | Throughput | TTFT p99 |
|---|---|---|---|
| GPT-5.5 | 510 мс | 78 tok/s | 980 мс |
| GPT-5.4 | 320 мс | 105 tok/s | 620 мс |
| Claude Opus 4.7 | 680 мс | 65 tok/s | 1280 мс |
| Claude Sonnet 4.6 | 410 мс | 92 tok/s | 780 мс |
| Gemini 3.1 Pro | 380 мс | 88 tok/s | 720 мс |
| Gemini 3.5 Flash | 240 мс | 140 tok/s | 470 мс |
| DeepSeek V4 Pro | 290 мс | 110 tok/s | 540 мс |
Для UX чата хорошим считается TTFT под 700 мс — пользователь видит реакцию почти мгновенно. Throughput важен для длинных ответов: 100 tok/s — это ~75 русских слов в секунду, читается комфортно.

Streaming function calls
Когда в стриме включён tool calling, chunks отдают delta.tool_calls с кусочками JSON arguments. Аккумулируете их по индексу и парсите целиком только в конце:
def stream_with_tools(prompt: str, tools: list) -> dict:
stream = client.chat.completions.create(
model="claude-opus-4-7",
messages=[{"role": "user", "content": prompt}],
tools=tools,
stream=True,
)
tool_calls = {} # index → accumulated
content = []
finish_reason = None
for chunk in stream:
if not chunk.choices:
continue
delta = chunk.choices[0].delta
finish_reason = chunk.choices[0].finish_reason or finish_reason
if delta.content:
content.append(delta.content)
if delta.tool_calls:
for tc in delta.tool_calls:
idx = tc.index
if idx not in tool_calls:
tool_calls[idx] = {
"id": tc.id or "",
"name": "",
"arguments": "",
}
if tc.function:
if tc.function.name:
tool_calls[idx]["name"] = tc.function.name
if tc.function.arguments:
tool_calls[idx]["arguments"] += tc.function.arguments
# парсим целиком только после стрима
parsed = []
if finish_reason == "tool_calls":
for idx, call in sorted(tool_calls.items):
parsed.append({
"id": call["id"],
"name": call["name"],
"arguments": json.loads(call["arguments"]),
})
return {
"content": "".join(content),
"tool_calls": parsed,
"finish_reason": finish_reason,
}Не пытайтесь делать json.loads на каждом chunk — он будет невалидным. Подробности про function calling — в материале «Function calling и tool use на Python».
Robust error handling и retry
Стрим может оборваться в трёх местах: до первого chunk'а (TTFT timeout), в середине (network drop), на финальном chunk'е (content_filter). Стратегия:
import time
from openai import OpenAI, APIConnectionError, APIStatusError
def stream_with_retry(prompt: str, max_retries: int = 3) -> str:
for attempt in range(max_retries):
try:
accumulated = []
stream = client.chat.completions.create(
model="claude-opus-4-7",
messages=[{"role": "user", "content": prompt}],
stream=True,
timeout=300, # глобальный timeout
)
for chunk in stream:
if not chunk.choices:
continue
delta = chunk.choices[0].delta
if delta.content:
accumulated.append(delta.content)
yield delta.content # отдаём наружу
return # успешно дошли до конца
except (APIConnectionError, TimeoutError) as e:
partial = "".join(accumulated)
if partial and attempt == max_retries - 1:
# последняя попытка — возвращаем что есть
yield f"\n[Прервано после {len(partial)} символов: {e}]"
return
# exponential backoff
time.sleep(2 ** attempt)
continue
except APIStatusError as e:
# 4xx ошибка — retry не поможет
yield f"\n[Ошибка API: {e.message}]"
returnЧто важно:
- Сохраняйте partial output — токены, которые модель уже отдала, оплачены. Их разумно показать пользователю с пометкой «прервано».
- Exponential backoff — 1с, 2с, 4с между попытками. Без него вы заDDoSите шлюз при массовом сбое сети.
- Не retry'те 4xx — это ваши ошибки (невалидный запрос, лимит, нет ключа), повторение не поможет.
- Глобальный timeout 300 секунд — длинные reasoning-ответы (Opus 4.7) могут идти долго, но не бесконечно.

FastAPI proxy для браузера
Стандартный паттерн: фронтенд не знает ключа LLM, ходит к вашему FastAPI endpoint, тот стримит наружу:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from openai import AsyncOpenAI
import json
app = FastAPI
async_client = AsyncOpenAI(
api_key="sk-promptra-...",
base_url="https://api.promptra.ru/v1",
)
class ChatRequest(BaseModel):
message: str
model: str = "claude-opus-4-7"
async def event_stream(req: ChatRequest):
stream = await async_client.chat.completions.create(
model=req.model,
messages=[{"role": "user", "content": req.message}],
stream=True,
stream_options={"include_usage": True},
)
async for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
payload = json.dumps({
"type": "text",
"content": chunk.choices[0].delta.content,
}, ensure_ascii=False)
yield f"data: {payload}\n\n"
if chunk.usage:
payload = json.dumps({
"type": "usage",
"prompt_tokens": chunk.usage.prompt_tokens,
"completion_tokens": chunk.usage.completion_tokens,
})
yield f"data: {payload}\n\n"
yield "data: [DONE]\n\n"
@app.post("/chat/stream")
async def chat_stream(req: ChatRequest):
return StreamingResponse(
event_stream(req),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # отключить буфер nginx
},
)X-Accel-Buffering: no — критичный заголовок для nginx, без него стрим буферизуется. На фронтенде стандартный EventSource:
const evtSource = new EventSource('/chat/stream', { withCredentials: true });
evtSource.onmessage = (e) => {
if (e.data === '[DONE]') {
evtSource.close;
return;
}
const payload = JSON.parse(e.data);
if (payload.type === 'text') {
document.getElementById('output').textContent += payload.content;
}
};Подробнее про архитектуру чат-бота — в материале «Чат-бот на нейросети API».

UI integration: что не забыть
Когда стрим работает, надо корректно показать его пользователю. Чек-лист:
- Курсор «печатающего» — мигающая «|» в конце аккумулированного текста. Уберите после
[DONE]. - Auto-scroll к низу при каждом новом chunk'е, но отменяйте если пользователь сам прокрутил вверх (читает прошлый ответ).
- Кнопка «Stop» — посылает abort на ваш FastAPI, тот закрывает стрим с LLM. Сэкономит токены пользователя.
- Кнопка «Regenerate» — повтор запроса с тем же контекстом. Полезно при content_filter или пустом ответе.
- Indicator расхода — после
[DONE]показать «Потрачено: 1240 токенов = 1.85 ₽». Прозрачность повышает доверие. - Markdown rendering — стримите plain text в буфер, а рендерите Markdown только по окончании chunk'а или по фразам. Иначе таблицы и code-блоки будут «прыгать».
- Code blocks с syntax highlighting — рендерите финально, не пытайтесь highlight'ить inline.
Для production-чатов добавьте rate limit на endpoint (запросов на пользователя в минуту), pre-flight бюджет-чек («Как считать токены в LLM»), и логирование всех завершённых стримов с TTFT, throughput, finish_reason.
Оплата и закрывающие документы
Юрлицо-исполнитель — российское юр.лицо , резидент РФ. Сервисная комиссия 5% берётся только при пополнении баланса, на токены наценки нет. Полный пакет закрывающих документов (договор-оферта, счёт на оплату, акт оказанных услуг, счёт-фактура, УПД) приходит через ЭДО — Диадок, СБИС, Контур. Подробнее — на странице «Тарифы».
Что дальше
Streaming через SSE — это один параметр stream=True и цикл for chunk in stream. Дальше — TTFT-замер, обработка ошибок с partial output, FastAPI-proxy для скрытия ключа, и аккуратный UI с auto-scroll и кнопкой Stop. С этим набором ваш чат становится отзывчивым на уровне ChatGPT.app или Claude.ai, а под капотом — один шлюз ко всем флагманам. Полезные следующие шаги: «Function calling на Python» для streaming tool calls, «Embeddings и RAG» для контекстуальных чатов, и «Async-вызовы и Batch API» для batch-сценариев без UI. Если нужно подобрать модель под ваш чат или подключить ключ через юрлицо — напишите команде Promptra в Telegram.
> 📚 Главный гайд по теме: Лучшая нейросеть 2026: какую LLM выбрать под задачу — связанные материалы и обзор всей категории.
FAQ
Что такое SSE и почему именно его используют LLM API?
Server-Sent Events — протокол однонаправленного стриминга от сервера к клиенту поверх HTTP. Проще WebSocket (нет upgrade-handshake), работает через все proxy и CDN. Для LLM идеально: модель генерирует токены последовательно, нужен только канал сервер→клиент. OpenAI, Anthropic и Google используют SSE.
Что такое TTFT и почему он важнее total latency?
Time To First Token — время от отправки запроса до первого токена. Пользователь начинает видеть ответ через 200–800 мс вместо 8–15 секунд. На современных моделях TTFT: GPT-5.5 — 400–700 мс, Claude Opus 4.7 — 500–900 мс, Gemini 3.1 Pro — 350–600 мс, DeepSeek V4 Pro — 200–500 мс.
Какой формат у chunk'ов в OpenAI/Anthropic streaming?
Каждый chunk — JSON в строке data: {...}\n\n. У OpenAI это delta.content или delta.tool_calls. У Anthropic нативно — events: message_start, content_block_delta. Через шлюз Promptra Claude нормализован под OpenAI-совместимый формат delta.content.
Как обрабатывать ошибки в середине стрима?
Try/except вокруг итерации stream, сохранять partial output (он уже оплачен), на retry — exponential backoff с 1с, 2с, 4с. 4xx ошибки не retry'ите. Глобальный timeout 300 секунд для длинных reasoning-ответов.
Можно ли стримить function calling?
Да. Chunks отдают delta.tool_calls с кусочками JSON arguments. Аккумулируете их по index и парсите целиком только после finish_reason='tool_calls'. Не делайте json.loads на каждом chunk — он невалидный.
Как сделать SSE proxy между фронтендом и LLM API?
FastAPI endpoint с StreamingResponse и async generator. Ключ LLM не светится в браузере, можно добавить custom-логику (логирование, биллинг, фильтрация). Заголовок X-Accel-Buffering: no критичен для nginx — иначе стрим буферизуется.

