Skip to content

Instantly share code, notes, and snippets.

@Tobi-De
Created December 8, 2023 11:30
Show Gist options
  • Save Tobi-De/92491f68a2ee4825cf377eaf7bba8195 to your computer and use it in GitHub Desktop.
Save Tobi-De/92491f68a2ee4825cf377eaf7bba8195 to your computer and use it in GitHub Desktop.
sse litestar
import time
from litestar import Litestar, get
from litestar.channels.backends.redis import RedisChannelsPubSubBackend
from litestar.channels.plugin import ChannelsPlugin
from litestar.response.sse import ServerSentEvent
import redis.asyncio as Redis
from typing import Generator, AsyncGenerator
import json
from contextlib import asynccontextmanager
redis = Redis.from_url("redis://localhost:6379")
redis_backend = RedisChannelsPubSubBackend(redis=redis, key_prefix="RELAY_CHANNELS")
channels_plugin = ChannelsPlugin(backend=redis_backend, arbitrary_channels_allowed=True)
COUNT_KEY = "REDIS_COUNT_KEY"
@asynccontextmanager
async def increment(channel: str) -> Generator[None, None, None]:
await redis.hincrby(COUNT_KEY, channel, 1)
try:
yield
finally:
await redis.hincrby(COUNT_KEY, channel, -1)
async def generate_event(
channel_name: str, channels: ChannelsPlugin
) -> AsyncGenerator[bytes, None]:
async with increment(channel=channel_name), channels.start_subscription(
[channel_name]
) as subscriber:
async for message in subscriber.iter_events():
print(message)
yield message
@get("/sse/{channel:str}")
async def sse(channel: str, channels: ChannelsPlugin) -> ServerSentEvent:
return ServerSentEvent(generate_event(channel_name=channel, channels=channels))
@get("/count")
async def count()->dict[str, int]:
return {
k.decode(): int(v) for k, v in (await redis.hgetall(COUNT_KEY)).items()
}
app = Litestar(route_handlers=[sse, count], plugins=[channels_plugin])
if __name__ == "__main__":
import redis as sync_redis
counter = 0
while True:
print("Sending message")
r = sync_redis.from_url("redis://localhost:6379")
r.publish(channel="test_channel", message=json.dumps({"data": counter}))
counter += 1
time.sleep(4)
# stream event with curl
# curl -N http://localhost:8001/sse/test_channel
# send event with python
# python app.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment