Created
February 9, 2016 15:01
-
-
Save milliams/600e04f3ea2756b18ca2 to your computer and use it in GitHub Desktop.
First implementation of the monitoring replacement
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Requirements | |
------------ | |
We need a thread-safe list of queued back-ends and of running back-ends | |
We need to be able to rate-limit per-backend | |
Implementation | |
-------------- | |
We have 1 + N threads running at any time. | |
One thread (``FillMonitoringQueue``) is the driver which, based on which backends are active, | |
adds backends to the monitoring schedule. It will loop constantly with a brief pause to not | |
overload the process. | |
We then have N other threads (``RunBackendMonitoring``) which request work from the schedule | |
and run the monitoring update function in their thread. The currently running backends are tracked | |
another list. | |
Implementation notes | |
-------------------- | |
concurrent.futures can't work since we can't force hanging threads to stop | |
We need to make threads which are daemons so we can kill them if necessary. | |
""" | |
from __future__ import print_function | |
import atexit | |
import threading | |
import time | |
from collections import deque, defaultdict | |
from datetime import datetime | |
config = {'default': 1, 'Dirac': 10, 'Local': 1, 'LCG': 4} # This is representing the PollThread config section | |
# These next few functions are for testing and represent the job registry with jobs becoming active or not | |
def add_backend(b): | |
backends.append(b) | |
def remove_backend(b): | |
backends.remove(b) | |
def active_backends(): | |
return backends | |
backends = [] | |
# The next few classes are the monitoring core and comprise the code to do the monitoring | |
monitoring_schedule = deque() | |
monitoring_running = deque() | |
class StoppableThread(threading.Thread): | |
""" | |
A thread which can be stopped politely bu calling ``stop()`` on it. | |
It will also be killed when the interpreter closes as it's a daemon thread | |
""" | |
def __init__(self, name): | |
super(StoppableThread, self).__init__(name=name) | |
self.daemon = True | |
self._stop = threading.Event() | |
def stop(self): | |
self._stop.set() | |
def stopped(self): | |
return self._stop.isSet() | |
class FillMonitoringQueue(StoppableThread): | |
def __init__(self, name): | |
super(FillMonitoringQueue, self).__init__(name=name) | |
self.last_run = defaultdict(lambda: datetime.min) | |
def run(self): | |
while not self.stopped(): | |
for backend in active_backends(): # For all active backends | |
if backend not in monitoring_schedule and backend not in monitoring_running: # If it's not already being monitored (or scheduled to be) | |
# If the backend was run too recently, don't do anything here | |
rate = config.get(backend, config['default']) | |
if (datetime.utcnow() - self.last_run[backend]).total_seconds() > rate: | |
monitoring_schedule.append(backend) # Add it to the queue. This is the only place anything should be added | |
self.last_run[backend] = datetime.utcnow() | |
time.sleep(1) # Don't check for jobs too regularly | |
class RunBackendMonitoring(StoppableThread): | |
def run(self): | |
while not self.stopped(): | |
try: | |
backend = monitoring_schedule.popleft() # Remove the backend from the schedule | |
monitoring_running.append(backend) # And add it to the list of running ones | |
monitor_jobs_on_backend(backend, ['job1', 'job2']) | |
except IndexError: | |
# Nothing to monitor | |
pass | |
time.sleep(1) # Don't monitor too regularly | |
def monitor_jobs_on_backend(backend, jobs): | |
print('running monitoring for {0}: {1}'.format(backend, jobs)) | |
time.sleep(1) # Faking the time it takes to complete monitoring | |
# backend.master_updateMonitoringInformation(jobs) | |
monitoring_running.remove(backend) | |
# These next few functions are the interface to the monitoring system and allow to turn it on or off. | |
monitoring_threads = [] | |
def start_monitoring(num_monitoring_threads=2): | |
if monitoring_threads: | |
raise RuntimeError('Monitoring is already running') | |
fill_monitoring_thread = FillMonitoringQueue('FillMonitoringQueue') | |
fill_monitoring_thread.start() | |
monitoring_threads.append(fill_monitoring_thread) | |
for i in range(num_monitoring_threads): | |
consumer = RunBackendMonitoring('RunBackendMonitoring_{0}'.format(i)) | |
consumer.start() | |
monitoring_threads.append(consumer) | |
@atexit.register | |
def stop_monitoring(force=False): | |
print('Trying to shutdown monitoring') | |
while True: | |
# Ask them to stop nicely | |
for t in monitoring_threads: | |
t.stop() | |
time.sleep(1) # How much chance to give the thread to exit once asked | |
# If we are forcing the shutdown then don't ask, just kill | |
# Do this after the sleep to be just a little nice about it | |
if force: | |
break | |
if any(t.is_alive() for t in monitoring_threads): | |
print('Some threads still running') | |
ans = raw_input('Really kill? [y/N] ') | |
if ans == 'y': | |
break | |
else: | |
break | |
del monitoring_threads[:] | |
# From here on is just for testing | |
start_monitoring() | |
add_backend('Dirac') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment