Skip to content

Instantly share code, notes, and snippets.

@danking
Last active April 13, 2021 07:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save danking/77187356b686f454c950ca3657aefc3e to your computer and use it in GitHub Desktop.
Save danking/77187356b686f454c950ca3657aefc3e to your computer and use it in GitHub Desktop.
import uuid
from aiohttp import web
import base64
import ssl
import json
from ssl import Purpose
from hailtop.hail_logging import AccessLogger, configure_logging
from hailtop.config import get_deploy_config
configure_logging()
deploy_config = get_deploy_config()
app = web.Application(client_max_size=32 * 1024 * 1024)
routes = web.RouteTableDef()
@routes.get('/v2')
@routes.get('/v2/')
async def version_check(request): # pylint: disable=unused-argument
return web.json_response(headers={'Docker-Distribution-API-Version': 'registry/2.0'})
@routes.get('/v2/{repo}/{image}/tags/list')
async def get_tags_list(request): # pylint: disable=unused-argument
repo = request.match_info['repo']
image = request.match_info['image']
name = repo + '/' + image
return web.json_response({
"name": name,
"tags": app['tags'].get(name, [])})
@routes.get('/v2/{repo}/{image}/manifests/{reference}')
async def get_manifest(request): # pylint: disable=unused-argument
repo = request.match_info['repo']
image = request.match_info['image']
reference = request.match_info['reference']
name = repo + '/' + image
manifest = app['manifests'].get(name, dict()).get(reference)
if manifest is None:
return web.json_response(dict(), status=404) # raise HTTPNotFound()
return web.Response(body=json.dumps(manifest), headers={
'Content-Type': manifest.get('mediaType', 'application/vnd.oci.image.manifest.v1+json'),
'Docker-Context-Digest': manifest['config']['digest']})
@routes.put('/v2/{repo}/{image}/manifests/{reference}')
async def put_manifest(request): # pylint: disable=unused-argument
repo = request.match_info['repo']
image = request.match_info['image']
reference = request.match_info['reference']
name = repo + '/' + image
manifests = app['manifests'].get(name)
if manifests is None:
manifests = dict()
app['manifests'][name] = manifests
manifests[reference] = await request.json()
return web.json_response({}, status=201, headers={
'Location': 'https://' + request.headers['Host'] + '/v2/' + name + '/manifests/' + reference,
'Docker-Context-Digest': manifests[reference]['config']['digest']})
@routes.delete('/v2/{repo}/{image}/manifests/{reference}')
async def del_manifest(request): # pylint: disable=unused-argument
repo = request.match_info['repo']
image = request.match_info['image']
reference = request.match_info['reference']
name = repo + '/' + image
manifests = app['manifests'].get(name)
if manifests is not None:
manifests.pop(reference)
return web.json_response()
@routes.get('/v2/{repo}/{image}/blobs/{digest}')
async def get_blob(request): # pylint: disable=unused-argument
repo = request.match_info['repo']
image = request.match_info['image']
digest = request.match_info['digest']
name = repo + '/' + image
blob = app['blobs'].get(name, dict()).get(digest)
if blob is None:
return web.json_response(dict(), status=404) # raise HTTPNotFound()
print(digest)
return web.Response(body=blob, headers={
'Docker-Context-Digest': digest})
@routes.delete('/v2/{repo}/{image}/blobs/{digest}')
async def del_blob(request): # pylint: disable=unused-argument
repo = request.match_info['repo']
image = request.match_info['image']
digest = request.match_info['digest']
name = repo + '/' + image
blobs = app['blobs'].get(name)
if blobs is not None:
blobs.pop(digest)
return web.json_response()
@routes.post('/v2/{repo}/{image}/blobs/uploads')
@routes.post('/v2/{repo}/{image}/blobs/uploads/')
async def start_upload(request): # pylint: disable=unused-argument
repo = request.match_info['repo']
image = request.match_info['image']
name = repo + '/' + image
blobs = app['blobs'].get(name)
if blobs is None:
blobs = dict()
app['blobs'][name] = blobs
digest = request.query.get('digest')
if digest is not None:
blobs[digest] = await request.read()
id = uuid.uuid4().bytes
app['uploads'][id] = bytearray()
return web.Response(status=202, headers={
'Location': 'https://' + request.headers['Host'] + '/v2/' + name + '/blobs/uploads/' + base64.urlsafe_b64encode(id).decode(),
'Docker-Upload-UUID': base64.urlsafe_b64encode(id).decode()})
@routes.get('/v2/{repo}/{image}/blobs/uploads/{id}')
async def get_upload(request): # pylint: disable=unused-argument
repo = request.match_info['repo']
image = request.match_info['image']
id = request.match_info['id']
name = repo + '/' + image
id = base64.urlsafe_b64decode(id)
if id not in app['uploads']:
return web.json_response(dict(), status=404) # raise HTTPNotFound()
return web.Response(status=204, headers={
'Location': 'https://' + request.headers['Host'] + '/v2/' + name +'/blobs/uploads/' + base64.urlsafe_b64encode(id).decode(),
'Docker-Upload-UUID': base64.urlsafe_b64encode(id).decode()})
@routes.patch('/v2/{repo}/{image}/blobs/uploads/{id}')
async def patch_upload(request): # pylint: disable=unused-argument
repo = request.match_info['repo']
image = request.match_info['image']
id = request.match_info['id']
name = repo + '/' + image
id = base64.urlsafe_b64decode(id)
if id not in app['uploads']:
return web.json_response(dict(), status=404) # raise HTTPNotFound()
data = app['uploads'][id]
# n = request.headers.get('Content-Length')
# l, r = [int(x) for x in request.headers['Content-Range'].split('-')]
# assert n is None or r - l == n, (l, r, n)
# assert r == len(data), (len(data), l, r, n)
data += await request.read()
# assert n is None or len(data) == l + n, (len(data), l, r, n)
return web.Response(status=202, headers={
'Location': 'https://' + request.headers['Host'] + '/v2/' + name + '/blobs/uploads/' + base64.urlsafe_b64encode(id).decode(),
'Range': f'0-{len(data)}',
'Docker-Upload-UUID': base64.urlsafe_b64encode(id).decode()})
@routes.put('/v2/{repo}/{image}/blobs/uploads/{id}')
async def put_upload(request): # pylint: disable=unused-argument
repo = request.match_info['repo']
image = request.match_info['image']
id = request.match_info['id']
name = repo + '/' + image
id = base64.urlsafe_b64decode(id)
if id not in app['uploads']:
return web.json_response(dict(), status=404) # raise HTTPNotFound()
digest = request.query['digest']
data = app['uploads'][id]
# n = request.headers.get('Content-Length')
# assert n is None or r - l == n, (l, r, n)
# assert r == len(data), (len(data), l, r, n)
data += await request.read()
# assert n is None or len(data) == l + n, (len(data), l, r, n)
blobs = app['blobs'].get(name)
if blobs is None:
blobs = dict()
app['blobs'][name] = blobs
blobs[digest] = data
return web.Response(status=201, headers={
'Location': 'https://' + request.headers['Host'] + '/v2/' + name + '/blobs/uploads/' + base64.urlsafe_b64encode(id).decode(),
'Docker-Upload-UUID': base64.urlsafe_b64encode(id).decode()})
@routes.delete('/v2/{repo}/{image}/blobs/uploads/{id}')
async def del_upload(request): # pylint: disable=unused-argument
id = request.match_info['id']
id = base64.urlsafe_b64decode(id)
app['uploads'].pop(id)
return web.Response()
@routes.get('/v2/_catalog')
async def get_catalog(request): # pylint: disable=unused-argument
return web.json_response({
'repositories': list(app['tags'])})
async def on_startup(app):
app['tags'] = dict()
app['manifests'] = dict()
app['blobs'] = dict()
app['uploads'] = dict()
app.add_routes(routes)
app.on_startup.append(on_startup)
_server_ssl_context = ssl.create_default_context(purpose=Purpose.CLIENT_AUTH)
_server_ssl_context.load_cert_chain('server-cert.pem',
keyfile='server-key.pem',
password=None)
_server_ssl_context.verify_mode = ssl.CERT_OPTIONAL
web.run_app(app, host='0.0.0.0', port=5000, ssl_context=_server_ssl_context, access_log_class=AccessLogger)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment