Skip to content

Instantly share code, notes, and snippets.

@devraj
Last active January 30, 2024 21:08
Show Gist options
  • Save devraj/6cf8467f0431caa2901330e06fb385de to your computer and use it in GitHub Desktop.
Save devraj/6cf8467f0431caa2901330e06fb385de to your computer and use it in GitHub Desktop.
SQLAlchemy asyncio calls from within a Celery task
# We've been trying to figure out how to run async code from
# Celery until it gets support for it
#
# This is an extracted example which should fit into
# https://github.com/anomaly/lab-python-server
import asyncio
from uuid import UUID
from sqlalchemy.ext.asyncio import AsyncSession
from ...celery import app
from ...db import get_async_session
from ...models import Customer
# Globally available loop to run async tasks
# this has to be on the global level
loop = asyncio.get_event_loop()
# Define an async method and then you can do everything
# async using SQLAlchemy et al
async def get_user():
session_generator = get_async_session()
# Fires the yield statement and returns the session
session = await session_generator.asend(None)
customers = await Customer.get_all(session)
import logging
logging.error(customers)
# The task then uses the event loop to run the async method
# make sure you are to pass any variables that you wish to
# recieve as part of the task
@app.task(ignore_result=True)
def create_card_holder(customerId: UUID):
loop.run_until_complete(get_user())
@devraj
Copy link
Author

devraj commented Jan 30, 2024

thank you very much, you really save me

Not a problem. Glad this post was of assistance to you. As an aside, this was part of my research and decision making for asyncio tasks in Python.

Eventually due to the workarounds, I moved to using taskiq, we use fastapi and it integrates well with the ecosystem, which was a bonus for us.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment