Last active
August 29, 2015 13:57
-
-
Save kergoth/9748594 to your computer and use it in GitHub Desktop.
Work-in-progress rework of Mentor's bb-determine-layers as a standalone bblayers configuration tool for yocto
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import argparse | |
import collections | |
import contextlib | |
import glob | |
import os | |
import signal | |
import sys | |
glob_default = './*/:./*/*/:{scriptdir}/../../*/:{scriptdir}/../../*/*/' | |
layers_default = 'core' | |
override_default = 'openembedded-layer=1' | |
class Terminate(BaseException): | |
"""Exception raised if we receive a SIGTERM""" | |
pass | |
def sigterm_exception(signum, stackframe): | |
"""Raise Terminate exception if we receive a SIGTERM""" | |
raise Terminate() | |
def setup_command_import(command, relpath='../lib'): | |
"""Set up sys.path based on the location of a binary in the PATH """ | |
PATH = os.environ['PATH'].split(':') | |
cmd_paths = [os.path.join(path, relpath) | |
for path in PATH if os.path.exists(os.path.join(path, command))] | |
if not cmd_paths: | |
raise ImportError("Unable to locate bb, please ensure PATH is set correctly.") | |
sys.path[0:0] = cmd_paths | |
def find(dir, dirfilter=None, **walkoptions): | |
""" Given a directory, recurse into that directory, | |
returning all files as absolute paths. """ | |
for root, dirs, files in os.walk(dir, **walkoptions): | |
if dirfilter is not None: | |
for d in dirs: | |
if not dirfilter(d): | |
dirs.remove(d) | |
for file in files: | |
yield os.path.join(root, file) | |
@contextlib.contextmanager | |
def status(message, outfile=sys.stderr): | |
"""Show the user what we're doing, and whether we succeed""" | |
outfile.write('{0}..'.format(message)) | |
outfile.flush() | |
try: | |
yield | |
except KeyboardInterrupt: | |
outfile.write('.interrupted\n') | |
raise | |
except Terminate: | |
outfile.write('.terminated\n') | |
raise | |
except BaseException: | |
outfile.write('.failed\n') | |
raise | |
outfile.write('.done\n') | |
class DictArgument(object): | |
def __init__(self, listsep=None, valuesep='=', keytype=None, valuetype=None): | |
self.listsep = listsep | |
self.valuesep = valuesep | |
self.keytype = keytype | |
self.valuetype = valuetype | |
def __call__(self, string): | |
d = {} | |
for o in string.split(self.listsep): | |
try: | |
k, v = o.rsplit(self.valuesep, 1) | |
except ValueError: | |
raise argparse.ArgumentTypeError('invalid entry `{0}`'.format(o)) | |
if self.keytype is not None: | |
try: | |
k = self.keytype(k) | |
except ValueError: | |
raise argparse.ArgumentTypeError('invalid key `{0}` for type `{1}`'.format(k, self.keytype.__name__)) | |
if self.valuetype is not None: | |
try: | |
v = self.valuetype(v) | |
except ValueError: | |
raise argparse.ArgumentTypeError('invalid value `{0}` for type `{1}`'.format(v, self.valuetype.__name__)) | |
d[k] = v | |
return d | |
def __repr__(self): | |
return 'DictArgument(listsep={0}, valuesep={1}, keytype={2}, valuetype={3})'.format(repr(self.listsep), repr(self.valuesep), repr(self.keytype), repr(self.valuetype)) | |
def process_arguments(cmdline_args): | |
parser = argparse.ArgumentParser(description='Configure bitbake layers', formatter_class=argparse.ArgumentDefaultsHelpFormatter) | |
parser.add_argument('-s', '--sort', action='store_true', help='sort layers by layer priority') | |
parser.add_argument('-a', '--add-layers', action='append', dest='add', help='layers to add to the configuration') | |
parser.add_argument('-r', '--remove-layers', action='append', dest='remove', help='layers to remove from the configuration') | |
parser.add_argument('-o', '--add-optional-layers', action='append', dest='optional', help='layers to add to the configuration if available, but silently ignore if not') | |
parser.add_argument('-l', '--base-layers', dest='base', help='specify the baseline list of layers to work from. defaults to the current bblayers if available') | |
parser.add_argument('-m', '--add-machine-layer', dest='machine', help='add the highest priority layer which provides a machine .conf for the specified machine') | |
parser.add_argument('-O', '--override-layer-priorities', default=override_default, | |
type=DictArgument(valuetype=int), | |
help='override the priority of one or more layers. space separated list of layername=priority values. default: `{0}`'.format(override_default)) | |
parser.add_argument('-g', '--globs', default=glob_default, | |
help='wildcard patterns to locate layers. colon separated. default: `{0}`'.format(glob_default)) | |
parser.add_argument('-p', '--paths', | |
help='paths to search recursively to find layers. colon separated. equivalent to the "find" command', default='') | |
scriptdir = os.path.dirname(__file__) | |
args = parser.parse_args(cmdline_args) | |
if args.base is not None: | |
args.base = args.base.split() | |
new_globs = [] | |
for pattern in args.globs.split(':'): | |
pattern = os.path.abspath(pattern.format(scriptdir=scriptdir)) | |
new_globs.append(pattern) | |
args.globs = new_globs | |
new_paths = [] | |
if args.paths: | |
for path in args.paths.split(':'): | |
path = os.path.abspath(path.format(scriptdir=scriptdir)) | |
new_paths.append(path) | |
args.paths = new_paths | |
return args | |
class LayerError(Exception): | |
pass | |
class DuplicateLayer(LayerError): | |
def __init__(self, layer, other_layer): | |
self.layer = layer | |
self.other_layer = other_layer | |
super(DuplicateError, self).__init__() | |
def __str__(self): | |
return 'Duplicate layers found for `{0}`:\n {1}\n {2}'.format( | |
self.layer.name, self.layer.path, self.other_layer.path) | |
class Layer(object): | |
def __init__(self, path, confpath, name, priority, pattern, depends): | |
self.path = os.path.realpath(path) | |
self.confpath = os.path.realpath(confpath) | |
self.name = name | |
self.priority = priority | |
self.depends = depends | |
self.missingdeps = set() | |
self.pattern = pattern | |
def __repr__(self): | |
return '{0.__class__.__name__}({0.path!r}, {0.confpath!r},' \ | |
'{0.name!r}, {0.priority!r}, {0.pattern!r}, {0.depends!r})'.format(self) | |
def __hash__(self): | |
return hash(repr(self)) | |
@staticmethod | |
def from_layerpath(layer_path, data=None): | |
layers = [] | |
if data is None: | |
data = bb.data.init() | |
bb.parse.init_parser(data) | |
lconf = os.path.join(layer_path, 'conf', 'layer.conf') | |
ldata = data.createCopy() | |
ldata.setVar('LAYERDIR', layer_path) | |
try: | |
ldata = bb.parse.handle(lconf, ldata, include=True) | |
except BaseException as exc: | |
raise LayerError(exc) | |
ldata.expandVarref('LAYERDIR') | |
collections = (ldata.getVar('BBFILE_COLLECTIONS', True) or '').split() | |
if not collections: | |
name = os.path.basename(layer_path) | |
l = Layer(layer_path, lconf, name, 0, None, set()) | |
layers.append(l) | |
for name in collections: | |
pattern = ldata.getVar('BBFILE_PATTERN_%s' % name, True) | |
priority = ldata.getVar('BBFILE_PRIORITY_%s' % name, True) or 0 | |
depends = ldata.getVar('LAYERDEPENDS_%s' % name, True) or '' | |
l = Layer(layer_path, lconf, name, int(priority), pattern, set(depends.split())) | |
layers.append(l) | |
if layers: | |
return layers | |
class Layers(collections.MutableSet): | |
def __init__(self, iterable=None): | |
"""A collection of layers. Maintains lookup mappings by path and name""" | |
self.layers = set() | |
self.by_path = {} | |
self.by_name = {} | |
if iterable is not None: | |
self |= iterable | |
def __contains__(self, layer): | |
return layer in self.layers | |
def __iter__(self): | |
return iter(self.layers) | |
def __len__(self): | |
return len(self.layers) | |
def __repr__(self): | |
return '{0.__class__.__name__}({0.layers!r})'.format(self) | |
def __hash__(self): | |
return hash(repr(self)) | |
def add(self, layer): | |
if layer in self: | |
return | |
if layer.name in self.by_name: | |
raise DuplicateLayer(layer, self.by_name[layer.name]) | |
if layer.path in self.by_path: | |
return | |
self.layers.add(layer) | |
self.by_path[layer.path] = layer | |
self.by_name[layer.name] = layer | |
def add_from_path(self, layerpath, data=None): | |
layerpath = os.path.realpath(layerpath) | |
if layerpath in self.by_path: | |
return | |
self |= Layer.from_layerpath(layerpath, data) | |
def discard(self, layer): | |
self.layers.discard(layer) | |
del self.by_path[layer.path] | |
del self.by_name[layer.name] | |
def clear(self): | |
self.layers.clear() | |
self.by_path.clear() | |
self.by_name.clear() | |
def priority_sorted(self): | |
return sorted(self.layers, key=lambda l: (l.priority, l.name), reverse=True) | |
def which(self, path, all=True): | |
layers = [] | |
for layer in self.priority_sorted(): | |
if os.path.exists(os.path.join(layer.path, path)): | |
layers.append(layer) | |
return layers | |
def get_by_name_recursive(self, name): | |
"""Get a layer by name and its dependencies, recursively. | |
Returns a Layers() collection, and a list of missing dependencies.""" | |
missing = set() | |
found_layers = Layers() | |
try: | |
layer = self.by_name[name] | |
except KeyError: | |
missing.add(name) | |
else: | |
found_layers.add(layer) | |
for dep in layer.depends: | |
try: | |
dep_found, dep_missing = self.get_by_name_recursive(dep) | |
except KeyError: | |
missing.add(dep) | |
else: | |
found_layers |= dep_found | |
missing |= dep_missing | |
return found_layers, missing | |
def get_layerpaths_bitbake(globs=None, paths=None): | |
bitbake_path = None | |
layer_paths = set() | |
for pattern in globs: | |
paths = glob.glob(pattern) | |
for path in paths: | |
layerconf_path = os.path.join(path, 'conf', 'layer.conf') | |
if os.path.exists(layerconf_path): | |
layer_paths.add(os.path.realpath(path)) | |
elif os.path.exists(os.path.join(path, 'bin', 'bitbake')): | |
bitbake_path = os.path.join(path, 'bin') | |
for path in paths: | |
for subpath in find(path, dirfilter=lambda d: d != 'lib'): | |
if subpath.endswith('/conf/layer.conf'): | |
layer_paths.add(os.path.realpath(os.path.dirname(os.path.dirname(subpath)))) | |
elif subpath.endswith('/bin/bitbake'): | |
bitbake_path = os.path.dirname(subpath) | |
return layer_paths, bitbake_path | |
def configure_layers(cmdline_opts): | |
curdir = os.getcwd() | |
builddir = os.environ.get('BUILDDIR') or find_builddir() | |
if not builddir: | |
sys.exit("Unable to locate BUILDDIR, aborting") | |
os.chdir(builddir) | |
setup_command_import('bitbake') | |
try: | |
import bb | |
except (ImportError, RuntimeError) as exc: | |
sys.exit("Unable to import 'bb' python package: %s" % exc) | |
import bb.parse | |
import bb.data | |
args = process_arguments(cmdline_opts) | |
bblayers = BBLayers(os.path.join(builddir, 'conf', 'bblayers.conf')) | |
if not args.base: | |
args.base = bblayers.data.getVar('BBLAYERS', True).split() | |
args.base.extend(args.add_layers) | |
data = bb.data.init() | |
bb.parse.init_parser(data) | |
duplicates = collections.defaultdict(set) | |
all_layers = Layers() | |
with status('Parsing layer configuration files'): | |
for layer_path in layer_paths: | |
try: | |
all_layers.add_from_path(layer_path, data) | |
except DuplicateLayer as exc: | |
duplicates[exc.layer.name].add(exc.layer) | |
duplicates[exc.layer.name].add(exc.other_layer) | |
except LayerError as exc: | |
sys.exit(str(exc)) | |
for layer in list(all_layers): | |
if layer.name in args.excluded_layers: | |
all_layers.discard(layer) | |
else: | |
priority_override = args.override_layer_priorities.get(layer.name) | |
if priority_override is not None: | |
layer.priority = priority_override | |
configured_layer_names = set(args.layers) | |
if args.machine: | |
with status("Determining layers to include for MACHINE '{0}'".format(args.machine)): | |
machine_layers = all_layers.which('conf/machine/{0}.conf'.format(args.machine)) | |
if not machine_layers: | |
sys.exit("No BSP layer found for machine `{0}`".format(args.machine)) | |
configured_layer_names |= set(l.name for l in machine_layers) | |
configured_layers = Layers() | |
for name in configured_layer_names: | |
if name not in all_layers.by_name: | |
sys.exit("Layer '{0}' not found".format(name)) | |
layers, missing_layers = all_layers.get_by_name_recursive(name) | |
if missing_layers: | |
sys.exit("Dependent layers `{0}` not found for requested layer `{1}`".format(', '.join(missing_layers), name)) | |
configured_layers |= layers | |
for name in args.optional_layers: | |
if name in all_layers.by_name: | |
layers, missing_layers = all_layers.get_by_name_recursive(name) | |
if missing_layers: | |
sys.stderr.write("Warning: optional layer `{0}` was found, but its dependencies are missing: `{1}`".format(name, ', '.join(missing_layers))) | |
else: | |
configured_layers |= layers | |
for layer in configured_layers.priority_sorted(): | |
print(layer.path) | |
# bblayers.data.setVar('BBLAYERS', 'foo bar baz') | |
# bblayers.update_variable_lines('BBLAYERS', split='word') | |
def find_builddir(): | |
path = os.getcwd() | |
while path != "/": | |
if os.path.exists(os.path.join(path, 'conf', 'bblayers.conf')): | |
return path | |
path = os.path.dirname(path) | |
class BBLayers(object): | |
def __init__(self, filename, data=None): | |
if data is None: | |
data = bb.data.init() | |
else: | |
data = data.createCopy() | |
bb.parse.init_parser(data) | |
self.data = data | |
self.filename = filename | |
self.parse_file(filename) | |
def parse_file(self, filename): | |
self.data.enableTracking() | |
try: | |
self.data = bb.parse.handle(filename, self.data, include=True) | |
except BaseException as exc: | |
raise LayerError(exc) | |
self.data.disableTracking() | |
with open(filename, 'rU') as fileobj: | |
self.lines = [l.rstrip() for l in fileobj.readlines()] | |
def get_line_range(self, variable): | |
"""Pulls the line range for a variable from the variable history tracking""" | |
history = self.data.varhistory.variable(variable) | |
for entry in history: | |
if entry['file'] == self.filename and entry['op'] in ('set', 'set?'): | |
endline = entry['line'] - 1 | |
break | |
else: | |
return | |
if endline == 0: | |
startline = endline | |
else: | |
for lineno in xrange(endline-1, 0, -1): | |
line = self.lines[lineno] | |
if not line.endswith('\\'): | |
break | |
startline = lineno + 1 | |
return entry, startline, endline | |
def history_entry_string(self, entry, split=None): | |
variable = entry['variable'] | |
func = self.data.getVarFlag(variable, 'func') | |
if func: | |
raise ValueError("Unable to emit function at this time") | |
op = entry['op'] | |
value = self.data.getVar(variable, False) | |
if value is None: | |
value = entry['detail'] | |
if op == 'append': | |
operator = '.=' | |
elif op == 'prepend': | |
operator = '=.' | |
elif op == 'set?': | |
operator = '?=' | |
elif op == 'set': | |
operator = '=' | |
else: | |
raise ValueError("Unsupported operator: {0}".format(op)) | |
if 'flag' in entry: | |
raise ValueError("Flag operations are not supported at this time") | |
if split == 'word': | |
value = "\\\n{0}".format("".join(" " + word + " \\\n" for word in value.split())) | |
return '{0} {1} "{2}"'.format(variable, operator, value) | |
def update_variable_lines(self, variable, split=None): | |
value = self.data.getVar(variable, False) | |
entry, startline, endline = self.get_line_range(variable) | |
value_lines = self.history_entry_string(entry, split).splitlines() | |
self.lines[startline:endline+1] = value_lines | |
# Update variable tracking entries in this file to the adjusted line | |
# numbers to reflect the change we just made | |
new_endline = startline + len(value_lines) - 1 | |
line_delta = new_endline - endline | |
for var, entries in self.data.varhistory.variables.iteritems(): | |
for entry in entries: | |
if entry['file'] == self.filename and entry['line'] >= endline + 1: | |
entry['line'] += line_delta | |
def write_file(self, filename=None): | |
if filename is None: | |
filename = self.filename | |
with open(filename, 'w') as fileobj: | |
fileobj.writelines(line + '\n' for line in self.lines) | |
if __name__ == '__main__': | |
signal.signal(signal.SIGTERM, sigterm_exception) | |
try: | |
sys.exit(configure_layers(sys.argv[1:]) or 0) | |
except KeyboardInterrupt: | |
signal.signal(signal.SIGINT, signal.SIG_DFL) | |
os.kill(os.getpid(), signal.SIGINT) | |
except Terminate: | |
signal.signal(signal.SIGTERM, signal.SIG_DFL) | |
os.kill(os.getpid(), signal.SIGTERM) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment