Skip to content

Instantly share code, notes, and snippets.

@johnathaningle
Created June 19, 2021 18:15
Show Gist options
  • Save johnathaningle/380f03281504c4c363c96f5ccbd30462 to your computer and use it in GitHub Desktop.
Save johnathaningle/380f03281504c4c363c96f5ccbd30462 to your computer and use it in GitHub Desktop.
SDEV400 Homework 4
from enum import IntEnum
import sys
from typing import Any, Dict, List, Optional
import boto3
from boto3.dynamodb.conditions import Key, Attr
from uuid import UUID, uuid4
from datetime import datetime
from decimal import Decimal
import json
from statistics import mean
from botocore.utils import parse_timestamp
class Trip:
"""
The DTO for trip data. This is logged in the DynamoDB database
"""
TripId: str
Name: str
Time: datetime
Distance: Decimal
Speed: Decimal
def __init__(self, db_value: Optional[Dict[str, Any]]) -> None:
"""Init the trip DTO
Args:
db_value (Optional[Dict[str, Any]]): If db_value is specified, load the trip data into memory
"""
if db_value is not None:
self.TripId = db_value.get("TripId")
self.Name = db_value.get("Name")
self.Time = parse_timestamp(db_value["Time"])
self.Distance = db_value.get("Distance")
self.Speed = db_value.get("Speed")
def __str__(self) -> str:
return f"{self.Name} completed on {self.Time} for a total of {self.Distance} miles at {self.Speed} MPH"
class MenuOptions(IntEnum):
"""Application Menu Options
"""
Input_Ride_Data = 0,
Retrieve_Ride_Data = 1,
Manually_Import_Data = 2,
Report_Average_Distance = 3,
Report_Average_Speed = 4,
Help = 5,
Exit = 6,
@staticmethod
def get_values() -> str:
ret = ""
for idx, option in enumerate(MenuOptions):
o = str(idx) + " - " + str(option).replace(f"{MenuOptions.__name__}.", "") + "\n"
o = o.replace("_", " ")
ret += o
return ret
TABLE = "Trips"
BUCKET = "cycletracker"
class App:
def __init__(self) -> None:
print("--- App Loading ---")
self.dynamodb = boto3.resource('dynamodb')
self.s3 = boto3.client('s3')
self.trips: List[Trip] = []
self.load_trips()
print("--- App Begin ---\n")
def get_date(self, prompt: str, default_text: str) -> datetime:
"""Logic to prompt the user to input a valid date
Args:
prompt (str): The prompt for the user input
default_text (str): If no value is specified, use this default value
Returns:
datetime: The parsed date
"""
valid = False
while not valid:
date_text = input(prompt)
try:
d = parse_timestamp(date_text)
return d
except:
if default_text is not None:
return parse_timestamp(default_text)
print("Invalid Date Input")
continue
def filter_results_by_dates(self, start: datetime, end: datetime) -> List[Trip]:
"""Filter the in memory list of trips by a given date range
Args:
start (datetime): The trip time must be greater than or equal to this date time
end (datetime): the trip time must be less than or equal to this date time
Returns:
List[Trip]: The filtered list of trips
"""
filter_trips = []
for trip in self.trips:
if trip.Time >= start and trip.Time <= end:
filter_trips.append(trip)
return filter_trips
def report_average_speed(self):
"""Print the average speed of a user's trips
"""
if len(self.trips) == 0:
print("No data avaliable")
return
speeds = [t.Speed for t in self.trips]
print(f"Your average speed is {round(mean(speeds), 2)} MPH")
def load_trips(self) -> bool:
"""Load the user's trip data from dynamodb
Returns:
bool: Whether or not there were any valid results returned
"""
table = self.dynamodb.Table(TABLE)
response: Dict[str, Any] = table.scan(
ProjectionExpression="TripId, #n, Distance, #t, Speed",
ExpressionAttributeNames={
"#t": "Time",
"#n" : "Name",
},
)
if response.get("Items") is not None and response.get("Count") is not None:
print(f"Loading complete, {response.get('Count')} total trips loaded")
self.trips = [Trip(i) for i in response["Items"]]
return True
else:
print("There is not enough data to perform this search")
return False
def handle_search(self):
"""Handle a user's search for trip data
"""
# Make sure there is trip data avaliable
if len(self.trips) > 0:
query_date = self.get_date("Date Begin: ", "1971-01-01")
query_date_end = self.get_date("Date End: ", str(datetime.now()))
trips = self.filter_results_by_dates(query_date, query_date_end)
if len(trips) > 0:
print("Based on your selection, here are your recent trips:")
for trip in trips:
print(trip)
else:
print("No trips meet your criteria")
else:
print("No trips found, please input ride data")
def insert_bucket_objects(self, objects: List[Dict[str, str]]):
"""Take the S3 response of objects listed in a bucket,
Load the object data into memory, then put the S3 data into dynamodb
Args:
objects (List[Dict[str, str]]): The response Contents from S3
"""
for obj in objects:
try:
response = self.s3.get_object(Bucket=BUCKET, Key=obj["Key"])
body = response.get("Body")
if body:
contents = body.read()
item = json.loads(contents, encoding="UTF-8")
item["TripId"] = str(uuid4())
table = self.dynamodb.Table(TABLE)
table.put_item(Item=json.loads(json.dumps(item), parse_float=Decimal))
print(f"Added: {item['Name']}")
self.s3.delete_object(Bucket=BUCKET, Key=obj["Key"])
except:
print("The trip data is invalid, this value will be skipped")
def load_bucket_objects(self) -> Optional[List[Any]]:
"""Load the objects in an Amazon S3 bucket
:param bucket_name: string
:return: List of bucket objects. If error, return None.
"""
try:
response = self.s3.list_objects_v2(Bucket=BUCKET)
except Exception as e:
# AllAccessDisabled error == bucket not found
return None
objects = response.get('Contents')
if objects:
insert = self.yes_no_menu(f"Insert the {len(objects)} records saved in S3?")
if insert:
self.insert_bucket_objects(objects)
self.load_trips()
else:
print("No objects to copy")
return None
def get_float(self, message: str) -> float:
"""Handle the user input logic for retrieving a float value
Args:
message (str): The message to prompt to the user
Returns:
float: the user input converted to a float
"""
valid = False
while not valid:
try:
value = float(input(message))
return value
except:
print("Invalid input, please enter a decimal value")
def get_string(self, message: str, default_text: Optional[str]):
"""Prompt the user for a non-null and non empty string
Args:
message (str): The message to prompt to the user
default_text (Optional[str]): If no value is provided by the user, use a default value if avaliable
Returns:
[type]: a string value
"""
valid = False
while not valid:
value = input(message)
if value is not None and value.strip() != "":
return value
elif default_text is not None:
return default_text
else:
print("Invalid input, try again")
def get_custom_params(self, reference_item: Dict[str, any]) -> None:
"""Add optional fields to the a dictionary (representing the trip object)
Args:
reference_item (Dict[str, any]): A dictionary representing a trip object
Returns:
[type]: None
"""
adding = True
while adding:
key = self.get_string("Custom Parameter Name: ", None)
value = self.get_string("Custom Parameter Value: ", None)
d_value: Optional[float] = None
try:
if value.isnumeric():
d_value = float(value)
except:
print("Invalid Input")
continue
if d_value is None:
reference_item[key] = value
else:
reference_item[key] = d_value
check = input("------\nPress enter to continue or\nenter any value to return to the menu: ")
if check != "":
return
def yes_no_menu(self, message: str) -> bool:
"""Generic Y/N menu which uses a custom prompt message"""
valid_choice = False
while not valid_choice:
choice = input(f"{message} (Y/N)")
if choice is None or choice.strip() == "":
print("Invalid choice, try again")
elif choice.lower() == "y":
return True
elif choice.lower() == "n":
return False
else:
print("Invalid choice, try again")
def input_ride_data(self):
"""Prompt the user to input ride data such as the time of the trip,
the total distance in miles, the average ride speed in MPH, and allow
for free form parameters via adding "custom data" to the trip
"""
tid = str(uuid4())
name = self.get_string("Trip Name: ", f"Trip: {datetime.now()}")
distance = self.get_float("Ride Distance: ")
speed = self.get_float("Ride Speed (MPH): ")
time = str(datetime.now())
item = {
'TripId': tid,
'Name': name,
'Distance': distance,
'Time': time,
'Speed': speed,
}
if self.yes_no_menu("Enter Custom Data?"):
self.get_custom_params(item)
table = self.dynamodb.Table(TABLE)
table.put_item(Item=json.loads(json.dumps(item), parse_float=Decimal))
print(f"Trip {name} added to your records")
self.load_trips()
def report_average_distance(self):
"""Print the average ride distance for all recorded trips
"""
if len(self.trips) == 0:
print("No data avaliable")
return
distances = [t.Distance for t in self.trips]
print(f"Your average ride distance is {round(mean(distances), 2)} miles")
def init_table(self) -> dict:
"""Create the Trips table and return the response dictionary"""
table = self.dynamodb.create_table(
TableName=TABLE,
KeySchema=[
{
'AttributeName': 'TripId',
'KeyType': 'HASH' #Partition key
},
{
'AttributeName': 'Name',
'KeyType': 'RANGE' #Sort key
}
],
AttributeDefinitions=[
{
'AttributeName': 'TripId',
'AttributeType': 'S'
},
{
'AttributeName': 'Name',
'AttributeType': 'S'
},
],
ProvisionedThroughput={
'ReadCapacityUnits': 10,
'WriteCapacityUnits': 10
}
)
return table
def run(self):
"""Handle the main application logic
"""
print("Welcome to the Cycle Tracker")
done = False
while not done:
try:
print("\n--- App Menu ---")
print("Please chose an option: ")
choice = MenuOptions(int(input(MenuOptions.get_values())))
if choice == MenuOptions.Exit:
print("Thanks for using the application.")
sys.exit(0)
elif choice == MenuOptions.Help:
print("""Help Menu:
- The Cycling Tracker app is designed to help you track your cycling metrics
- Inputting ride data allows you to input your latest ride data
- Retrieve ride data allows you to view your recent ride history
- Manual data import will load ride data from S3 data storage
- Reporting average distance will compute your all time average distance per ride
- Reporting average speed will compute your all time average speed per ride
""")
elif choice == MenuOptions.Input_Ride_Data:
app.input_ride_data()
elif choice == MenuOptions.Manually_Import_Data:
app.load_bucket_objects()
elif choice == MenuOptions.Retrieve_Ride_Data:
app.handle_search()
elif choice == MenuOptions.Report_Average_Distance:
app.report_average_distance()
elif choice == MenuOptions.Report_Average_Speed:
app.report_average_speed()
except Exception as ex:
print(ex)
print("Invalid choice try again...")
if __name__ == "__main__":
app = App()
if "--create-tables" in sys.argv:
app.init_table()
app.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment