Skip to content

Instantly share code, notes, and snippets.

@mgerlach-klick
Last active October 23, 2018 02:36
Show Gist options
  • Save mgerlach-klick/0044a0e7e98b20002e967d495a8d79c7 to your computer and use it in GitHub Desktop.
Save mgerlach-klick/0044a0e7e98b20002e967d495a8d79c7 to your computer and use it in GitHub Desktop.
A simple data-based database access and update pattern for Python
import records
import re
def q(db, select, where=None, joins={}, order_by=None, limit=None, debug=False):
"""
Usage examples:
1) q(db, "phonebook.name phonebook.number", where=["phonebook.name=:name", {"name": "Dieter"}])
2) q(db, "phonebook.name, phonebook.number", where="phonebook.name='Dieter'")
3) q(db, "phonebook.name, person.description", where="phonebook.name='Dieter'", joins={'person.name':'phonebook.name'})
4) q(db,"phonebook.name, phonebook.number", order_by="phonebook.number asc", limit=3)
Arguments:
db: records.Database instance.
select: A string that describes all fields to be fetched from the database, after the pattern table.column. The list is space or comma separated.
where: Optional. Either a string of hardcoded conditions (cf. ex. 2), or a list of a parameterized where-string and a dictionary of values (cf. example 1).
order_by: Optional. A string that describes the order.
limit: Optional. An integer describing the maximum results.
debug: If true prints the resulting SQL query
Result:
The result is a dictionary or a list of dictionaries with keys that correspond to the query format.
Example:
q(db, "phonebook.name person.description", joins={'person.name':'phonebook.name'})
# =>
[{'person.description': 'This is dieter', 'phonebook.name': 'Dieter'},
{'person.description': 'This is clearly not Dieter', 'phonebook.name': 'Dirk'}]
"""
def colify(c, sep='$'): return sep.join(c.split('.'))
columns = ",\n".join(["{} AS {}".format(col, colify(col)) for col in re.split(r'\s+', select.replace(",", " "))])
def get_table(s): return re.search(r'^(.+?)\.', s).group(1)
from_table = get_table(select)
join_clauses = "\n".join([" JOIN {} ON {} = {}".format(get_table(j1), j1, j2) for j1,j2 in joins.items()])
where_str = "\n WHERE {}".format(where if isinstance(where, str) else where[0]) if where else ""
where_values = None if (isinstance(where, str) or where is None) else where[1]
order_by_str = "\n ORDER BY {}".format(order_by) if isinstance(order_by, str) else ""
limit_str = "\n LIMIT {}".format(limit) if limit else ""
sql_str = "SELECT \n {} \n FROM {} \n {} {} {} {};".format(columns, from_table, join_clauses, where_str, order_by_str, limit_str)
if(debug): print(sql_str)
if db is None: return sql_str # Maybe you just wanna kick the tires without risking anything! That's cool!
res = db.query(sql_str, **where_values) if where_values else db.query(sql_str)
result = []
for item in res:
d = {}
for k in item.keys():
if '$' in k:
d[k.replace('$', '.')] = item[k]
else:
d[k] = item[k]
result.append(d)
if len(result) == 0:
return None
elif len(result) == 1:
return result[0]
else:
return result
def transact(db, map, debug=False):
"""
Usage:
transact(db, {'person.name': 'Mr. T', 'person.description': 'not necessary!'})
This either creates the record or updates the description
through the intermediate query:
INSERT INTO person (name, description) VALUES (:person_name, :person_description) ON CONFLICT (name) DO UPDATE
SET
name = excluded.name,
description = excluded.description;
and returns the new/updated record:
{"name": "Mr. T", "description": "not necessary!"}
"""
if not hasattr(transact, "pkey_cache"): #hacky static variable! :-O
transact.pkey_cache = {}
def colify(c): return re.search(r"\.(.+)$", c).group(1)
keys = list(map.keys())
table = re.search(r"^(.+?)\.", keys[0]).group(1)
columns = [colify(c) for c in keys]
columns_str = ", ".join(columns)
def sqlize_key(k): return k.replace(".","_")
def desqlize_key(k): return k.replace("_",".")
sqlized_map = {}
for k,v in map.items():
sqlized_map[sqlize_key(k)] = map[k]
if transact.pkey_cache.get(table):
conflict_cols = transact.pkey_cache.get(table)
else:
if debug:
print('fetching primary keys')
print(transact.pkey_cache)
conflict_cols = [res.name for res in db.query("SELECT name FROM pragma_table_info(:table) where pk=1;", table=table)]
transact.pkey_cache[table] = conflict_cols
conflict_cols_str = ", ".join(conflict_cols)
excluded_strs = ",\n ".join(["{} = excluded.{}".format(col, col) for col in columns])
sqlized_keys_str = ", ".join(":{}".format(k) for k in sqlized_map.keys())
sql_str = "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE \n SET \n {};".format(table, columns_str, sqlized_keys_str, conflict_cols_str, excluded_strs)
pkey = conflict_cols[0]
if db is None: return sql_str
if debug: print(sql_str)
execution_res = db.query(sql_str, **sqlized_map)
query_res = db.query('select * from {} where {} = :pval'.format( table, pkey), pval=map["{}.{}".format(table, desqlize_key(pkey))])
return query_res[0].as_dict()
def test_setup():
db = records.Database('sqlite://')
db.query("""CREATE TABLE phonebook(
name TEXT PRIMARY KEY,
number TEXT UNIQUE,
validDate DATE
);""")
db.query("""CREATE TABLE person(
name TEXT PRIMARY KEY,
description TEXT
);""")
db.query("""insert into phonebook(name, number, validDate) values (:name, :num, "2018-12-12");""", name="Dieter", num="666")
db.query("""insert into phonebook(name, number, validDate) values (:name, :num, "2018-10-10");""", name="Dirk", num="88888888")
db.query("""insert into person(name, description) values (:name, :description);""", name="Dieter", description="This is dieter")
db.query("""insert into person(name, description) values (:name, :description);""", name="Dirk", description="This is clearly not Dieter")
return db
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment