Skip to content

Instantly share code, notes, and snippets.

@pearj
Last active May 19, 2020 05:31
Show Gist options
  • Save pearj/ddd8eefd5421b98fb08b8c9e82df397a to your computer and use it in GitHub Desktop.
Save pearj/ddd8eefd5421b98fb08b8c9e82df397a to your computer and use it in GitHub Desktop.
import requests
import argparse
from subprocess import Popen, PIPE, STDOUT
import logging
import sys
import threading
import Queue
from operator import itemgetter
from base64 import b64encode
class DockerAuth(requests.auth.AuthBase):
def __init__(self, username, password, force_auth = False):
self.username = username
self.password = password
self.force_auth = force_auth
self.chal = {}
def _basic_auth_str(self, username, password):
"""Returns a Basic Auth string."""
return 'Basic ' + b64encode(('%s:%s' % (username, password)).encode('latin1')).strip().decode('latin1')
def build_bearer_header(self):
payload = {'account': self.username, 'service': self.chal['service']}
if 'scope' in self.chal:
payload['scope'] = self.chal['scope']
r = requests.get(self.chal['realm'], params=payload, auth=(self.username, self.password))
json = r.json()
if 'token' in json:
return "Bearer " + json['token']
else:
raise Exception("Failed to get token from %s got error body %s" %(response.request.url, json))
def handle_401(self, r, **kwargs):
"""Takes the given response and tries digest-auth, if needed."""
num_401_calls = getattr(self, 'num_401_calls', 1)
s_auth = r.headers.get('www-authenticate', '')
if 'bearer' in s_auth.lower() and num_401_calls < 2:
setattr(self, 'num_401_calls', num_401_calls + 1)
self.chal = requests.utils.parse_dict_header(s_auth.replace('Bearer ', ''))
# Consume content and release the original connection
# to allow our new request to reuse the same one.
r.content
r.raw.release_conn()
r.request.headers['Authorization'] = self.build_bearer_header()
_r = r.connection.send(r.request, **kwargs)
_r.history.append(r)
return _r
setattr(self, 'num_401_calls', 1)
return r
def __call__(self, r):
# If we have force_auth enabled, add a basic auth header which should cause a 401 to do the
if self.force_auth:
r.headers['Authorization'] = self._basic_auth_str(self.username, self.password)
r.register_hook('response', self.handle_401)
return r
q = Queue.Queue()
result = {}
shutdown_event = threading.Event()
def get_arg_parser():
parser = argparse.ArgumentParser()
boolean_choice = ['true', 'false']
parser.add_argument('--threads', '-th', dest='num_threads', help='Number of Threads', type=int, default=1)
parser.add_argument('--source-docker', '-s', dest='source_docker', help='Source docker registry', required=True)
parser.add_argument('--source-verify', '-sv', dest='source_verify', help='Should the source docker registry be verified', choices=boolean_choice, default='true')
registry_type = ['docker', 'quay']
parser.add_argument('--source-type', '-st', dest='source_type', help='The type of source registry, affects which api it uses to iterate the images to copy', choices=registry_type, default='docker')
parser.add_argument('--source-auth', '-sa', dest='source_auth', help='Username and password separated by colon for the source registry')
parser.add_argument('--target-docker', '-t', dest='target_docker', help='Target docker resistry', required=True)
parser.add_argument('--target-verify', '-tv', dest='target_verify', help='Should the target docker registry be verified', choices=boolean_choice, default='true')
parser.add_argument('--target-auth', '-ta', dest='target_auth', help='Username and password separated by colon for the target registry')
return parser
def log_subprocess_output(pipe):
for line in iter(pipe.readline, b''): # b'\n'-separated lines
logging.info(line.rstrip())
def copy_image(imageObj):
source_image, target_image, args, image = itemgetter('source_image', 'target_image', 'args', 'image')(imageObj)
args = imageObj['args']
# Change the name of the thread to the image we are copying
thread = threading.current_thread()
thread.name = image
logging.info("Copying image %s => %s", source_image, target_image)
skopeo_source = "docker://%s" %(source_image)
skopeo_target = "docker://%s" %(target_image)
command = ['skopeo', 'copy', skopeo_source, skopeo_target, '--src-tls-verify=' + args.source_verify, '--dest-tls-verify=' + args.target_verify]
if args.source_auth:
command.append('--src-creds=' + args.source_auth)
if args.target_auth:
command.append('--dest-creds=' + args.target_auth)
logging.info("Executing %s" %(" ".join(command)))
process = Popen(command, stdout=PIPE, stderr=STDOUT)
with process.stdout:
log_subprocess_output(process.stdout)
exitCode = process.wait()
return exitCode == 0
def generate_protocol(tls_verify):
if tls_verify == "true":
return "https"
else:
return "http"
def mirror_docker_registry(args):
logging.info("Mirroring Docker registry %s", args.source_docker)
baseurl = generate_protocol(args.source_verify) + '://' + args.source_docker + '/v2'
extra_auth_args = {}
username = ""
password = ""
if args.source_auth:
username, password = args.source_auth.split(":",1)
extra_auth_args['auth'] = DockerAuth(username, password, True)
r = requests.get(baseurl + '/_catalog', **extra_auth_args)
repos = r.json()
for repo in repos['repositories']:
if args.source_auth:
extra_auth_args['auth'] = DockerAuth(username, password)
tagr = requests.get(baseurl + '/' + repo + '/tags/list', **extra_auth_args)
tags = tagr.json()
for tag in tags['tags']:
image = repo + ':' + tag
source_image = args.source_docker + '/' + image
target_image = args.target_docker + '/' + image
imageObj = {'source_image': source_image, 'target_image': target_image, 'args': args, 'image': image, 'args': args}
logging.info("Queuing image %s for mirroring", source_image)
q.put(imageObj)
def mirror_quay_registry(args):
logging.info("Mirroring Quay registry %s", args.source_docker)
baseurl = generate_protocol(args.source_verify) + '://' + args.source_docker + '/api/v1/repository'
r = requests.get(baseurl + '?public=true')
repos = r.json()
for repo in repos['repositories']:
repourl = repo['namespace'] + '/' + repo['name']
tagReq = requests.get(baseurl + '/' + repourl + '?includeTags=true&includeStats=false')
tags = tagReq.json()
for tag in tags['tags']:
image = repourl + ':' + tag
source_image = args.source_docker + '/' + image
target_image = args.target_docker + '/' + image
imageObj = {'source_image': source_image, 'target_image': target_image, 'args': args, 'image': image, 'args': args}
logging.info("Queuing image %s for mirroring", source_image)
q.put(imageObj)
def worker():
while not shutdown_event.is_set():
image = q.get()
if image is None: # EOF?
break
res = copy_image(image)
result[image['image']] = res # store it
q.task_done()
def print_results():
print "------------- Finished ----------------"
for image in result:
print "%s => %s" %(image, result[image])
if __name__ == "__main__":
logging.basicConfig(format='%(levelname)s:%(name)s:%(threadName)s:%(message)s', level=logging.INFO)
# logging.logThreads = True
parser = get_arg_parser()
args = parser.parse_args()
threads = []
try:
# Start the mirroring threads
threads = [ threading.Thread(target=worker) for _i in range(args.num_threads) ]
for thread in threads:
thread.daemon = True
thread.start()
# Find all the images to mirror and populate the queue
if args.source_type == "docker":
mirror_docker_registry(args)
elif args.source_type == "quay":
mirror_quay_registry(args)
queue_size_remaining = 0
# stop workers
for i in range(args.num_threads):
q.put(None)
for t in threads:
# Python 2.7 doesn't have a way to allow thread.join to be interruptable by CTRL-C, so we need to wake up every second
while t.isAlive():
t.join(1)
# Log the size of the queue anytime it is different
if queue_size_remaining != q.qsize():
queue_size_remaining = q.qsize()
logging.info("Approximately %d images left to sync", queue_size_remaining)
print_results()
except KeyboardInterrupt:
logging.error("Ctrl+C pressed... shutting down workers")
shutdown_event.set()
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment