Skip to content

Instantly share code, notes, and snippets.

@sabaini
Created March 31, 2023 06:44
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sabaini/28dbfeba80d724dc1d235fce7231ea94 to your computer and use it in GitHub Desktop.
Save sabaini/28dbfeba80d724dc1d235fce7231ea94 to your computer and use it in GitHub Desktop.
import random
import boto3
import json
import argparse
def app_handle(args):
keys = get_keys(args.keys)
client = boto3.resource(
"s3", verify=False,
endpoint_url=args.endpoint,
aws_access_key_id=keys["access_key"],
aws_secret_access_key=keys["secret_key"])
bucket_name = "demobucket"
client.Bucket(bucket_name).create()
print("Uploading data")
for i in range(0, 10):
object_name = "object-{}".format(i)
primary_object_one = client.Object(
bucket_name,
object_name
)
primary_object_one.put(Body=dummy_data())
print("{}/{} SIZE: {:.2f}MB".format(bucket_name, object_name, primary_object_one.content_length/(1024*1024)), end=", ", flush=True)
def get_keys(keys_file: str) -> dict:
with open(args.keys, 'r') as keys_file:
keys_dict = json.load(keys_file)
return keys_dict["keys"]
def dummy_data() -> str:
# create some dummy data with random size
return str("aaaaaaaaaa" * 1024 * random.randint(1, 5000))
if __name__ == "__main__":
argparse = argparse.ArgumentParser(
description="An application which uses S3 for storage",
)
argparse.add_argument(
"endpoint",
type=str,
help="Provide RGW endpoint to talk to.",
)
argparse.add_argument(
"keys",
type=str,
help="Provide JSON file generated from Ceph RGW Admin.",
)
argparse.set_defaults(func=app_handle)
args = argparse.parse_args()
args.func(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment