Created
September 11, 2015 11:28
-
-
Save tiborsimon/73ea25a50fe3341e561c to your computer and use it in GitHub Desktop.
Blinker monitoring wrapper
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from blinker import signal | |
import pprint | |
import datetime | |
class MultipleResponseError(Exception): | |
pass | |
class Signal(object): | |
def __init__(self, name): | |
self.name = name | |
def send(self, sender='Anonymous', allow_multiresp=False, get_responser=False, **kwargs): | |
ret = signal(self.name).send(sender, **kwargs) | |
if logging_enabled: | |
LOG.append(SignalEvent(self.name, sender, allow_multiresp, ret)) | |
if allow_multiresp: | |
if get_responser: | |
return ret | |
else: | |
return [r[1] for r in ret] | |
else: | |
if len(ret) == 1: | |
if get_responser: | |
return ret[0] | |
else: | |
return ret[0][1] | |
else: | |
raise MultipleResponseError('Expected only one response, {} responses received instead.. \n{}'.format(len(ret), pprint.pformat(ret))) | |
def connect(self, new_callable): | |
STARTUP.append(ConnectEvent(self.name, new_callable)) | |
return signal(self.name).connect(new_callable) | |
# S I G N A L D E F I N I T I O N S | |
# Environment handling | |
get_main_env_list = Signal('get_main_env_list') | |
get_sub_env_list = Signal('get_sub_env_list') | |
validate_env = Signal('validate_env') | |
# Job handling | |
create_job = Signal('create_job') | |
# S T A T I S C T I C A L I N S T R U M E N T A T I O N | |
LOG = [] | |
STARTUP = [] | |
logging_enabled = False | |
def print_log(): | |
if not logging_enabled: | |
print('Logging is disabled. Set signals.logging_enabled to True before eny signal is sent to enable it.') | |
return | |
print('Message flow:') | |
for l in LOG: | |
print(' {} -'.format(LOG.index(l)+1), l) | |
def get_log(): | |
if not logging_enabled: | |
return None | |
return LOG | |
def print_startup(): | |
print('System startup:') | |
for l in STARTUP: | |
print(' {} -'.format(STARTUP.index(l)+1), l) | |
def get_startup(): | |
return STARTUP | |
class SignalEvent(object): | |
def __init__(self, signal_name, sender, allow_multiresp, data): | |
self.timestamp = datetime.datetime.utcnow() | |
self.signal = signal_name | |
self.allow_multiresp = allow_multiresp | |
self.sender = sender | |
self.data = data | |
def __repr__(self): | |
if self.allow_multiresp: | |
return '{} signal: "{}" from "{}" multiresponse=True resulted: {}'.format(self.timestamp, self.signal, self.sender, self.data) | |
else: | |
return '{} signal: "{}" from "{}" multiresponse=False resulted: {}'.format(self.timestamp, self.signal, self.sender, self.data) | |
class ConnectEvent(object): | |
def __init__(self, signal_name, connection): | |
self.timestamp = datetime.datetime.utcnow() | |
self.signal = signal_name | |
self.connection = connection | |
def __repr__(self): | |
return '{} "{}" signal was connected to {} -> {}'.format(self.timestamp, self.signal, self.connection.__module__, self.connection.__name__) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment