Skip to content

Instantly share code, notes, and snippets.

@clayg
Last active September 4, 2020 16:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save clayg/5d17b261c9cc1fd970ba7915bdc57680 to your computer and use it in GitHub Desktop.
Save clayg/5d17b261c9cc1fd970ba7915bdc57680 to your computer and use it in GitHub Desktop.
Basic boto3 s3 client Multipart Upload
from argparse import ArgumentParser
import sys
import json
import datetime
import os
import boto3
parser = ArgumentParser()
parser.add_argument('bucket', help='bucket to make the upload')
parser.add_argument('key', nargs='?', help='key name for large object')
parser.add_argument('-p', type=int, default=2, help='number of parts')
parser.add_argument('--upload-id', default=None,
help='Retry/Abort by upload-id')
parser.add_argument('--abort', action='store_true', help='Abort upload-id')
parser.add_argument('--endpoint-url', default='http://saio:8080',
help='boto client endpoint_url')
parser.add_argument('--aws-access-key-id', default='test:tester',
help='boto client aws_access_key_id')
parser.add_argument('--aws-secret-access-key', default='testing',
help='boto client aws_secret_access_key')
with open('/dev/zero', 'rb') as f:
BODY = f.read(5242880)
def default(o):
if isinstance(o, (datetime.date, datetime.datetime)):
return o.isoformat()
def dumps(data):
print(json.dumps(data, indent=2, default=default))
def do_upload(client, opts):
print('doing *create_multipart_upload*')
resp = client.create_multipart_upload(Bucket=opts.bucket, Key=opts.key)
dumps(resp)
opts.upload_id = resp['UploadId']
parts = []
for i in range(1, opts.p + 1):
body = (b'part%04d' % i) + BODY
print(len(body))
print('doing *upload_part*')
resp = client.upload_part(Body=body, Bucket=opts.bucket, Key=opts.key,
PartNumber=i, UploadId=opts.upload_id)
parts.append({
'PartNumber': i,
'ETag': resp['ETag'],
})
with open(opts.upload_id, 'w') as f:
json.dump(parts, f)
return parts
def do_abort(client, opts):
print('doing *abort_multipart_upload*')
resp = client.abort_multipart_upload(
Bucket=opts.bucket,
Key=opts.key,
UploadId=opts.upload_id)
dumps(resp)
def do_complete(client, opts, parts):
multipart_body = {
'Parts': parts,
}
dumps(multipart_body)
print('doing *complete_multipart_upload*')
try:
resp = client.complete_multipart_upload(Bucket=opts.bucket,
Key=opts.key,
MultipartUpload=multipart_body,
UploadId=opts.upload_id)
dumps(resp)
except client.exceptions.ClientError as err:
print(err)
def do_list(client, opts):
print('doing *list_parts*')
try:
resp = client.list_parts(Bucket=opts.bucket, Key=opts.key,
UploadId=opts.upload_id)
except client.exceptions.NoSuchUpload as err:
print(err)
if os.path.exists(opts.upload_id):
with open(opts.upload_id) as f:
return json.load(f)
else:
raise
dumps(resp)
parts = [{
'PartNumber': p['PartNumber'],
'ETag': p['ETag'],
} for p in resp.get('Parts', [])]
return parts
def main():
opts = parser.parse_args()
client = boto3.client('s3', endpoint_url=opts.endpoint_url,
aws_access_key_id=opts.aws_access_key_id,
aws_secret_access_key=opts.aws_secret_access_key)
# print('doing *create_multipart_upload*')
# resp = client.create_multipart_upload(Bucket=opts.bucket, Key=opts.key)
# dumps(resp)
# return
if any([opts.abort, opts.upload_id]) and not opts.key:
return 'ERROR: missing key argument is only for listing uploads'
try:
client.create_bucket(Bucket=opts.bucket)
except client.exceptions.BucketAlreadyOwnedByYou:
print('bucket exists')
if not opts.key:
resp = client.list_multipart_uploads(Bucket=opts.bucket)
dumps(resp)
else:
if opts.upload_id:
try:
parts = do_list(client, opts)
except client.exceptions.NoSuchUpload as err:
print(err)
if not opts.abort:
raise
else:
parts = do_upload(client, opts)
if opts.abort:
do_abort(client, opts)
else:
do_complete(client, opts, parts)
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment