Skip to content

Instantly share code, notes, and snippets.

@motiteux
Forked from temoto/helpers_data.py
Created February 6, 2014 15:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save motiteux/8845869 to your computer and use it in GitHub Desktop.
Save motiteux/8845869 to your computer and use it in GitHub Desktop.
def namedlist(typename, field_names):
"""Returns a new subclass of list with named fields.
>>> Point = namedlist('Point', ('x', 'y'))
>>> Point.__doc__ # docstring for the new class
'Point(x, y)'
>>> p = Point(11, y=22) # instantiate with positional args or keywords
>>> p[0] + p[1] # indexable like a plain list
33
>>> x, y = p # unpack like a regular list
>>> x, y
(11, 22)
>>> p.x + p.y # fields also accessable by name
33
>>> d = p._asdict() # convert to a dictionary
>>> d['x']
11
>>> Point(**d) # convert from a dictionary
Point(x=11, y=22)
>>> p._replace(x=100) # _replace() is like str.replace() but targets named fields
Point(x=100, y=22)
"""
fields_len = len(field_names)
fields_text = repr(tuple(field_names)).replace("'", "")[1:-1] # tuple repr without parens or quotes
class ResultType(list):
__slots__ = ()
_fields = field_names
def _fixed_length_error(*args, **kwargs):
raise TypeError(u"Named list has fixed length")
append = _fixed_length_error
insert = _fixed_length_error
pop = _fixed_length_error
remove = _fixed_length_error
def sort(self):
raise TypeError(u"Sorting named list in place would corrupt field accessors. Use sorted(x)")
def _replace(self, **kwargs):
values = map(kwargs.pop, field_names, self)
if kwargs:
raise TypeError(u"Unexpected field names: {s!r}".format(kwargs.keys()))
if len(values) != fields_len:
raise TypeError(u"Expected {e} arguments, got {n}".format(e=fields_len, n=len(values)))
return ResultType(*values)
def __repr__(self):
items_repr=", ".join("{name}={value!r}".format(name=name, value=value)
for name, value in zip(field_names, self))
return "{typename}({items})".format(typename=typename, items=items_repr)
ResultType.__init__ = eval("lambda self, {fields}: self.__setitem__(slice(None, None, None), [{fields}])".format(fields=fields_text))
ResultType.__name__ = typename
for i, name in enumerate(field_names):
fget = eval("lambda self: self[{0:d}]".format(i))
fset = eval("lambda self, value: self.__setitem__({0:d}, value)".format(i))
setattr(ResultType, name, property(fget, fset))
return ResultType
def datetime_to_tuple(dt, precision=None):
"""datetime, precision -> tuple(year, month, day, hour, minute, second, microsecond)[:precision]
Reverse operation is `datetime(*tuple)`.
"""
full = (dt.year, dt.month, dt.day, dt.hour,
dt.minute, dt.second, dt.microsecond)
return full[:precision] if precision else full
def datetime_to_unix(dt, _epoch_ord=datetime.date(1970, 1, 1).toordinal()):
"""UTC datetime -> UNIX timestamp
Invariant: `datetime.utcfromtimestamp(datetime_to_unix(dt)) == dt`
"""
days = dt.date().toordinal() - _epoch_ord
hours = days * 24 + dt.hour
minutes = hours * 60 + dt.minute
seconds = minutes * 60 + dt.second
return seconds + dt.microsecond / 1e6
def str_to_date(s, format="%Y-%m-%d", _parse=datetime.datetime.strptime):
""" '2012-11-13' -> date(2012, 11, 13)
"""
dt = _parse(s, format)
return dt.date()
"""PostgreSQL DB helpers.
Uses two pools for PostgreSQL DB connections: one pool for connections in autocommit mode used by execute(),
one pool for connections in transaction mode.
Interface::
* autocommit() -> context manager, returns psycopg2.Cursor (in autocommit mode)
* execute(statement, params=None, repeat=True) -> psycopg2.Cursor
* transaction() -> context manager, returns psycopg2.Cursor inside explicit transaction
"""
import collections
import contextlib
import eventlet
import eventlet.db_pool
import logbook
# import logging
import psycopg2
import psycopg2.extensions
import psycopg2.extras
import psycopg2.pool
import random
import time
try:
import sqlalchemy
from sqlalchemy.dialects.postgresql.psycopg2 import PGDialect
_sa_class = sqlalchemy.sql.ClauseElement
_sa_dialect = PGDialect()
except ImportError:
_sa_class = None
_sa_dialect = None
# Select logbook or logging here
log = logbook.Logger('db')
# log = logging.getLogger('db')
class Error(Exception):
pass
class DictCursor(psycopg2.extras.DictCursor):
def execute(self, statement, params=None, record_type=None,
_sa_class=_sa_class, _sa_dialect=_sa_dialect):
"""Psycopg2.Cursor.execute wrapped with query time logging.
Returns self, so you can chain it with fetch* methods, etc.
"""
if _sa_class is not None and isinstance(statement, _sa_class):
compiled = statement.compile(dialect=_sa_dialect)
statement, params = compiled.string, compiled.params
self.connection.notices[:] = []
error = None
start = time.time()
try:
super(DictCursor, self).execute(statement, params)
except psycopg2.Error as error:
pass
total = round(time.time() - start, 3)
for notice in self.connection.notices:
log.notice(notice.strip().decode('utf-8', 'replace'))
if notice == "WARNING: there is already a transaction in progress\n":
raise Error(u"Nested BEGIN inside transaction. Aborting possibly broken code.")
sql = (self.mogrify(statement, params)
if not statement.lower().startswith("insert")
else statement).decode('utf-8', 'replace')
sql_id = id(sql)
log.info(u"Query [{time:.3f}] id={id} {sql}".format(
time=total, id=sql_id, sql=sql))
if error is not None:
raise error
return self
def executemany(self, statement, parameters):
return super(DictCursor, self).executemany(statement, parameters)
def callproc(self, procname, parameters):
return super(DictCursor, self).callproc(procname, parameters)
def scalar(self):
row = self.fetchone()
if row is None:
return None
return row[0]
class NamedTupleCursor(psycopg2.extras.NamedTupleCursor):
EmptyRecord = namedtuple("Record", ())
def execute(self, statement, params=None, record_type=None,
_sa_class=_sa_class, _sa_dialect=_sa_dialect):
"""Psycopg2.Cursor.execute wrapped with query time logging.
Returns cursor object, so you can chain it with fetch* methods, etc.
"""
if _sa_class is not None and isinstance(statement, _sa_class):
compiled = statement.compile(dialect=_sa_dialect)
statement, params = compiled.string, compiled.params
self.connection.notices[:] = []
error = None
start = time.time()
try:
super(NamedTupleCursor, self).execute(statement, params)
except psycopg2.Error as error:
pass
total = round(time.time() - start, 3)
for notice in self.connection.notices:
log.notice(notice.strip().decode('utf-8', 'replace'))
if notice == "WARNING: there is already a transaction in progress\n":
raise DbError(u"Nested BEGIN inside transaction. Aborting possibly broken code.")
sql = (self.mogrify(statement, params)
if not statement.lower().startswith("insert")
else statement).decode('utf-8', 'replace')
sql_id = id(sql)
log.info(u"Query [{time:.3f}] id={id} {sql}".format(
time=total, id=sql_id, sql=sql))
if error is not None:
raise error
self.Record = record_type
return self
def executemany(self, statement, parameters):
return super(NamedTupleCursor, self).executemany(statement, parameters)
def callproc(self, procname, parameters):
return super(NamedTupleCursor, self).callproc(procname, parameters)
def scalar(self):
row = self.fetchone()
if row is None:
return None
return row[0]
def _make_nt(self, _namedtuple=namedtuple):
if not self.description:
return NamedTupleCursor.EmptyRecord
columns = [d[0] if d[0] != "?column?" else "column" + str(i)
for i, d in enumerate(self.description, 1)]
return _namedtuple("Record", columns)
# Select default cursor class here
default_cursor_class = DictCursor
# default_cursor_class = NamedTupleCursor
class ReadCursor(object):
"""Read-only cursor-like object.
"""
rowcount = property(lambda self: self._rowcount)
def __init__(self, rows, rowcount):
self._rows = rows
self._rowcount = rowcount
def fetchone(self):
if self._rows is None:
raise psycopg2.ProgrammingError("no results to fetch")
if self._rowcount == 0:
return None
return self._rows[0]
def fetchmany(self, size=None):
if self._rows is None:
raise psycopg2.ProgrammingError("no results to fetch")
if size is None:
return self._rows
return self._rows[:size]
def fetchall(self):
if self._rows is None:
raise psycopg2.ProgrammingError("no results to fetch")
return self._rows
def __iter__(self):
if self._rows is None:
raise psycopg2.ProgrammingError("no results to fetch")
return iter(self._rows)
def scalar(self):
if self._rows is None:
raise psycopg2.ProgrammingError("no results to fetch")
if self._rowcount == 0:
return None
return self._rows[0][0]
class Connection(psycopg2.extensions.connection):
def commit(self):
start = time.time()
super(Connection, self).commit()
total = time.time() - start
log.info(u"Commit [{time:.3f}]".format(time=total))
def rollback(self):
start = time.time()
super(Connection, self).rollback()
total = time.time() - start
log.info(u"Rollback [{time:.3f}]".format(time=total))
def cursor(self, _klass=default_cursor_class, *args, **kwargs):
return super(Connection, self).cursor(
*args, cursor_factory=_klass, **kwargs)
class EventletConnectionPool(eventlet.db_pool.RawConnectionPool):
def connect(self, db_module, timeout, *args, **kwargs):
try:
connection = super(EventletConnectionPool, self).connect(db_module, timeout, *args, **kwargs)
except psycopg2.OperationalError:
raise
# Note: makes round-trip to DB. Only required for new connections.
connection.autocommit = True
return connection
@contextlib.contextmanager
def item(self):
close = True
conn = self.get()
# Note: makes round-trip to DB. Only required for new connections.
conn.autocommit = True
try:
yield conn
# no error
close = False
finally:
if close:
conn._base.close()
self.put(conn)
class ThreadConnectionPool(psycopg2.pool.ThreadedConnectionPool):
@contextlib.contextmanager
def item(self):
close = True
conn = self.getconn()
# Note: makes round-trip to DB. Only required for new connections.
conn.autocommit = True
try:
yield conn
# no error
close = False
finally:
self.putconn(conn, close=close or conn.closed)
def is_connection_error(e):
"""Exception object -> True | False
"""
if not isinstance(e, psycopg2.DatabaseError):
return False
error_str = str(e)
MSG1 = "socket not open"
MSG2 = "server closed the connection unexpectedly"
MSG3 = "could not connect to server"
return MSG1 in error_str or MSG2 in error_str or MSG3 in error_str
# TODO: override this if necessary
def get_connection_pool(group):
# the most straightforward threaded pool built-in psycopg2
return ThreadConnectionPool(
minconn=0, maxconn=10,
dsn=POSTGRESQL_DSN, connection_factory=Connection)
# eventlet pool
# return EventletConnectionPool(
# psycopg2, min_size=0, max_size=10, max_idle=10, max_age=60,
# dsn=POSTGRESQL_DSN, connection_factory=Connection)
# pre initialized pools for different groups of database servers
# return group_map[group]
@contextlib.contextmanager
def autocommit(group='default', connection_pool=None):
"""Context manager.
Executes block with new cursor from pooled connection in autocommit mode. Returns cursor.
At the end of the block, the connection is returned to pool.
>>> with autocommit() as cursor:
... cursor.execute("select 1")
... cursor.execute("select 2")
Use it when you do several selects and don't want to waste time for final ROLLBACK.
"""
pool = connection_pool or get_connection_pool(group)
with pool.item() as connection:
cursor = connection.cursor()
yield cursor
def execute(statement, params=None, group='default', connection_pool=None, repeat=True, record_type=None):
"""Shortcut for
1. get connection from pool, create new cursor
2. cursor.execute(statement, params)
3. cursor.fetchall() (if possible)
4. return connection to pool
Returns read-only cursor with rows.
On disconnect, if `repeat is True` attempts reconnect and repeats function call one more time.
If second attempt fails, raises exception.
"""
pool = connection_pool or get_connection_pool(group)
with pool.item() as connection:
try:
cursor = connection.cursor()
cursor.execute(statement, params, record_type=record_type)
rows = None
rowcount = cursor.rowcount
try:
rows = cursor.fetchall()
except psycopg2.ProgrammingError as e:
if str(e) != "no results to fetch":
raise
return ReadCursor(rows, rowcount)
except psycopg2.DatabaseError as e:
if repeat and is_connection_error(e):
log.warning(u"execute() DB disconnect, repeating query.")
else:
raise
# Connection lost, repeat.
return execute(statement, params, repeat=False)
def transaction(group='default', connection_pool=None):
"""Context manager.
Executes block with new cursor from pooled connection in transaction. Returns cursor.
At the end of the block, the connection is returned to pool.
Transaction is commited "on success".
>>> with transaction() as cursor:
... rows = cursor.execute(...).fetchall()
... process(rows)
... cursor.execute(...)
Always use it instead of manual BEGIN/ROLLBACK-s.
"""
pool = connection_pool or get_connection_pool(group)
with pool.item() as connection:
cursor = connection.cursor()
cursor.execute("begin")
try:
yield cursor
except Exception:
cursor.execute("rollback")
raise
else:
cursor.execute("commit")
def str_utf8(x):
"""
Returns the byte string representation of obj.
Like unicode(x).encode('utf-8') except it works for bytes.
"""
if isinstance(x, str):
return x
return unicode(x).encode('utf-8')
from cStringIO import StringIO
from gzip import GzipFile
import zlib
def gzip_string(s, level=6):
"""Compress string using gzip.
Default compression level is 6.
"""
zbuf = StringIO()
zfile = GzipFile(mode='wb', compresslevel=level, fileobj=zbuf)
zfile.write(s)
zfile.close()
return zbuf.getvalue()
def gunzip_string(s):
"""Decompress string using gzip.
See http://stackoverflow.com/questions/2695152/in-python-how-do-i-decode-gzip-encoding/2695466#2695466
"""
return zlib.decompress(s, 16 + zlib.MAX_WBITS)
import functools
# import logbook
import logging
import time
# log = logbook.Logger(__name__)
log = logging.getLogger(__name__)
def repr_func(f):
"""Attempt to get the most useful string representation of callable.
"""
name = getattr(f, 'func_name', '<unknown>')
func_code = getattr(f, 'func_code', None)
if func_code is not None:
return u'{name}() @ {fc.co_filename}:{fc.co_firstlineno}'.format(
name=name,
fc=func_code)
return repr(f)
def retry(tries, exceptions=(Exception,), delay=0):
"""
Decorator for retrying a function if exception occurs
tries -- num tries
exceptions -- exceptions to catch
delay -- wait between retries
"""
def wrapper(func):
@functools.wraps(func)
def wrapped(*args, **kwargs):
n = tries # copy to local variable for modification
while n > 0:
n -= 1
try:
return func(*args, **kwargs)
except exceptions as e:
if n == 0:
raise
# logbook
# log.error(u'retry: {f} {e}', f=repr_func(func), e=e)
# logging
log.error(u'retry: %s %s', repr_func(func), e)
time.sleep(delay)
return wrapped
return wrapper
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment