Skip to content

Instantly share code, notes, and snippets.

@msiemens
Last active August 29, 2015 14:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save msiemens/509416f4e171d213b56e to your computer and use it in GitHub Desktop.
Save msiemens/509416f4e171d213b56e to your computer and use it in GitHub Desktop.
My helpers.py file
"""
My compilation of small helper functions.
"""
import logging
import os
import threading
import traceback
import sys
from logging.handlers import SMTPHandler
from pygments import highlight
from pygments.lexers import get_lexer_by_name
from pygments.formatters import TerminalFormatter
def install_thread_excepthook():
"""
Workaround for sys.excepthook thread bug
From
http://spyced.blogspot.com/2007/06/workaround-for-sysexcepthook-bug.html
(https://sourceforge.net/tracker/?func=detail&atid=105470&aid=1230540&group_id=5470).
Call once from __main__ before creating any threads.
If using psyco, call psyco.cannotcompile(threading.Thread.run)
since this replaces a new-style class method.
"""
init_old = threading.Thread.__init__
def init(self, *args, **kwargs):
init_old(self, *args, **kwargs)
run_old = self.run
def run_with_except_hook(*args, **kw):
try:
run_old(*args, **kw)
except (KeyboardInterrupt, SystemExit):
raise
except:
sys.excepthook(*sys.exc_info())
self.run = run_with_except_hook
threading.Thread.__init__ = init
def highlited_excepthook():
"""
Let all traceback messages be highlighted :)
Needs Pygments package.
Source: http://stackoverflow.com/a/14776693/997063
"""
def excepthook(type, value, tb):
tbtext = ''.join(traceback.format_exception(type, value, tb))
lexer = get_lexer_by_name("pytb", stripall=True)
formatter = TerminalFormatter()
sys.stderr.write(highlight(tbtext, lexer, formatter))
sys.excepthook = excepthook
install_thread_excepthook()
class Struct(object):
"""
Helper class: Access dict keys like objects.
Attention: Differs from dict!
- iter(Struct) returns key, value
"""
def __init__(self, **entries):
# Recursion!
for key in entries:
if isinstance(entries[key], dict):
entries[key] = Struct(**entries[key])
self.__dict__.update(entries)
def __repr__(self):
return str(self.__dict__)
def __iter__(self):
return iter(self.__dict__.items())
def __getitem__(self, key):
return self.__dict__[key]
def __setitem__(self, key, value):
return self.__dict__.update({key: value})
def mail_logging(logger, config):
"""
Log error messages to mail.
config = {
'SMTP_SERVER': '...',
'SMTP_USERNAME': '...',
'SMTP_PASSWORD': '...',
'MAIL_FROM': '...',
'MAIL_TO': '...',
'MAIL_SUBJECT': '...'
}
:type _logger: logging.Logger
"""
def mail_get_subject(self, record):
return self.subject + ': ' + str(record.exc_info[0].__name__)
class MyFormatter(logging.Formatter):
def __init__(self, *args, **kwargs):
super(MyFormatter, self).__init__(*args, **kwargs)
def format(self, record):
"""
:type record: logging.LogRecord
"""
exc = record.exc_info
exc_location = traceback.extract_tb(exc[2])[-1]
#: == (filename, line number, function name, text)
record.pathname = exc_location[0]
record.lineno = exc_location[1]
record.funcName = exc_location[2]
record.module = exc_location[3] # Misuse module for line content
return super(MyFormatter, self).format(record)
mail_handler = SMTPHandler(config['SMTP_SERVER'],
config['MAIL_FROM'],
[config['MAIL_TO']],
config['MAIL_SUBJECT'],
(config['SMTP_USERNAME'], config['SMTP_PASSWORD']),
tuple() if config['SMTP_SSL'] == 1 else None)
mail_handler.getSubject = types.MethodType(mail_get_subject, mail_handler)
mail_handler.setFormatter(MyFormatter('''
Message type: %(levelname)s
Location: %(pathname)s:%(lineno)d
Function: %(funcName)s
Line: %(module)s
Time: %(asctime)s
Message: %(message)s
'''))
mail_handler.setLevel(logging.ERROR)
logger.addHandler(mail_handler)
class FileLockException(Exception):
pass
class FileLock(object):
""" A file locking mechanism that has context-manager support so
you can use it in a with statement. This should be relatively cross
compatible as it doesn't rely on msvcrt or fcntl for the locking.
"""
def __init__(self, file_name, timeout=10, delay=.05):
""" Prepare the file locker. Specify the file to lock and optionally
the maximum timeout and the delay between each attempt to lock.
"""
self.is_locked = False
self.lockfile = os.path.join(os.getcwd(), "%s.lock" % file_name)
self.file_name = file_name
self.timeout = timeout
self.delay = delay
def acquire(self):
""" Acquire the lock, if possible. If the lock is in use, it check again
every `wait` seconds. It does this until it either gets the lock or
exceeds `timeout` number of seconds, in which case it throws
an exception.
"""
start_time = time.time()
while True:
try:
self.fd = os.open(self.lockfile, os.O_CREAT|os.O_EXCL|os.O_RDWR)
break;
except OSError as e:
if e.errno != errno.EEXIST:
raise
if (time.time() - start_time) >= self.timeout:
raise FileLockException("Timeout occured.")
time.sleep(self.delay)
self.is_locked = True
def release(self):
""" Get rid of the lock by deleting the lockfile.
When working in a `with` statement, this gets automatically
called at the end.
"""
if self.is_locked:
os.close(self.fd)
os.unlink(self.lockfile)
self.is_locked = False
def __enter__(self):
""" Activated when used in the with statement.
Should automatically acquire a lock to be used in the with block.
"""
if not self.is_locked:
self.acquire()
return self
def __exit__(self, type, value, traceback):
""" Activated at the end of the with statement.
It automatically releases the lock if it isn't locked.
"""
if self.is_locked:
self.release()
def __del__(self):
""" Make sure that the FileLock instance doesn't leave a lockfile
lying around.
"""
self.release()
@decorator
def synchronized(method, self, *args, **kwargs):
"""
Make a function call synchronized. Works only on class members!
Needs decorator package.
"""
try:
self._lock
except AttributeError:
self._lock = threading.Lock()
with self._lock:
return method(self, *args, **kwargs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment