import asyncio
from flask import Flask
async def abar(a):
print(a)
loop = asyncio.get_event_loop()
app = Flask(__name__)
@app.route("/")
def notify():
loop.run_until_complete(abar("abar"))
return "OK"
if __name__ == "__main__":
app.run(debug=False, use_reloader=False)
This will block the Flask response until the async function returns, but it still allows you to do some clever things. I've used this pattern to perform many external requests in parallel using aiohttp, and then when they are complete, I'm back into traditional flask for data processing and template rendering.
import aiohttp
import asyncio
import async_timeout
from flask import Flask
loop = asyncio.get_event_loop()
app = Flask(__name__)
async def fetch(url):
async with aiohttp.ClientSession() as session, async_timeout.timeout(10):
async with session.get(url) as response:
return await response.text()
def fight(responses):
return "Why can't we all just get along?"
@app.route("/")
def index():
# perform multiple async requests concurrently
responses = loop.run_until_complete(asyncio.gather(
fetch("https://google.com/"),
fetch("https://bing.com/"),
fetch("https://duckduckgo.com"),
fetch("http://www.dogpile.com"),
))
# do something with the results
return fight(responses)
if __name__ == "__main__":
app.run(debug=False, use_reloader=False)
This solution doesn't work entirely with versions before 3.8, need to create a backport using
import asyncio
import itertools
import time
import threading
__all__ = ["EventLoopThread", "get_event_loop", "stop_event_loop", "run_coroutine"]
class EventLoopThread(threading.Thread):
loop = None
_count = itertools.count(0)
def __init__(self):
name = f"{type(self).__name__}-{next(self._count)}"
super().__init__(name=name, daemon=True)
def __repr__(self):
loop, r, c, d = self.loop, False, True, False
if loop is not None:
r, c, d = loop.is_running(), loop.is_closed(), loop.get_debug()
return (
f"<{type(self).__name__} {self.name} id={self.ident} "
f"running={r} closed={c} debug={d}>"
)
def run(self):
self.loop = loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_forever()
finally:
try:
shutdown_asyncgens = loop.shutdown_asyncgens()
except AttributeError:
pass
else:
loop.run_until_complete(shutdown_asyncgens)
loop.close()
asyncio.set_event_loop(None)
def stop(self):
loop, self.loop = self.loop, None
if loop is None:
return
loop.call_soon_threadsafe(loop.stop)
self.join()
_lock = threading.Lock()
_loop_thread = None
def get_event_loop():
global _loop_thread
with _lock:
if _loop_thread is None:
_loop_thread = EventLoopThread()
_loop_thread.start()
return _loop_thread.loop
def stop_event_loop():
global _loop_thread
with _lock:
if _loop_thread is not None:
_loop_thread.stop()
_loop_thread = None
def run_coroutine(coro):
"""Run the coroutine in the event loop running in a separate thread
Returns a Future, call Future.result() to get the output
"""
return asyncio.run_coroutine_threadsafe(coro, get_event_loop())