Last active
December 6, 2017 12:22
-
-
Save paulwinex/4fafc88755f765f76be692f35cf01a2c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
import af | |
import logging as _logging | |
logger = _logging.getLogger(__name__) | |
logger.setLevel(_logging.DEBUG) | |
class Monitor(object): | |
class CLASSTYPE: | |
""" | |
Subscribe classes | |
""" | |
JOB = 'jobs' | |
TASK = 'tasks' | |
MONITOR = 'monitors' | |
RENDER = 'renders' | |
_need_ids = [TASK,] # subscribe class is require ids | |
_reconnect_timeout = 3 | |
def __init__(self, callback=None, # send update data to this callback | |
full_subscribe=False, # auto subscribe all tasks | |
check_timeout_sec=3 # update timeout | |
): | |
self.id = None | |
self.__full_subscribe = full_subscribe | |
self.__timeout = check_timeout_sec | |
self.register() | |
self.__reg_attempts = 0 | |
self.callback = callback | |
self.__is_stopped = False | |
self.requests_count = 0 | |
# array for auto subscribe | |
self.subscriptions = dict( | |
jobs=[], | |
renders=[], | |
monitors=[], | |
users=[] | |
) | |
def __del__(self): | |
self.stop_monitor() | |
self.unregister() | |
def register(self): | |
""" | |
Register monitor | |
""" | |
self.id = cmd.monitorRegister() | |
self.subscribe(self.CLASSTYPE.JOB) | |
logger.debug('Monitor Registered: %s' % self.id) | |
def start_monitor(self, timeout=None): | |
""" | |
Start request loop in thread | |
:param timeout: | |
:return: | |
""" | |
self.__timeout = timeout or self.__timeout | |
self.__is_stopped = False | |
self.requests_count = 0 | |
self.check_events_loop() | |
def check_events_loop(self): | |
""" | |
Request data and execute callback | |
""" | |
if self.__is_stopped: | |
return | |
if not self.callback: | |
logger.error('No callback') | |
return | |
if not callable(self.callback): | |
logger.error('Callback non callable') | |
return | |
response = self.get_events() | |
if response is False: | |
logger.warning('Register monitor...') | |
# start next loop | |
self.__ct = threading.Timer(self.__timeout, self.check_events_loop) | |
self.__ct.start() | |
# compute data | |
if response: | |
data = self.events_to_data(response) | |
if data: | |
# execute callback | |
self.callback(data) | |
# requests counter | |
self.requests_count += 1 | |
def events_to_data(self, events): | |
""" | |
Convert af_server data to app data | |
:param events: dict | |
:return: dict | |
""" | |
new_data = dict( | |
jobs=[], | |
users=[], | |
monitors=[], | |
renders=[] | |
) | |
update_data = dict( | |
jobs=[], | |
users=[], | |
monitors=[], | |
renders=[], | |
tasks=[] | |
) | |
delete_data = dict( | |
jobs=[], | |
users=[], | |
monitors=[], | |
renders=[] | |
) | |
for event, values in events.items(): | |
if event == 'jobs_change': | |
update_data['jobs'] += values | |
if self.__full_subscribe: | |
for job_id in values: | |
if job_id not in self.subscriptions['jobs']: | |
self.subscribe(self.CLASSTYPE.TASK, ids=[job_id]) | |
self.subscriptions['jobs'].append(job_id) | |
logger.debug('Subscribe job tasks') | |
elif event == 'jobs_del': | |
# print 'delete', values | |
delete_data['jobs'] += values | |
if self.__full_subscribe: | |
for job_id in values: | |
if job_id in self.subscriptions['jobs']: | |
self.unsubscribe(self.CLASSTYPE.TASK, ids=[job_id]) | |
self.subscriptions['jobs'].remove(job_id) | |
logger.debug('Unsubscribe job %s' % job_id) | |
# return cmd.getJobInfo(values[0]) | |
elif event == 'jobs_add': | |
new_data['jobs'] += values | |
if self.__full_subscribe: | |
for job_id in values: | |
if job_id not in self.subscriptions['jobs']: | |
self.subscribe(self.CLASSTYPE.TASK, ids=[job_id]) | |
self.subscriptions['jobs'].append(job_id) | |
logger.debug('Subscribe job tasks') | |
# return cmd.getJobInfo(values[0]) | |
elif event == 'tasks_progress': | |
# compress tasks data | |
jobs = [] | |
for job in values: | |
blocks = {} | |
for i, tsk in enumerate(job['tasks']): | |
task = job['progress'][i] | |
block_id = job['blocks'][i] | |
block = blocks.get(block_id, {}) | |
block[tsk] = task | |
blocks[block_id] = block | |
jobs.append(dict(id=job['job_id'], blocks=blocks)) | |
update_data['tasks'] += jobs | |
# todo: add other events | |
return dict( | |
new=new_data, | |
update=update_data, | |
delete=delete_data | |
) | |
def stop_monitor(self): | |
""" | |
Stop request loop | |
""" | |
self.__is_stopped = True | |
def get_events(self): | |
""" | |
Request events from af_server | |
:return: dict or None | |
""" | |
self.__reg_attempts += 1 | |
resp = cmd.monitorEvents(self.id) | |
if 'events' not in resp: | |
# need to register monitor | |
if self.__reg_attempts < 10: | |
self.register() | |
else: | |
logger.error('Cant register monitor on server') | |
return False | |
self.__reg_attempts = 0 | |
return resp['events'] | |
def unregister(self): | |
""" | |
Unregister monitor | |
:return: | |
""" | |
return cmd.monitorUnregister(self.id) | |
def subscribe(self, class_type, **kw): | |
""" | |
Sunbscribe changes | |
:param class_type: Monitor.CLASSTYPE value | |
:param kw: dict ( {ids:[x1, x2, ...]} ) | |
:return: afserver response dict | |
""" | |
if class_type in self.CLASSTYPE._need_ids: | |
if not 'ids' in kw: | |
logger.error('Class "%s" requires to specify ids' % class_type) | |
return False | |
return cmd.monitorSubscribe(self.id, class_type, **kw) | |
def unsubscribe(self, class_type, **kw): | |
if class_type in self.CLASSTYPE._need_ids: | |
if not 'ids' in kw: | |
logger.error('Class "%s" requires to specify ids' % class_type) | |
return False | |
return cmd.monitorUnsubscribe(self.id, class_type, **kw) | |
class Command(af.Cmd): | |
def setJobParameter(self, jobId, parm, value, verbose=False): | |
self.action = 'action' | |
self.data['type'] = 'jobs' | |
self.data['ids'] = [jobId] | |
self.data['params'] = {parm: value} | |
return self._sendRequest(verbose) | |
def userNames(self): | |
return [x['name'] for x in self.usersInfo()] | |
def usersInfo(self): | |
self.action = 'get' | |
self.data['type'] = 'users' | |
val = self._sendRequest() | |
return val['users'] | |
def createUser(self, name, priority=50, verbose=False): | |
self.action = 'user' | |
self.data['name'] = name | |
self.data['priority'] = priority | |
resp = self._sendRequest(verbose) | |
return resp['user']['id'] | |
def createUserIfNotexists(self, username=None, priority=50): | |
username = username or self.data['user_name'] | |
if not username in self.userNames(): | |
return self.createUser(username, priority) | |
def monitorSubscribe(self, monitorId, classType, **kw): | |
""" | |
Subscribe monitor with keywords | |
:param monitorId: id | |
:param classType: Monitor.CLASSTYPE | |
:param kw: dict | |
:return: afserver response | |
""" | |
self.action = "action" | |
self.data["type"] = "monitors" | |
self.data["ids"] = [monitorId] | |
self.data["operation"] = {"type": "watch", | |
"class": classType, | |
"status": "subscribe"} | |
self.data["operation"].update(kw) | |
return self._sendRequest() | |
def monitorUnsubscribe(self, monitorId, classType, **kw): | |
""" | |
Unsubscribe monitor with keywords | |
:param monitorId: id | |
:param classType: Monitor.CLASSTYPE | |
:param kw: dict | |
:return: afserver response | |
""" | |
self.action = "action" | |
self.data["type"] = "monitors" | |
self.data["ids"] = [monitorId] | |
self.data["operation"] = {"type": "watch", | |
"class": classType, | |
"status": "unsubscribe"} | |
self.data["operation"].update(kw) | |
return self._sendRequest() | |
cmd = Command() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment