Skip to content

Instantly share code, notes, and snippets.

@wadadaaa
Created October 31, 2017 11:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wadadaaa/8de5f04c70a1ba45362eee814f9e32b5 to your computer and use it in GitHub Desktop.
Save wadadaaa/8de5f04c70a1ba45362eee814f9e32b5 to your computer and use it in GitHub Desktop.
class HealthFlowHelper(object):
def __init__(self, f_name, ignored_exceptions=None):
self.ignored_exc_types = ignored_exceptions or list()
self.conn = get_connection()
self._endpoints = ('enter', 'exit', 'errors')
self._names_obj = namedtuple('Names', ' '.join(self._endpoints))
self.names = self._names_obj(*['fire_counter:{0}:{1}'.format(f_name, x) for x in self._endpoints])
self.conn.incr(self.names.enter)
def exit(self):
self.conn.incr(self.names.exit)
def __enter__(self):
pass
def __exit__(self, exc_type, exc_val, exc_tb):
if not exc_type:
self.exit()
else:
if exc_type not in self.ignored_exc_types:
self.conn.sadd(self.names.errors, '{}.{}_{}_{}'.format(
exc_type.__module__, exc_type.__name__, exc_tb.tb_lineno, traceback.format_exception_only(exc_type, exc_val)[0].strip()
))
return not bool(exc_type)
def hfc(ignored_exceptions=None):
def inner(func):
@wraps(func)
def wrapper(*args, **kwargs):
f_name = func.im_class if isinstance(func, (MethodType, UnboundMethodType)) else func.__module__
f_name = '.'.join([f_name, func.__name__])
with HealthFlowHelper(f_name, ignored_exceptions=ignored_exceptions):
return func(*args, **kwargs)
return wrapper
return inner
class HealthFlowHelperByCodeBlock(HealthFlowHelper):
# noinspection PyMissingConstructor
def __init__(self, f_name, block_name, ignored_exceptions=None):
self.ignored_exc_types = ignored_exceptions or list()
self.conn = get_connection()
self._endpoints = ('enter', 'exit', 'errors')
self._names_obj = namedtuple('Names', ' '.join(self._endpoints))
self.names = self._names_obj(
*['fire_counter_detailed:{0}:{1}:{2}'.format(f_name, block_name, x) for x in self._endpoints]
)
self.conn.incr(self.names.enter)
это для именованных блоков кода
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment