Skip to content

Instantly share code, notes, and snippets.

@vytas7
Last active February 8, 2022 15:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vytas7/f40c5299b0ca6c85cddceef01c057372 to your computer and use it in GitHub Desktop.
Save vytas7/f40c5299b0ca6c85cddceef01c057372 to your computer and use it in GitHub Desktop.
Hybrid ASGI/WSGI app in the same process
#!/usr/bin/env python
import asyncio
import logging
import threading
import uuid
import falcon
import falcon.asgi
import gunicorn.app.base
import uvicorn
# NOTE(vytas): Useful since ASGI otherwise has nothing like wsgierrors.
logging.basicConfig(
format='%(asctime)s [%(levelname)s] %(message)s', level=logging.INFO)
class RequestID:
def process_request(self, req, resp):
req.context.request_id = str(uuid.uuid4())
async def process_request_async(self, req, resp):
self.process_request(req, resp)
class Hub:
TIMEOUT = 5.0
def __init__(self):
self._loop = None
self._queues = set()
async def process_startup(self, scope, event):
# NOTE: Another way to get Uvicorn's loop is manually creating one and
# passing it to Uvicorn:
# https://github.com/encode/uvicorn/issues/706#issuecomment-652220153
self._loop = asyncio.get_running_loop()
async def _enqueue(self, message):
for queue in self._queues:
await queue.put(message)
async def events(self, request_id):
logging.info(f'<{request_id}>: SSE emitter starting')
queue = asyncio.Queue()
self._queues.add(queue)
try:
while True:
try:
message = await asyncio.wait_for(
queue.get(), timeout=self.TIMEOUT)
logging.info(f'<{request_id}>: ==> {message}')
yield falcon.asgi.SSEvent(json=message, retry=5000)
except asyncio.TimeoutError:
yield falcon.asgi.SSEvent()
finally:
self._queues.discard(queue)
logging.info(f'<{request_id}>: SSE emitter exiting')
def broadcast(self, message):
if self._loop is not None:
asyncio.run_coroutine_threadsafe(
self._enqueue(message), self._loop)
else:
logging.warning(f'Cannot broadcast {message}: no known loop yet')
class EventStream:
def __init__(self, hub):
self._hub = hub
async def on_get(self, req, resp):
resp.sse = self._hub.events(req.context.request_id)
class SyncResource:
def __init__(self, hub):
self._hub = hub
def on_get(self, req, resp):
resp.media = {'greeting': 'Hello!'}
def on_post(self, req, resp):
self._hub.broadcast(req.get_media())
resp.status = falcon.HTTP_ACCEPTED
class HybridApplication(gunicorn.app.base.BaseApplication):
hub = Hub()
@classmethod
def post_fork(cls, arbiter, worker):
asgi_app = falcon.asgi.App(middleware=[RequestID(), cls.hub])
asgi_app.add_route('/sse', EventStream(cls.hub))
# TODO: Use Gunicorn's hooks to properly join() instead of daemon=True.
uvicorn_thread = threading.Thread(
target=uvicorn.run,
args=(asgi_app,),
kwargs=dict(host='127.0.0.1', port=8002, log_level='debug'),
daemon=True)
uvicorn_thread.start()
def __init__(self):
self.options = {
'bind': '127.0.0.1:8000',
'post_fork': self.post_fork,
'workers': 1,
'threads': 4,
}
self.application = app = falcon.App()
app.add_route('/messages', SyncResource(self.hub))
super().__init__()
def load_config(self):
config = {key: value for key, value in self.options.items()
if key in self.cfg.settings and value is not None}
for key, value in config.items():
self.cfg.set(key.lower(), value)
def load(self):
return self.application
if __name__ == '__main__':
HybridApplication().run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment