Skip to content

Instantly share code, notes, and snippets.

@neoblackcap
Last active Apr 28, 2022
Embed
What would you like to do?
// tasks.py
import asyncio
import celery
app = celery.Celery('app')
async def coroutine():
await asyncio.sleep(5)
print("pass 5 second")
@app.task
def simple_task():
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine())
// sample.py
from . import tasks
if __name__ == '__main__':
tasks.simple_task.apply_async()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment