Last active
November 14, 2019 23:35
-
-
Save jgsogo/a39acad525fd3e7e5315b2fa0bc70b6f to your computer and use it in GitHub Desktop.
Parse Conan lockfile and create buildInfo for Artifactory
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf-8 | |
# Build definition: https://github.com/jfrog/build-info | |
import argparse | |
import datetime | |
import json | |
import os | |
import re | |
import sys | |
import textwrap | |
from collections import OrderedDict, defaultdict, namedtuple | |
from functools import partial | |
try: | |
from rtpy import Rtpy | |
except ImportError: | |
sys.stderr.write("Install 'rtpy' to use this script: 'pip install rtpy'") | |
sys.exit(1) | |
pref_pattern = re.compile(r"(?P<name>[^\/@#:]+)\/" | |
r"(?P<version>[^\/@#:]+)" | |
r"@" | |
r"(?P<user>[^\/@#:]+)\/" | |
r"(?P<channel>[^\/@#:]+)" | |
r"#(?P<rrev>[^\/@#:]+)" | |
r":(?P<pid>[^\/@#:]+)" | |
r"#(?P<prev>[^\/@#:]+)") | |
class Artifact(namedtuple('Artifact', ["sha1", "md5", "name", "id"])): | |
def __hash__(self): | |
return hash(self.sha1) | |
def parse_pref(pref): | |
return pref_pattern.match(pref).groupdict() | |
def _get_reference(pref): | |
r = parse_pref(pref) | |
return "{name}/{version}@{user}/{channel}".format(**r) | |
def _get_package_reference(pref): | |
r = parse_pref(pref) | |
return "{reference}:{pid}".format(reference=_get_reference(pref), **r) | |
def _parse_profile(contents): | |
import configparser | |
config = configparser.ConfigParser() | |
config.read_string(contents) | |
for section, values in config._sections.items(): | |
for key, value in values.items(): | |
yield "{}.{}".format(section, key), value | |
def _parse_options(contents): | |
for line in contents.splitlines(): | |
key, value = line.split("=") | |
yield "options.{}".format(key), value | |
def _get_artifacts(path, remotes, use_id=False, name_format="{name}"): | |
art_query = 'items.find({{"path": "{path}"}}).include("repo", "name", "path", "actual_md5", "actual_sha1")' | |
url = art_query.format(path=path) | |
ret = {} | |
for remote in remotes: | |
r = remote.searches.artifactory_query_language(url) | |
if r: | |
for result in r["results"]: | |
if result["name"] in [".timestamp"]: | |
continue | |
name_or_id = name_format.format(**result) | |
ret[result["actual_sha1"]] = {"md5": result["actual_md5"], | |
"name": name_or_id if not use_id else None, | |
"id": name_or_id if use_id else None} | |
return set([Artifact(k, **v) for k, v in ret.items()]) | |
def _get_recipe_artifacts(pref, remotes, add_prefix, use_id): | |
r = parse_pref(pref) | |
url = "{user}/{name}/{version}/{channel}/{rrev}/export".format(**r) | |
name_format = "{} :: {{name}}".format(_get_reference(pref)) if add_prefix else "{name}" | |
return _get_artifacts(path=url, remotes=remotes, use_id=use_id, name_format=name_format) | |
def _get_package_artifacts(pref, remotes, add_prefix, use_id): | |
r = parse_pref(pref) | |
url = "{user}/{name}/{version}/{channel}/{rrev}/package/{pid}/{prev}".format(**r) | |
name_format = "{} :: {{name}}".format(_get_package_reference(pref)) if add_prefix else "{name}" | |
arts = _get_artifacts(path=url, remotes=remotes, use_id=use_id, name_format=name_format) | |
return arts | |
def get_remotes(remotes): | |
for it in remotes: | |
items = it.split(",") | |
if len(items) == 2: | |
yield Rtpy({"af_url": items[0], "api_key": items[1]}) | |
elif len(items) == 3: | |
yield Rtpy({"af_url": items[0], "username": items[1], "password": items[2]}) | |
def process_lockfile(lockfiles, remotes, multi_module): | |
modules = defaultdict(lambda: {"id": None, "properties": {}, | |
"artifacts": set(), "dependencies": set()}) | |
def _gather_deps(node_uid, contents, func): | |
node_content = contents["graph_lock"]["nodes"].get(node_uid) | |
artifacts = func(node_content["pref"], remotes=remotes, add_prefix=True, use_id=True) | |
for _, id_node in node_content.get("requires", {}).items(): | |
artifacts.update(_gather_deps(id_node, contents, func)) | |
return artifacts | |
for lockfile in lockfiles: | |
with open(lockfile) as json_data: | |
data = json.load(json_data) | |
profile = dict(_parse_profile(data["profile_host"])) | |
# Gather modules, their artifacts and recursively all required artifacts | |
for _, node in data["graph_lock"]["nodes"].items(): | |
pref = node["pref"] | |
if node.get("modified"): # Work only on generated nodes | |
# Create module for the recipe reference | |
recipe_key = _get_reference(pref) | |
modules[recipe_key]["id"] = recipe_key | |
modules[recipe_key]["artifacts"].update(_get_recipe_artifacts(pref, remotes, add_prefix=not multi_module, use_id=False)) | |
# TODO: what about `python_requires`? | |
# TODO: can we associate any properties to the recipe? Profile/options may be different per lockfile | |
# Create module for the package_id | |
package_key = _get_package_reference(pref) if multi_module else recipe_key | |
modules[package_key]["id"] = package_key | |
modules[package_key]["artifacts"].update(_get_package_artifacts(pref, remotes, add_prefix=not multi_module, use_id=False)) | |
if multi_module: # Only for multi_module, see TODO above | |
modules[package_key]["properties"].update(profile) | |
modules[package_key]["properties"].update(_parse_options(node.get("options"))) | |
# Recurse requires | |
for _, node_id in node["requires"].items(): | |
modules[recipe_key]["dependencies"].update(_gather_deps(node_id, data, _get_recipe_artifacts)) | |
modules[package_key]["dependencies"].update(_gather_deps(node_id, data, _get_package_artifacts)) | |
# TODO: Is the recipe a 'dependency' of the package | |
return modules | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description='Create build info from lockfile.') | |
parser.add_argument('lockfile', nargs='+', help='Lockfiles to parse') | |
parser.add_argument('--remotes', nargs='+', help="List of remotes to use, provide each remote" | |
"in a different argument using any of these" | |
"patterns: 'url,username,password' or" | |
"'url,api_key' (no additional blanks)") | |
parser.add_argument('--output-file', default='buildinfo.json', | |
help="Path to generated build info file") | |
parser.add_argument('--build-name', help="Build name") | |
parser.add_argument('--build-number', help="Build number") | |
parser.add_argument('--multi-module', action='store_true', default=False, | |
help="Create several modules: one for the recipe and one per package") | |
parser.add_argument('--skip-env', action='store_true', default=False, | |
help="Do not capture environment variables (excluded some keys)") | |
args = parser.parse_args() | |
remotes = list(get_remotes(args.remotes)) | |
modules = process_lockfile(args.lockfile, remotes, args.multi_module) | |
# Add extra information | |
ret = {"version": "1.0.1", | |
"name": args.build_name, | |
"number": args.build_number, | |
"type": "GENERIC", | |
"started": datetime.datetime.utcnow().isoformat().split(".")[0] + ".000Z", | |
"buildAgent": {"name": "Conan Client", "version": "1.X"}, | |
"modules": list(modules.values())} | |
if not args.skip_env: | |
excluded = ["secret", "key", "password"] | |
environment = {"buildInfo.env.{}".format(k): v for k, v in os.environ.items() if k not in excluded} | |
ret["properties"] = environment | |
def dump_custom_types(obj): | |
if isinstance(obj, set): | |
artifacts = [{k: v for k, v in o._asdict().items() if v is not None} for o in obj] | |
return sorted(artifacts, key=lambda u: u.get("name") or u.get("id")) | |
raise TypeError | |
with open(args.output_file, "w") as f: | |
f.write(json.dumps(ret, indent=4, default=dump_custom_types)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf-8 | |
import sys | |
from collections import OrderedDict, defaultdict, namedtuple | |
import argparse | |
import json | |
import os | |
import re | |
import textwrap | |
try: | |
from rtpy import Rtpy | |
except ImportError: | |
sys.stderr.write("Install 'rtpy' to use this script: 'pip install rtpy'") | |
sys.exit(1) | |
def find_module(build_info, module_id): | |
for it in build_info["modules"]: | |
if it["id"] == module_id: | |
return it | |
new_module = {"id": module_id, "properties": {}, "artifacts": [], "dependencies": []} | |
build_info["modules"].append(new_module) | |
return new_module | |
def merge_properties(lhs, rhs): | |
ret = lhs["properties"] | |
for it, value in rhs["properties"].items(): | |
if it in lhs["properties"]: | |
assert value == ret[it], "{} != {}".format(value, ret[it]) | |
else: | |
ret[it] = value | |
return ret | |
def merge_artifacts(lhs, rhs, key, cmp_key): | |
ret = {it[cmp_key]: it for it in lhs[key]} | |
for art in rhs[key]: | |
art_cmp_key = art[cmp_key] | |
if art_cmp_key in ret: | |
assert art[cmp_key] == ret[art_cmp_key][cmp_key], \ | |
"({}) {} != {} for sha1={}".format(cmp_key, art[cmp_key], ret[art_cmp_key][cmp_key], art_cmp_key) | |
else: | |
ret[art_cmp_key] = art | |
return [value for _, value in ret.items()] | |
def merge_buildinfo(lhs, rhs): | |
if not lhs or not rhs: | |
return lhs or rhs | |
# Check they are compatible | |
assert lhs["version"] == rhs["version"] | |
assert lhs["name"] == rhs["name"] | |
assert lhs["number"] == rhs["number"] | |
# merge_properties(lhs, rht) # TODO: Environment coming from different machines | |
for rhs_module in rhs["modules"]: | |
lhs_module = find_module(lhs, rhs_module["id"]) | |
lhs_module["properties"] = merge_properties(lhs_module, rhs_module) | |
lhs_module["artifacts"] = merge_artifacts(lhs_module, rhs_module, key="artifacts", cmp_key="name") | |
lhs_module["dependencies"] = merge_artifacts(lhs_module, rhs_module, key="dependencies", cmp_key="id") | |
return lhs | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description='Create build info from lockfile.') | |
parser.add_argument('buildinfo', nargs='+', help='BuildInfo files to parse') | |
parser.add_argument('--output-file', default='buildinfo.json', | |
help="Path to generated build info file") | |
args = parser.parse_args() | |
build_info = {} | |
for it in args.buildinfo: | |
with open(it) as json_data: | |
data = json.load(json_data) | |
build_info = merge_buildinfo(build_info, data) | |
with open(args.output_file, "w") as f: | |
f.write(json.dumps(build_info, indent=4)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf-8 | |
import sys | |
from collections import OrderedDict, defaultdict, namedtuple | |
import argparse | |
import json | |
import os | |
import re | |
import textwrap | |
try: | |
from rtpy import Rtpy | |
except ImportError: | |
sys.stderr.write("Install 'rtpy' to use this script: 'pip install rtpy'") | |
sys.exit(1) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description='Create build info from lockfile.') | |
parser.add_argument('buildinfo', default="buildinfo.json", help='Build info JSON file') | |
parser.add_argument('--remote', help="Remote to publish build info to. Use" | |
" 'url,username,password' or 'url,api_key'" | |
" (no additional blanks)") | |
args = parser.parse_args() | |
# Remote | |
items = args.remote.split(",") | |
if len(items) == 2: | |
rtpy = Rtpy({"af_url": items[0], "api_key": items[1]}) | |
elif len(items) == 3: | |
rtpy = Rtpy({"af_url": items[0], "username": items[1], "password": items[2]}) | |
else: | |
sys.stderr.write("Cannot parse provided remote") | |
with open(args.buildinfo) as json_data: | |
rtpy.builds._request("PUT", "build", "Publish build info", kwargs={}, | |
params={"Content-Type": "application/json"}, data=json_data) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment