Skip to content

Instantly share code, notes, and snippets.

@milljm
Last active January 4, 2022 20:47
Show Gist options
  • Save milljm/0d0cc40f99da3f17ef2ea057de4313c5 to your computer and use it in GitHub Desktop.
Save milljm/0d0cc40f99da3f17ef2ea057de4313c5 to your computer and use it in GitHub Desktop.
Shared Thread Pool
#!/usr/bin/env python
import os, traceback
from time import sleep
from multiprocessing.pool import ThreadPool
import threading # for thread locking and thread timers
class Scheduler:
"""
A simple threading scheduler.
Instance Scheduler(int), where int is the ammount of processors you wish to
make available while performing work. This is normally the total number of
cores available on a machine.
Call Scheduler.schedule(task), to begin scheduling 'task' to perform work.
When an available process becomes available, Scheduler will the run method.
When you have scheduled everything you wish to run, call Scheduler.waitFinish()
which will return, once until all scheduled jobs have completed.
"""
def __init__(self, available_processors, average_load=64):
# Hard limit machine usage
self.average_load = average_load
# Initialize run_pool based on available slots
self.run_pool = ThreadPool(processes=available_processors)
# Slot lock when processing resource allocations and modifying slots_in_use
self.slot_lock = threading.Lock()
# Current slots in use
self.slots_in_use = 0
# Number of available slots
self.available_slots = available_processors
# Sets of threading objects created by jobs entering and exiting the queue. When scheduler.waitFinish()
# is called, and the thread pool is empty, the pool closes, and the call to waitFinish() returns.
self.__thread_pool_lock = threading.Lock()
self.__runner_pool_jobs = set([])
# Max time a job is allowed to run in a thread pool
# Set to something reasonable, or False to allow infinite
self.__max_time = False
# Allow threads the ability to set a global error state
self.__error_state = False
def schedule(self, job):
"""
Do stuff with incomming objects and submit them to the
thread pool to perform work.
"""
# If we are not to schedule any more jobs for some reason, return now
if self.__error_state:
return
self._queueJobs((job, 'hold'))
def waitFinish(self):
"""
Inform the Scheduler there are no further jobs to schedule.
Return once all jobs have completed.
"""
try:
# wait until there is an error, or if all the queus are empty
with self.__thread_pool_lock:
waiting_on_runner_pool = sum(1 for x in self.__runner_pool_jobs if not x.ready())
while waiting_on_runner_pool:
if self.__error_state:
break
with self.__thread_pool_lock:
waiting_on_runner_pool = sum(1 for x in self.__runner_pool_jobs if not x.ready())
sleep(0.1)
if not self.__error_state:
self.run_pool.close()
self.run_pool.join()
except KeyboardInterrupt:
# Do ctrl-c stuff
pass
def run(self, job):
""" do the work """
print(job)
sleep(1)
return
def _queueJobs(self, job):
"""
submit work to a thread pool
"""
(a_job, state) = job
with self.__thread_pool_lock:
if state == 'hold':
job = (a_job, 'queued')
self.__runner_pool_jobs.add(self.run_pool.apply_async(self._runJob, (job,)))
def _getLoad(self):
""" Method to return current load average """
loadAverage = 0.0
try:
loadAverage = os.getloadavg()[0]
except AttributeError:
pass # getloadavg() not available in this implementation of os
return loadAverage
def _satisfyLoad(self):
""" Method for controlling load average """
while self.slots_in_use > 1 and self._getLoad() >= self.average_load:
sleep(1.0)
def _reserveSlots(self):
"""
Method which allocates resources to perform the job. Returns bool if job
should be allowed to run based on available resources.
"""
self._satisfyLoad()
with self.slot_lock:
can_run = False
if self.slots_in_use + 1 <= self.available_slots:
can_run = True
if can_run:
self.slots_in_use += 1
return can_run
def _handleTimeoutJob(self, job):
""" Handle jobs that have timed out """
# Do stuff if the job has surpassed a max time
pass
def _runJob(self, job):
""" Method the run_pool calls when an available thread becomes ready """
if self.__error_state:
return
(a_job, state) = job
try:
# see if we have enough slots to start this job
if self._reserveSlots():
# Create timeout timer
if self.__max_time:
timeout_timer = threading.Timer(float(), self._handleTimeoutJob, (job,))
timeout_timer.start()
self.run(a_job)
if self.__max_time:
timeout_timer.cancel()
# a_job is now finished
job = (a_job, 'finished')
# Recover slots
with self.slot_lock:
self.slots_in_use = max(0, self.slots_in_use - 1)
# Not enough slots to run the job
else:
# set job on hold, so it will re-enter the queue
# sleep a little bit so as not to saturate the main thread
job = (a_job, 'hold')
sleep(.1)
# Job is done (or needs to re-enter the queue)
self._queueJobs(job)
except Exception as e:
print('runWorker Exception: %s' % (traceback.format_exc()))
except KeyboardInterrupt:
# Do ctrl-c stuff
pass
if __name__ == '__main__':
s = Scheduler(6)
for job in range(5):
s.schedule('hello %d times!' % (job))
s.waitFinish()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment