Skip to content

Instantly share code, notes, and snippets.

@johnathaningle
Created May 20, 2021 21:23
Show Gist options
  • Save johnathaningle/376f3f7694c43552bb35c4f4e1e4145b to your computer and use it in GitHub Desktop.
Save johnathaningle/376f3f7694c43552bb35c4f4e1e4145b to your computer and use it in GitHub Desktop.
SDEV400 Homework 1
from enum import Enum
from typing import Any, List, Optional
import boto3
from datetime import datetime
import random
from botocore.exceptions import ClientError
import logging
import botocore
from uuid import uuid4
BUCKET_ERROR_MESSAGE = "Please select from available buckets"
BUCKET_SELECT_ERROR_MESSAGE = "Error Occurred selecting bucket to download"
USERNAME = "johningle"
class Option(Enum):
create_bucket = 1
place_object_in_bucket = 2
delete_object_in_bucket = 3
delete_bucket = 4
copy_from_one_bucket_to_another = 5
download_from_bucket = 6
exit_program = 7
@staticmethod
def get_values() -> str:
ret = ""
for idx, option in enumerate(Option):
o = str(idx + 1) + " - " + str(option).replace(f"{Option.__name__}.", "") + "\n"
o = o.replace("_", " ")
ret += o
return ret
class Application:
buckets: List[Any]
def __init__(self) -> None:
self.buckets = self.gen_bucket_list()
def gen_6_digit_random_number(self) -> int:
"""Generates 6 digit random number"""
gen_num = random.randint(1, 999999)
return gen_num
def gen_bucket_list(self):
"""Generates lists buckets"""
# Create an S3 client
s3 = boto3.client('s3')
# Call S3 to list current buckets
response = s3.list_buckets()
# Get a list of all bucket names from the response
buckets = [bucket['Name'] for bucket in response['Buckets']]
# Print out the bucket list
#print("Bucket List: %s" % buckets)
return buckets
def list_all_buckets(self):
"""List all current buckets in S3"""
self.buckets = self.gen_bucket_list()
for count, i in enumerate(self.buckets):
print (f"{count}: {i}")
def list_bucket_objects(self, bucket_name: str) -> Optional[List[Any]]:
"""List the objects in an Amazon S3 bucket
:param bucket_name: string
:return: List of bucket objects. If error, return None.
"""
# Retrieve the list of bucket objects
s3 = boto3.client('s3')
try:
response = s3.list_objects_v2(Bucket=bucket_name)
except ClientError as e:
# AllAccessDisabled error == bucket not found
logging.error(e)
return None
objects = response['Contents']
if objects:
for idx, o in enumerate(objects):
print(f"{idx} - {o['Key']}")
return objects
else:
print("No objects to copy")
return None
def create_bucket_setup(self, bucket_name, region=None):
"""Create an S3 bucket in a specified region
If a region is not specified, the bucket is created in the S3 default
region (us-east-1).
:param bucket_name: Bucket to create
:param region: String region to create bucket in, e.g., 'us-west-2'
:return: True if bucket created, else False
"""
# Create bucket
try:
if region is None:
s3_client = boto3.client('s3')
s3_client.create_bucket(Bucket=bucket_name)
else:
s3_client = boto3.client('s3', region_name=region)
location = {'LocationConstraint': region}
s3_client.create_bucket(Bucket=bucket_name,
CreateBucketConfiguration=location)
except ClientError as e:
logging.error(e)
return print("Error creating bucket {0}!".format(bucket_name))
return print("Bucket {0} has been created.".format(bucket_name))
def put_object(self, dest_bucket_name):
"""Add an object to an Amazon S3 bucket
The src_data argument must be of type bytes or a string that references
a file specification.
:param dest_bucket_name: string
:param dest_object_name: string
:param src_data: bytes of data or string reference to file spec
:return: True if src_data was added to dest_bucket/dest_object, otherwise
False
"""
#create the new filename to be added to the bucket
object_name = str(uuid4()) + ".txt"
# Construct Body= parameter
object_data = bytes("This is generated test file contents", "utf-8")
# Put the object
s3 = boto3.client('s3')
try:
s3.put_object(Bucket=dest_bucket_name, Key=object_name, Body=object_data)
except ClientError as e:
# AllAccessDisabled error == bucket not found
# NoSuchKey or InvalidRequest error == (dest bucket/obj == src bucket/obj)
logging.error(e)
return print("Error at put_object")
return print(f"File {object_name} has been uploaded successfully")
def delete_object(self, bucket_name, object_name):
"""Delete an object from an S3 bucket
:param bucket_name: string
:param object_name: string
:return: True if the referenced object was deleted, otherwise False
"""
# Delete the object
s3 = boto3.client('s3')
try:
s3.delete_object(Bucket=bucket_name, Key=object_name)
except ClientError as e:
logging.error(e)
return print("Error deleteing object from bucket!")
return print("Object {0} deleted from {1}".format(object_name, bucket_name))
def delete_bucket_setup(self, bucket_name):
"""Delete an empty S3 bucket
If the bucket is not empty, the operation fails.
:param bucket_name: string
:return: True if the referenced bucket was deleted, otherwise False
"""
# Delete the bucket
s3 = boto3.client('s3')
try:
s3.delete_bucket(Bucket=bucket_name)
except ClientError as e:
logging.error(e)
return print("Error in delete bucket!")
return print("Bucket {0} has been deleted!".format(bucket_name))
def copy_object(self, src_bucket_name, src_object_name, dest_bucket_name, dest_object_name=None):
"""Copy an Amazon S3 bucket object
:param src_bucket_name: string
:param src_object_name: string
:param dest_bucket_name: string. Must already exist.
:param dest_object_name: string. If dest bucket/object exists, it is
overwritten. Default: src_object_name
:return: True if object was copied, otherwise False
"""
# Construct source bucket/object parameter
copy_source = {'Bucket': src_bucket_name, 'Key': src_object_name}
if dest_object_name is None:
dest_object_name = src_object_name
# Copy the object
s3 = boto3.client('s3')
try:
s3.copy_object(CopySource=copy_source, Bucket=dest_bucket_name,
Key=dest_object_name)
except ClientError as e:
logging.error(e)
return print("Error in copying object")
return print("\n{1} from {0} was copied to {2} as {3}".format(src_bucket_name,
src_object_name, dest_bucket_name, dest_object_name))
def download_from_bucket_setup(self, bucket_name, file_to_download, new_file_name):
"""
Download a file form selected bucket
"""
s3 = boto3.resource('s3')
try:
s3.Bucket(bucket_name).download_file(file_to_download, new_file_name)
print("File {1} has been downloaded from {0}".format(bucket_name, file_to_download))
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
print("The object does not exist.")
else:
raise
def create_bucket(self):
"""
Create an S3 bucket
"""
#Import generated number
end_num = self.gen_6_digit_random_number()
#generate bucket name
bucket_name = "{0}-{1}".format(USERNAME, end_num)
self.create_bucket_setup(bucket_name, None)
def place_object_in_bucket(self):
"""
Places an object in bucket list
"""
self.buckets = self.gen_bucket_list()
while True:
try:
self.list_all_buckets()
select_bucket = int(input("Select bucket to insert object: "))
if select_bucket < len(self.buckets):
#print(buckets[select_bucket])
self.put_object(self.buckets[select_bucket])
break
else:
print(BUCKET_ERROR_MESSAGE)
except Exception as e:
print(BUCKET_SELECT_ERROR_MESSAGE, e)
def delete_object_in_bucket(self):
"""
Deletes an object from bucket list
"""
self.buckets = self.gen_bucket_list()
while True:
try:
self.list_all_buckets()
select_bucket = int(input("Select bucket to delete from: "))
if select_bucket < len(self.buckets):
objects = self.list_bucket_objects(self.buckets[select_bucket])
if objects is None:
return
selection = int(input("Object to delete: "))
self.delete_object(self.buckets[select_bucket], objects[selection]["Key"])
break
else:
print(BUCKET_ERROR_MESSAGE)
except Exception as e:
print(BUCKET_SELECT_ERROR_MESSAGE, e)
def delete_bucket(self):
"""
Deletes a bucket from S3
"""
self.buckets = self.gen_bucket_list()
while True:
try:
self.list_all_buckets()
select_bucket = int(input("Select bucket to delete: "))
if select_bucket < len(self.buckets):
self.delete_bucket_setup(self.buckets[select_bucket])
break
else:
print(BUCKET_ERROR_MESSAGE)
except Exception as e:
print(BUCKET_SELECT_ERROR_MESSAGE, e)
def copy_from_one_bucket_to_another(self):
"""
Copy object to another bucket
"""
self.buckets = self.gen_bucket_list()
copying = True
while copying:
try:
self.list_all_buckets()
src_bucket = int(input("Select source bucket: "))
objects = self.list_bucket_objects(str(self.buckets[src_bucket]))
src_object_name = int(input("Select object in bucket to move: "))
dest_bucket = int(input("Select bucket to move object to: "))
dest_object_name = (input("Select name of copied object: "))
if src_bucket < len(self.buckets):
self.copy_object(self.buckets[src_bucket], objects[src_object_name]["Key"],
self.buckets[dest_bucket], dest_object_name)
break
else:
print(BUCKET_ERROR_MESSAGE)
except Exception as e:
print(e)
def download_from_bucket(self):
"""
downloads from a bucket
"""
self.buckets = self.gen_bucket_list()
downloading = True
while downloading:
try:
self.list_all_buckets()
select_bucket = int(input("Select bucket to download from: "))
objects = self.list_bucket_objects(str(self.buckets[select_bucket]))
if select_bucket < len(self.buckets):
file_number = int(input("Select file to download from {0}: ".format(self.buckets[select_bucket])))
file = objects[file_number]["Key"]
new_file_name = input("Select new file name: ")
self.download_from_bucket_setup(self.buckets[select_bucket], file, new_file_name)
downloading = False
else:
print("Please select from available buckets")
except Exception as e:
print(BUCKET_SELECT_ERROR_MESSAGE, e)
def exit_program(self):
"""Exit program"""
print("Exiting: ", datetime.now())
exit(0)
def run():
app = Application()
while True:
try:
print("Select an operation:")
print(Option.get_values())
try:
selected_choice = Option(int(input("Select an Option: ")))
except Exception:
print("Invalid Operation - Exiting Application")
exit(-1)
if selected_choice == Option.create_bucket:
app.create_bucket()
elif selected_choice == Option.place_object_in_bucket:
app.place_object_in_bucket()
elif selected_choice == Option.delete_object_in_bucket:
app.delete_object_in_bucket()
elif selected_choice == Option.delete_bucket:
app.delete_bucket()
elif selected_choice == Option.copy_from_one_bucket_to_another:
app.copy_from_one_bucket_to_another()
elif selected_choice == Option.download_from_bucket:
app.download_from_bucket()
elif selected_choice == Option.exit_program:
app.exit_program()
else:
print("Please enter a valid option")
except KeyboardInterrupt:
exit(0)
if __name__ == "__main__":
try:
run()
except Exception as e:
print("A fatal exception has occurred")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment