Skip to content

Instantly share code, notes, and snippets.

Created July 23, 2012 14:16
Show Gist options
  • Save kindly/3163864 to your computer and use it in GitHub Desktop.
Save kindly/3163864 to your computer and use it in GitHub Desktop.
import sqlalchemy
import uuid
_engines = {}
_pg_types = {}
def get_engine(context, data_dict):
db_name = data_dict.get('db_name', 'main')
engine = _engines.get(db_name)
if not engine:
engine = sqlalchemy.create_engine(data_dict['db_uri'], echo=True)
_engines[db_name] = engine
return engine
def get_type(context, oid):
if not _pg_types:
connection = context['connection']
results = connection.execute(
'select oid, typname from pg_type;'
for result in results:
_pg_types[result[0]] = result[1]
return _pg_types[oid]
def convert_jsonable(value):
if any([isinstance(value, basestring),
isinstance(value, int),
isinstance(value, float),
isinstance(value, bool)]):
return value
if value is None:
return None
return str(value)
def dictize_results(context, results):
jsonable = context.get('jsonable', False)
result = {}
data = []
fields = []
result['fields'] = fields
result['data'] = data
for field in results.cursor.description:
{'name': field[0],
'type': get_type(context, field[1])}
for row in results:
row_data = {}
for num, value in enumerate(row):
if jsonable:
value = convert_jsonable(value)
row_data[fields[num]['name']] = value
result['count'] = results.rowcount
return result
def execute_sql(context, sql, *args):
connection = context['connection']
results = connection.execute(sql, *args)
#not a select statement
if not results.cursor:
if results.rowcount > 0:
return {'count': results.rowcount}
#if create/drop table
return {}
return dictize_results(context, results)
def insert_dict(context, data_dict):
table_name = data_dict.pop('__table')
primary_key = "%s_id" % table_name
primary_key_value = data_dict.get(primary_key)
if not data_dict.get(primary_key):
primary_key_value = str(uuid.uuid4())
data_dict[primary_key] = primary_key_value
sql_columns = ", ".join(['"%s"' % key for key in data_dict.keys()])
sql_values = ", ".join(['%s' for key in data_dict.keys()])
result = execute_sql(
'insert into "%s" (%s) values (%s)' % (table_name, sql_columns, sql_values),
return result
def update_dict(context, data_dict, table=None):
table_name = data_dict.pop('__table', table)
if not table_name:
raise AttributeError, 'Table needs to be supplied to update'
primary_key = "%s_id" % table_name
primary_key_value = data_dict[primary_key]
update_clauses = []
params = []
for key, value in data_dict.iteritems():
update_clauses.append(""""%s" = %%s""" % key)
update_columns = ', '.join(update_clauses)
where_clause = """"%s" = %%s""" % (primary_key)
result = execute_sql(
'update "%s" set %s where %s' % (table_name, update_columns, where_clause),
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment