Skip to content

Instantly share code, notes, and snippets.

@milliams
Created February 9, 2016 15:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save milliams/600e04f3ea2756b18ca2 to your computer and use it in GitHub Desktop.
Save milliams/600e04f3ea2756b18ca2 to your computer and use it in GitHub Desktop.
First implementation of the monitoring replacement
"""
Requirements
------------
We need a thread-safe list of queued back-ends and of running back-ends
We need to be able to rate-limit per-backend
Implementation
--------------
We have 1 + N threads running at any time.
One thread (``FillMonitoringQueue``) is the driver which, based on which backends are active,
adds backends to the monitoring schedule. It will loop constantly with a brief pause to not
overload the process.
We then have N other threads (``RunBackendMonitoring``) which request work from the schedule
and run the monitoring update function in their thread. The currently running backends are tracked
another list.
Implementation notes
--------------------
concurrent.futures can't work since we can't force hanging threads to stop
We need to make threads which are daemons so we can kill them if necessary.
"""
from __future__ import print_function
import atexit
import threading
import time
from collections import deque, defaultdict
from datetime import datetime
config = {'default': 1, 'Dirac': 10, 'Local': 1, 'LCG': 4} # This is representing the PollThread config section
# These next few functions are for testing and represent the job registry with jobs becoming active or not
def add_backend(b):
backends.append(b)
def remove_backend(b):
backends.remove(b)
def active_backends():
return backends
backends = []
# The next few classes are the monitoring core and comprise the code to do the monitoring
monitoring_schedule = deque()
monitoring_running = deque()
class StoppableThread(threading.Thread):
"""
A thread which can be stopped politely bu calling ``stop()`` on it.
It will also be killed when the interpreter closes as it's a daemon thread
"""
def __init__(self, name):
super(StoppableThread, self).__init__(name=name)
self.daemon = True
self._stop = threading.Event()
def stop(self):
self._stop.set()
def stopped(self):
return self._stop.isSet()
class FillMonitoringQueue(StoppableThread):
def __init__(self, name):
super(FillMonitoringQueue, self).__init__(name=name)
self.last_run = defaultdict(lambda: datetime.min)
def run(self):
while not self.stopped():
for backend in active_backends(): # For all active backends
if backend not in monitoring_schedule and backend not in monitoring_running: # If it's not already being monitored (or scheduled to be)
# If the backend was run too recently, don't do anything here
rate = config.get(backend, config['default'])
if (datetime.utcnow() - self.last_run[backend]).total_seconds() > rate:
monitoring_schedule.append(backend) # Add it to the queue. This is the only place anything should be added
self.last_run[backend] = datetime.utcnow()
time.sleep(1) # Don't check for jobs too regularly
class RunBackendMonitoring(StoppableThread):
def run(self):
while not self.stopped():
try:
backend = monitoring_schedule.popleft() # Remove the backend from the schedule
monitoring_running.append(backend) # And add it to the list of running ones
monitor_jobs_on_backend(backend, ['job1', 'job2'])
except IndexError:
# Nothing to monitor
pass
time.sleep(1) # Don't monitor too regularly
def monitor_jobs_on_backend(backend, jobs):
print('running monitoring for {0}: {1}'.format(backend, jobs))
time.sleep(1) # Faking the time it takes to complete monitoring
# backend.master_updateMonitoringInformation(jobs)
monitoring_running.remove(backend)
# These next few functions are the interface to the monitoring system and allow to turn it on or off.
monitoring_threads = []
def start_monitoring(num_monitoring_threads=2):
if monitoring_threads:
raise RuntimeError('Monitoring is already running')
fill_monitoring_thread = FillMonitoringQueue('FillMonitoringQueue')
fill_monitoring_thread.start()
monitoring_threads.append(fill_monitoring_thread)
for i in range(num_monitoring_threads):
consumer = RunBackendMonitoring('RunBackendMonitoring_{0}'.format(i))
consumer.start()
monitoring_threads.append(consumer)
@atexit.register
def stop_monitoring(force=False):
print('Trying to shutdown monitoring')
while True:
# Ask them to stop nicely
for t in monitoring_threads:
t.stop()
time.sleep(1) # How much chance to give the thread to exit once asked
# If we are forcing the shutdown then don't ask, just kill
# Do this after the sleep to be just a little nice about it
if force:
break
if any(t.is_alive() for t in monitoring_threads):
print('Some threads still running')
ans = raw_input('Really kill? [y/N] ')
if ans == 'y':
break
else:
break
del monitoring_threads[:]
# From here on is just for testing
start_monitoring()
add_backend('Dirac')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment