Skip to content

Instantly share code, notes, and snippets.

@Tarliton
Last active February 26, 2023 13:07
Show Gist options
  • Save Tarliton/c494fa972a7a2594372738c96c0654a1 to your computer and use it in GitHub Desktop.
Save Tarliton/c494fa972a7a2594372738c96c0654a1 to your computer and use it in GitHub Desktop.
asyncio with a thread executor and sqlalchemy
import asyncio
import base64
import os
import random
from sqlalchemy import Column, Integer, String
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker, declarative_base
# SQLAlchemy==2.0.4
# psycopg==3.1.8
connection_pool_size = min(32, os.cpu_count() + 4) # the same as ThreadPoolExecutor default worker size Python >= 3.8
def create_db():
engine = create_engine('postgresql+psycopg://postgres:mysecretpassword@localhost:5432/mydatabase',
pool_size=connection_pool_size, max_overflow=0)
Base = declarative_base()
class InnerUser(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(50))
Base.metadata.create_all(engine)
session_factory = sessionmaker(bind=engine)
session_maker = scoped_session(session_factory)
return session_maker, InnerUser
Session, User = create_db()
def users_count():
session = Session()
users = session.query(User).count()
session.close()
return users
def add_user():
session = Session()
user = User()
user.name = base64.b64encode(os.urandom(10)).decode('utf8')
name = user.name
session.add(user)
session.commit()
session.close()
return name
async def count_or_add(loop, i):
if random.uniform(0, 10) > 5:
users = await loop.run_in_executor(None, users_count)
print(i, users)
else:
user = await loop.run_in_executor(None, add_user)
print(i, user)
async def run(n):
tasks = []
for i in range(n):
task = asyncio.ensure_future(count_or_add(event_loop, i))
tasks.append(task)
responses = asyncio.gather(*tasks)
await responses
number_tasks = 1000
event_loop = asyncio.get_event_loop()
future = asyncio.ensure_future(run(number_tasks))
event_loop.run_until_complete(future)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment