Skip to content

Instantly share code, notes, and snippets.

@pit
Last active November 11, 2019 07:39
Show Gist options
  • Save pit/236ec02dfcdc720ac0114ca4823432d1 to your computer and use it in GitHub Desktop.
Save pit/236ec02dfcdc720ac0114ca4823432d1 to your computer and use it in GitHub Desktop.
import boto3
from boto3.s3.transfer import S3Transfer
from botocore.client import Config
from hashlib import md5
import os
from base64 import b64encode
import logging
import time
# 1) Create S3 bukets with some unique name (ex: comparison-test-bucket) in all providers you want to test.
# 2) Generate S3 credentials (access key/secret key) for each provider and put them into test.py header
# 3) Create VPS/Droplet/etc you want to test from
# 4) Install python3 there
# 5) Install python module boto3
# 6) Create test files in VM with different sizes 1MB/10MB/100MB/200MB/500MB (dd if=/dev/urandom of=file_1.test bs=1MB count=1)
# 7) Copy test.py to VM
# 8) Run `python3 test.py`
# boto3.set_stream_logger('botocore', logging.WARNING)
# boto3.set_stream_logger('botocore.auth', logging.DEBUG)
# boto3.set_stream_logger('botocore.parsers', logging.DEBUG)
# boto3.set_stream_logger('botocore.endpoint', logging.DEBUG)
session = boto3.session.Session()
TEST_BUCKET_NAME = 'comparison-test-bucket'
s3_client_aws = session.client(
service_name='s3',
# region_name='us-west-2',
region_name='us-east-1',
aws_access_key_id='',
aws_secret_access_key='',
config=Config(signature_version='s3v4'),
)
s3_client_idrive = session.client(
service_name='s3',
region_name='us-east-1',
aws_access_key_id='',
aws_secret_access_key='',
endpoint_url='https://s3.us-west-1.idrivecloud.io/',
config=Config(signature_version='s3'),
)
s3_client_wasabi = session.client(
service_name='s3',
region_name='us-east-1',
aws_access_key_id='',
aws_secret_access_key='',
endpoint_url='https://s3.us-west-1.wasabisys.com',
config=Config(signature_version='s3v4'),
)
def process_time():
return int(round(time.time() * 1000))
def human_bytes(num, units=[' bytes', ' KB', ' MB', ' GB', ' TB', ' PB', ' EB']):
""" Returns a human readable string reprentation of bytes"""
return str(round(num, 2)) + units[0] if num < 1024 else human_bytes(num / 1024, units[1:])
def human_bits(num, units=[' bit', ' Kb', ' Mb', ' Gb', ' Tb', ' Pb', ' Eb']):
""" Returns a human readable string reprentation of bytes"""
return str(round(num, 2)) + units[0] if num < 1000 else human_bytes(num / 1000, units[1:])
def human_sec(num, units=[' sec', ' m', ' h']):
""" Returns a human readable string reprentation of bytes"""
return str(round(num, 2)) + units[0] if num < 60 else human_bytes(num / 60, units[1:])
clients = {
'aws': s3_client_aws,
'idrive': s3_client_idrive,
'wasabi': s3_client_wasabi
}
print(' | '.join((
'Provider'.ljust(8),
'1 MB'.ljust(21),
'10 MB'.ljust(21),
'100 MB'.ljust(21),
'200 MB'.ljust(21),
'500 MB'.ljust(21),
)))
for client_name in clients.keys():
client = clients[client_name]
timing = dict()
speed = dict()
file_names = (
'1',
'10',
'100',
'200',
'500',
)
for file_size in file_names:
file_name = 'file_{}.test'.format(file_size)
with open(file_name, 'rb') as file:
size = os.path.getsize(file_name)
start = process_time()
client.put_object(
Body=file,
Bucket=TEST_BUCKET_NAME,
Key=file_name,
ContentType='application/octet-stream',
ContentLength=os.path.getsize(file_name),
)
stop = process_time()
elapsed_sec = (stop - start) / 1000
timing[file_size] = '{} {}'.format(
human_sec(elapsed_sec).ljust(9),
'{}ps'.format(human_bits(int(size * 8 / elapsed_sec))).ljust(11)
)
line = list()
line.append(client_name.ljust(8))
for tt in timing:
line.append(timing[tt].ljust(15))
print(' | '.join(line))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment