Created
June 19, 2021 18:15
-
-
Save johnathaningle/380f03281504c4c363c96f5ccbd30462 to your computer and use it in GitHub Desktop.
SDEV400 Homework 4
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from enum import IntEnum | |
import sys | |
from typing import Any, Dict, List, Optional | |
import boto3 | |
from boto3.dynamodb.conditions import Key, Attr | |
from uuid import UUID, uuid4 | |
from datetime import datetime | |
from decimal import Decimal | |
import json | |
from statistics import mean | |
from botocore.utils import parse_timestamp | |
class Trip: | |
""" | |
The DTO for trip data. This is logged in the DynamoDB database | |
""" | |
TripId: str | |
Name: str | |
Time: datetime | |
Distance: Decimal | |
Speed: Decimal | |
def __init__(self, db_value: Optional[Dict[str, Any]]) -> None: | |
"""Init the trip DTO | |
Args: | |
db_value (Optional[Dict[str, Any]]): If db_value is specified, load the trip data into memory | |
""" | |
if db_value is not None: | |
self.TripId = db_value.get("TripId") | |
self.Name = db_value.get("Name") | |
self.Time = parse_timestamp(db_value["Time"]) | |
self.Distance = db_value.get("Distance") | |
self.Speed = db_value.get("Speed") | |
def __str__(self) -> str: | |
return f"{self.Name} completed on {self.Time} for a total of {self.Distance} miles at {self.Speed} MPH" | |
class MenuOptions(IntEnum): | |
"""Application Menu Options | |
""" | |
Input_Ride_Data = 0, | |
Retrieve_Ride_Data = 1, | |
Manually_Import_Data = 2, | |
Report_Average_Distance = 3, | |
Report_Average_Speed = 4, | |
Help = 5, | |
Exit = 6, | |
@staticmethod | |
def get_values() -> str: | |
ret = "" | |
for idx, option in enumerate(MenuOptions): | |
o = str(idx) + " - " + str(option).replace(f"{MenuOptions.__name__}.", "") + "\n" | |
o = o.replace("_", " ") | |
ret += o | |
return ret | |
TABLE = "Trips" | |
BUCKET = "cycletracker" | |
class App: | |
def __init__(self) -> None: | |
print("--- App Loading ---") | |
self.dynamodb = boto3.resource('dynamodb') | |
self.s3 = boto3.client('s3') | |
self.trips: List[Trip] = [] | |
self.load_trips() | |
print("--- App Begin ---\n") | |
def get_date(self, prompt: str, default_text: str) -> datetime: | |
"""Logic to prompt the user to input a valid date | |
Args: | |
prompt (str): The prompt for the user input | |
default_text (str): If no value is specified, use this default value | |
Returns: | |
datetime: The parsed date | |
""" | |
valid = False | |
while not valid: | |
date_text = input(prompt) | |
try: | |
d = parse_timestamp(date_text) | |
return d | |
except: | |
if default_text is not None: | |
return parse_timestamp(default_text) | |
print("Invalid Date Input") | |
continue | |
def filter_results_by_dates(self, start: datetime, end: datetime) -> List[Trip]: | |
"""Filter the in memory list of trips by a given date range | |
Args: | |
start (datetime): The trip time must be greater than or equal to this date time | |
end (datetime): the trip time must be less than or equal to this date time | |
Returns: | |
List[Trip]: The filtered list of trips | |
""" | |
filter_trips = [] | |
for trip in self.trips: | |
if trip.Time >= start and trip.Time <= end: | |
filter_trips.append(trip) | |
return filter_trips | |
def report_average_speed(self): | |
"""Print the average speed of a user's trips | |
""" | |
if len(self.trips) == 0: | |
print("No data avaliable") | |
return | |
speeds = [t.Speed for t in self.trips] | |
print(f"Your average speed is {round(mean(speeds), 2)} MPH") | |
def load_trips(self) -> bool: | |
"""Load the user's trip data from dynamodb | |
Returns: | |
bool: Whether or not there were any valid results returned | |
""" | |
table = self.dynamodb.Table(TABLE) | |
response: Dict[str, Any] = table.scan( | |
ProjectionExpression="TripId, #n, Distance, #t, Speed", | |
ExpressionAttributeNames={ | |
"#t": "Time", | |
"#n" : "Name", | |
}, | |
) | |
if response.get("Items") is not None and response.get("Count") is not None: | |
print(f"Loading complete, {response.get('Count')} total trips loaded") | |
self.trips = [Trip(i) for i in response["Items"]] | |
return True | |
else: | |
print("There is not enough data to perform this search") | |
return False | |
def handle_search(self): | |
"""Handle a user's search for trip data | |
""" | |
# Make sure there is trip data avaliable | |
if len(self.trips) > 0: | |
query_date = self.get_date("Date Begin: ", "1971-01-01") | |
query_date_end = self.get_date("Date End: ", str(datetime.now())) | |
trips = self.filter_results_by_dates(query_date, query_date_end) | |
if len(trips) > 0: | |
print("Based on your selection, here are your recent trips:") | |
for trip in trips: | |
print(trip) | |
else: | |
print("No trips meet your criteria") | |
else: | |
print("No trips found, please input ride data") | |
def insert_bucket_objects(self, objects: List[Dict[str, str]]): | |
"""Take the S3 response of objects listed in a bucket, | |
Load the object data into memory, then put the S3 data into dynamodb | |
Args: | |
objects (List[Dict[str, str]]): The response Contents from S3 | |
""" | |
for obj in objects: | |
try: | |
response = self.s3.get_object(Bucket=BUCKET, Key=obj["Key"]) | |
body = response.get("Body") | |
if body: | |
contents = body.read() | |
item = json.loads(contents, encoding="UTF-8") | |
item["TripId"] = str(uuid4()) | |
table = self.dynamodb.Table(TABLE) | |
table.put_item(Item=json.loads(json.dumps(item), parse_float=Decimal)) | |
print(f"Added: {item['Name']}") | |
self.s3.delete_object(Bucket=BUCKET, Key=obj["Key"]) | |
except: | |
print("The trip data is invalid, this value will be skipped") | |
def load_bucket_objects(self) -> Optional[List[Any]]: | |
"""Load the objects in an Amazon S3 bucket | |
:param bucket_name: string | |
:return: List of bucket objects. If error, return None. | |
""" | |
try: | |
response = self.s3.list_objects_v2(Bucket=BUCKET) | |
except Exception as e: | |
# AllAccessDisabled error == bucket not found | |
return None | |
objects = response.get('Contents') | |
if objects: | |
insert = self.yes_no_menu(f"Insert the {len(objects)} records saved in S3?") | |
if insert: | |
self.insert_bucket_objects(objects) | |
self.load_trips() | |
else: | |
print("No objects to copy") | |
return None | |
def get_float(self, message: str) -> float: | |
"""Handle the user input logic for retrieving a float value | |
Args: | |
message (str): The message to prompt to the user | |
Returns: | |
float: the user input converted to a float | |
""" | |
valid = False | |
while not valid: | |
try: | |
value = float(input(message)) | |
return value | |
except: | |
print("Invalid input, please enter a decimal value") | |
def get_string(self, message: str, default_text: Optional[str]): | |
"""Prompt the user for a non-null and non empty string | |
Args: | |
message (str): The message to prompt to the user | |
default_text (Optional[str]): If no value is provided by the user, use a default value if avaliable | |
Returns: | |
[type]: a string value | |
""" | |
valid = False | |
while not valid: | |
value = input(message) | |
if value is not None and value.strip() != "": | |
return value | |
elif default_text is not None: | |
return default_text | |
else: | |
print("Invalid input, try again") | |
def get_custom_params(self, reference_item: Dict[str, any]) -> None: | |
"""Add optional fields to the a dictionary (representing the trip object) | |
Args: | |
reference_item (Dict[str, any]): A dictionary representing a trip object | |
Returns: | |
[type]: None | |
""" | |
adding = True | |
while adding: | |
key = self.get_string("Custom Parameter Name: ", None) | |
value = self.get_string("Custom Parameter Value: ", None) | |
d_value: Optional[float] = None | |
try: | |
if value.isnumeric(): | |
d_value = float(value) | |
except: | |
print("Invalid Input") | |
continue | |
if d_value is None: | |
reference_item[key] = value | |
else: | |
reference_item[key] = d_value | |
check = input("------\nPress enter to continue or\nenter any value to return to the menu: ") | |
if check != "": | |
return | |
def yes_no_menu(self, message: str) -> bool: | |
"""Generic Y/N menu which uses a custom prompt message""" | |
valid_choice = False | |
while not valid_choice: | |
choice = input(f"{message} (Y/N)") | |
if choice is None or choice.strip() == "": | |
print("Invalid choice, try again") | |
elif choice.lower() == "y": | |
return True | |
elif choice.lower() == "n": | |
return False | |
else: | |
print("Invalid choice, try again") | |
def input_ride_data(self): | |
"""Prompt the user to input ride data such as the time of the trip, | |
the total distance in miles, the average ride speed in MPH, and allow | |
for free form parameters via adding "custom data" to the trip | |
""" | |
tid = str(uuid4()) | |
name = self.get_string("Trip Name: ", f"Trip: {datetime.now()}") | |
distance = self.get_float("Ride Distance: ") | |
speed = self.get_float("Ride Speed (MPH): ") | |
time = str(datetime.now()) | |
item = { | |
'TripId': tid, | |
'Name': name, | |
'Distance': distance, | |
'Time': time, | |
'Speed': speed, | |
} | |
if self.yes_no_menu("Enter Custom Data?"): | |
self.get_custom_params(item) | |
table = self.dynamodb.Table(TABLE) | |
table.put_item(Item=json.loads(json.dumps(item), parse_float=Decimal)) | |
print(f"Trip {name} added to your records") | |
self.load_trips() | |
def report_average_distance(self): | |
"""Print the average ride distance for all recorded trips | |
""" | |
if len(self.trips) == 0: | |
print("No data avaliable") | |
return | |
distances = [t.Distance for t in self.trips] | |
print(f"Your average ride distance is {round(mean(distances), 2)} miles") | |
def init_table(self) -> dict: | |
"""Create the Trips table and return the response dictionary""" | |
table = self.dynamodb.create_table( | |
TableName=TABLE, | |
KeySchema=[ | |
{ | |
'AttributeName': 'TripId', | |
'KeyType': 'HASH' #Partition key | |
}, | |
{ | |
'AttributeName': 'Name', | |
'KeyType': 'RANGE' #Sort key | |
} | |
], | |
AttributeDefinitions=[ | |
{ | |
'AttributeName': 'TripId', | |
'AttributeType': 'S' | |
}, | |
{ | |
'AttributeName': 'Name', | |
'AttributeType': 'S' | |
}, | |
], | |
ProvisionedThroughput={ | |
'ReadCapacityUnits': 10, | |
'WriteCapacityUnits': 10 | |
} | |
) | |
return table | |
def run(self): | |
"""Handle the main application logic | |
""" | |
print("Welcome to the Cycle Tracker") | |
done = False | |
while not done: | |
try: | |
print("\n--- App Menu ---") | |
print("Please chose an option: ") | |
choice = MenuOptions(int(input(MenuOptions.get_values()))) | |
if choice == MenuOptions.Exit: | |
print("Thanks for using the application.") | |
sys.exit(0) | |
elif choice == MenuOptions.Help: | |
print("""Help Menu: | |
- The Cycling Tracker app is designed to help you track your cycling metrics | |
- Inputting ride data allows you to input your latest ride data | |
- Retrieve ride data allows you to view your recent ride history | |
- Manual data import will load ride data from S3 data storage | |
- Reporting average distance will compute your all time average distance per ride | |
- Reporting average speed will compute your all time average speed per ride | |
""") | |
elif choice == MenuOptions.Input_Ride_Data: | |
app.input_ride_data() | |
elif choice == MenuOptions.Manually_Import_Data: | |
app.load_bucket_objects() | |
elif choice == MenuOptions.Retrieve_Ride_Data: | |
app.handle_search() | |
elif choice == MenuOptions.Report_Average_Distance: | |
app.report_average_distance() | |
elif choice == MenuOptions.Report_Average_Speed: | |
app.report_average_speed() | |
except Exception as ex: | |
print(ex) | |
print("Invalid choice try again...") | |
if __name__ == "__main__": | |
app = App() | |
if "--create-tables" in sys.argv: | |
app.init_table() | |
app.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment