Created
June 1, 2021 20:25
-
-
Save valq7711/ab0ad76e52c8d75ba45f3560a5fd59b4 to your computer and use it in GitHub Desktop.
async_pydal
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pydal import DAL | |
from voodoodal import DB, Table, Field, model | |
import aiosqlite | |
import asyncio | |
# async sqlexecuter | |
class SQLExecuter: | |
def __init__(self, pydal_adapter, adb_executer): | |
self.adb_executer = adb_executer | |
self.adapter = pydal_adapter | |
self.cursor = None | |
async def exec( | |
self, | |
query, | |
placeholders = None, | |
as_dict=False, | |
fields=None, | |
colnames=None, | |
): | |
adapter = self.adapter | |
if placeholders: | |
cursor = await self.adb_executer(query, placeholders) | |
else: | |
cursor = await self.adb_executer(query) | |
self.cursor = cursor | |
if as_dict: | |
if not hasattr(cursor, "description"): | |
raise RuntimeError( | |
"database does not support executesql(...,as_dict=True)" | |
) | |
# Non-DAL legacy db query, converts cursor results to dict. | |
# sequence of 7-item sequences. each sequence tells about a column. | |
# first item is always the field name according to Python Database API specs | |
columns = cursor.description | |
# reduce the column info down to just the field names | |
fields = colnames or [f[0] for f in columns] | |
if len(fields) != len(set(fields)): | |
raise RuntimeError( | |
"Result set includes duplicate column names. Specify unique column names using the 'colnames' argument" | |
) | |
#: avoid bytes strings in columns names (py3) | |
if columns: | |
for i in range(0, len(fields)): | |
if isinstance(fields[i], bytes): | |
fields[i] = fields[i].decode("utf8") | |
# will hold our finished resultset in a list | |
data = await cursor.fetchall() | |
# convert the list for each row into a dictionary so it's | |
# easier to work with. row['field_name'] rather than row[0] | |
return [dict(zip(fields, row)) for row in data] | |
try: | |
data = await cursor.fetchall() | |
except: | |
return None | |
if fields or colnames: | |
fields = [] if fields is None else fields | |
if not isinstance(fields, list): | |
fields = [fields] | |
extracted_fields = [] | |
for field in fields: | |
if isinstance(field, DAL.Table): | |
extracted_fields.extend([f for f in field]) | |
else: | |
extracted_fields.append(field) | |
if not colnames: | |
colnames = [f.sqlsafe for f in extracted_fields] | |
else: | |
#: extracted_fields is empty we should make it from colnames | |
# what 'col_fields' is for | |
col_fields = [] # [[tablename, fieldname], ....] | |
newcolnames = [] | |
for tf in colnames: | |
if "." in tf: | |
t_f = tf.split(".") | |
tf = ".".join(adapter.dialect.quote(f) for f in t_f) | |
else: | |
t_f = None | |
if not extracted_fields: | |
col_fields.append(t_f) | |
newcolnames.append(tf) | |
colnames = newcolnames | |
data = adapter.parse( | |
data, | |
fields=extracted_fields | |
or [tf and self[tf[0]][tf[1]] for tf in col_fields], | |
colnames=colnames, | |
) | |
return data | |
# usage example | |
db = DAL('sqlite://storage.db') | |
model = model(db) | |
@model | |
class db(DB): | |
class todo(Table): | |
info = Field() | |
db.commit() | |
db.close() | |
async def main(): | |
conn = await aiosqlite.connect('storage.db') | |
asql = SQLExecuter(db._adapter, conn.execute) | |
await asql.exec(db.todo._insert(info = 'foo')) | |
await asql.exec(db.todo._insert(info = 'bar')) | |
await conn.commit() | |
rows = await asql.exec(db(db.todo)._select(), fields = db.todo) | |
print(rows) | |
await asql.cursor.close() | |
await conn.close() | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment