7 — AI Routes & Streaming
Ember's AI layer provides first-class primitives for LLM-backed APIs — streaming output, conversation memory, tool calling, and model routing.
Basic SSE Streaming
Any async generator can stream Server-Sent Events:
python
from ember import Ember, SSEResponse
import asyncio
app = Ember()
@app.get("/stream")
async def stream_numbers(request):
async def events():
for i in range(10):
await asyncio.sleep(0.1)
yield f"data: {i}\n\n"
return SSEResponse(events())bash
curl -N http://127.0.0.1:8000/stream
# data: 0
# data: 1
# ...Token Streaming (LLM Output)
TokenStreamResponse wraps a string token generator in SSE frames automatically:
python
from ember import TokenStreamResponse
import asyncio
@app.post("/generate")
async def generate(request):
data = await request.json()
prompt = data.get("prompt", "")
async def tokens():
words = f"You asked: {prompt}. Here is my answer.".split()
for word in words:
await asyncio.sleep(0.04)
yield word + " "
return TokenStreamResponse(tokens())bash
curl -N -X POST http://127.0.0.1:8000/generate \
-H "Content-Type: application/json" \
-d '{"prompt":"Hello"}'
# data: You
# data: asked:
# ...ai_route() — Full AI Endpoint
ai_route() enables auto-injection of ConversationContext and ModelRouter into the handler:
python
from ember import (
Ember, Request, SSEResponse,
ConversationContext, MessageRole,
sse_stream, TokenLimits,
)
import asyncio
app = Ember()
async def fake_llm(prompt: str):
"""Simulate a streaming LLM response."""
for word in f"Sure! You asked about: {prompt}".split():
await asyncio.sleep(0.05)
yield word + " "
@app.ai_route(
"/v1/chat",
methods=["POST"],
streaming=True,
token_limits=TokenLimits(tokens_per_minute=60_000),
)
async def chat(
request: Request,
context: ConversationContext, # auto-injected — per-session history
) -> SSEResponse:
body = await request.json()
user_msg = body.get("message", "")
context.add_message(MessageRole.USER, user_msg)
stream = fake_llm(user_msg)
return sse_stream(stream)bash
curl -N -X POST http://127.0.0.1:8000/v1/chat \
-H "Content-Type: application/json" \
-d '{"message":"What is Python?"}'ConversationContext
Manages per-session message history. Auto-injected into ai_route() handlers.
python
from ember import ConversationContext, MessageRole
context.add_message(MessageRole.USER, "Hello")
context.add_message(MessageRole.ASSISTANT, "Hi there!")
context.add_message(MessageRole.SYSTEM, "You are a helpful assistant.")
messages = context.get_messages() # list[Message]
context.clear() # reset historyToolRegistry
Register Python functions as tools callable by LLMs:
python
from ember import ToolRegistry
from ember.ai.tools import ToolCall, ParameterSchema
tools = ToolRegistry()
@tools.register(description="Get current weather for a city")
async def get_weather(city: str, units: str = "celsius") -> dict:
return {"city": city, "temperature": 22, "units": units}
@tools.register(
name="search",
description="Search the web for information",
parameters=[
ParameterSchema("query", "string", "Search query"),
ParameterSchema("limit", "integer", "Max results", required=False),
],
)
async def search_web(query: str, limit: int = 5) -> list:
return [{"title": f"Result for {query}"}]Execute a tool (from an LLM tool-use response):
python
result = await tools.execute(ToolCall(
id="call_01",
name="get_weather",
arguments={"city": "Paris", "units": "celsius"},
))
print(result.content) # '{"city": "Paris", "temperature": 22, "units": "celsius"}'
print(result.is_error) # FalseGenerate specs:
python
tools.to_openai_specs() # for OpenAI function calling
tools.to_anthropic_specs() # for Anthropic toolsUse in a route:
python
@app.ai_route("/v1/chat", methods=["POST"], streaming=True, tool_registry=tools)
async def chat(request, context):
...ModelRouter
Route requests to different LLM providers based on strategy:
python
from ember import ModelRouter, ModelSpec, RoutingStrategy
router = ModelRouter(
models=[
ModelSpec(
name="gpt-4o-mini",
provider="openai",
endpoint="https://api.openai.com/v1/chat/completions",
api_key_env="OPENAI_API_KEY",
max_tokens=4096,
context_window=128_000,
cost_per_input_token=0.00015,
cost_per_output_token=0.0006,
supports_tools=True,
),
ModelSpec(
name="claude-haiku",
provider="anthropic",
endpoint="https://api.anthropic.com/v1/messages",
api_key_env="ANTHROPIC_API_KEY",
max_tokens=4096,
context_window=200_000,
cost_per_input_token=0.0008,
cost_per_output_token=0.004,
),
],
strategy=RoutingStrategy.FALLBACK, # try first, fall back on error
)
app = Ember(model_router=router)
@app.ai_route("/v1/chat", methods=["POST"])
async def chat(request, context, model_router: ModelRouter):
model = await model_router.select(request, context)
# model.name → "gpt-4o-mini" or "claude-haiku"
...Strategies: FALLBACK, CHEAPEST, FASTEST, ROUND_ROBIN.
Token Rate Limiting
Protect your LLM budget:
python
from ember import TokenLimits, GlobalTokenBucket, RateLimitMiddleware
# Global bucket: 100k tokens/minute across all users
bucket = GlobalTokenBucket(
capacity=100_000.0,
refill_rate=100_000.0 / 60, # per second
)
app.add_middleware(RateLimitMiddleware(
global_bucket=bucket,
estimate_from_body=True, # deducts estimated tokens from request body
))
# Per-route limits
@app.ai_route(
"/v1/premium",
methods=["POST"],
token_limits=TokenLimits(
tokens_per_minute=10_000,
max_prompt_tokens=4_096,
max_completion_tokens=2_048,
),
)
async def premium_chat(request, context): ...OpenAI-Compatible Endpoint
Full streaming response in OpenAI chat format:
python
import json, uuid, asyncio
from ember import Ember, Request, SSEResponse
app = Ember()
@app.post("/v1/chat/completions")
async def completions(request: Request) -> SSEResponse:
body = await request.json()
stream = body.get("stream", False)
model = body.get("model", "ember-default")
async def fake_tokens():
for word in "Hello! I am Ember, your AI assistant.".split():
await asyncio.sleep(0.05)
yield word + " "
if stream:
async def openai_stream():
chunk_id = f"chatcmpl-{uuid.uuid4().hex[:8]}"
async for token in fake_tokens():
chunk = {
"id": chunk_id,
"object": "chat.completion.chunk",
"model": model,
"choices": [{"delta": {"content": token}, "index": 0, "finish_reason": None}],
}
yield f"data: {json.dumps(chunk)}\n\n"
yield "data: [DONE]\n\n"
return SSEResponse(openai_stream())
else:
tokens = []
async for t in fake_tokens():
tokens.append(t)
content = "".join(tokens)
return {
"id": f"chatcmpl-{uuid.uuid4().hex[:8]}",
"object": "chat.completion",
"model": model,
"choices": [{"message": {"role": "assistant", "content": content}, "index": 0}],
"usage": {"prompt_tokens": 20, "completion_tokens": len(tokens), "total_tokens": 20 + len(tokens)},
}
if __name__ == "__main__":
app.run(port=8000)bash
# Non-streaming
curl -X POST http://127.0.0.1:8000/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{"model":"ember-default","messages":[{"role":"user","content":"Hello"}]}'
# Streaming
curl -N -X POST http://127.0.0.1:8000/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{"model":"ember-default","stream":true,"messages":[{"role":"user","content":"Hello"}]}'