Created
September 5, 2019 16:26
-
-
Save euri10/5ef4f2079ff6140f2df2f434dc6e834d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import asyncio | |
import uvicorn | |
from aio_pika import connect_robust, ExchangeType, IncomingMessage | |
from celery import Celery | |
from fastapi import FastAPI | |
from starlette.applications import Starlette | |
from starlette.requests import Request | |
from starlette.responses import JSONResponse | |
from models import ( | |
CeleryTaskSucceeded, | |
CeleryTaskSent, | |
CeleryTaskReceived, | |
CeleryTaskStarted, | |
CeleryTaskFailed, | |
CeleryTaskRejected, | |
CeleryTaskRevoked, | |
CeleryTaskRetried, | |
) | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.DEBUG) | |
fastapp = Starlette(debug=True) | |
celery_app = Celery(broker="amqp://guest@localhost//") | |
class SomeAsyncService: | |
def __init__(self, tick=5.0, consumer=None): | |
self.cancellation_event = asyncio.Event() | |
self.tick = tick | |
self.consumer = consumer | |
def on_task_succeeded(self, event): | |
logger.info("on_task_succeeded") | |
logger.info(event) | |
logger.info(CeleryTaskSucceeded(**event)) | |
async def on_message(self, message: IncomingMessage): | |
if message.routing_key == "task.multi": | |
logger.debug( | |
f"INCOMING message, exchange:{message.exchange}, routing_key:{message.routing_key}" | |
) | |
logger.debug(message.body) | |
# body = json.loads(message.body) | |
# if body["type"] == "state": | |
# raise asyncio.CancelledError() | |
def shutdown(self): | |
logger.debug(f"about to cancel") | |
self.cancellation_event.set() | |
logger.debug(f"cancellation_event.set") | |
async def startup(self): | |
logger.debug("Service started") | |
cancellation_task = self.cancellation_event.wait() | |
while not self.cancellation_event.is_set(): | |
pending, done = await asyncio.wait( | |
[cancellation_task, asyncio.sleep(self.tick)], | |
return_when=asyncio.FIRST_COMPLETED, | |
) | |
logger.debug(f"pending: {pending}") | |
logger.debug(f"done: {done}") | |
if cancellation_task in done: | |
for task in pending: | |
task.cancel() | |
else: | |
logger.debug("Service did something") | |
if self.consumer is None: | |
try: | |
connection = await connect_robust(url=celery_app.conf.broker_url) | |
channel = await connection.channel() | |
await channel.set_qos(prefetch_count=1) | |
exchange = await channel.declare_exchange( | |
name=celery_app.conf.event_exchange, | |
type=ExchangeType.TOPIC, | |
durable=True, | |
) | |
u = "mqueue" | |
queue = await channel.declare_queue( | |
name=f"{celery_app.conf.event_queue_prefix}.{u}", | |
auto_delete=True, | |
arguments={ | |
"x-expires": int( | |
celery_app.conf.event_queue_expires * 1000 | |
), | |
"x-message-ttl": int( | |
celery_app.conf.event_queue_ttl * 1000 | |
), | |
}, | |
) | |
await queue.bind(exchange=exchange, routing_key="#") | |
self.consumer = await queue.consume( | |
callback=self.on_message, no_ack=True, exclusive=False | |
) | |
except Exception as e: | |
logger.debug(f"{e}") | |
else: | |
logger.debug(f"consumer already set") | |
logger.debug("Service finished something") | |
logger.debug("Service stopped") | |
# while True: | |
# logger.debug(f"ticks every {self.tick}s.") | |
# tasks = [t for t in asyncio.all_tasks() if t is not | |
# asyncio.current_task()] | |
# for task in tasks: | |
# logger.debug(f"task name: {task._coro.__name__}") | |
# await asyncio.sleep(self.tick) | |
@fastapp.on_event("startup") | |
async def on_startup(): | |
logger.debug(f"on_event startup") | |
service = SomeAsyncService(tick=5.0) | |
service_task = asyncio.create_task(service.startup()) | |
fastapp.state.service = service | |
fastapp.state.service_task = service_task | |
@fastapp.on_event("shutdown") | |
async def on_shutdown(): | |
logger.debug(f"on_event shutdown") | |
service = fastapp.state.service | |
service.shutdown() | |
service_task = fastapp.state.service_task | |
await service_task | |
@fastapp.route("/") | |
async def homepage(request: Request): | |
return JSONResponse({"hello": "world"}) | |
@fastapp.route("/newtask", methods=["POST"]) | |
def newtask(request: Request): | |
msg = "teattgergretgbretbv" | |
celery_app.send_task("tasks.celery1", args=[msg]) | |
return JSONResponse({"msg": f"Word {msg} received"}) | |
def main(): | |
uvicorn.run(fastapp, host="0.0.0.0", port=8000) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment