Skip to content

Instantly share code, notes, and snippets.

@kgriffs
Last active April 30, 2023 11:20
Show Gist options
  • Save kgriffs/023dcdc39c07c0ec0c749d0ddf29c4da to your computer and use it in GitHub Desktop.
Save kgriffs/023dcdc39c07c0ec0c749d0ddf29c4da to your computer and use it in GitHub Desktop.
Falcon ASGI+WebSocket Interface Proposal (By Example)
# See also:
#
# * https://asgi.readthedocs.io/en/latest/specs/www.html#websocket
# * https://developer.mozilla.org/en-US/docs/Web/API/Websockets_API
#
import falcon.asgi
import falcon.media
class SomeResource:
# Get a paginated list of events via a regular HTTP request.
#
# For small-scale, all-in-one apps, it may make sense to support
# both a regular HTTP interface and one based on WebSocket
# side-by-side in the same deployment. However, these two
# interaction models have very different performance characteristics,
# and so larger scale-out deployments may wish to specifically
# designate instance groups for one type of traffic vs. the
# other (although the actual applications may still be capable
# of handling both modes).
#
async def on_get(self, req: Request, account_id: str):
pass
# Push event stream to client. Note that the framework will pass
# parameters defined in the URI template as with HTTP method
# responders.
async def on_websocket(self, req: Request, ws: WebSocket, account_id: str):
# The HTTP request used to initiate the WebSocket handshake can be
# examined as needed.
some_header_value = req.get_header('Some-Header')
# Reject it?
if some_condition:
# If close() is called before accept() the code kwarg is
# ignored, if present, and the server returns a 403
# HTTP response without upgrading the connection.
await ws.close()
return
# Examine subprotocols advertised by the client. Here let's just
# assume we only support wamp, so if the client doesn't advertise
# it we reject the connection.
if 'wamp' not in ws.subprotocols:
# If close() is not called explicitly, the framework will
# take care of it automatically with the default code (1000).
return
# If, after examining the connection info, you would like to accept
# it, simply call accept() as follows:
try:
await ws.accept(subprotocol='wamp')
except WebSocketDisconnected:
return
# Simply start sending messages to the client if this is an event
# feed endpoint.
while True:
try:
event = await my_next_event()
# Send an instance of str as a WebSocket TEXT (0x01) payload
await ws.send_text(event)
# Send an instance of bytes, bytearray, or memoryview as a
# WebSocket BINARY (0x02) payload.
await ws.send_data(event)
# Or if you want it to be serialized to JSON (by default; can
# be customized via app.ws_options.media_handlers):
await ws.send_media(event) # Defaults to WebSocketPayloadType.TEXT
except WebSocketDisconnected:
# Do any necessary cleanup, then bail out
return
# ...or loop like this to implement a simple request-response protocol
while True:
try:
# Use this if you expect a WebSocket TEXT (0x01) payload,
# decoded from UTF-8 to a Unicode string.
payload_str = await ws.receive_text()
# Or if you are expecting a WebSocket BINARY (0x02) payload,
# in which case you will end up with a byte string result:
payload_bytes = await ws.receive_data()
# Or if you want to get a serialized media object (defaults to
# JSON deserialization of text payloads, and MessagePack
# deserialization for BINARY payloads, but this can be
# customized via app.ws_options.media_handlers).
media_object = await ws.receive_media()
except WebSocketDisconnected:
# Do any necessary cleanup, then bail out
return
except TypeError:
# The received message payload was not of the expected
# type (e.g., got BINARY when TEXT was expected).
pass
except json.JSONDecodeError:
# The default media deserializer uses the json standard
# library, so you might see this error raised as well.
pass
# At any time, you may decide to close the websocket. If the
# socket is already closed, this call does nothing (it will
# not raise an error.)
if we_are_so_done_with_this_conversation():
# https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent
await ws.close(code=1000)
return
try:
# Here we are sending as a binary (0x02) payload type, which
# will go find the handler configured for that (defaults to
# MessagePack which assumes you've also installed that
# package, but this can be customized as mentioned above.')
await ws.send_media(
{'event': 'message'},
payload_type=WebSocketPayloadType.BINARY,
)
except WebSocketDisconnected:
# Do any necessary cleanup, then bail out. If ws.close() was
# not already called by the app, the framework will take
# care of it.
# NOTE: If you do not handle this exception, it will be
# bubbled up to a default error handler that simply
# logs the message as a warning and then closes the
# server side of the connection. This handler can be
# overwritten as with any other error handler for the app.
# NOTE: If the error handler accepts an 'ws' kwarg, the framework
# will pass in the falcon.asgi.WebSocket instance.
return
# ...or run a couple of different loops in parallel to support
# independent bidirectional message streams.
messages = collections.deque()
async def sink():
while True:
try:
message = await ws.receive_text()
except falcon.WebSocketDisconnected:
break
messages.append(message)
sink_task = falcon.create_task(sink())
while not sink_task.done():
while ws.ready and not messages and not sink_task.done():
await asyncio.sleep(0)
try:
await ws.send_text(messages.popleft())
except falcon.WebSocketDisconnected:
break
sink_task.cancel()
try:
await sink_task
except asyncio.CancelledError:
pass
class SomeMiddleware:
async def process_request_ws(self, req: Request):
# This will be called for the HTTP request that initiates the
# WebSocket handshake before routing.
pass
async def process_resource_ws(self, req: Request, resource, params):
# This will be called for the HTTP request that initiates the
# WebSocket handshake after routing (if a route matches the
# request).
pass
app = falcon.asgi.App(middleware=SomeMiddleware())
app.add_route('/{account_id}/messages', SomeResource())
# Let's say we want to use a faster JSON library. You could also use this
# pattern to add serialization support for custom types that aren't
# normally JSON-serializable out of the box.
class RapidJSONHandler(falcon.media.TextBaseHandlerWS):
def serialize(self, media: object) -> str:
return rapidjson.dumps(media, ensure_ascii=False)
# The raw TEXT payload will be passed as a Unicode string
def deserialize(self, payload: str) -> object:
return rapidjson.loads(payload)
# And/or for binary mode we want to use Protocol Buffers:
class ProtocolBuffersHandler(falcon.media.BinaryBaseHandlerWS):
def serialize(self, media: message_pb2.Message) -> Union[bytes, bytearray, memoryview]:
return media.SerializeToString()
# The raw BINARY payload will be passed as a byte string
def deserialize(self, payload: bytes) -> message_pb2.Message:
message = message_pb2.Message()
return message.ParseFromString(payload)
# Expected to (de)serialize from/to str
json_handler = RapidJSONHandler()
app.ws_options.media_handlers[falcon.WebSocketPayloadType.TEXT] = json_handler
# Expected to (de)serialize from/to bytes, bytearray, or memoryview
protobuf_handler = ProtocolBuffersHandler()
app.ws_options.media_handlers[falcon.WebSocketPayloadType.BINARY] = protobuf_handler
@kgriffs
Copy link
Author

kgriffs commented Jul 20, 2020

@kgriffs
Copy link
Author

kgriffs commented Nov 24, 2020

PR has been merged! Next pre-release will include WS support. Please help test! falconry/falcon#1741

@kgriffs
Copy link
Author

kgriffs commented Dec 1, 2020

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment