Skip to content

Instantly share code, notes, and snippets.

@awolverp
Last active June 21, 2024 12:04
Show Gist options
  • Save awolverp/74d83ff22c3cac4ac8f2dff83f7ba101 to your computer and use it in GitHub Desktop.
Save awolverp/74d83ff22c3cac4ac8f2dff83f7ba101 to your computer and use it in GitHub Desktop.
Asynchronous SQLAlchemy Example | Python

Asynchronous SQLALchemy Example

Note

We won't use relationships in this example; see SQLAlchemy documentation for those.

First of all, we need to install asyncio sqlalchemy by:

pip3 install -U 'sqlalchemy[asyncio]' aiomysql

First of file, we need to import things that we need in future:

from sqlalchemy.ext.asyncio import AsyncAttrs, create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy.sql import sqltypes

Now let's define "Declarative Base" class and our table:

class Model(AsyncAttrs, DeclarativeBase):
    # You don't need this if you aren't using MySQL
    __table_args__ = {"mysql_engine": "InnoDB"}

    def __repr__(self) -> str:
        return "{}({})".format(
            self.__class__.__qualname__,
            ", ".join(f"{k}={getattr(self, k, None)}" for k in self.__table__.columns.keys()),
        )

class User(Model):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(sqltypes.Integer, primary_key=True, autoincrement=True)
    first_name: Mapped[str] = mapped_column(sqltypes.VARCHAR(32), nullable=False)
    last_name: Mapped[str] = mapped_column(sqltypes.VARCHAR(32), nullable=True)
    amount: Mapped[int] = mapped_column(sqltypes.Integer, nullable=False, default=0)

Our table is ready; wait where's our connection?

_engine = create_async_engine(
    "mysql+aiomysql://user:password@127.0.0.1:3306/database_name"
)
db = async_sessionmaker(_engine, expire_on_commit=False)


async def initialize_database():
    async with _engine.begin() as conn:
        await conn.run_sync(Model.metadata.create_all)

async def dispose_database():
    await _engine.dispose()

OK, now everything is ready to use. here is an example for using:

from sqlalchemy import sql
import asyncio

async def main():
    await initialize_database()

    # inserting
    async with db.begin() as session:
        await session.execute(
            sql.insert(User).values(first_name="Ali", last_name="P")
        )
        await session.execute(
            sql.insert(User).values(first_name="aWolverP", last_name=None)
        )
    
    # selecting
    async with db.begin() as session:
        user_1 = await session.execute(
            sql.select(User).where(User.id == 1)
        )
        user_1 = user_1.scalar()

        print(user_1)
        # User(id=1, first_name="Ali", last_name="P", amount=0)

        users = await session.execute(
            sql.select(User)
        )
        users = users.scalars().all()
        
        print(users)
        # [User(id=1, first_name="Ali", last_name="P", amount=0), User(id=2, first_name="aWolverP", last_name=None, amount=0)]

        count = await session.execute(
            sql.select(sql.func.count(User.id))
        )
        count = count.scalar()
        
        print(count)
        # 2

    # updating
    async with db.begin() as session:
        await session.execute(
            sql.update(User).where(User.id == 1)
            .values(
                {
                    User.first_name: "New Name",
                    User.amount: User.amount + 5000,
                }
            )
        )
        user_1 = await session.execute(
            sql.select(User).where(User.id == 1)
        )
        user_1 = user_1.scalar()

        print(user_1)
        # User(id=1, first_name="New Name", last_name="P", amount=5000)

    # deleting
    async with db.begin() as session:
        await session.execute(
            sql.delete(User).where(User.id == 2)
        )

        user_2 = await session.execute(
            sql.select(User).where(User.id == 2)
        )
        user_2 = user_2.scalar()

        print(user_2)
        # None

    await dispose_database()

asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment