Skip to content

Instantly share code, notes, and snippets.

@paulwinex
Last active December 6, 2017 12:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save paulwinex/4fafc88755f765f76be692f35cf01a2c to your computer and use it in GitHub Desktop.
Save paulwinex/4fafc88755f765f76be692f35cf01a2c to your computer and use it in GitHub Desktop.
import threading
import af
import logging as _logging
logger = _logging.getLogger(__name__)
logger.setLevel(_logging.DEBUG)
class Monitor(object):
class CLASSTYPE:
"""
Subscribe classes
"""
JOB = 'jobs'
TASK = 'tasks'
MONITOR = 'monitors'
RENDER = 'renders'
_need_ids = [TASK,] # subscribe class is require ids
_reconnect_timeout = 3
def __init__(self, callback=None, # send update data to this callback
full_subscribe=False, # auto subscribe all tasks
check_timeout_sec=3 # update timeout
):
self.id = None
self.__full_subscribe = full_subscribe
self.__timeout = check_timeout_sec
self.register()
self.__reg_attempts = 0
self.callback = callback
self.__is_stopped = False
self.requests_count = 0
# array for auto subscribe
self.subscriptions = dict(
jobs=[],
renders=[],
monitors=[],
users=[]
)
def __del__(self):
self.stop_monitor()
self.unregister()
def register(self):
"""
Register monitor
"""
self.id = cmd.monitorRegister()
self.subscribe(self.CLASSTYPE.JOB)
logger.debug('Monitor Registered: %s' % self.id)
def start_monitor(self, timeout=None):
"""
Start request loop in thread
:param timeout:
:return:
"""
self.__timeout = timeout or self.__timeout
self.__is_stopped = False
self.requests_count = 0
self.check_events_loop()
def check_events_loop(self):
"""
Request data and execute callback
"""
if self.__is_stopped:
return
if not self.callback:
logger.error('No callback')
return
if not callable(self.callback):
logger.error('Callback non callable')
return
response = self.get_events()
if response is False:
logger.warning('Register monitor...')
# start next loop
self.__ct = threading.Timer(self.__timeout, self.check_events_loop)
self.__ct.start()
# compute data
if response:
data = self.events_to_data(response)
if data:
# execute callback
self.callback(data)
# requests counter
self.requests_count += 1
def events_to_data(self, events):
"""
Convert af_server data to app data
:param events: dict
:return: dict
"""
new_data = dict(
jobs=[],
users=[],
monitors=[],
renders=[]
)
update_data = dict(
jobs=[],
users=[],
monitors=[],
renders=[],
tasks=[]
)
delete_data = dict(
jobs=[],
users=[],
monitors=[],
renders=[]
)
for event, values in events.items():
if event == 'jobs_change':
update_data['jobs'] += values
if self.__full_subscribe:
for job_id in values:
if job_id not in self.subscriptions['jobs']:
self.subscribe(self.CLASSTYPE.TASK, ids=[job_id])
self.subscriptions['jobs'].append(job_id)
logger.debug('Subscribe job tasks')
elif event == 'jobs_del':
# print 'delete', values
delete_data['jobs'] += values
if self.__full_subscribe:
for job_id in values:
if job_id in self.subscriptions['jobs']:
self.unsubscribe(self.CLASSTYPE.TASK, ids=[job_id])
self.subscriptions['jobs'].remove(job_id)
logger.debug('Unsubscribe job %s' % job_id)
# return cmd.getJobInfo(values[0])
elif event == 'jobs_add':
new_data['jobs'] += values
if self.__full_subscribe:
for job_id in values:
if job_id not in self.subscriptions['jobs']:
self.subscribe(self.CLASSTYPE.TASK, ids=[job_id])
self.subscriptions['jobs'].append(job_id)
logger.debug('Subscribe job tasks')
# return cmd.getJobInfo(values[0])
elif event == 'tasks_progress':
# compress tasks data
jobs = []
for job in values:
blocks = {}
for i, tsk in enumerate(job['tasks']):
task = job['progress'][i]
block_id = job['blocks'][i]
block = blocks.get(block_id, {})
block[tsk] = task
blocks[block_id] = block
jobs.append(dict(id=job['job_id'], blocks=blocks))
update_data['tasks'] += jobs
# todo: add other events
return dict(
new=new_data,
update=update_data,
delete=delete_data
)
def stop_monitor(self):
"""
Stop request loop
"""
self.__is_stopped = True
def get_events(self):
"""
Request events from af_server
:return: dict or None
"""
self.__reg_attempts += 1
resp = cmd.monitorEvents(self.id)
if 'events' not in resp:
# need to register monitor
if self.__reg_attempts < 10:
self.register()
else:
logger.error('Cant register monitor on server')
return False
self.__reg_attempts = 0
return resp['events']
def unregister(self):
"""
Unregister monitor
:return:
"""
return cmd.monitorUnregister(self.id)
def subscribe(self, class_type, **kw):
"""
Sunbscribe changes
:param class_type: Monitor.CLASSTYPE value
:param kw: dict ( {ids:[x1, x2, ...]} )
:return: afserver response dict
"""
if class_type in self.CLASSTYPE._need_ids:
if not 'ids' in kw:
logger.error('Class "%s" requires to specify ids' % class_type)
return False
return cmd.monitorSubscribe(self.id, class_type, **kw)
def unsubscribe(self, class_type, **kw):
if class_type in self.CLASSTYPE._need_ids:
if not 'ids' in kw:
logger.error('Class "%s" requires to specify ids' % class_type)
return False
return cmd.monitorUnsubscribe(self.id, class_type, **kw)
class Command(af.Cmd):
def setJobParameter(self, jobId, parm, value, verbose=False):
self.action = 'action'
self.data['type'] = 'jobs'
self.data['ids'] = [jobId]
self.data['params'] = {parm: value}
return self._sendRequest(verbose)
def userNames(self):
return [x['name'] for x in self.usersInfo()]
def usersInfo(self):
self.action = 'get'
self.data['type'] = 'users'
val = self._sendRequest()
return val['users']
def createUser(self, name, priority=50, verbose=False):
self.action = 'user'
self.data['name'] = name
self.data['priority'] = priority
resp = self._sendRequest(verbose)
return resp['user']['id']
def createUserIfNotexists(self, username=None, priority=50):
username = username or self.data['user_name']
if not username in self.userNames():
return self.createUser(username, priority)
def monitorSubscribe(self, monitorId, classType, **kw):
"""
Subscribe monitor with keywords
:param monitorId: id
:param classType: Monitor.CLASSTYPE
:param kw: dict
:return: afserver response
"""
self.action = "action"
self.data["type"] = "monitors"
self.data["ids"] = [monitorId]
self.data["operation"] = {"type": "watch",
"class": classType,
"status": "subscribe"}
self.data["operation"].update(kw)
return self._sendRequest()
def monitorUnsubscribe(self, monitorId, classType, **kw):
"""
Unsubscribe monitor with keywords
:param monitorId: id
:param classType: Monitor.CLASSTYPE
:param kw: dict
:return: afserver response
"""
self.action = "action"
self.data["type"] = "monitors"
self.data["ids"] = [monitorId]
self.data["operation"] = {"type": "watch",
"class": classType,
"status": "unsubscribe"}
self.data["operation"].update(kw)
return self._sendRequest()
cmd = Command()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment