Skip to content

Server-Sent Events (SSE)

Ember has first-class SSE support designed for LLM token streaming.


SSEResponse

Return an async generator from any route:

python
from ember import SSEResponse
import asyncio

@app.get("/stream")
async def stream_data():
    async def events():
        for i in range(10):
            await asyncio.sleep(0.1)
            yield f"data: event {i}\n\n"
    return SSEResponse(events())

TokenStreamResponse

Wraps a generator of string tokens in SSE frames automatically:

python
from ember import TokenStreamResponse
import asyncio

@app.post("/generate")
async def generate(request):
    async def tokens():
        for word in "The quick brown fox".split():
            await asyncio.sleep(0.05)
            yield word + " "
    return TokenStreamResponse(tokens())

Each yielded string is sent as data: <token>\n\n.


sse_stream() Helper

Combines a token generator with an optional token bucket for back-pressure:

python
from ember import sse_stream, GlobalTokenBucket

bucket = GlobalTokenBucket(capacity=100_000, refill_rate=1_666.7)

@app.ai_route("/v1/chat", methods=["POST"], streaming=True)
async def chat(request, context):
    async def tokens():
        async for token in my_model_stream(request):
            yield token
    return sse_stream(tokens(), token_bucket=bucket)

SSEWriter (Cython)

The SSEWriter Cython class encodes frames with zero Python string allocation. It is used internally by SSEResponse and TokenStreamResponse. You can use it directly for custom frame formats:

python
from ember.ai.sse import SSEWriter

writer = SSEWriter()
frame  = writer.encode_token("Hello ")   # → b"data: Hello \n\n"
frame  = writer.encode_event("update", '{"id":1}')  # → b"event: update\ndata: {\"id\":1}\n\n"
frame  = writer.encode_done()            # → b"data: [DONE]\n\n"

OpenAI-Compatible Streaming

python
import json, uuid, time

@app.ai_route("/v1/chat/completions", methods=["POST"], streaming=True)
async def completions(request, context):
    body  = await request.ai_body()
    model = body.model or "ember-default"

    async def openai_stream():
        chunk_id = f"chatcmpl-{uuid.uuid4().hex[:8]}"
        async for token in my_llm(body.messages):
            chunk = {
                "id": chunk_id,
                "object": "chat.completion.chunk",
                "model": model,
                "choices": [{"delta": {"content": token}, "index": 0}],
            }
            yield f"data: {json.dumps(chunk)}\n\n"
        yield "data: [DONE]\n\n"

    return SSEResponse(openai_stream())

Released under the MIT License.