Last active
October 23, 2018 02:36
-
-
Save mgerlach-klick/0044a0e7e98b20002e967d495a8d79c7 to your computer and use it in GitHub Desktop.
A simple data-based database access and update pattern for Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import records | |
import re | |
def q(db, select, where=None, joins={}, order_by=None, limit=None, debug=False): | |
""" | |
Usage examples: | |
1) q(db, "phonebook.name phonebook.number", where=["phonebook.name=:name", {"name": "Dieter"}]) | |
2) q(db, "phonebook.name, phonebook.number", where="phonebook.name='Dieter'") | |
3) q(db, "phonebook.name, person.description", where="phonebook.name='Dieter'", joins={'person.name':'phonebook.name'}) | |
4) q(db,"phonebook.name, phonebook.number", order_by="phonebook.number asc", limit=3) | |
Arguments: | |
db: records.Database instance. | |
select: A string that describes all fields to be fetched from the database, after the pattern table.column. The list is space or comma separated. | |
where: Optional. Either a string of hardcoded conditions (cf. ex. 2), or a list of a parameterized where-string and a dictionary of values (cf. example 1). | |
order_by: Optional. A string that describes the order. | |
limit: Optional. An integer describing the maximum results. | |
debug: If true prints the resulting SQL query | |
Result: | |
The result is a dictionary or a list of dictionaries with keys that correspond to the query format. | |
Example: | |
q(db, "phonebook.name person.description", joins={'person.name':'phonebook.name'}) | |
# => | |
[{'person.description': 'This is dieter', 'phonebook.name': 'Dieter'}, | |
{'person.description': 'This is clearly not Dieter', 'phonebook.name': 'Dirk'}] | |
""" | |
def colify(c, sep='$'): return sep.join(c.split('.')) | |
columns = ",\n".join(["{} AS {}".format(col, colify(col)) for col in re.split(r'\s+', select.replace(",", " "))]) | |
def get_table(s): return re.search(r'^(.+?)\.', s).group(1) | |
from_table = get_table(select) | |
join_clauses = "\n".join([" JOIN {} ON {} = {}".format(get_table(j1), j1, j2) for j1,j2 in joins.items()]) | |
where_str = "\n WHERE {}".format(where if isinstance(where, str) else where[0]) if where else "" | |
where_values = None if (isinstance(where, str) or where is None) else where[1] | |
order_by_str = "\n ORDER BY {}".format(order_by) if isinstance(order_by, str) else "" | |
limit_str = "\n LIMIT {}".format(limit) if limit else "" | |
sql_str = "SELECT \n {} \n FROM {} \n {} {} {} {};".format(columns, from_table, join_clauses, where_str, order_by_str, limit_str) | |
if(debug): print(sql_str) | |
if db is None: return sql_str # Maybe you just wanna kick the tires without risking anything! That's cool! | |
res = db.query(sql_str, **where_values) if where_values else db.query(sql_str) | |
result = [] | |
for item in res: | |
d = {} | |
for k in item.keys(): | |
if '$' in k: | |
d[k.replace('$', '.')] = item[k] | |
else: | |
d[k] = item[k] | |
result.append(d) | |
if len(result) == 0: | |
return None | |
elif len(result) == 1: | |
return result[0] | |
else: | |
return result | |
def transact(db, map, debug=False): | |
""" | |
Usage: | |
transact(db, {'person.name': 'Mr. T', 'person.description': 'not necessary!'}) | |
This either creates the record or updates the description | |
through the intermediate query: | |
INSERT INTO person (name, description) VALUES (:person_name, :person_description) ON CONFLICT (name) DO UPDATE | |
SET | |
name = excluded.name, | |
description = excluded.description; | |
and returns the new/updated record: | |
{"name": "Mr. T", "description": "not necessary!"} | |
""" | |
if not hasattr(transact, "pkey_cache"): #hacky static variable! :-O | |
transact.pkey_cache = {} | |
def colify(c): return re.search(r"\.(.+)$", c).group(1) | |
keys = list(map.keys()) | |
table = re.search(r"^(.+?)\.", keys[0]).group(1) | |
columns = [colify(c) for c in keys] | |
columns_str = ", ".join(columns) | |
def sqlize_key(k): return k.replace(".","_") | |
def desqlize_key(k): return k.replace("_",".") | |
sqlized_map = {} | |
for k,v in map.items(): | |
sqlized_map[sqlize_key(k)] = map[k] | |
if transact.pkey_cache.get(table): | |
conflict_cols = transact.pkey_cache.get(table) | |
else: | |
if debug: | |
print('fetching primary keys') | |
print(transact.pkey_cache) | |
conflict_cols = [res.name for res in db.query("SELECT name FROM pragma_table_info(:table) where pk=1;", table=table)] | |
transact.pkey_cache[table] = conflict_cols | |
conflict_cols_str = ", ".join(conflict_cols) | |
excluded_strs = ",\n ".join(["{} = excluded.{}".format(col, col) for col in columns]) | |
sqlized_keys_str = ", ".join(":{}".format(k) for k in sqlized_map.keys()) | |
sql_str = "INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE \n SET \n {};".format(table, columns_str, sqlized_keys_str, conflict_cols_str, excluded_strs) | |
pkey = conflict_cols[0] | |
if db is None: return sql_str | |
if debug: print(sql_str) | |
execution_res = db.query(sql_str, **sqlized_map) | |
query_res = db.query('select * from {} where {} = :pval'.format( table, pkey), pval=map["{}.{}".format(table, desqlize_key(pkey))]) | |
return query_res[0].as_dict() | |
def test_setup(): | |
db = records.Database('sqlite://') | |
db.query("""CREATE TABLE phonebook( | |
name TEXT PRIMARY KEY, | |
number TEXT UNIQUE, | |
validDate DATE | |
);""") | |
db.query("""CREATE TABLE person( | |
name TEXT PRIMARY KEY, | |
description TEXT | |
);""") | |
db.query("""insert into phonebook(name, number, validDate) values (:name, :num, "2018-12-12");""", name="Dieter", num="666") | |
db.query("""insert into phonebook(name, number, validDate) values (:name, :num, "2018-10-10");""", name="Dirk", num="88888888") | |
db.query("""insert into person(name, description) values (:name, :description);""", name="Dieter", description="This is dieter") | |
db.query("""insert into person(name, description) values (:name, :description);""", name="Dirk", description="This is clearly not Dieter") | |
return db |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment