Skip to content

Instantly share code, notes, and snippets.

@kergoth
Last active August 29, 2015 13:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kergoth/9748594 to your computer and use it in GitHub Desktop.
Save kergoth/9748594 to your computer and use it in GitHub Desktop.
Work-in-progress rework of Mentor's bb-determine-layers as a standalone bblayers configuration tool for yocto
#!/usr/bin/env python
import argparse
import collections
import contextlib
import glob
import os
import signal
import sys
glob_default = './*/:./*/*/:{scriptdir}/../../*/:{scriptdir}/../../*/*/'
layers_default = 'core'
override_default = 'openembedded-layer=1'
class Terminate(BaseException):
"""Exception raised if we receive a SIGTERM"""
pass
def sigterm_exception(signum, stackframe):
"""Raise Terminate exception if we receive a SIGTERM"""
raise Terminate()
def setup_command_import(command, relpath='../lib'):
"""Set up sys.path based on the location of a binary in the PATH """
PATH = os.environ['PATH'].split(':')
cmd_paths = [os.path.join(path, relpath)
for path in PATH if os.path.exists(os.path.join(path, command))]
if not cmd_paths:
raise ImportError("Unable to locate bb, please ensure PATH is set correctly.")
sys.path[0:0] = cmd_paths
def find(dir, dirfilter=None, **walkoptions):
""" Given a directory, recurse into that directory,
returning all files as absolute paths. """
for root, dirs, files in os.walk(dir, **walkoptions):
if dirfilter is not None:
for d in dirs:
if not dirfilter(d):
dirs.remove(d)
for file in files:
yield os.path.join(root, file)
@contextlib.contextmanager
def status(message, outfile=sys.stderr):
"""Show the user what we're doing, and whether we succeed"""
outfile.write('{0}..'.format(message))
outfile.flush()
try:
yield
except KeyboardInterrupt:
outfile.write('.interrupted\n')
raise
except Terminate:
outfile.write('.terminated\n')
raise
except BaseException:
outfile.write('.failed\n')
raise
outfile.write('.done\n')
class DictArgument(object):
def __init__(self, listsep=None, valuesep='=', keytype=None, valuetype=None):
self.listsep = listsep
self.valuesep = valuesep
self.keytype = keytype
self.valuetype = valuetype
def __call__(self, string):
d = {}
for o in string.split(self.listsep):
try:
k, v = o.rsplit(self.valuesep, 1)
except ValueError:
raise argparse.ArgumentTypeError('invalid entry `{0}`'.format(o))
if self.keytype is not None:
try:
k = self.keytype(k)
except ValueError:
raise argparse.ArgumentTypeError('invalid key `{0}` for type `{1}`'.format(k, self.keytype.__name__))
if self.valuetype is not None:
try:
v = self.valuetype(v)
except ValueError:
raise argparse.ArgumentTypeError('invalid value `{0}` for type `{1}`'.format(v, self.valuetype.__name__))
d[k] = v
return d
def __repr__(self):
return 'DictArgument(listsep={0}, valuesep={1}, keytype={2}, valuetype={3})'.format(repr(self.listsep), repr(self.valuesep), repr(self.keytype), repr(self.valuetype))
def process_arguments(cmdline_args):
parser = argparse.ArgumentParser(description='Configure bitbake layers', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-s', '--sort', action='store_true', help='sort layers by layer priority')
parser.add_argument('-a', '--add-layers', action='append', dest='add', help='layers to add to the configuration')
parser.add_argument('-r', '--remove-layers', action='append', dest='remove', help='layers to remove from the configuration')
parser.add_argument('-o', '--add-optional-layers', action='append', dest='optional', help='layers to add to the configuration if available, but silently ignore if not')
parser.add_argument('-l', '--base-layers', dest='base', help='specify the baseline list of layers to work from. defaults to the current bblayers if available')
parser.add_argument('-m', '--add-machine-layer', dest='machine', help='add the highest priority layer which provides a machine .conf for the specified machine')
parser.add_argument('-O', '--override-layer-priorities', default=override_default,
type=DictArgument(valuetype=int),
help='override the priority of one or more layers. space separated list of layername=priority values. default: `{0}`'.format(override_default))
parser.add_argument('-g', '--globs', default=glob_default,
help='wildcard patterns to locate layers. colon separated. default: `{0}`'.format(glob_default))
parser.add_argument('-p', '--paths',
help='paths to search recursively to find layers. colon separated. equivalent to the "find" command', default='')
scriptdir = os.path.dirname(__file__)
args = parser.parse_args(cmdline_args)
if args.base is not None:
args.base = args.base.split()
new_globs = []
for pattern in args.globs.split(':'):
pattern = os.path.abspath(pattern.format(scriptdir=scriptdir))
new_globs.append(pattern)
args.globs = new_globs
new_paths = []
if args.paths:
for path in args.paths.split(':'):
path = os.path.abspath(path.format(scriptdir=scriptdir))
new_paths.append(path)
args.paths = new_paths
return args
class LayerError(Exception):
pass
class DuplicateLayer(LayerError):
def __init__(self, layer, other_layer):
self.layer = layer
self.other_layer = other_layer
super(DuplicateError, self).__init__()
def __str__(self):
return 'Duplicate layers found for `{0}`:\n {1}\n {2}'.format(
self.layer.name, self.layer.path, self.other_layer.path)
class Layer(object):
def __init__(self, path, confpath, name, priority, pattern, depends):
self.path = os.path.realpath(path)
self.confpath = os.path.realpath(confpath)
self.name = name
self.priority = priority
self.depends = depends
self.missingdeps = set()
self.pattern = pattern
def __repr__(self):
return '{0.__class__.__name__}({0.path!r}, {0.confpath!r},' \
'{0.name!r}, {0.priority!r}, {0.pattern!r}, {0.depends!r})'.format(self)
def __hash__(self):
return hash(repr(self))
@staticmethod
def from_layerpath(layer_path, data=None):
layers = []
if data is None:
data = bb.data.init()
bb.parse.init_parser(data)
lconf = os.path.join(layer_path, 'conf', 'layer.conf')
ldata = data.createCopy()
ldata.setVar('LAYERDIR', layer_path)
try:
ldata = bb.parse.handle(lconf, ldata, include=True)
except BaseException as exc:
raise LayerError(exc)
ldata.expandVarref('LAYERDIR')
collections = (ldata.getVar('BBFILE_COLLECTIONS', True) or '').split()
if not collections:
name = os.path.basename(layer_path)
l = Layer(layer_path, lconf, name, 0, None, set())
layers.append(l)
for name in collections:
pattern = ldata.getVar('BBFILE_PATTERN_%s' % name, True)
priority = ldata.getVar('BBFILE_PRIORITY_%s' % name, True) or 0
depends = ldata.getVar('LAYERDEPENDS_%s' % name, True) or ''
l = Layer(layer_path, lconf, name, int(priority), pattern, set(depends.split()))
layers.append(l)
if layers:
return layers
class Layers(collections.MutableSet):
def __init__(self, iterable=None):
"""A collection of layers. Maintains lookup mappings by path and name"""
self.layers = set()
self.by_path = {}
self.by_name = {}
if iterable is not None:
self |= iterable
def __contains__(self, layer):
return layer in self.layers
def __iter__(self):
return iter(self.layers)
def __len__(self):
return len(self.layers)
def __repr__(self):
return '{0.__class__.__name__}({0.layers!r})'.format(self)
def __hash__(self):
return hash(repr(self))
def add(self, layer):
if layer in self:
return
if layer.name in self.by_name:
raise DuplicateLayer(layer, self.by_name[layer.name])
if layer.path in self.by_path:
return
self.layers.add(layer)
self.by_path[layer.path] = layer
self.by_name[layer.name] = layer
def add_from_path(self, layerpath, data=None):
layerpath = os.path.realpath(layerpath)
if layerpath in self.by_path:
return
self |= Layer.from_layerpath(layerpath, data)
def discard(self, layer):
self.layers.discard(layer)
del self.by_path[layer.path]
del self.by_name[layer.name]
def clear(self):
self.layers.clear()
self.by_path.clear()
self.by_name.clear()
def priority_sorted(self):
return sorted(self.layers, key=lambda l: (l.priority, l.name), reverse=True)
def which(self, path, all=True):
layers = []
for layer in self.priority_sorted():
if os.path.exists(os.path.join(layer.path, path)):
layers.append(layer)
return layers
def get_by_name_recursive(self, name):
"""Get a layer by name and its dependencies, recursively.
Returns a Layers() collection, and a list of missing dependencies."""
missing = set()
found_layers = Layers()
try:
layer = self.by_name[name]
except KeyError:
missing.add(name)
else:
found_layers.add(layer)
for dep in layer.depends:
try:
dep_found, dep_missing = self.get_by_name_recursive(dep)
except KeyError:
missing.add(dep)
else:
found_layers |= dep_found
missing |= dep_missing
return found_layers, missing
def get_layerpaths_bitbake(globs=None, paths=None):
bitbake_path = None
layer_paths = set()
for pattern in globs:
paths = glob.glob(pattern)
for path in paths:
layerconf_path = os.path.join(path, 'conf', 'layer.conf')
if os.path.exists(layerconf_path):
layer_paths.add(os.path.realpath(path))
elif os.path.exists(os.path.join(path, 'bin', 'bitbake')):
bitbake_path = os.path.join(path, 'bin')
for path in paths:
for subpath in find(path, dirfilter=lambda d: d != 'lib'):
if subpath.endswith('/conf/layer.conf'):
layer_paths.add(os.path.realpath(os.path.dirname(os.path.dirname(subpath))))
elif subpath.endswith('/bin/bitbake'):
bitbake_path = os.path.dirname(subpath)
return layer_paths, bitbake_path
def configure_layers(cmdline_opts):
curdir = os.getcwd()
builddir = os.environ.get('BUILDDIR') or find_builddir()
if not builddir:
sys.exit("Unable to locate BUILDDIR, aborting")
os.chdir(builddir)
setup_command_import('bitbake')
try:
import bb
except (ImportError, RuntimeError) as exc:
sys.exit("Unable to import 'bb' python package: %s" % exc)
import bb.parse
import bb.data
args = process_arguments(cmdline_opts)
bblayers = BBLayers(os.path.join(builddir, 'conf', 'bblayers.conf'))
if not args.base:
args.base = bblayers.data.getVar('BBLAYERS', True).split()
args.base.extend(args.add_layers)
data = bb.data.init()
bb.parse.init_parser(data)
duplicates = collections.defaultdict(set)
all_layers = Layers()
with status('Parsing layer configuration files'):
for layer_path in layer_paths:
try:
all_layers.add_from_path(layer_path, data)
except DuplicateLayer as exc:
duplicates[exc.layer.name].add(exc.layer)
duplicates[exc.layer.name].add(exc.other_layer)
except LayerError as exc:
sys.exit(str(exc))
for layer in list(all_layers):
if layer.name in args.excluded_layers:
all_layers.discard(layer)
else:
priority_override = args.override_layer_priorities.get(layer.name)
if priority_override is not None:
layer.priority = priority_override
configured_layer_names = set(args.layers)
if args.machine:
with status("Determining layers to include for MACHINE '{0}'".format(args.machine)):
machine_layers = all_layers.which('conf/machine/{0}.conf'.format(args.machine))
if not machine_layers:
sys.exit("No BSP layer found for machine `{0}`".format(args.machine))
configured_layer_names |= set(l.name for l in machine_layers)
configured_layers = Layers()
for name in configured_layer_names:
if name not in all_layers.by_name:
sys.exit("Layer '{0}' not found".format(name))
layers, missing_layers = all_layers.get_by_name_recursive(name)
if missing_layers:
sys.exit("Dependent layers `{0}` not found for requested layer `{1}`".format(', '.join(missing_layers), name))
configured_layers |= layers
for name in args.optional_layers:
if name in all_layers.by_name:
layers, missing_layers = all_layers.get_by_name_recursive(name)
if missing_layers:
sys.stderr.write("Warning: optional layer `{0}` was found, but its dependencies are missing: `{1}`".format(name, ', '.join(missing_layers)))
else:
configured_layers |= layers
for layer in configured_layers.priority_sorted():
print(layer.path)
# bblayers.data.setVar('BBLAYERS', 'foo bar baz')
# bblayers.update_variable_lines('BBLAYERS', split='word')
def find_builddir():
path = os.getcwd()
while path != "/":
if os.path.exists(os.path.join(path, 'conf', 'bblayers.conf')):
return path
path = os.path.dirname(path)
class BBLayers(object):
def __init__(self, filename, data=None):
if data is None:
data = bb.data.init()
else:
data = data.createCopy()
bb.parse.init_parser(data)
self.data = data
self.filename = filename
self.parse_file(filename)
def parse_file(self, filename):
self.data.enableTracking()
try:
self.data = bb.parse.handle(filename, self.data, include=True)
except BaseException as exc:
raise LayerError(exc)
self.data.disableTracking()
with open(filename, 'rU') as fileobj:
self.lines = [l.rstrip() for l in fileobj.readlines()]
def get_line_range(self, variable):
"""Pulls the line range for a variable from the variable history tracking"""
history = self.data.varhistory.variable(variable)
for entry in history:
if entry['file'] == self.filename and entry['op'] in ('set', 'set?'):
endline = entry['line'] - 1
break
else:
return
if endline == 0:
startline = endline
else:
for lineno in xrange(endline-1, 0, -1):
line = self.lines[lineno]
if not line.endswith('\\'):
break
startline = lineno + 1
return entry, startline, endline
def history_entry_string(self, entry, split=None):
variable = entry['variable']
func = self.data.getVarFlag(variable, 'func')
if func:
raise ValueError("Unable to emit function at this time")
op = entry['op']
value = self.data.getVar(variable, False)
if value is None:
value = entry['detail']
if op == 'append':
operator = '.='
elif op == 'prepend':
operator = '=.'
elif op == 'set?':
operator = '?='
elif op == 'set':
operator = '='
else:
raise ValueError("Unsupported operator: {0}".format(op))
if 'flag' in entry:
raise ValueError("Flag operations are not supported at this time")
if split == 'word':
value = "\\\n{0}".format("".join(" " + word + " \\\n" for word in value.split()))
return '{0} {1} "{2}"'.format(variable, operator, value)
def update_variable_lines(self, variable, split=None):
value = self.data.getVar(variable, False)
entry, startline, endline = self.get_line_range(variable)
value_lines = self.history_entry_string(entry, split).splitlines()
self.lines[startline:endline+1] = value_lines
# Update variable tracking entries in this file to the adjusted line
# numbers to reflect the change we just made
new_endline = startline + len(value_lines) - 1
line_delta = new_endline - endline
for var, entries in self.data.varhistory.variables.iteritems():
for entry in entries:
if entry['file'] == self.filename and entry['line'] >= endline + 1:
entry['line'] += line_delta
def write_file(self, filename=None):
if filename is None:
filename = self.filename
with open(filename, 'w') as fileobj:
fileobj.writelines(line + '\n' for line in self.lines)
if __name__ == '__main__':
signal.signal(signal.SIGTERM, sigterm_exception)
try:
sys.exit(configure_layers(sys.argv[1:]) or 0)
except KeyboardInterrupt:
signal.signal(signal.SIGINT, signal.SIG_DFL)
os.kill(os.getpid(), signal.SIGINT)
except Terminate:
signal.signal(signal.SIGTERM, signal.SIG_DFL)
os.kill(os.getpid(), signal.SIGTERM)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment