Created
February 16, 2011 02:56
-
-
Save kergoth/828777 to your computer and use it in GitHub Desktop.
Prototype / Experimentations with an alternative runqueue task executor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import multiprocessing | |
import os | |
import sys | |
import bb.data, bb.cache | |
from queue import Empty | |
class Task(object): | |
def __init__(self, taskid, name, fn, taskhash, appends): | |
self.hash = taskhash | |
self.id = taskid | |
self.name = name | |
self.fn = fn | |
self.appends = appends | |
# Add to RunQueue/RunQueueExecute and pass to the TaskRunner | |
def get_tasks(self): | |
while self.stats.total: | |
taskid = self.sched.next() | |
if taskid: | |
fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[taskid]] | |
taskname = self.rqdata.runq_task[taskid] | |
taskhash = self.rqdata.runq_hash[taskid] | |
appends = self.cooker.get_file_appends(fn) | |
yield Task(taskid, taskname, fn, taskhash, appends) | |
class TaskRunner(object): | |
"""Execute tasks on behalf of the runqueue""" | |
def __init__(self, cfgdata, get_tasks): | |
self.get_tasks = get_tasks | |
self.cfg = cfgdata | |
self.num_processes = multiprocessing.cpu_count() | |
def start(self): | |
self.manager = multiprocessing.Manager() | |
self.events = self.manager.Queue() | |
self.pool = multiprocessing.Pool(self.num_processes, init_process, | |
(self.cfg, self.events)) | |
self.results = self.pool.imap(execute_task, self.get_tasks()) | |
self.pool.close() | |
self.processor = EventProcessor(self.events, self.cfg, | |
name='Event Processor') | |
self.processor.start() | |
# FIXME: this implementation spins in get_tasks() waiting on the next | |
# task from the scheduler. I think this means an idle worker will sit | |
# spinning in the get_tasks() loop if nothing is buildable, rather than | |
# waiting for some sort of event. This isn't ideal. Either I should | |
# switch to using apply_async or something instead of imap, or feed tasks | |
# to the workers via a queue, so they block waiting for the next ones. | |
# The runqueue execute() would then simply hand off tasks to the queue | |
# as they become buildable and get results as they become available. | |
# Another idea would be to use an Event/Condition as a notification | |
# of a task being completed, and therefore there potentially being a new | |
# buildable task. Another issue to consider is the danger of handing | |
# off a given task more than once, which the current RunQueueExecute code | |
# handles by tracking what tasks have been marked as currently running. | |
def next(self): | |
try: | |
return self.results.next(0.25) | |
except multiprocessing.TimeoutError: | |
return | |
def shutdown(self, clean=True): | |
self.events.put('STOP') | |
self.events.close() | |
if not clean: | |
self.pool.terminate() | |
self.pool.join() | |
self.processor.join() | |
def init_process(cfg, eventqueue): | |
execute_task.cfg = cfg | |
# Make the child the process group leader | |
os.setpgid(0, 0) | |
# No stdin | |
newsi = os.open(os.devnull, os.O_RDWR) | |
os.dup2(newsi, sys.stdin.fileno()) | |
bb.event.worker_pid = os.getpid() | |
bb.event.worker_pipe = eventqueue | |
def execute_task(self, task): | |
try: | |
data = bb.cache.Cache.loadDataFull(task.fn, task.appends, | |
execute_task.cfg) | |
data.setVar('BB_TASKHASH', task.hash) | |
os.environ.clear() | |
os.environ.update(bb.data.exported_vars(data)) | |
bb.build.exec_task(task.fn, task.name, data) | |
except Exception as exc: | |
exc.task = task | |
raise | |
else: | |
return task | |
class EventProcessor(multiprocessing.Process): | |
"""Dispatch events received in our queue to the UI""" | |
def __init__(self, queue, cfgdata): | |
self.queue = queue | |
self.cfg = cfgdata | |
multiprocessing.Process.__init__() | |
def run(self): | |
for event in queue_generator(self.queue): | |
bb.event.fire_ui_handlers(event, self.cfg) | |
def queue_generator(queue): | |
while True: | |
try: | |
value = queue.get() | |
except Empty: | |
break | |
else: | |
if value == 'STOP': | |
drain_queue(queue) | |
break | |
else: | |
yield value | |
def drain_queue(queue): | |
while True: | |
try: | |
queue.get(block=False, timeout=0) | |
except Empty: | |
break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment