Skip to content

Instantly share code, notes, and snippets.

@kergoth
Created February 16, 2011 02:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kergoth/828777 to your computer and use it in GitHub Desktop.
Save kergoth/828777 to your computer and use it in GitHub Desktop.
Prototype / Experimentations with an alternative runqueue task executor
import multiprocessing
import os
import sys
import bb.data, bb.cache
from queue import Empty
class Task(object):
def __init__(self, taskid, name, fn, taskhash, appends):
self.hash = taskhash
self.id = taskid
self.name = name
self.fn = fn
self.appends = appends
# Add to RunQueue/RunQueueExecute and pass to the TaskRunner
def get_tasks(self):
while self.stats.total:
taskid = self.sched.next()
if taskid:
fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[taskid]]
taskname = self.rqdata.runq_task[taskid]
taskhash = self.rqdata.runq_hash[taskid]
appends = self.cooker.get_file_appends(fn)
yield Task(taskid, taskname, fn, taskhash, appends)
class TaskRunner(object):
"""Execute tasks on behalf of the runqueue"""
def __init__(self, cfgdata, get_tasks):
self.get_tasks = get_tasks
self.cfg = cfgdata
self.num_processes = multiprocessing.cpu_count()
def start(self):
self.manager = multiprocessing.Manager()
self.events = self.manager.Queue()
self.pool = multiprocessing.Pool(self.num_processes, init_process,
(self.cfg, self.events))
self.results = self.pool.imap(execute_task, self.get_tasks())
self.pool.close()
self.processor = EventProcessor(self.events, self.cfg,
name='Event Processor')
self.processor.start()
# FIXME: this implementation spins in get_tasks() waiting on the next
# task from the scheduler. I think this means an idle worker will sit
# spinning in the get_tasks() loop if nothing is buildable, rather than
# waiting for some sort of event. This isn't ideal. Either I should
# switch to using apply_async or something instead of imap, or feed tasks
# to the workers via a queue, so they block waiting for the next ones.
# The runqueue execute() would then simply hand off tasks to the queue
# as they become buildable and get results as they become available.
# Another idea would be to use an Event/Condition as a notification
# of a task being completed, and therefore there potentially being a new
# buildable task. Another issue to consider is the danger of handing
# off a given task more than once, which the current RunQueueExecute code
# handles by tracking what tasks have been marked as currently running.
def next(self):
try:
return self.results.next(0.25)
except multiprocessing.TimeoutError:
return
def shutdown(self, clean=True):
self.events.put('STOP')
self.events.close()
if not clean:
self.pool.terminate()
self.pool.join()
self.processor.join()
def init_process(cfg, eventqueue):
execute_task.cfg = cfg
# Make the child the process group leader
os.setpgid(0, 0)
# No stdin
newsi = os.open(os.devnull, os.O_RDWR)
os.dup2(newsi, sys.stdin.fileno())
bb.event.worker_pid = os.getpid()
bb.event.worker_pipe = eventqueue
def execute_task(self, task):
try:
data = bb.cache.Cache.loadDataFull(task.fn, task.appends,
execute_task.cfg)
data.setVar('BB_TASKHASH', task.hash)
os.environ.clear()
os.environ.update(bb.data.exported_vars(data))
bb.build.exec_task(task.fn, task.name, data)
except Exception as exc:
exc.task = task
raise
else:
return task
class EventProcessor(multiprocessing.Process):
"""Dispatch events received in our queue to the UI"""
def __init__(self, queue, cfgdata):
self.queue = queue
self.cfg = cfgdata
multiprocessing.Process.__init__()
def run(self):
for event in queue_generator(self.queue):
bb.event.fire_ui_handlers(event, self.cfg)
def queue_generator(queue):
while True:
try:
value = queue.get()
except Empty:
break
else:
if value == 'STOP':
drain_queue(queue)
break
else:
yield value
def drain_queue(queue):
while True:
try:
queue.get(block=False, timeout=0)
except Empty:
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment