Created
May 1, 2013 18:36
-
-
Save clayote/5497264 to your computer and use it in GitHub Desktop.
This is a metaclass to automagically generate some SQL that works with my particular database schema. Plus a couple classes that use it.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SaveableMetaclass(type): | |
def __new__(metaclass, clas, parents, attrs): | |
if 'coldecls' not in attrs: | |
raise Exception("No coldecls in {0}".format(clas)) | |
if 'primarykeys' not in attrs: | |
raise Exception("no primarykeys in {0}".format(clas)) | |
coldecls = attrs['coldecls'] | |
primarykeys = attrs['primarykeys'] | |
tablenames = attrs['tablenames'] | |
if 'foreignkeys' in attrs: | |
foreignkeys = attrs['foreignkeys'] | |
else: | |
foreignkeys = {} | |
if 'checks' in attrs: | |
checks = attrs['checks'] | |
else: | |
checks = {} | |
for d in foreignkeys, checks: | |
for tablename in tablenames: | |
if tablename not in d: | |
d[tablename] = {} | |
schemata = [] | |
inserts = {} | |
deletes = {} | |
detects = {} | |
missings = {} | |
keylen = {} | |
rowlen = {} | |
keyqms = {} | |
rowqms = {} | |
keystrs = {} | |
rowstrs = {} | |
keynames = {} | |
valnames = {} | |
colnames = {} | |
colnamestr = {} | |
for item in primarykeys.iteritems(): | |
(tablename, pkey) = item | |
keynames[tablename] = sorted(pkey) | |
keylen[tablename] = len(pkey) | |
keyqms[tablename] = ", ".join(["?"] * keylen[tablename]) | |
keystrs[tablename] = "(" + keyqms[tablename] + ")" | |
for item in coldecls.iteritems(): | |
(tablename, coldict) = item | |
valnames[tablename] = sorted( | |
[key for key in coldict.keys() | |
if key not in keynames[tablename]]) | |
rowlen[tablename] = len(coldict) | |
rowqms[tablename] = ", ".join(["?"] * rowlen[tablename]) | |
rowstrs[tablename] = "(" + rowqms[tablename] + ")" | |
for tablename in coldecls.iterkeys(): | |
colnames[tablename] = keynames[tablename] + valnames[tablename] | |
for tablename in tablenames: | |
coldecl = coldecls[tablename] | |
pkey = primarykeys[tablename] | |
fkeys = foreignkeys[tablename] | |
cks = ["CHECK(%s)" % ck for ck in checks[tablename]] | |
pkeydecs = [keyname + " " + typ.upper() | |
for (keyname, typ) in coldecl.iteritems() | |
if keyname in pkey] | |
valdecs = [valname + " " + typ.upper() | |
for (valname, typ) in coldecl.iteritems() | |
if valname not in pkey] | |
coldecs = sorted(pkeydecs) + sorted(valdecs) | |
coldecstr = ", ".join(coldecs) | |
pkeycolstr = ", ".join(pkey) | |
pkeys = [keyname for (keyname, typ) in coldecl.iteritems() | |
if keyname in pkey] | |
pkeynamestr = ", ".join(sorted(pkeys)) | |
vals = [valname for (valname, typ) in coldecl.iteritems() | |
if valname not in pkey] | |
colnamestr[tablename] = ", ".join(sorted(pkeys) + sorted(vals)) | |
pkeystr = "PRIMARY KEY (%s)" % (pkeycolstr,) | |
fkeystrs = ["FOREIGN KEY (%s) REFERENCES %s(%s)" % | |
(item[0], item[1][0], item[1][1]) | |
for item in fkeys.iteritems()] | |
fkeystr = ", ".join(fkeystrs) | |
chkstr = ", ".join(cks) | |
table_decl_data = [coldecstr] | |
if len(pkey) > 0: | |
table_decl_data.append(pkeystr) | |
if len(fkeystrs) > 0: | |
table_decl_data.append(fkeystr) | |
if len(cks) > 0: | |
table_decl_data.append(chkstr) | |
table_decl = ", ".join(table_decl_data) | |
create_stmt = "CREATE TABLE %s (%s);" % (tablename, table_decl) | |
insert_stmt_start = "INSERT INTO %s VALUES " % ( | |
tablename,) | |
inserts[tablename] = insert_stmt_start | |
delete_stmt_start = "DELETE FROM %s WHERE (%s) IN " % ( | |
tablename, pkeycolstr) | |
deletes[tablename] = delete_stmt_start | |
detect_stmt_start = "SELECT %s FROM %s WHERE (%s) IN " % ( | |
colnamestr[tablename], tablename, pkeynamestr) | |
detects[tablename] = detect_stmt_start | |
missing_stmt_start = "SELECT %s FROM %s WHERE (%s) NOT IN " % ( | |
colnamestr[tablename], tablename, pkeynamestr) | |
missings[tablename] = missing_stmt_start | |
schemata.append(create_stmt) | |
def insert_rowdicts_table(db, rowdicts, tabname): | |
rowstr = rowstrs[tabname] | |
qrystr = inserts[tabname] + ", ".join([rowstr] * len(rowdicts)) | |
qrylst = [] | |
for rowdict in rowdicts: | |
extender = [rowdict[col] for col in colnames[tabname]] | |
qrylst.extend(extender) | |
qrytup = tuple(qrylst) | |
db.c.execute(qrystr, qrytup) | |
def delete_keydicts_table(db, keydicts, tabname): | |
keystr = keystrs[tabname] | |
qrystr = deletes[tabname] + ", ".join([keystr] * len(keydicts)) | |
qrylst = [] | |
for keydict in keydicts: | |
qrylst.extend([keydict[col] for col in keynames[tabname]]) | |
qrytup = tuple(qrylst) | |
db.c.execute(qrystr, qrytup) | |
def detect_keydicts_table(db, keydicts, tabname): | |
keystr = keystrs[tabname] | |
qrystr = detects[tabname] + ", ".join([keystr] * len(keydicts)) | |
qrylst = [] | |
for keydict in keydicts: | |
qrylst.extend([keydict[col] for col in keynames[tabname]]) | |
qrytup = tuple(qrylst) | |
db.c.execute(qrystr, qrytup) | |
return db.c.fetchall() | |
def missing_keydicts_table(db, keydicts, tabname): | |
keystr = keystrs[tabname] | |
qrystr = missings[tabname] + ", ".join([keystr] * len(keydicts)) | |
qrylst = [] | |
for keydict in keydicts: | |
qrylst.extend([keydict[col] for col in keynames[tabname]]) | |
qrytup = tuple(qrylst) | |
db.c.execute(qrystr, qrytup) | |
return db.c.fetchall() | |
def insert_tabdict(db, tabdict): | |
for item in tabdict.iteritems(): | |
(tabn, rd) = item | |
if isinstance(rd, list): | |
insert_rowdicts_table(db, rd, tabn) | |
else: | |
insert_rowdicts_table(db, [rd], tabn) | |
def delete_tabdict(db, tabdict): | |
for item in tabdict.iteritems(): | |
(tabn, rd) = item | |
delete_keydicts_table(db, rd, tabn) | |
def detect_tabdict(db, tabdict): | |
for item in tabdict.iteritems(): | |
(tabn, rd) = item | |
return detect_keydicts_table(db, rd, tabn) | |
def missing_tabdict(db, tabdict): | |
for item in tabdict.iteritems(): | |
(tabn, rd) = item | |
return missing_keydicts_table(db, rd, tabn) | |
def unravel(self, db): | |
pass | |
dbop = {'insert': insert_tabdict, | |
'delete': delete_tabdict, | |
'detect': detect_tabdict, | |
'missing': missing_tabdict} | |
atrdic = {'coldecls': coldecls, | |
'colnames': colnames, | |
'colnamestr': colnamestr, | |
'keynames': keynames, | |
'valnames': valnames, | |
'cols': colnames[tablenames[0]], | |
'primarykeys': primarykeys, | |
'foreignkeys': foreignkeys, | |
'checks': checks, | |
'schemata': schemata, | |
'keylen': keylen, | |
'rowlen': rowlen, | |
'keyqms': keyqms, | |
'rowqms': rowqms, | |
'dbop': dbop, | |
'unravel': unravel, | |
'maintab': tablenames[0]} | |
atrdic.update(attrs) | |
return type.__new__(metaclass, clas, parents, atrdic) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment