Skip to content

Instantly share code, notes, and snippets.

@scarlson
Last active February 11, 2022 21:16
Show Gist options
  • Save scarlson/5165866 to your computer and use it in GitHub Desktop.
Save scarlson/5165866 to your computer and use it in GitHub Desktop.
Build and save objects based on Psycopg2 tables.
### Postgres:
#
# CREATE TABLE film (
# film_id varchar(40) CONSTRAINT firstkey PRIMARY KEY,
# did integer NOT NULL,
# date_prod date,
# imdb text,
# kind varchar(10),
# len interval hour to minute
# );
#
#
### Python:
#
# class Film(BaseObject):
# pass
#
# Memento = Film('Memento')
# Memento.imdb = 'http://www.imdb.com/title/tt0209144/'
# if Memento.save():
# print "Success!"
#
import psycopg2
DATABASE = 'psycopg2 connect val'
class BaseObject(object):
def __init__(self, value, table=None, key=None):
self._table = table or self.__class__.__name__
self._key = key or self._table + '_id'
self._getVals()
self._fetch(value)
@property
def conn(self):
if not hasattr(self, '_conn'):
self._conn = psycopg2.connect(DATABASE)
self._conn.set_client_encoding('UTF8')
if self._conn.closed:
self._conn = psycopg2.connect(DATABASE)
self._conn.set_client_encoding('UTF8')
return self._conn
@property
def cur(self):
if not hasattr(self, '_cur'):
self._cur = self.conn.cursor()
if self._cur.closed:
self._cur = self.conn.cursor()
return self._cur
def select(self, query, *params):
if params:
query = self.cur.mogrify(query, params)
else:
query = self.cur.mogrify(query)
self.cur.execute(query)
res = self.cur.fetchall()
self.cur.close()
self.conn.close()
if res:
if len(res) > 1:
return res
else:
return res[0]
def update(self, query, *params):
if params:
query = self.cur.mogrify(query, params)
else:
query = self.cur.mogrify(query)
try:
self.cur.execute(query)
self.conn.commit()
self.cur.close()
self.conn.close()
return True
except:
self.conn.rollback()
self.cur.close()
self.conn.close()
return False
def _getVals(self):
q = "select * from %s limit 1;" % self._table
q = self.cur.mogrify(q)
self.cur.execute(q)
vals = [val[0] for val in self.cur.description]
self._vals = vals
for val in vals:
self.__dict__[val] = None
self.cur.close()
self.conn.close()
@property
def exists(self):
if not hasattr(self, '_exists'):
if hasattr(self, '_key') and hasattr(self, '_table'):
q = 'select * from %(_table)s where %(_key)s =' % self.__dict__
q += ' %s;'
if self.select(q, self.__dict__[self._key]):
return True
return False
def _fetch(self, unid):
if hasattr(self, '_key') and hasattr(self, '_table'):
q = 'select * from %(_table)s where %(_key)s =' % self.__dict__
q += ' %s;'
vals = self.select(q, unid)
if vals:
vals = list(vals[::-1])
for val in self._vals:
self.__dict__[val] = vals.pop()
else:
for val in self._vals:
self.__dict__[val] = None
self.__dict__[self._key] = unid
def save(self):
if hasattr(self, '_key') and hasattr(self, '_table'):
if self.exists:
keys = tuple(self._vals)
values = tuple(self.__dict__[val] for val in self._vals)
q = 'update ' + self._table + ' set (' + ",".join(keys) + ') = %s where ' + self._key + ' = %s;'
return self.update(q, values, self.__dict__[self._key])
else:
q = "insert into " + self._table + " values %s;"
values = tuple(self.__dict__[val] for val in self._vals)
return self.update(q, values)
return False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment