Skip to content

Instantly share code, notes, and snippets.

@valq7711
Created June 1, 2021 20:25
Show Gist options
  • Save valq7711/ab0ad76e52c8d75ba45f3560a5fd59b4 to your computer and use it in GitHub Desktop.
Save valq7711/ab0ad76e52c8d75ba45f3560a5fd59b4 to your computer and use it in GitHub Desktop.
async_pydal
from pydal import DAL
from voodoodal import DB, Table, Field, model
import aiosqlite
import asyncio
# async sqlexecuter
class SQLExecuter:
def __init__(self, pydal_adapter, adb_executer):
self.adb_executer = adb_executer
self.adapter = pydal_adapter
self.cursor = None
async def exec(
self,
query,
placeholders = None,
as_dict=False,
fields=None,
colnames=None,
):
adapter = self.adapter
if placeholders:
cursor = await self.adb_executer(query, placeholders)
else:
cursor = await self.adb_executer(query)
self.cursor = cursor
if as_dict:
if not hasattr(cursor, "description"):
raise RuntimeError(
"database does not support executesql(...,as_dict=True)"
)
# Non-DAL legacy db query, converts cursor results to dict.
# sequence of 7-item sequences. each sequence tells about a column.
# first item is always the field name according to Python Database API specs
columns = cursor.description
# reduce the column info down to just the field names
fields = colnames or [f[0] for f in columns]
if len(fields) != len(set(fields)):
raise RuntimeError(
"Result set includes duplicate column names. Specify unique column names using the 'colnames' argument"
)
#: avoid bytes strings in columns names (py3)
if columns:
for i in range(0, len(fields)):
if isinstance(fields[i], bytes):
fields[i] = fields[i].decode("utf8")
# will hold our finished resultset in a list
data = await cursor.fetchall()
# convert the list for each row into a dictionary so it's
# easier to work with. row['field_name'] rather than row[0]
return [dict(zip(fields, row)) for row in data]
try:
data = await cursor.fetchall()
except:
return None
if fields or colnames:
fields = [] if fields is None else fields
if not isinstance(fields, list):
fields = [fields]
extracted_fields = []
for field in fields:
if isinstance(field, DAL.Table):
extracted_fields.extend([f for f in field])
else:
extracted_fields.append(field)
if not colnames:
colnames = [f.sqlsafe for f in extracted_fields]
else:
#: extracted_fields is empty we should make it from colnames
# what 'col_fields' is for
col_fields = [] # [[tablename, fieldname], ....]
newcolnames = []
for tf in colnames:
if "." in tf:
t_f = tf.split(".")
tf = ".".join(adapter.dialect.quote(f) for f in t_f)
else:
t_f = None
if not extracted_fields:
col_fields.append(t_f)
newcolnames.append(tf)
colnames = newcolnames
data = adapter.parse(
data,
fields=extracted_fields
or [tf and self[tf[0]][tf[1]] for tf in col_fields],
colnames=colnames,
)
return data
# usage example
db = DAL('sqlite://storage.db')
model = model(db)
@model
class db(DB):
class todo(Table):
info = Field()
db.commit()
db.close()
async def main():
conn = await aiosqlite.connect('storage.db')
asql = SQLExecuter(db._adapter, conn.execute)
await asql.exec(db.todo._insert(info = 'foo'))
await asql.exec(db.todo._insert(info = 'bar'))
await conn.commit()
rows = await asql.exec(db(db.todo)._select(), fields = db.todo)
print(rows)
await asql.cursor.close()
await conn.close()
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment