Skip to content

Instantly share code, notes, and snippets.

@xsduan
Last active April 3, 2024 17:27
Show Gist options
  • Star 10 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save xsduan/09fb145da3da3a78f5ca844b155f27aa to your computer and use it in GitHub Desktop.
Save xsduan/09fb145da3da3a78f5ca844b155f27aa to your computer and use it in GitHub Desktop.
peewee-async sqlite connection doohicky
"""
Temporary module to allow for sqlite databases during development. Remove once
we get an actual database.
"""
import peewee
from peewee_async import AsyncDatabase
import playhouse.sqlite_ext as sqlite_ext
try:
import aiosqlite
except ImportError:
aiosqlite = None
__all__ = ["SqliteDatabase", "SqliteExtDatabase"]
class AsyncSqliteConnection:
def __init__(self, *, database=None, loop=None, timeout=None, **kwargs):
self._created_connections = []
self.loop = loop
self.database = database
self.timeout = timeout
self.connect_kwargs = kwargs
async def acquire(self):
conn = aiosqlite.connect(database=self.database, **self.connect_kwargs)
self._created_connections.append(conn)
return await conn.__aenter__()
async def release(self, conn):
if conn in self._created_connections:
self._created_connections.remove(conn)
await conn.commit()
await conn.__aexit__(None, None, None)
async def connect(self):
pass
async def close(self):
for conn in self._created_connections:
await conn.__aexit__(None, None, None)
self._created_connections = []
async def cursor(self, conn=None, *args, **kwargs):
in_transaction = conn is not None
if not conn:
conn = await self.acquire()
cursor = await conn.cursor()
# cursor.release is a coroutine
cursor.release = self.release_cursor( # pylint: disable = assignment-from-no-return
cursor, in_transaction
)
return cursor
async def release_cursor(self, cursor, in_transaction=False):
conn = cursor._conn
await cursor.__aexit__(None, None, None)
if not in_transaction:
await self.release(conn)
class AsyncSqliteMixin(AsyncDatabase):
if aiosqlite:
import sqlite3
Error = sqlite3.Error
def init_async(self, conn_class=AsyncSqliteConnection):
if not aiosqlite:
raise Exception("Error, aiosqlite is not installed!")
self._async_conn_cls = conn_class
@property
def connect_kwargs_async(self):
return {**self.connect_kwargs}
async def last_insert_id_async(self, cursor, model):
"""Get ID of last inserted row.
"""
if model._meta.auto_increment:
return cursor.lastrowid
class SqliteDatabase(AsyncSqliteMixin, peewee.SqliteDatabase):
def init(self, database, **kwargs):
super().init(database, **kwargs)
self.init_async()
class SqliteExtDatabase(SqliteDatabase, sqlite_ext.SqliteExtDatabase):
pass
"""Taken directly from readme - a good "unit test" of sorts"""
import asyncio
import peewee
import peewee_async
from async_sqlite import SqliteDatabase
# Nothing special, just define model and database:
database = SqliteDatabase('a.sqlite')
class TestModel(peewee.Model):
text = peewee.CharField()
class Meta:
database = database
# Look, sync code is working!
TestModel.create_table(True)
TestModel.create(text="Yo, I can do it sync!")
database.close()
# Create async models manager:
objects = peewee_async.Manager(database)
# No need for sync anymore!
database.set_allow_sync(False)
async def handler():
await objects.create(TestModel, text="Not bad. Watch this, I'm async!")
all_objects = await objects.execute(TestModel.select())
for obj in all_objects:
print(obj.text)
loop = asyncio.get_event_loop()
loop.run_until_complete(handler())
loop.close()
# Clean up, can do it sync again:
with objects.allow_sync():
TestModel.drop_table(True)
# Expected output:
# Yo, I can do it sync!
# Not bad. Watch this, I'm async!
[[package]]
category = "main"
description = "asyncio bridge to the standard sqlite3 module"
name = "aiosqlite"
optional = false
python-versions = "*"
version = "0.9.0"
[[package]]
category = "main"
description = "a little orm"
name = "peewee"
optional = false
python-versions = "*"
version = "2.10.2"
[[package]]
category = "main"
description = "Asynchronous interface for peewee ORM powered by asyncio."
name = "peewee-async"
optional = false
python-versions = "*"
version = "0.5.12"
[package.dependencies]
peewee = ">=2.8.0,<=2.10.2"
[metadata]
content-hash = "7b142de08db0c014422b2311cbe01615de0ecaee987eb7f2edb0d693f183d8c4"
python-versions = "^3.7"
[metadata.hashes]
aiosqlite = ["af4fed9e778756fa0ffffc7a8b14c4d7b1a57155dc5669f18e45107313f6019e"]
peewee = ["2342067f48a779e35956a44cd547df883dda35153daa9fe994d970585aaec281"]
peewee-async = ["1376774637b6f5cfb9192a06380a8e987fed206e0e229bbadd50da6a4578557b", "ab64a2a376033ce5621406b33735cb064659af05f5c2570af0fba08f6eab6282"]
[tool.poetry]
name = "test"
version = "0.1.0"
description = ""
authors = ["Shane Duan <s_duan@u.pacific.edu>"]
[tool.poetry.dependencies]
python = "^3.7"
peewee-async = "^0.5.12"
aiosqlite = "^0.9.0"
[tool.poetry.dev-dependencies]
[build-system]
requires = ["poetry>=0.12"]
build-backend = "poetry.masonry.api"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment