Skip to content

Instantly share code, notes, and snippets.

Created February 9, 2016 15:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save milliams/600e04f3ea2756b18ca2 to your computer and use it in GitHub Desktop.
Save milliams/600e04f3ea2756b18ca2 to your computer and use it in GitHub Desktop.
First implementation of the monitoring replacement
We need a thread-safe list of queued back-ends and of running back-ends
We need to be able to rate-limit per-backend
We have 1 + N threads running at any time.
One thread (``FillMonitoringQueue``) is the driver which, based on which backends are active,
adds backends to the monitoring schedule. It will loop constantly with a brief pause to not
overload the process.
We then have N other threads (``RunBackendMonitoring``) which request work from the schedule
and run the monitoring update function in their thread. The currently running backends are tracked
another list.
Implementation notes
concurrent.futures can't work since we can't force hanging threads to stop
We need to make threads which are daemons so we can kill them if necessary.
from __future__ import print_function
import atexit
import threading
import time
from collections import deque, defaultdict
from datetime import datetime
config = {'default': 1, 'Dirac': 10, 'Local': 1, 'LCG': 4} # This is representing the PollThread config section
# These next few functions are for testing and represent the job registry with jobs becoming active or not
def add_backend(b):
def remove_backend(b):
def active_backends():
return backends
backends = []
# The next few classes are the monitoring core and comprise the code to do the monitoring
monitoring_schedule = deque()
monitoring_running = deque()
class StoppableThread(threading.Thread):
A thread which can be stopped politely bu calling ``stop()`` on it.
It will also be killed when the interpreter closes as it's a daemon thread
def __init__(self, name):
super(StoppableThread, self).__init__(name=name)
self.daemon = True
self._stop = threading.Event()
def stop(self):
def stopped(self):
return self._stop.isSet()
class FillMonitoringQueue(StoppableThread):
def __init__(self, name):
super(FillMonitoringQueue, self).__init__(name=name)
self.last_run = defaultdict(lambda: datetime.min)
def run(self):
while not self.stopped():
for backend in active_backends(): # For all active backends
if backend not in monitoring_schedule and backend not in monitoring_running: # If it's not already being monitored (or scheduled to be)
# If the backend was run too recently, don't do anything here
rate = config.get(backend, config['default'])
if (datetime.utcnow() - self.last_run[backend]).total_seconds() > rate:
monitoring_schedule.append(backend) # Add it to the queue. This is the only place anything should be added
self.last_run[backend] = datetime.utcnow()
time.sleep(1) # Don't check for jobs too regularly
class RunBackendMonitoring(StoppableThread):
def run(self):
while not self.stopped():
backend = monitoring_schedule.popleft() # Remove the backend from the schedule
monitoring_running.append(backend) # And add it to the list of running ones
monitor_jobs_on_backend(backend, ['job1', 'job2'])
except IndexError:
# Nothing to monitor
time.sleep(1) # Don't monitor too regularly
def monitor_jobs_on_backend(backend, jobs):
print('running monitoring for {0}: {1}'.format(backend, jobs))
time.sleep(1) # Faking the time it takes to complete monitoring
# backend.master_updateMonitoringInformation(jobs)
# These next few functions are the interface to the monitoring system and allow to turn it on or off.
monitoring_threads = []
def start_monitoring(num_monitoring_threads=2):
if monitoring_threads:
raise RuntimeError('Monitoring is already running')
fill_monitoring_thread = FillMonitoringQueue('FillMonitoringQueue')
for i in range(num_monitoring_threads):
consumer = RunBackendMonitoring('RunBackendMonitoring_{0}'.format(i))
def stop_monitoring(force=False):
print('Trying to shutdown monitoring')
while True:
# Ask them to stop nicely
for t in monitoring_threads:
time.sleep(1) # How much chance to give the thread to exit once asked
# If we are forcing the shutdown then don't ask, just kill
# Do this after the sleep to be just a little nice about it
if force:
if any(t.is_alive() for t in monitoring_threads):
print('Some threads still running')
ans = raw_input('Really kill? [y/N] ')
if ans == 'y':
del monitoring_threads[:]
# From here on is just for testing
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment