Skip to content

Instantly share code, notes, and snippets.

@euri10
Created September 5, 2019 16:26
Show Gist options
  • Save euri10/5ef4f2079ff6140f2df2f434dc6e834d to your computer and use it in GitHub Desktop.
Save euri10/5ef4f2079ff6140f2df2f434dc6e834d to your computer and use it in GitHub Desktop.
import logging
import asyncio
import uvicorn
from aio_pika import connect_robust, ExchangeType, IncomingMessage
from celery import Celery
from fastapi import FastAPI
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
from models import (
CeleryTaskSucceeded,
CeleryTaskSent,
CeleryTaskReceived,
CeleryTaskStarted,
CeleryTaskFailed,
CeleryTaskRejected,
CeleryTaskRevoked,
CeleryTaskRetried,
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
fastapp = Starlette(debug=True)
celery_app = Celery(broker="amqp://guest@localhost//")
class SomeAsyncService:
def __init__(self, tick=5.0, consumer=None):
self.cancellation_event = asyncio.Event()
self.tick = tick
self.consumer = consumer
def on_task_succeeded(self, event):
logger.info("on_task_succeeded")
logger.info(event)
logger.info(CeleryTaskSucceeded(**event))
async def on_message(self, message: IncomingMessage):
if message.routing_key == "task.multi":
logger.debug(
f"INCOMING message, exchange:{message.exchange}, routing_key:{message.routing_key}"
)
logger.debug(message.body)
# body = json.loads(message.body)
# if body["type"] == "state":
# raise asyncio.CancelledError()
def shutdown(self):
logger.debug(f"about to cancel")
self.cancellation_event.set()
logger.debug(f"cancellation_event.set")
async def startup(self):
logger.debug("Service started")
cancellation_task = self.cancellation_event.wait()
while not self.cancellation_event.is_set():
pending, done = await asyncio.wait(
[cancellation_task, asyncio.sleep(self.tick)],
return_when=asyncio.FIRST_COMPLETED,
)
logger.debug(f"pending: {pending}")
logger.debug(f"done: {done}")
if cancellation_task in done:
for task in pending:
task.cancel()
else:
logger.debug("Service did something")
if self.consumer is None:
try:
connection = await connect_robust(url=celery_app.conf.broker_url)
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
exchange = await channel.declare_exchange(
name=celery_app.conf.event_exchange,
type=ExchangeType.TOPIC,
durable=True,
)
u = "mqueue"
queue = await channel.declare_queue(
name=f"{celery_app.conf.event_queue_prefix}.{u}",
auto_delete=True,
arguments={
"x-expires": int(
celery_app.conf.event_queue_expires * 1000
),
"x-message-ttl": int(
celery_app.conf.event_queue_ttl * 1000
),
},
)
await queue.bind(exchange=exchange, routing_key="#")
self.consumer = await queue.consume(
callback=self.on_message, no_ack=True, exclusive=False
)
except Exception as e:
logger.debug(f"{e}")
else:
logger.debug(f"consumer already set")
logger.debug("Service finished something")
logger.debug("Service stopped")
# while True:
# logger.debug(f"ticks every {self.tick}s.")
# tasks = [t for t in asyncio.all_tasks() if t is not
# asyncio.current_task()]
# for task in tasks:
# logger.debug(f"task name: {task._coro.__name__}")
# await asyncio.sleep(self.tick)
@fastapp.on_event("startup")
async def on_startup():
logger.debug(f"on_event startup")
service = SomeAsyncService(tick=5.0)
service_task = asyncio.create_task(service.startup())
fastapp.state.service = service
fastapp.state.service_task = service_task
@fastapp.on_event("shutdown")
async def on_shutdown():
logger.debug(f"on_event shutdown")
service = fastapp.state.service
service.shutdown()
service_task = fastapp.state.service_task
await service_task
@fastapp.route("/")
async def homepage(request: Request):
return JSONResponse({"hello": "world"})
@fastapp.route("/newtask", methods=["POST"])
def newtask(request: Request):
msg = "teattgergretgbretbv"
celery_app.send_task("tasks.celery1", args=[msg])
return JSONResponse({"msg": f"Word {msg} received"})
def main():
uvicorn.run(fastapp, host="0.0.0.0", port=8000)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment