Skip to content

Instantly share code, notes, and snippets.

@thinkt4nk
Created March 13, 2011 05:27
Show Gist options
  • Save thinkt4nk/867895 to your computer and use it in GitHub Desktop.
Save thinkt4nk/867895 to your computer and use it in GitHub Desktop.
import pprint
import psycopg2
import psycopg2.extras
from psycopg2.extras import DictConnection
class pgImport(object):
"""Imports data to pg instance"""
def __init__(self,dbname='template1',user='postgres',password='postgres',host='localhost'):
"""Constructor"""
self.db = psycopg2.connect('dbname=%s user=%s host=%s password=%s' % (dbname,user,host,password))
self.dbCursor = self.db.cursor(cursor_factory=psycopg2.extras.DictCursor)
def create_table(self,tableName,columns):
"""Create a table"""
q = 'CREATE TABLE "%s" (%s);' % (tableName,', '.join(columns))
self.dbCursor.execute(q)
self.db.commit()
return True
def query(self,q,params):
"""Query a database"""
self.dbCursor.execute(q,params)
return self.dbCursor.fetchall()
def insert(self,tableName,models):
"""Insert new model objects into a table"""
if models.__class__ != 'list':
models = [models]
for model in models:
q = 'INSERT INTO "%s" (%s) VALUES (%s)' % (
tableName,
','.join(model.data.keys()),
','.join(map(lambda i: '%s' ,model.data.keys()))
)
try:
self.dbCursor.execute(q,[v.__str__() for v in model.data.values()])
except psycopg2.DataError as e:
for d in model.data:
print '%s : %s' % (d,model.data[d])
self.db.commit()
def update(self,tableName,models,primary_key='id'):
"""Update model objects in a table"""
if models.__class__ != 'list':
models = [models]
for model in models:
if primary_key.__class__ != 'list':
primary_key = [primary_key]
primary_key_clause = ' AND '.join(['%s=%%s' % key for key in primary_key])
print primary_key_clause
exit()
q = 'UPDATE "%s" SET %s WHERE %s' % (
tableName,
','.join(['%s=%%s'% key for key in model.get_attributes().keys()]),
primary_key_clause
)
params=map(lambda key: model.data[key],model.get_attributes())
for k in primary_key:
params.append(model.get_attribute(k))
pprint.pprint([q,params])
#self.dbCursor.execute(q,model.data.values())
self.db.commit()
class pgModel(object):
"""Encapsulates db data, empowers data with db functionality"""
def __init__(self,db,tableName):
self.db=db
self.tableName=tableName
self.data={}
def save(self):
self.db.insert(self.tableName,self)
def update(self,primary_key='id'):
self.db.update(self.tableName,self,primary_key)
def set_attributes(self,attribute_map):
for column in attribute_map.keys():
self.data[column]=attribute_map[column]
def set_attribute(self,attribute,value):
if value:
self.data[attribute]=value
def get_attributes(self):
attribute_map = {}
for column in self.data.keys():
attribute_map[column]=self.data[column]
return attribute_map
def get_attribute(self,attribute_name):
try:
if self.data.__class__ == 'dict':
return self.data[attribute_name]
except (KeyError,TypeError):
print self.data.__class__
exit()
def query(self,primary_keys):
primary_key_clauses=[]
primary_key_params=[]
for key in primary_keys:
primary_key_clauses.append('%s=%%s'%key)
primary_key_params.append(self.data[key])
q='SELECT * FROM "%s" WHERE %s LIMIT 1' % (self.tableName,' AND '.join(primary_key_clauses))
return self.db.query(q,primary_key_params)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment