Skip to content

Instantly share code, notes, and snippets.

@wolfv
Last active March 23, 2022 18:38
Show Gist options
  • Save wolfv/3222408bd46ab3114e5b384f20252c10 to your computer and use it in GitHub Desktop.
Save wolfv/3222408bd46ab3114e5b384f20252c10 to your computer and use it in GitHub Desktop.
conda OCI mirror things
from conda_package_handling import api as cph_api
from tempfile import TemporaryDirectory
import pathlib
import os
import subprocess
import shutil
import json
import requests
import tarfile
from pprint import pprint
info_archive_media_type = "application/vnd.conda.info.v1.tar+gzip"
info_index_media_type = "application/vnd.conda.info.index.v1+json"
package_tarbz2_media_type = "application/vnd.conda.package.v1"
package_conda_media_type = "application/vnd.conda.package.v2"
CACHE_DIR = pathlib.Path(os.path.dirname(os.path.abspath(__file__))) / "cache"
class Layer:
def __init__(self, file, media_type):
self.file = file
self.media_type = media_type
class ORAS:
def __init__(self, base_dir="."):
self.exec = 'oras'
self.base_dir = pathlib.Path(base_dir)
def run(self, args):
return subprocess.run([self.exec] + args, cwd=self.base_dir)
def pull(self, location, subdir, package_name, media_type):
name, version, build = package_name.rsplit('-', 2)
location = f'{location}/{subdir}/{name}:{version}-{build}'
args = ['pull', location, '--media-type', media_type]
self.run(args)
def push(self, target, tag, layers, config=None):
layer_opts = [f'{str(l.file)}:{l.media_type}' for l in layers]
dest = f'{target}:{tag}'
args = ["push", dest] + layer_opts
return self.run(args)
class SubdirAccessor:
def __init__(self, location, subdir, base_dir='.'):
self.loc = location
self.subdir = subdir
self.oras = ORAS(base_dir=base_dir)
def get_index_json(self, package_name):
self.oras.pull(self.loc, self.subdir, package_name, info_index_media_type)
with open(pathlib.Path(package_name) / 'info' / 'index.json') as fi:
return json.load(fi)
def get_info(self, package_name):
self.oras.pull(self.loc, self.subdir, package_name, info_archive_media_type)
return tarfile.open(pathlib.Path(package_name) / 'info.tar.gz', 'r:gz')
def get_package(self, package_name):
self.oras.pull(self.loc, self.subdir, package_name, package_tarbz2_media_type)
return package_name + '.tar.bz2'
def compress_folder(source_dir, output_filename):
return subprocess.check_output(f'tar -cvzf {output_filename} *', cwd=source_dir, shell=True)
# def extract(fn, dest_dir=None, components=None, prefix=None):
def get_package_name(path_to_archive):
fn = pathlib.Path(path_to_archive).name
if fn.endswith('.tar.bz2'):
return fn[:-8]
elif fn.endswith('.conda'):
return fn[:-6]
else:
raise RuntimeError("Cannot decipher package type")
def prepare_metadata(path_to_archive, upload_files_directory):
package_name = get_package_name(path_to_archive)
dest_dir = pathlib.Path(upload_files_directory) / package_name
print(dest_dir)
dest_dir.mkdir(parents=True)
with TemporaryDirectory() as temp_dir:
cph_api.extract(str(path_to_archive), temp_dir, components=['info'])
index_json = os.path.join(temp_dir, "info", "index.json")
info_archive = os.path.join(temp_dir, 'info.tar.gz')
compress_folder(os.path.join(temp_dir, 'info'), os.path.join(temp_dir, 'info.tar.gz'))
(dest_dir / 'info').mkdir(parents=True)
shutil.copy(info_archive, dest_dir / 'info.tar.gz')
shutil.copy(index_json, dest_dir / 'info' / 'index.json')
for x in pathlib.Path(dest_dir).iterdir():
print(x)
def upload_conda_package(path_to_archive, host):
path_to_archive = pathlib.Path(path_to_archive)
package_name = get_package_name(path_to_archive)
with TemporaryDirectory() as upload_files_directory:
shutil.copy(path_to_archive, upload_files_directory)
prepare_metadata(path_to_archive, upload_files_directory)
if path_to_archive.name.endswith('tar.bz2'):
layers = [Layer(path_to_archive.name, package_tarbz2_media_type)]
else:
layers = [Layer(path_to_archive.name, package_conda_media_type)]
metadata = [Layer(f"{package_name}/info.tar.gz", info_archive_media_type),
Layer(f"{package_name}/info/index.json", info_index_media_type)]
for x in pathlib.Path(upload_files_directory).rglob("*"):
print(x)
oras = ORAS(base_dir=upload_files_directory)
name = package_name.rsplit('-', 2)[0]
version_and_build = '-'.join(package_name.rsplit('-', 2)[1:])
with open(pathlib.Path(upload_files_directory) / package_name / 'info' / 'index.json', 'r') as fi:
j = json.load(fi)
subdir = j["subdir"]
oras.push(f'{host}/{subdir}/{name}', version_and_build, layers + metadata)
def get_repodata(channel, subdir):
repodata = CACHE_DIR / channel / subdir / "repodata.json"
if repodata.exists():
return repodata
repodata.parent.mkdir(parents=True, exist_ok=True)
r = requests.get(f"https://conda.anaconda.org/{channel}/{subdir}/repodata.json", allow_redirects=True)
with open(repodata, 'w') as fo:
fo.write(r.text)
return repodata
gh_session = requests.Session()
gh_session.auth = ('wolfv', os.environ.get('GHA_PAT'))
def get_github_packages(location, filter_function=None):
org = location.split('/', 1)
# api_url = f'https://api.github.com/orgs/{org}/packages'
headers = {'accept': 'application/vnd.github.v3+json'}
api_url = f'https://api.github.com/users/wolfv/packages'
api_url += '?package_type=container&visibility=public'
r = gh_session.get(api_url, headers=headers)
packages = []
if not filter_function:
return r.json()
for pkg in r.json():
if filter_function(pkg):
packages.append(pkg)
return packages
def oci_auth(location, package, scope='pull'):
url = f"{location}/token?scope=repository:{package}:{scope}"
r = requests.get(url)
j = r.json()
oci_session = requests.Session()
oci_session.headers = {'Authorization': f'Bearer {j["token"]}'}
return oci_session
def get_package_tags(location, package):
full_url = f'{location}/{package}'
url = f'{location}/v2/{package}/tags/list'
sess = oci_auth(location, package)
res = sess.get(url)
return res.json()['tags']
if __name__ == '__main__':
channel = 'conda-forge'
subdir = 'osx-arm64'
repodata_fn = get_repodata(channel, subdir)
xtensor = get_github_packages('ghcr.io/wolfv', filter_function=lambda x: x['name'].startswith('osx-arm64/xtensor'))
tags = get_package_tags('https://ghcr.io', 'wolfv/' + xtensor[0]['name'])
print(tags)
# pprint(xtensor)
# exit(0)
# with open(repodata_fn) as fi:
# j = json.load(fi)
# for key, package in j["packages"].items():
# if package["name"] == 'xtensor':
# print("Loading ", key)
# r = requests.get(f"https://conda.anaconda.org/{channel}/{subdir}/{key}", allow_redirects=True)
# with open(key, 'wb') as fo:
# fo.write(r.content)
# upload_conda_package(key, 'ghcr.io/wolfv')
# subdir = SubdirAccessor('ghcr.io/wolfv', 'osx-arm64')
# index = subdir.get_index_json('xtensor-0.21.10-h260d524_0')
# print(index)
# with subdir.get_info('xtensor-0.21.10-h260d524_0') as fi:
# paths = json.load(fi.extractfile('paths.json'))
# print(paths)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment