Created
July 23, 2012 14:16
-
-
Save kindly/3163864 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sqlalchemy | |
import uuid | |
_engines = {} | |
_pg_types = {} | |
def get_engine(context, data_dict): | |
db_name = data_dict.get('db_name', 'main') | |
engine = _engines.get(db_name) | |
if not engine: | |
engine = sqlalchemy.create_engine(data_dict['db_uri'], echo=True) | |
_engines[db_name] = engine | |
return engine | |
def get_type(context, oid): | |
if not _pg_types: | |
connection = context['connection'] | |
results = connection.execute( | |
'select oid, typname from pg_type;' | |
) | |
for result in results: | |
_pg_types[result[0]] = result[1] | |
return _pg_types[oid] | |
def convert_jsonable(value): | |
if any([isinstance(value, basestring), | |
isinstance(value, int), | |
isinstance(value, float), | |
isinstance(value, bool)]): | |
return value | |
if value is None: | |
return None | |
return str(value) | |
def dictize_results(context, results): | |
jsonable = context.get('jsonable', False) | |
result = {} | |
data = [] | |
fields = [] | |
result['fields'] = fields | |
result['data'] = data | |
for field in results.cursor.description: | |
fields.append( | |
{'name': field[0], | |
'type': get_type(context, field[1])} | |
) | |
for row in results: | |
row_data = {} | |
for num, value in enumerate(row): | |
if jsonable: | |
value = convert_jsonable(value) | |
row_data[fields[num]['name']] = value | |
result['data'].append(row_data) | |
result['count'] = results.rowcount | |
return result | |
def execute_sql(context, sql, *args): | |
connection = context['connection'] | |
results = connection.execute(sql, *args) | |
#not a select statement | |
if not results.cursor: | |
if results.rowcount > 0: | |
return {'count': results.rowcount} | |
else: | |
#if create/drop table | |
return {} | |
return dictize_results(context, results) | |
def insert_dict(context, data_dict): | |
table_name = data_dict.pop('__table') | |
primary_key = "%s_id" % table_name | |
primary_key_value = data_dict.get(primary_key) | |
if not data_dict.get(primary_key): | |
primary_key_value = str(uuid.uuid4()) | |
data_dict[primary_key] = primary_key_value | |
sql_columns = ", ".join(['"%s"' % key for key in data_dict.keys()]) | |
sql_values = ", ".join(['%s' for key in data_dict.keys()]) | |
result = execute_sql( | |
context, | |
'insert into "%s" (%s) values (%s)' % (table_name, sql_columns, sql_values), | |
*data_dict.values() | |
) | |
return result | |
def update_dict(context, data_dict, table=None): | |
table_name = data_dict.pop('__table', table) | |
if not table_name: | |
raise AttributeError, 'Table needs to be supplied to update' | |
primary_key = "%s_id" % table_name | |
primary_key_value = data_dict[primary_key] | |
update_clauses = [] | |
params = [] | |
for key, value in data_dict.iteritems(): | |
update_clauses.append(""""%s" = %%s""" % key) | |
params.append(value) | |
update_columns = ', '.join(update_clauses) | |
where_clause = """"%s" = %%s""" % (primary_key) | |
params.append(primary_key_value) | |
result = execute_sql( | |
context, | |
'update "%s" set %s where %s' % (table_name, update_columns, where_clause), | |
params | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment