Skip to content

Instantly share code, notes, and snippets.

@jbohren
Last active August 29, 2015 14:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jbohren/9cd689c08782d6ffbcdf to your computer and use it in GitHub Desktop.
Save jbohren/9cd689c08782d6ffbcdf to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
from __future__ import print_function
import glob
import sys
import threading
import time
import traceback
try:
# Python3
from queue import Queue
from queue import Empty
except ImportError:
# Python2
from Queue import Queue
from Queue import Empty
try:
# Python3
import asyncio
except ImportError:
# Python2
import trollius as asyncio
from StringIO import StringIO
from concurrent.futures import ThreadPoolExecutor
from itertools import tee
from osrf_pycommon.process_utils import async_execute_process
from osrf_pycommon.process_utils import AsyncSubprocessProtocol
from osrf_pycommon.process_utils import get_loop
from catkin_tools.common import format_time_delta
from catkin_tools.common import format_time_delta_short
from catkin_tools.common import log
from catkin_tools.common import wide_log
from jobserver import JobServer
from catkin_tools.terminal_color import ansi
from catkin_tools.terminal_color import fmt
from catkin_tools.terminal_color import sanitize
from catkin_tools.terminal_color import ColorMapper
# This map translates more human reable format strings into colorized versions
_color_translation_map = {
# 'output': 'colorized_output'
'': fmt('@!' + sanitize('') + '@|'),
# Job starting
"Starting >>> {:<{}}":
fmt( "Starting @!@{gf}>>>@| @!@{cf}{:<{}}@|"),
# Job finishing
"Finished <<< {:<{}} [ {} ]":
fmt("@!@{kf}Finished@| @{gf}<<<@| @{cf}{:<{}}@| [ @{yf}{}@| ]"),
"Failed <<< {:<{}} [ {} ]":
fmt("@!@{rf}Failed@| @{rf}<<<@| @{cf}{:<{}}@| [ @{yf}{}@| ]"),
# Job abandoning
"Abandoned <<< {:<{}} [ {} ]":
fmt("@!@{rf}Abandoned@| @{rf}<<<@| @{cf}{:<{}}@| [ @{yf}{}@| ]"),
"Depends on failed job {}":
fmt("@{yf}Depends on failed job @!{}@|"),
"Depends on failed job {} via {}":
fmt("@{yf}Depends on failed job @!{}@| @{yf}via @!{}@|"),
# Stage finishing
"{}:{}":
fmt("@{cf}{}@|:@{bf}{}@|"),
"Failed <<< {}:{:<{}} [ Exited with code {} ]":
fmt("@!@{rf}Failed@| @{rf}<<<@| @{cf}{}@|:@{bf}{:<{}}@|[ @{yf}Exited code @!@{yf}{}@| ]"),
"Warnings <<< {}:{}":
fmt("@!@{yf}Warnings@| @{yf}<<<@| @{cf}{}@|:@{bf}{}@|"),
"Errors <<< {}:{}":
fmt("@!@{rf}Errors@| @{rf}<<<@| @{cf}{}@|:@{bf}{}@|"),
}
color_mapper = ColorMapper(_color_translation_map)
clr = color_mapper.clr
def colorize_cmake(line):
"""Colorizes output from CMake
:param line: one, new line terminated, line from `cmake` which needs coloring.
:type line: str
"""
cline = sanitize(line)
if line.startswith('-- '):
cline = '@{cf}-- @|' + cline[len('-- '):]
if ':' in cline:
split_cline = cline.split(':')
cline = split_cline[0] + ':@{yf}' + ':'.join(split_cline[1:]) + '@|'
if line.lower().startswith('warning'):
# WARNING
cline = fmt('@{yf}') + cline
if line.startswith('CMake Warning'):
# CMake Warning...
cline = cline.replace('CMake Warning', '@{yf}@!CMake Warning@|')
if line.startswith('ERROR:'):
# ERROR:
cline = cline.replace('ERROR:', '@!@{rf}ERROR:@|')
if line.startswith('CMake Error'):
# CMake Error...
cline = cline.replace('CMake Error', '@{rf}@!CMake Error@|')
if line.startswith('Call Stack (most recent call first):'):
# CMake Call Stack
cline = cline.replace('Call Stack (most recent call first):',
'@{cf}@_Call Stack (most recent call first):@|')
return fmt(cline)
def split(values, cond):
"""Split an iterable based on a condition."""
head, tail = tee((cond(v), v) for v in values)
return [v for c, v in head if c], [v for c, v in tail if not c]
class ExecutorEvent(object):
"""Structure for events generated by the Executor.
Events can be jobs starting/finishing, commands starting/failing/finishing,
commands producing output (each line is an event), or when the executor
quits or failes.
"""
# TODO: Make this a map of ID -> fields
EVENT_IDS = [
'JOB_STATUS', # All jobs have finished
'QUEUED_JOB', # A job has been queued to be executed
'STARTED_JOB', # A job has started to be executed
'FINISHED_JOB', # A job has finished executing (succeeded or failed)
'ABANDONED_JOB', # A job has been abandoned for some reason
'STARTED_STAGE', # A job stage has started to be executed
'FINISHED_STAGE', # A job stage has finished executing (succeeded or failed)
'STAGE_PROGRESS', # A job stage has executed partially
'STDOUT', # A status message from a job
'STDERR' # A warning or error message from a job
]
def __init__(self, event_id, **kwargs):
"""Create a new event.
:param event_id: One of the valid EVENT_IDS
:param **kwargs: The additional data to be passed along with this event.
"""
# Store the time this event was generated
self.time = time.time()
# Make sure the event ID is valid
if event_id not in ExecutorEvent.EVENT_IDS:
print(ExecutorEvent.EVENT_IDS)
raise ValueError("The event ID %s is not a valid executor event." % event_id)
# Store the event data
self.event_id = event_id
self.data = kwargs
class IOBufferContainer(object):
"""A simple buffer container for use in logging."""
def __init__(self):
self.stdout_buffer = b""
self.stderr_buffer = b""
self.interleaved_buffer = b""
class IOBufferLogger(IOBufferContainer):
"""This is a logging class to be used instead of sys.stdout and sys.stderr
in FunStage operations.
This class also generates `stdout` and `stderr` events.
"""
def __init__(self, job_id, label, event_queue):
IOBufferContainer.__init__(self)
self.job_id = job_id
self.label = label
self.event_queue = event_queue
def out(self, data):
self.stdout_buffer += data + '\n'
self.interleaved_buffer += data + '\n'
self.event_queue.put(ExecutorEvent(
'STDOUT',
job_id=self.job_id,
label=self.label,
data=data))
def err(self, data):
self.stderr_buffer += data + '\n'
self.interleaved_buffer += data + '\n'
self.event_queue.put(ExecutorEvent(
'STDERR',
job_id=self.job_id,
label=self.label,
data=data))
class IOBufferProtocol(IOBufferContainer, AsyncSubprocessProtocol):
"""An asyncio protocol that collects stdout and stderr.
This class also generates `stdout` and `stderr` events.
Since the underlying asyncio API constructs the actual protocols, this
class provides a factory method to inject the job and stage information
into the created protocol.
"""
def __init__(self, job_id, label, event_queue, *args, **kwargs):
IOBufferContainer.__init__(self)
AsyncSubprocessProtocol.__init__(self, *args, **kwargs)
self.job_id = job_id
self.label = label
self.event_queue = event_queue
@staticmethod
def factory(job_id, label, event_queue):
"""Factory method for constructing with job metadata."""
def init_proxy(*args, **kwargs):
return IOBufferProtocol(job_id, label, event_queue, *args, **kwargs)
return init_proxy
def on_stdout_received(self, data):
self.stdout_buffer += data
self.interleaved_buffer += data
self.event_queue.put(ExecutorEvent(
'STDOUT',
job_id=self.job_id,
label=self.label,
data=data))
def on_stderr_received(self, data):
self.stderr_buffer += data
self.interleaved_buffer += data
self.event_queue.put(ExecutorEvent(
'STDERR',
job_id=self.job_id,
label=self.label,
data=data))
@asyncio.coroutine
def async_job(job, executor, event_queue):
"""Run a sequence of Stages from a Job and collect their output.
:param job: A Job instance
:executor: A thread pool executor for blocking stages
:event_queue: A queue for asynchronous events
"""
# Initialize success flag
all_stages_succeeded = True
# Execute each stage of this job
for stage in job.stages:
# Abort the job if one of the stages has failed
if job.continue_on_failure and not all_stages_succeeded:
break
# Notify stage started
event_queue.put(ExecutorEvent(
'STARTED_STAGE',
job_id=job.jid,
label=stage.label))
if type(stage) is CmdStage:
# Initiate the command
transport, logger = yield asyncio.From(
async_execute_process(
stage.protocol.factory(job.jid, stage.label, event_queue),
cmd=stage.command,
**stage.kwargs))
# Asynchronously yield until this command is completed
retcode = yield asyncio.From(logger.complete)
elif type(stage) is FunStage:
# Create logger to be used instead of using stdout / stderr
logger = IOBufferLogger(job.jid, stage.label, event_queue)
try:
# Asynchronously yield until this function is completed
retcode = yield asyncio.From(get_loop().run_in_executor(executor, stage.function, logger, event_queue))
except:
logger.err(str(traceback.format_exc()))
retcode = 1
# Set whether this stage succeeded
stage_succeeded = (retcode == 0)
# Update success tracker from this stage
all_stages_succeeded = all_stages_succeeded and stage_succeeded
# Store the results from this stage
event_queue.put(ExecutorEvent(
'FINISHED_STAGE',
job_id=job.jid,
label=stage.label,
succeeded=stage_succeeded,
stdout=logger.stdout_buffer,
stderr=logger.stderr_buffer,
interleaved=logger.interleaved_buffer,
retcode=retcode))
# Finally, return whether all stages of the job completed
raise asyncio.Return(job.jid, all_stages_succeeded)
@asyncio.coroutine
def process_jobs(
jobs,
event_queue,
continue_on_failure=False,
continue_without_deps=False):
"""Process a number of jobs asynchronously.
:param jobs: A list of Jobs
:param event_queue: A python queue for reporting events.
:param continue_on_failure: Keep running jobs even if one fails.
:param continue_without_deps: Run jobs even if their dependencies fail.
"""
# Initialize list of ready and pending jobs (jobs not ready to be executed)
queued_jobs, pending_jobs = split(jobs, lambda j: len(j.deps) == 0)
# List of active jobs job_id -> future
active_jobs = {}
# Dict of completd jobs job_id -> succeeded
completed_jobs = {}
# List of jobs whose deps failed
abandoned_jobs = []
# Create a thread pool executor for blocking python stages in the asynchronous jobs
executor = ThreadPoolExecutor(max_workers=JobServer.max_jobs())
# Process all jobs asynchronously until there are none left
while len(active_jobs) + len(queued_jobs) + len(pending_jobs) > 0:
# Get a token genej.jidator from the job server
token_generator = JobServer.try_acquire()
# Activate jobs while the jobserver dispenses tokens
while len(queued_jobs) > 0 and token_generator.next() is not None:
# Pop a job off of the job queue
job = queued_jobs.pop(0)
event_queue.put(ExecutorEvent(
'STARTED_JOB',
job_id=job.jid))
# Start the job coroutine
active_jobs[job.jid] = async_job(job, executor, event_queue)
# Process jobs as they complete asynchronously
for job_completed in asyncio.as_completed(list(active_jobs.values())):
# Report running jobs
event_queue.put(ExecutorEvent(
'JOB_STATUS',
active=list(active_jobs.keys()),
queued=[j.jid for j in queued_jobs],
pending=[j.jid for j in pending_jobs],
abandoned=[j.jid for j in abandoned_jobs]))
# Capture a result once the job has finished
job_id, succeeded = yield asyncio.From(job_completed)
# Release a jobserver token now that this job has succeeded
JobServer.release()
# Generate event with the results of this job
event_queue.put(ExecutorEvent(
'FINISHED_JOB',
job_id=job_id,
succeeded=succeeded))
# Remove the job from the active jobs dict
del active_jobs[job_id]
# Add the job to the completed list
completed_jobs[job_id] = succeeded
# Handle failure modes
if not succeeded:
# By default don't abandon any other jobs because this job failed
new_abandoned_jobs = []
# Handle different abandoning policies
if not continue_on_failure:
# Abort all pending jobs if any job fails
new_abandoned_jobs = queued_jobs + pending_jobs
queued_jobs = []
pending_jobs = []
# Notify that jobs have been abandoned
for abandoned_job in new_abandoned_jobs:
event_queue.put(ExecutorEvent(
'ABANDONED_JOB',
job_id=abandoned_job.jid,
reason='PEER_FAILED',
peer_job_id=job_id))
elif not continue_without_deps:
unhandled_abandoned_job_ids = [job_id]
# Abandon jobs which depend on abandoned jobs
while len(unhandled_abandoned_job_ids) > 0:
# Get the abandoned job
abandoned_job_id = unhandled_abandoned_job_ids.pop(0)
# Abandon all pending jobs which depend on this job_id
unhandled_abandoned_jobs, pending_jobs = split(
pending_jobs,
lambda j: abandoned_job_id in j.deps)
# Handle each new abandoned job
for abandoned_job in unhandled_abandoned_jobs:
new_abandoned_jobs.append(abandoned_job)
# Notify if any jobs have been abandoned
event_queue.put(ExecutorEvent(
'ABANDONED_JOB',
job_id=abandoned_job.jid,
reason='DEP_FAILED',
direct_dep_job_id=abandoned_job_id,
dep_job_id=job_id))
# Add additional job ids to check
unhandled_abandoned_job_ids.extend([j.jid for j in unhandled_abandoned_jobs])
# Update the abandoned jobs
abandoned_jobs.extend(new_abandoned_jobs)
# Update the list of ready jobs (based on completed job dependencies)
new_queued_jobs, pending_jobs = split(
pending_jobs,
lambda j: j.all_deps_completed(completed_jobs))
queued_jobs.extend(new_queued_jobs)
# Notify of newly queued jobs
for queued_job in new_queued_jobs:
event_queue.put(ExecutorEvent(
'QUEUED_JOB',
job_id=queued_job.jid))
# Report running jobs
event_queue.put(ExecutorEvent(
'JOB_STATUS',
active=list(active_jobs.keys()),
queued=[j.jid for j in queued_jobs],
pending=[j.jid for j in pending_jobs],
abandoned=[j.jid for j in abandoned_jobs]))
class Job(object):
"""A Job is a series of operations, each of which is considered a "stage" of the job."""
def __init__(self, jid, deps, stages, continue_on_failure=True):
self.jid = jid
self.deps = deps
self.stages = stages
self.continue_on_failure = continue_on_failure
def all_deps_completed(self, completed_jobs):
"""Return True if all dependencies have been completed."""
return all([dep_id in completed_jobs for dep_id in self.deps])
def all_deps_succeeded(self, completed_jobs):
"""Return True if all dependencies have been completed and succeeded."""
return all([completed_jobs.get(dep_id, False) for dep_id in self.deps])
def any_deps_failed(self, completed_jobs):
"""Return True if any dependencies which have been completed have failed."""
return any([not completed_jobs.get(dep_id, True) for dep_id in self.deps])
class Stage(object):
"""A description of one of the serially-executed stages of a Job.
Like Jobs, Stages are stateless, and simply describe what needs to be done
and how to do it.
"""
def __init__(self, label):
self.label = label or str(label)
class CmdStage(Stage):
"""Job stage that describes a system command.
:param label: The label for the stage
:param command: A list of strings composing a system command
:param protocol: A protocol class to use for this stage
Additional kwargs are passed to `async_execute_process`
"""
def __init__(self, label, command, protocol=None, **kwargs):
if not type(command) in [list, tuple] or not all([type(s) is str for s in command]):
raise ValueError('Command stage must be a list of strings: {}'.format(command))
super(CmdStage, self).__init__(label)
self.command = command
self.protocol = protocol or IOBufferProtocol
self.kwargs = kwargs
# Emulate tty for cli colors
self.kwargs.setdefault('emulate_tty', True)
# Capture stderr and stdout separately
self.kwargs.setdefault('stderr_to_stdout', False)
class FunStage(Stage):
"""Job stage that describes a python function.
:param label: The label for the stage
:param function: A python function which returns 0 on success
"""
def __init__(self, label, function):
if not callable(function):
raise ValueError('Function stage must be callable.')
super(FunStage, self).__init__(label)
self.function = function
def foo(logger, event_queue):
"""Simple blocking function that sleeps for 1 second and then raises an exception"""
logger.out('Fooing...')
time.sleep(1.0)
logger.err('Can\'t foo!')
raise RuntimeError()
logger.out('Done fooing.')
return 0
def bar(logger, event_queue):
"""Simple blocking function that sleeps for 0.25 seconds"""
logger.out('Baring...')
time.sleep(0.25)
logger.out('Done baring.')
return 0
def baz(logger, event_queue):
"""Simple blocking function that sleeps for 1 second and then writes to err"""
logger.out('Bazing...')
time.sleep(1.0)
logger.err('You shouldn\'t be bazing!')
logger.err('You should be baring instead.')
return 0
class ConsoleStatusThread(threading.Thread):
"""Status thread for displaying events to the console."""
def __init__(
self,
label,
jobs,
event_queue,
show_stage_events=False,
show_buffered_stdout=False,
show_buffered_stderr=True,
show_live_stdout=False,
show_live_stderr=False,
show_active_status=True,
active_status_rate=10.0):
"""
:param label: The label for this task (build, clean, etc)
:param event_queue: The event queue used by an Executor
:param show_stage_events: Show events relating to stages in each job
:param show_buffered_stdout: Show stdout from jobs as they finish
:param show_buffered_stderr: Show stderr from jobs as they finish
:param show_live_stdout: Show stdout lines from jobs as they're generated
:param show_live_stderr: Show stdout lines from jobs as they're generated
:param show_active_status: Periodically show a status line displaying the active jobs
:param active_status_rate: The rate in Hz at which the status line should be printed
"""
super(ConsoleStatusThread, self).__init__()
self.label = label
self.event_queue = event_queue
self.show_stage_events = show_stage_events
self.show_buffered_stdout = show_buffered_stdout
self.show_buffered_stderr = show_buffered_stderr
self.show_live_stdout = show_live_stdout
self.show_live_stderr = show_live_stderr
self.show_active_status = show_active_status
self.active_status_rate = max(active_status_rate, 0.1)
self.jobs = dict([(j.jid, j) for j in jobs])
self.max_jid_length = max([len(jid)+max([len(s.label) for s in job.stages]) for jid, job in self.jobs.items()])
def run(self):
active_jobs = []
queued_jobs = []
pending_jobs = []
start_times = dict()
end_times = dict()
start_time = time.time()
while True:
if self.show_active_status:
# Try to get an event from the queue (non-blocking)
try:
res = self.event_queue.get(False)
except Empty:
# Print live status (overwrites last line)
wide_log('[{} {} s] [{}/{} jobs] {}'.format(
self.label,
format_time_delta_short(time.time() - start_time),
JobServer.running_jobs(),
JobServer.max_jobs(),
active_jobs), end='\r')
sys.stdout.flush()
time.sleep(1.0 / self.active_status_rate)
continue
else:
# Try to get an event from the queue (blocking)
try:
res = self.event_queue.get(True)
except Empty:
break
# A `None` event is a signal to terminate
if res is None:
break
# Handle the received events
eid = res.event_id
if 'JOB_STATUS' == eid:
# Check if all jobs have finished in some way
if all([len(res.data[t]) == 0 for t in ['active', 'queued', 'pending']]):
break
active_jobs = res.data['active']
queued_jobs = res.data['queued']
pending_jobs = res.data['pending']
elif 'STARTED_JOB' == eid:
wide_log(clr('Starting >>> {:<{}}').format(
res.data['job_id'],
self.max_jid_length))
start_times[res.data['job_id']] = res.time
elif 'FINISHED_JOB' == eid:
end_times[res.data['job_id']] = res.time
duration = format_time_delta(end_times[res.data['job_id']] - start_times[res.data['job_id']])
if res.data['succeeded']:
wide_log(clr('Finished <<< {:<{}} [ {} ]').format(
res.data['job_id'],
self.max_jid_length,
duration))
else:
wide_log(clr('Failed <<< {:<{}} [ {} ]').format(
res.data['job_id'],
self.max_jid_length,
duration))
elif 'ABANDONED_JOB' == eid:
if 'DEP_FAILED' == res.data['reason']:
direct = res.data['dep_job_id'] == res.data['direct_dep_job_id']
if direct:
reason = clr('Depends on failed job {}').format(res.data['dep_job_id'])
else:
reason = clr('Depends on failed job {} via {}').format(
res.data['dep_job_id'],
res.data['direct_dep_job_id'])
elif 'PEER_FAILED' == res.data['reason']:
reason = clr('Unrelated job failed')
wide_log(clr('Abandoned <<< {:<{}} [ {} ]').format(
res.data['job_id'],
self.max_jid_length,
reason))
elif 'STARTED_STAGE' == eid:
if self.show_stage_events:
wide_log('Starting >>> {}:{}'.format(
res.data['job_id'],
res.data['label']))
elif 'FINISHED_STAGE' == eid:
if self.show_buffered_stderr and len(res.data['stderr']) > 0:
if res.data['succeeded']:
wide_log(clr('Warnings <<< {}:{}').format(
res.data['job_id'],
res.data['label']))
else:
wide_log(clr('Errors <<< {}:{}').format(
res.data['job_id'],
res.data['label']))
prefix_color = '@!@{yf}' if res.data['succeeded'] else '@!@{rf}'
prefix = clr(prefix_color + '> @|')
for line in res.data['stderr'].splitlines():
wide_log(prefix + line)
if res.data['succeeded']:
if self.show_stage_events:
wide_log('Finished <<< {}:{}'.format(
res.data['job_id'],
res.data['label']))
else:
wide_log(clr('Failed <<< {}:{:<{}} [ Exited with code {} ]').format(
res.data['job_id'],
res.data['label'],
max(0,self.max_jid_length - len(res.data['job_id'])),
res.data['retcode']))
elif 'STDERR' == eid:
if self.show_live_stderr:
prefix = '{} [{}]: '.format(
res.data['job_id'],
res.data['label'])
wide_log('\n'.join(prefix + l for l in res.data['data'].splitlines()))
if __name__ == '__main__':
# Initialize jobserver
JobServer.initialize(max_jobs=5)
# Get event loop
loop = get_loop()
# Queue for communicating status
event_queue = Queue()
# The job list is a list of pairs (ID, JOB) where the job is composed of
# multiple stages which are run sequentially. Jobs can specify their
# dependencies in terms of the ids of other jobs.
jobs = [
Job('job_a', deps=[], stages=[CmdStage('grep', ['grep', 'hosts'] + glob.glob('/etc/*'))]),
Job('job_b', deps=[], stages=[CmdStage('sleep', ['sleep', '2'])]),
Job('job_c', deps=['job_b'],
stages=[
CmdStage('sleep', ['sleep', '1']),
CmdStage('sleep', ['sleep', '1'])]
),
Job('job_d1', deps=['job_c'], stages=[CmdStage('sleep', ['sleep', '1.1'])]),
Job('job_d2', deps=['job_c'], stages=[CmdStage('sleep', ['sleep', '1.2'])]),
Job('job_d3', deps=['job_c'], stages=[CmdStage('sleep', ['sleep', '1.3'])]),
Job('job_e', deps=[], stages=[FunStage('foo', foo)]),
Job('job_f', deps=['job_e'], stages=[FunStage('foo', foo)]),
Job('job_g', deps=['job_b', 'job_c'],
stages=[
FunStage('bar', bar),
CmdStage('sleep', ['sleep', '0.5']),
FunStage('bar', bar)]
),
Job('job_h', deps=['job_b'], stages=[FunStage('baz', baz)]),
Job('job_i', deps=['job_f'], stages=[FunStage('baz', baz)]),
]
# TODO: Check job id uniqueness
# TODO: Check dependency existence
try:
# Spin up status output thread
status_thread = ConsoleStatusThread('exec', jobs, event_queue)
status_thread.start()
# Block while running N jobs asynchronously
loop.run_until_complete(process_jobs(
jobs,
event_queue,
continue_on_failure=True,
continue_without_deps=False))
except KeyboardInterrupt:
print("Interrupted!")
event_queue.put(None)
# Copyright 2014 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from multiprocessing import cpu_count
from tempfile import mkstemp
from termios import FIONREAD
import array
import errno
import fcntl
import os
import re
import subprocess
import time
from catkin_tools.common import log
from catkin_tools.common import version_tuple
JOBSERVER_SUPPORT_MAKEFILE = b'''
all:
\techo $(MAKEFLAGS) | grep -- '--jobserver-fds'
'''
def memory_usage():
"""
Get used and total memory usage.
:returns: Used and total memory in bytes
:rtype: tuple
"""
# Handle optional psutil support
try:
import psutil
psutil_version = version_tuple(psutil.__version__)
if psutil_version < (0, 6, 0):
usage = psutil.phymem_usage()
used = usage.used
else:
usage = psutil.virtual_memory()
used = usage.total - usage.available
return used, usage.total
except ImportError:
pass
return None, None
class JobServer:
"""
This class implements a GNU make-compatible job server.
"""
# Singleton jobserver
_singleton = None
# Flag designating whether the `make` program supports the GNU Make
# jobserver interface
_gnu_make_supported = False
def __init__(self, max_jobs=None, max_load=None, max_mem=None):
"""
:param max_jobs: the maximum number of jobs available
:param max_load: do not dispatch additional jobs if this system load
value is exceeded
:param max_mem: do not dispatch additional jobs if system physical
memory usage exceeds this value (see _set_max_mem for additional
documentation)
"""
assert(JobServer._singleton is None)
if not max_jobs:
try:
max_jobs = cpu_count()
except NotImplementedError:
log('@{yf}WARNING: Failed to determine the cpu_count, falling back to 1 jobs as the default.@|')
max_jobs = 1
else:
max_jobs = int(max_jobs)
self.max_jobs = max_jobs
self.max_load = max_load
self._set_max_mem(max_mem)
self.job_pipe = os.pipe()
# Initialize the pipe with max_jobs tokens
for i in range(max_jobs):
os.write(self.job_pipe[1], b'+')
@staticmethod
def _test_gnu_make_support():
"""
Test if the system 'make' supports the job server implementation.
"""
fd, makefile = mkstemp()
os.write(fd, JOBSERVER_SUPPORT_MAKEFILE)
os.close(fd)
ret = subprocess.call(['make', '-f', makefile, '-j2'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
os.unlink(makefile)
return (ret == 0)
def _set_max_mem(self, max_mem):
"""
Set the maximum memory to keep instantiating jobs.
:param max_mem: String describing the maximum memory that can be used
on the system. It can either describe memory percentage or absolute
amount. Use 'P%' for percentage or 'N' for absolute value in bytes,
'Nk' for kilobytes, 'Nm' for megabytes, and 'Ng' for gigabytes.
:type max_mem: str
"""
if max_mem is None:
self.max_mem = None
return
elif type(max_mem) is float or type(max_mem) is int:
mem_percent = max_mem
elif type(max_mem) is str:
m_percent = re.search('([0-9]+)\%', max_mem)
m_abs = re.search('([0-9]+)([kKmMgG]{0,1})', max_mem)
if m_percent is None and m_abs is None:
self.max_mem = None
return
if m_percent:
mem_percent = m_abs.group(1)
elif m_abs:
val = float(m_abs.group(1))
mag_symbol = m_abs.group(2)
_, total_mem = memory_usage()
if mag_symbol == '':
mag = 1.0
elif mag_symbol.lower() == 'k':
mag = 1024.0
elif mag_symbol.lower() == 'm':
mag = pow(1024.0, 2)
elif mag_symbol.lower() == 'g':
mag = pow(1024.0, 3)
mem_percent = 100.0 * val * mag / total_mem
self.max_mem = max(0.0, min(100.0, float(mem_percent)))
def _load_ok(self):
if self.max_load is not None:
try:
load = os.getloadavg()
if jobserver_running_jobs() > 0 and load[1] > self.max_load:
return False
except NotImplementedError:
return True
return True
def _mem_ok(self):
if self.max_mem is not None:
mem_used, mem_total = memory_usage()
mem_percent_used = 100.0 * float(mem_used) / float(mem_total)
if jobserver_running_jobs() > 0 and mem_percent_used > self.max_mem:
return False
return True
def _acquire(self):
"""
Obtain a job server token. Be sure to call _release() to avoid
deadlocks.
"""
try:
# read a token from the job pipe
token = os.read(self.job_pipe[0], 1)
return token
except OSError as e:
if e.errno != errno.EINTR:
raise
return None
def _wait_acquire(self):
"""
Wait until a job server token can be acquired.
"""
token = None
while token is None:
# make sure we're observing load and memory maximums
if not self._load_ok() or not self._mem_ok():
time.sleep(0.01)
continue
# try to get a job token
token = self._acquire()
return token
def _try_acquire(self):
"""
Try to obtain a job server tokens.
"""
while True:
# make sure we're observing load and memory maximums
if self._load_ok() and self._mem_ok():
# try to get a job token
token = self._acquire()
yield token
else:
yield None
def _release(self):
"""
Write a token to the job pipe.
"""
os.write(self.job_pipe[1], b'+')
@classmethod
def initialize(cls, *args, **kwargs):
"""
Initialize the global GNU Make jobserver.
:param max_jobs: the maximum number of jobs available
:param max_load: do not dispatch additional jobs if this system load
value is exceeded
:param max_mem: do not dispatch additional jobs if system physical
memory usage exceeds this value
"""
# Only initialize once
assert(cls._singleton is None)
# Check if the jobserver is supported
cls._gnu_make_supported = cls._test_gnu_make_support()
if not cls._gnu_make_supported:
log('@{yf}WARNING: Make job server not supported. The number of Make '
'jobs may exceed the number of CPU cores.@|')
return
# Create the jobserver singleton
cls._singleton = JobServer(*args, **kwargs)
@classmethod
def set_max_mem(cls, max_mem):
"""
Set the maximum memory to keep instantiating jobs.
:param max_mem: String describing the maximum memory that can be used on
the system. It can either describe memory percentage or absolute amount.
Use 'P%' for percentage or 'N' for absolute value in bytes, 'Nk' for
kilobytes, 'Nm' for megabytes, and 'Ng' for gigabytes.
:type max_mem: str
"""
cls._singleton._set_max_mem(max_mem)
@classmethod
def wait_acquire(cls):
"""
Block until a job server token is acquired, then return it.
"""
return cls._singleton._wait_acquire()
@classmethod
def try_acquire(cls):
"""
Yield None until a job server token is acquired, then yield it.
"""
while True:
# make sure we're observing load and memory maximums
if cls._singleton._load_ok() and cls._singleton._mem_ok():
# try to get a job token
token = cls._singleton._acquire()
yield token
else:
yield None
@classmethod
def release(cls):
"""
Release a job server token.
"""
cls._singleton._release()
@classmethod
def gnu_make_supported(cls):
return cls._gnu_make_supported
@classmethod
def gnu_make_args(cls):
"""
Get required arguments for spawning child gnu Make processes.
"""
if cls._make_supported:
return ["--jobserver-fds=%d,%d" % cls._singleton.job_pipe, "-j"]
else:
return []
@classmethod
def max_jobs(cls):
"""
Get the maximum number of jobs.
"""
return cls._singleton.max_jobs
@classmethod
def running_jobs(cls):
"""
Try to estimate the number of currently running jobs.
"""
if not cls._gnu_make_supported:
return '?'
try:
buf = array.array('i', [0])
if fcntl.ioctl(cls._singleton.job_pipe[0], FIONREAD, buf) == 0:
return cls._singleton.max_jobs - buf[0]
except NotImplementedError:
pass
except OSError:
pass
return cls._singleton.max_jobs
class JobGuard:
"""
Context manager representing a jobserver job.
"""
def __enter__(self):
JobServer.wait_acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
JobServer.release()
return False
Starting >>> job_a
Starting >>> job_b
Starting >>> job_e
Errors <<< job_a:grep
> grep: /etc/mtab.fuselock: Permission denied
> grep: /etc/fuse.conf: Permission denied
> grep: /etc/docker: Permission denied
> grep: /etc/blkid.tab: No such file or directory
> grep: /etc/sudoers: Permission denied
> grep: /etc/group-: Permission denied
> grep: /etc/gshadow: Permission denied
> grep: /etc/at.deny: Permission denied
> grep: /etc/shadow: Permission denied
> grep: /etc/passwd-: Permission denied
> grep: /etc/gshadow-: Permission denied
> grep: /etc/shadow-: Permission denied
Failed <<< job_a:grep [ Exited code 2 ]
Failed <<< job_a [ 0.0 seconds ]
Errors <<< job_e:foo
> Can't foo!
> Traceback (most recent call last):
> File "./asynctest.py", line 307, in async_job
> retcode = yield asyncio.From(get_loop().run_in_executor(executor, stage.function, logger, event_queue))
> RuntimeError
>
Failed <<< job_e:foo [ Exited code 1 ]
Failed <<< job_e [ 1.0 seconds ]
Abandoned <<< job_f [ Depends on failed job job_e ]
Abandoned <<< job_i [ Depends on failed job job_e via job_f ]
Finished <<< job_b [ 2.0 seconds ]
Starting >>> job_c
Starting >>> job_h
Warnings <<< job_h:baz
> You shouldn't be bazing!
> You should be baring instead.
Finished <<< job_h [ 1.0 seconds ]
Finished <<< job_c [ 2.0 seconds ]
Starting >>> job_d1
Starting >>> job_d2
Starting >>> job_d3
Starting >>> job_g
Finished <<< job_g [ 1.0 seconds ]
Finished <<< job_d1 [ 1.1 seconds ]
Finished <<< job_d2 [ 1.2 seconds ]
Finished <<< job_d3 [ 1.3 seconds ]
@jbohren
Copy link
Author

jbohren commented May 21, 2015

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment