Created
March 13, 2011 05:27
-
-
Save thinkt4nk/867895 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pprint | |
import psycopg2 | |
import psycopg2.extras | |
from psycopg2.extras import DictConnection | |
class pgImport(object): | |
"""Imports data to pg instance""" | |
def __init__(self,dbname='template1',user='postgres',password='postgres',host='localhost'): | |
"""Constructor""" | |
self.db = psycopg2.connect('dbname=%s user=%s host=%s password=%s' % (dbname,user,host,password)) | |
self.dbCursor = self.db.cursor(cursor_factory=psycopg2.extras.DictCursor) | |
def create_table(self,tableName,columns): | |
"""Create a table""" | |
q = 'CREATE TABLE "%s" (%s);' % (tableName,', '.join(columns)) | |
self.dbCursor.execute(q) | |
self.db.commit() | |
return True | |
def query(self,q,params): | |
"""Query a database""" | |
self.dbCursor.execute(q,params) | |
return self.dbCursor.fetchall() | |
def insert(self,tableName,models): | |
"""Insert new model objects into a table""" | |
if models.__class__ != 'list': | |
models = [models] | |
for model in models: | |
q = 'INSERT INTO "%s" (%s) VALUES (%s)' % ( | |
tableName, | |
','.join(model.data.keys()), | |
','.join(map(lambda i: '%s' ,model.data.keys())) | |
) | |
try: | |
self.dbCursor.execute(q,[v.__str__() for v in model.data.values()]) | |
except psycopg2.DataError as e: | |
for d in model.data: | |
print '%s : %s' % (d,model.data[d]) | |
self.db.commit() | |
def update(self,tableName,models,primary_key='id'): | |
"""Update model objects in a table""" | |
if models.__class__ != 'list': | |
models = [models] | |
for model in models: | |
if primary_key.__class__ != 'list': | |
primary_key = [primary_key] | |
primary_key_clause = ' AND '.join(['%s=%%s' % key for key in primary_key]) | |
print primary_key_clause | |
exit() | |
q = 'UPDATE "%s" SET %s WHERE %s' % ( | |
tableName, | |
','.join(['%s=%%s'% key for key in model.get_attributes().keys()]), | |
primary_key_clause | |
) | |
params=map(lambda key: model.data[key],model.get_attributes()) | |
for k in primary_key: | |
params.append(model.get_attribute(k)) | |
pprint.pprint([q,params]) | |
#self.dbCursor.execute(q,model.data.values()) | |
self.db.commit() | |
class pgModel(object): | |
"""Encapsulates db data, empowers data with db functionality""" | |
def __init__(self,db,tableName): | |
self.db=db | |
self.tableName=tableName | |
self.data={} | |
def save(self): | |
self.db.insert(self.tableName,self) | |
def update(self,primary_key='id'): | |
self.db.update(self.tableName,self,primary_key) | |
def set_attributes(self,attribute_map): | |
for column in attribute_map.keys(): | |
self.data[column]=attribute_map[column] | |
def set_attribute(self,attribute,value): | |
if value: | |
self.data[attribute]=value | |
def get_attributes(self): | |
attribute_map = {} | |
for column in self.data.keys(): | |
attribute_map[column]=self.data[column] | |
return attribute_map | |
def get_attribute(self,attribute_name): | |
try: | |
if self.data.__class__ == 'dict': | |
return self.data[attribute_name] | |
except (KeyError,TypeError): | |
print self.data.__class__ | |
exit() | |
def query(self,primary_keys): | |
primary_key_clauses=[] | |
primary_key_params=[] | |
for key in primary_keys: | |
primary_key_clauses.append('%s=%%s'%key) | |
primary_key_params.append(self.data[key]) | |
q='SELECT * FROM "%s" WHERE %s LIMIT 1' % (self.tableName,' AND '.join(primary_key_clauses)) | |
return self.db.query(q,primary_key_params) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment