Sse
Server-Sent Events (SSE) Channels¶
Real-time applications often need to push updates from the server to connected clients, for example, live dashboards, notification systems, or background job progress.
SSE Channels in Lilya provide a clean, async-native abstraction for this using the standard
Server-Sent Events protocol (text/event-stream
).
What Are SSE Channels?¶
SSEChannel
lets you create a named, in-memory channel where multiple subscribers can listen for new events while
the server broadcasts messages asynchronously.
This pattern is lightweight, built on anyio
, and integrates seamlessly
with Lilya's EventStreamResponse.
Why Use SSE Channels?¶
Problem | SSE Channels Solution |
---|---|
Need to send real-time updates without polling | Persistent event stream over HTTP |
Need to broadcast to many clients | Built-in fan-out via shared channel |
Want async support without WebSockets complexity | Simple async generators + HTTP |
Need automatic cleanup of disconnected clients | Graceful unsubscribe and memory management |
Basic Example¶
from lilya.apps import Lilya
from lilya.routing import Path
from lilya.contrib.security.signed_urls import SignedURLGenerator
from lilya.contrib.sse.channels import SSEChannel, sse_manager
from lilya.responses import EventStreamResponse
# Create or get a named SSE channel
notifications = SSEChannel("notifications")
async def events():
# Stream messages to connected clients
async def stream():
async for event in notifications.listen(heartbeat_interval=10):
yield event
return EventStreamResponse(stream())
async def send():
# Broadcast to all connected clients
await notifications.broadcast({"event": "notice", "data": "Hello, world!"})
return {"sent": True}
app = Lilya(routes=[
Path("/events", events),
Path("/send", send),
])
✅ Open /events
in your browser and then call /send
in another tab, you will instantly see the message arrive.
Understanding the Pieces¶
1. SSEChannel
¶
The core class for managing subscribers and messages.
from lilya.contrib.sse.channels import SSEChannel
ch = SSEChannel("demo")
await ch.broadcast({"event": "message", "data": "New update!"})
async for event in ch.listen():
yield event
Each SSEChannel
instance:
- Keeps a list of subscribers (
asyncio.Queue
s). - Allows multiple concurrent listeners.
- Cleans up disconnected subscribers automatically.
- Optionally sends heartbeat events to keep the connection alive.
2. EventStreamResponse
¶
A specialized response for streaming events in SSE format.
from lilya.responses import EventStreamResponse
async def stream():
for i in range(3):
yield {"event": "tick", "data": i}
return EventStreamResponse(stream())
- Converts Python dicts into properly formatted SSE frames.
- Supports optional
retry
,id
, andevent
fields. - Encodes JSON automatically for dict/list
data
.
3. sse_manager
¶
A global helper that manages multiple channels by name.
from lilya.contrib.sse.channels import sse_manager
# Get or create a channel
ch = await sse_manager.get_or_create("chat")
# Retrieve existing channels
print(await sse_manager.list_channels())
This allows you to coordinate multiple named channels — e.g., one per topic or tenant.
Practical Use Cases¶
Live Notifications¶
from lilya.apps import Lilya
from lilya.contrib.sse.channels import sse_manager
app = Lilya()
@app.get("/notify")
async def notify():
ch = await sse_manager.get_or_create("user_notifications")
await ch.broadcast({"event": "alert", "data": "You have a new message"})
return {"status": "notified"}
Clients subscribe to /events
, and your server pushes alerts as they happen.
Real-Time Job Progress¶
import anyio
from lilya.contrib.sse.channels import SSEChannel
from lilya.responses import EventStreamResponse
progress = SSEChannel("tasks")
async def run_job():
for i in range(100):
await progress.broadcast({"event": "progress", "data": i})
await anyio.sleep(0.1)
@app.get("/job/status")
async def job_status():
async def stream():
async for event in progress.listen(heartbeat_interval=5):
yield event
return EventStreamResponse(stream())
Multi-Channel Communication¶
You can dynamically create channels per user, tenant, or topic:
from lilya.apps import Lilya
from lilya.contrib.sse.channels import sse_manager
from lilya.responses import EventStreamResponse
app = Lilya()
@app.get("/events/{room_id}")
async def room_events(room_id: str):
room = await sse_manager.get_or_create(room_id)
async def stream():
async for ev in room.listen():
yield ev
return EventStreamResponse(stream())
Advanced Features¶
Heartbeats¶
To prevent idle connections from timing out, listen()
can send heartbeat messages automatically:
async for event in ch.listen(heartbeat_interval=10):
yield event
This sends:
: heartbeat
\n\n
every 10 seconds.
Automatic Cleanup¶
When a subscriber disconnects (e.g., browser tab closes), it's removed automatically. You can manually trigger cleanup if needed:
await ch.cleanup()
This ensures no stale connections remain.
Thread & Async Safety¶
- Built on
anyio
, compatible with bothasyncio
andtrio
. - Safe for concurrent broadcast and listen operations.
- Designed for long-lived event streams.
Integration with Dependencies¶
Because channels are async and injectable, you can combine them with Lilya's dependency system:
from lilya.apps import Lilya
from lilya.contrib.sse.channels import SSEChannel, sse_manager
from lilya.responses import EventStreamResponse
from lilya.dependencies import Depends, inject
@inject
async def get_channel() -> SSEChannel:
return await sse_manager.get_or_create("chat")
@app.get("/stream")
async def stream(ch: SSEChannel = Depends(get_channel)):
async def events():
async for ev in ch.listen():
yield ev
return EventStreamResponse(events())
Best Practices¶
Tip | Description |
---|---|
Use heartbeats | Prevents timeouts on long-lived connections. |
Keep payloads small | SSE is text-based; avoid heavy binary data. |
Avoid per-user channels | Use shared channels and filter on the client side when possible. |
Use sse_manager |
Reuse channels across routes or events. |
Combine with async tasks | Perfect for background jobs or progress updates. |
Summary¶
Concept | Description |
---|---|
SSEChannel |
Handles subscriptions and broadcasting. |
EventStreamResponse |
Converts async streams into SSE output. |
sse_manager |
Global manager for named channels. |
Heartbeats | Keeps connections alive. |
Auto-cleanup | Removes stale subscribers automatically. |
SSE Channels make Lilya a first-class framework for reactive, async-native real-time APIs, without the overhead of WebSockets.