Last active
May 19, 2020 05:31
-
-
Save pearj/ddd8eefd5421b98fb08b8c9e82df397a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import argparse | |
from subprocess import Popen, PIPE, STDOUT | |
import logging | |
import sys | |
import threading | |
import Queue | |
from operator import itemgetter | |
from base64 import b64encode | |
class DockerAuth(requests.auth.AuthBase): | |
def __init__(self, username, password, force_auth = False): | |
self.username = username | |
self.password = password | |
self.force_auth = force_auth | |
self.chal = {} | |
def _basic_auth_str(self, username, password): | |
"""Returns a Basic Auth string.""" | |
return 'Basic ' + b64encode(('%s:%s' % (username, password)).encode('latin1')).strip().decode('latin1') | |
def build_bearer_header(self): | |
payload = {'account': self.username, 'service': self.chal['service']} | |
if 'scope' in self.chal: | |
payload['scope'] = self.chal['scope'] | |
r = requests.get(self.chal['realm'], params=payload, auth=(self.username, self.password)) | |
json = r.json() | |
if 'token' in json: | |
return "Bearer " + json['token'] | |
else: | |
raise Exception("Failed to get token from %s got error body %s" %(response.request.url, json)) | |
def handle_401(self, r, **kwargs): | |
"""Takes the given response and tries digest-auth, if needed.""" | |
num_401_calls = getattr(self, 'num_401_calls', 1) | |
s_auth = r.headers.get('www-authenticate', '') | |
if 'bearer' in s_auth.lower() and num_401_calls < 2: | |
setattr(self, 'num_401_calls', num_401_calls + 1) | |
self.chal = requests.utils.parse_dict_header(s_auth.replace('Bearer ', '')) | |
# Consume content and release the original connection | |
# to allow our new request to reuse the same one. | |
r.content | |
r.raw.release_conn() | |
r.request.headers['Authorization'] = self.build_bearer_header() | |
_r = r.connection.send(r.request, **kwargs) | |
_r.history.append(r) | |
return _r | |
setattr(self, 'num_401_calls', 1) | |
return r | |
def __call__(self, r): | |
# If we have force_auth enabled, add a basic auth header which should cause a 401 to do the | |
if self.force_auth: | |
r.headers['Authorization'] = self._basic_auth_str(self.username, self.password) | |
r.register_hook('response', self.handle_401) | |
return r | |
q = Queue.Queue() | |
result = {} | |
shutdown_event = threading.Event() | |
def get_arg_parser(): | |
parser = argparse.ArgumentParser() | |
boolean_choice = ['true', 'false'] | |
parser.add_argument('--threads', '-th', dest='num_threads', help='Number of Threads', type=int, default=1) | |
parser.add_argument('--source-docker', '-s', dest='source_docker', help='Source docker registry', required=True) | |
parser.add_argument('--source-verify', '-sv', dest='source_verify', help='Should the source docker registry be verified', choices=boolean_choice, default='true') | |
registry_type = ['docker', 'quay'] | |
parser.add_argument('--source-type', '-st', dest='source_type', help='The type of source registry, affects which api it uses to iterate the images to copy', choices=registry_type, default='docker') | |
parser.add_argument('--source-auth', '-sa', dest='source_auth', help='Username and password separated by colon for the source registry') | |
parser.add_argument('--target-docker', '-t', dest='target_docker', help='Target docker resistry', required=True) | |
parser.add_argument('--target-verify', '-tv', dest='target_verify', help='Should the target docker registry be verified', choices=boolean_choice, default='true') | |
parser.add_argument('--target-auth', '-ta', dest='target_auth', help='Username and password separated by colon for the target registry') | |
return parser | |
def log_subprocess_output(pipe): | |
for line in iter(pipe.readline, b''): # b'\n'-separated lines | |
logging.info(line.rstrip()) | |
def copy_image(imageObj): | |
source_image, target_image, args, image = itemgetter('source_image', 'target_image', 'args', 'image')(imageObj) | |
args = imageObj['args'] | |
# Change the name of the thread to the image we are copying | |
thread = threading.current_thread() | |
thread.name = image | |
logging.info("Copying image %s => %s", source_image, target_image) | |
skopeo_source = "docker://%s" %(source_image) | |
skopeo_target = "docker://%s" %(target_image) | |
command = ['skopeo', 'copy', skopeo_source, skopeo_target, '--src-tls-verify=' + args.source_verify, '--dest-tls-verify=' + args.target_verify] | |
if args.source_auth: | |
command.append('--src-creds=' + args.source_auth) | |
if args.target_auth: | |
command.append('--dest-creds=' + args.target_auth) | |
logging.info("Executing %s" %(" ".join(command))) | |
process = Popen(command, stdout=PIPE, stderr=STDOUT) | |
with process.stdout: | |
log_subprocess_output(process.stdout) | |
exitCode = process.wait() | |
return exitCode == 0 | |
def generate_protocol(tls_verify): | |
if tls_verify == "true": | |
return "https" | |
else: | |
return "http" | |
def mirror_docker_registry(args): | |
logging.info("Mirroring Docker registry %s", args.source_docker) | |
baseurl = generate_protocol(args.source_verify) + '://' + args.source_docker + '/v2' | |
extra_auth_args = {} | |
username = "" | |
password = "" | |
if args.source_auth: | |
username, password = args.source_auth.split(":",1) | |
extra_auth_args['auth'] = DockerAuth(username, password, True) | |
r = requests.get(baseurl + '/_catalog', **extra_auth_args) | |
repos = r.json() | |
for repo in repos['repositories']: | |
if args.source_auth: | |
extra_auth_args['auth'] = DockerAuth(username, password) | |
tagr = requests.get(baseurl + '/' + repo + '/tags/list', **extra_auth_args) | |
tags = tagr.json() | |
for tag in tags['tags']: | |
image = repo + ':' + tag | |
source_image = args.source_docker + '/' + image | |
target_image = args.target_docker + '/' + image | |
imageObj = {'source_image': source_image, 'target_image': target_image, 'args': args, 'image': image, 'args': args} | |
logging.info("Queuing image %s for mirroring", source_image) | |
q.put(imageObj) | |
def mirror_quay_registry(args): | |
logging.info("Mirroring Quay registry %s", args.source_docker) | |
baseurl = generate_protocol(args.source_verify) + '://' + args.source_docker + '/api/v1/repository' | |
r = requests.get(baseurl + '?public=true') | |
repos = r.json() | |
for repo in repos['repositories']: | |
repourl = repo['namespace'] + '/' + repo['name'] | |
tagReq = requests.get(baseurl + '/' + repourl + '?includeTags=true&includeStats=false') | |
tags = tagReq.json() | |
for tag in tags['tags']: | |
image = repourl + ':' + tag | |
source_image = args.source_docker + '/' + image | |
target_image = args.target_docker + '/' + image | |
imageObj = {'source_image': source_image, 'target_image': target_image, 'args': args, 'image': image, 'args': args} | |
logging.info("Queuing image %s for mirroring", source_image) | |
q.put(imageObj) | |
def worker(): | |
while not shutdown_event.is_set(): | |
image = q.get() | |
if image is None: # EOF? | |
break | |
res = copy_image(image) | |
result[image['image']] = res # store it | |
q.task_done() | |
def print_results(): | |
print "------------- Finished ----------------" | |
for image in result: | |
print "%s => %s" %(image, result[image]) | |
if __name__ == "__main__": | |
logging.basicConfig(format='%(levelname)s:%(name)s:%(threadName)s:%(message)s', level=logging.INFO) | |
# logging.logThreads = True | |
parser = get_arg_parser() | |
args = parser.parse_args() | |
threads = [] | |
try: | |
# Start the mirroring threads | |
threads = [ threading.Thread(target=worker) for _i in range(args.num_threads) ] | |
for thread in threads: | |
thread.daemon = True | |
thread.start() | |
# Find all the images to mirror and populate the queue | |
if args.source_type == "docker": | |
mirror_docker_registry(args) | |
elif args.source_type == "quay": | |
mirror_quay_registry(args) | |
queue_size_remaining = 0 | |
# stop workers | |
for i in range(args.num_threads): | |
q.put(None) | |
for t in threads: | |
# Python 2.7 doesn't have a way to allow thread.join to be interruptable by CTRL-C, so we need to wake up every second | |
while t.isAlive(): | |
t.join(1) | |
# Log the size of the queue anytime it is different | |
if queue_size_remaining != q.qsize(): | |
queue_size_remaining = q.qsize() | |
logging.info("Approximately %d images left to sync", queue_size_remaining) | |
print_results() | |
except KeyboardInterrupt: | |
logging.error("Ctrl+C pressed... shutting down workers") | |
shutdown_event.set() | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment