Skip to content

Instantly share code, notes, and snippets.

@jgsogo
Last active November 14, 2019 23:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jgsogo/a39acad525fd3e7e5315b2fa0bc70b6f to your computer and use it in GitHub Desktop.
Save jgsogo/a39acad525fd3e7e5315b2fa0bc70b6f to your computer and use it in GitHub Desktop.
Parse Conan lockfile and create buildInfo for Artifactory
# coding=utf-8
# Build definition: https://github.com/jfrog/build-info
import argparse
import datetime
import json
import os
import re
import sys
import textwrap
from collections import OrderedDict, defaultdict, namedtuple
from functools import partial
try:
from rtpy import Rtpy
except ImportError:
sys.stderr.write("Install 'rtpy' to use this script: 'pip install rtpy'")
sys.exit(1)
pref_pattern = re.compile(r"(?P<name>[^\/@#:]+)\/"
r"(?P<version>[^\/@#:]+)"
r"@"
r"(?P<user>[^\/@#:]+)\/"
r"(?P<channel>[^\/@#:]+)"
r"#(?P<rrev>[^\/@#:]+)"
r":(?P<pid>[^\/@#:]+)"
r"#(?P<prev>[^\/@#:]+)")
class Artifact(namedtuple('Artifact', ["sha1", "md5", "name", "id"])):
def __hash__(self):
return hash(self.sha1)
def parse_pref(pref):
return pref_pattern.match(pref).groupdict()
def _get_reference(pref):
r = parse_pref(pref)
return "{name}/{version}@{user}/{channel}".format(**r)
def _get_package_reference(pref):
r = parse_pref(pref)
return "{reference}:{pid}".format(reference=_get_reference(pref), **r)
def _parse_profile(contents):
import configparser
config = configparser.ConfigParser()
config.read_string(contents)
for section, values in config._sections.items():
for key, value in values.items():
yield "{}.{}".format(section, key), value
def _parse_options(contents):
for line in contents.splitlines():
key, value = line.split("=")
yield "options.{}".format(key), value
def _get_artifacts(path, remotes, use_id=False, name_format="{name}"):
art_query = 'items.find({{"path": "{path}"}}).include("repo", "name", "path", "actual_md5", "actual_sha1")'
url = art_query.format(path=path)
ret = {}
for remote in remotes:
r = remote.searches.artifactory_query_language(url)
if r:
for result in r["results"]:
if result["name"] in [".timestamp"]:
continue
name_or_id = name_format.format(**result)
ret[result["actual_sha1"]] = {"md5": result["actual_md5"],
"name": name_or_id if not use_id else None,
"id": name_or_id if use_id else None}
return set([Artifact(k, **v) for k, v in ret.items()])
def _get_recipe_artifacts(pref, remotes, add_prefix, use_id):
r = parse_pref(pref)
url = "{user}/{name}/{version}/{channel}/{rrev}/export".format(**r)
name_format = "{} :: {{name}}".format(_get_reference(pref)) if add_prefix else "{name}"
return _get_artifacts(path=url, remotes=remotes, use_id=use_id, name_format=name_format)
def _get_package_artifacts(pref, remotes, add_prefix, use_id):
r = parse_pref(pref)
url = "{user}/{name}/{version}/{channel}/{rrev}/package/{pid}/{prev}".format(**r)
name_format = "{} :: {{name}}".format(_get_package_reference(pref)) if add_prefix else "{name}"
arts = _get_artifacts(path=url, remotes=remotes, use_id=use_id, name_format=name_format)
return arts
def get_remotes(remotes):
for it in remotes:
items = it.split(",")
if len(items) == 2:
yield Rtpy({"af_url": items[0], "api_key": items[1]})
elif len(items) == 3:
yield Rtpy({"af_url": items[0], "username": items[1], "password": items[2]})
def process_lockfile(lockfiles, remotes, multi_module):
modules = defaultdict(lambda: {"id": None, "properties": {},
"artifacts": set(), "dependencies": set()})
def _gather_deps(node_uid, contents, func):
node_content = contents["graph_lock"]["nodes"].get(node_uid)
artifacts = func(node_content["pref"], remotes=remotes, add_prefix=True, use_id=True)
for _, id_node in node_content.get("requires", {}).items():
artifacts.update(_gather_deps(id_node, contents, func))
return artifacts
for lockfile in lockfiles:
with open(lockfile) as json_data:
data = json.load(json_data)
profile = dict(_parse_profile(data["profile_host"]))
# Gather modules, their artifacts and recursively all required artifacts
for _, node in data["graph_lock"]["nodes"].items():
pref = node["pref"]
if node.get("modified"): # Work only on generated nodes
# Create module for the recipe reference
recipe_key = _get_reference(pref)
modules[recipe_key]["id"] = recipe_key
modules[recipe_key]["artifacts"].update(_get_recipe_artifacts(pref, remotes, add_prefix=not multi_module, use_id=False))
# TODO: what about `python_requires`?
# TODO: can we associate any properties to the recipe? Profile/options may be different per lockfile
# Create module for the package_id
package_key = _get_package_reference(pref) if multi_module else recipe_key
modules[package_key]["id"] = package_key
modules[package_key]["artifacts"].update(_get_package_artifacts(pref, remotes, add_prefix=not multi_module, use_id=False))
if multi_module: # Only for multi_module, see TODO above
modules[package_key]["properties"].update(profile)
modules[package_key]["properties"].update(_parse_options(node.get("options")))
# Recurse requires
for _, node_id in node["requires"].items():
modules[recipe_key]["dependencies"].update(_gather_deps(node_id, data, _get_recipe_artifacts))
modules[package_key]["dependencies"].update(_gather_deps(node_id, data, _get_package_artifacts))
# TODO: Is the recipe a 'dependency' of the package
return modules
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Create build info from lockfile.')
parser.add_argument('lockfile', nargs='+', help='Lockfiles to parse')
parser.add_argument('--remotes', nargs='+', help="List of remotes to use, provide each remote"
"in a different argument using any of these"
"patterns: 'url,username,password' or"
"'url,api_key' (no additional blanks)")
parser.add_argument('--output-file', default='buildinfo.json',
help="Path to generated build info file")
parser.add_argument('--build-name', help="Build name")
parser.add_argument('--build-number', help="Build number")
parser.add_argument('--multi-module', action='store_true', default=False,
help="Create several modules: one for the recipe and one per package")
parser.add_argument('--skip-env', action='store_true', default=False,
help="Do not capture environment variables (excluded some keys)")
args = parser.parse_args()
remotes = list(get_remotes(args.remotes))
modules = process_lockfile(args.lockfile, remotes, args.multi_module)
# Add extra information
ret = {"version": "1.0.1",
"name": args.build_name,
"number": args.build_number,
"type": "GENERIC",
"started": datetime.datetime.utcnow().isoformat().split(".")[0] + ".000Z",
"buildAgent": {"name": "Conan Client", "version": "1.X"},
"modules": list(modules.values())}
if not args.skip_env:
excluded = ["secret", "key", "password"]
environment = {"buildInfo.env.{}".format(k): v for k, v in os.environ.items() if k not in excluded}
ret["properties"] = environment
def dump_custom_types(obj):
if isinstance(obj, set):
artifacts = [{k: v for k, v in o._asdict().items() if v is not None} for o in obj]
return sorted(artifacts, key=lambda u: u.get("name") or u.get("id"))
raise TypeError
with open(args.output_file, "w") as f:
f.write(json.dumps(ret, indent=4, default=dump_custom_types))
# coding=utf-8
import sys
from collections import OrderedDict, defaultdict, namedtuple
import argparse
import json
import os
import re
import textwrap
try:
from rtpy import Rtpy
except ImportError:
sys.stderr.write("Install 'rtpy' to use this script: 'pip install rtpy'")
sys.exit(1)
def find_module(build_info, module_id):
for it in build_info["modules"]:
if it["id"] == module_id:
return it
new_module = {"id": module_id, "properties": {}, "artifacts": [], "dependencies": []}
build_info["modules"].append(new_module)
return new_module
def merge_properties(lhs, rhs):
ret = lhs["properties"]
for it, value in rhs["properties"].items():
if it in lhs["properties"]:
assert value == ret[it], "{} != {}".format(value, ret[it])
else:
ret[it] = value
return ret
def merge_artifacts(lhs, rhs, key, cmp_key):
ret = {it[cmp_key]: it for it in lhs[key]}
for art in rhs[key]:
art_cmp_key = art[cmp_key]
if art_cmp_key in ret:
assert art[cmp_key] == ret[art_cmp_key][cmp_key], \
"({}) {} != {} for sha1={}".format(cmp_key, art[cmp_key], ret[art_cmp_key][cmp_key], art_cmp_key)
else:
ret[art_cmp_key] = art
return [value for _, value in ret.items()]
def merge_buildinfo(lhs, rhs):
if not lhs or not rhs:
return lhs or rhs
# Check they are compatible
assert lhs["version"] == rhs["version"]
assert lhs["name"] == rhs["name"]
assert lhs["number"] == rhs["number"]
# merge_properties(lhs, rht) # TODO: Environment coming from different machines
for rhs_module in rhs["modules"]:
lhs_module = find_module(lhs, rhs_module["id"])
lhs_module["properties"] = merge_properties(lhs_module, rhs_module)
lhs_module["artifacts"] = merge_artifacts(lhs_module, rhs_module, key="artifacts", cmp_key="name")
lhs_module["dependencies"] = merge_artifacts(lhs_module, rhs_module, key="dependencies", cmp_key="id")
return lhs
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Create build info from lockfile.')
parser.add_argument('buildinfo', nargs='+', help='BuildInfo files to parse')
parser.add_argument('--output-file', default='buildinfo.json',
help="Path to generated build info file")
args = parser.parse_args()
build_info = {}
for it in args.buildinfo:
with open(it) as json_data:
data = json.load(json_data)
build_info = merge_buildinfo(build_info, data)
with open(args.output_file, "w") as f:
f.write(json.dumps(build_info, indent=4))
# coding=utf-8
import sys
from collections import OrderedDict, defaultdict, namedtuple
import argparse
import json
import os
import re
import textwrap
try:
from rtpy import Rtpy
except ImportError:
sys.stderr.write("Install 'rtpy' to use this script: 'pip install rtpy'")
sys.exit(1)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Create build info from lockfile.')
parser.add_argument('buildinfo', default="buildinfo.json", help='Build info JSON file')
parser.add_argument('--remote', help="Remote to publish build info to. Use"
" 'url,username,password' or 'url,api_key'"
" (no additional blanks)")
args = parser.parse_args()
# Remote
items = args.remote.split(",")
if len(items) == 2:
rtpy = Rtpy({"af_url": items[0], "api_key": items[1]})
elif len(items) == 3:
rtpy = Rtpy({"af_url": items[0], "username": items[1], "password": items[2]})
else:
sys.stderr.write("Cannot parse provided remote")
with open(args.buildinfo) as json_data:
rtpy.builds._request("PUT", "build", "Publish build info", kwargs={},
params={"Content-Type": "application/json"}, data=json_data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment