Created
November 27, 2021 01:07
-
-
Save peternewman22/754b90c827faaf2598a57f7a5561cf0c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import PySimpleGUI as sg | |
import sqlite3 | |
from datetime import timedelta, datetime | |
from enum import Enum | |
from collections import defaultdict | |
from typing import List, Tuple, Callable | |
import logging | |
logging.basicConfig(filename="main.log", filemode='w', level=logging.DEBUG, format='%(name)s - %(levelname)s - %(message)s') | |
def time_remaining_to_percentage(total_time_string: str, time_remaining_string: str) -> str: | |
total_time_list = list(map(lambda bit: float(bit), total_time_string.split(":"))) | |
time_remaining_list = list(map(lambda bit: float(bit),time_remaining_string.split(":"))) | |
total_duration = timedelta(hours=total_time_list[0], minutes = total_time_list[1], seconds = total_time_list[2]) | |
remaining_duration = timedelta(hours=time_remaining_list[0], minutes= time_remaining_list[1], seconds = time_remaining_list[2]) | |
total_seconds = total_duration.total_seconds() | |
remaining_seconds = remaining_duration.total_seconds() | |
# print(f"Total Duration: {total_duration} -> {total_seconds}, Remaining Duration: {remaining_duration} -> {remaining_seconds}") | |
return f"{round(((total_seconds - remaining_seconds)/total_seconds)*100,1)}%" | |
class K(Enum): | |
TITLE = "Title" | |
SAVE = "Save" | |
STARTED = "Started" | |
AUTHOR = "Author" | |
NARRATOR = "Narrator" | |
FINISHED = "Finished" | |
REMAINING = "Remaining" | |
PERC = "Perc" | |
DURATION = "Duration" | |
LAST_UPDATED = "Last Updated" | |
UPDATE = "Update" | |
ID = "id" | |
NEW = "New" | |
EXISTING_AUTHOR = "Existing Author" | |
EXISTING_NARRATOR = "Existing Narrator" | |
MAIN = "Main" | |
DB = "audible_library.db" | |
global all_titles | |
global all_authors | |
global all_narrators | |
to_update = [K.AUTHOR.value, K.NARRATOR.value, K.DURATION.value, K.REMAINING.value, K.PERC.value, K.STARTED.value, K.LAST_UPDATED.value, K.FINISHED.value] | |
book_list = [ | |
("I Can't Make This Up: Life Lessons", "Neil Strauss", "Kevin Hart", "11:15:00","8:13:00"), | |
("The Queen's Gambit", "Walter Trevis", "Amy Landon", "11:50:00", "11:50:00"), | |
("Dishonesty is the second-best policy", "David Mitchell", "David Mitchell", "8:11:00","8:11:00"), | |
("Thinking about it only makes it worse", "David Mitchell", "David Mitchell", "9:10:00","9:10:00") | |
] | |
def generate_and_seed_db(): | |
logging.debug(f"\tgenerate_and_seeed(): connecting to database") | |
db = sqlite3.connect(K.DB.value) | |
cur = db.cursor() | |
logging.debug(f"\tgenerate_and_seeed(): dropping table audiobooks if it exists") | |
cur.execute("DROP TABLE IF EXISTS audiobooks;") | |
logging.debug(f"\tgenerate_and_seeed(): creating new database") | |
cur.execute(""" | |
CREATE TABLE IF NOT EXISTS audiobooks( | |
id integer PRIMARY KEY, | |
title text UNIQUE NOT NULL, | |
author text NOT NULL, | |
narrator text NOT NULL, | |
duration text NOT NULL, | |
remaining text NOT NULL, | |
started date, | |
last_updated date, | |
finished date | |
) | |
""") | |
logging.debug(f"\tgenerate_and_seeed(): seeding with values {[print(x) for x in book_list]}") | |
cur.executemany("INSERT INTO audiobooks(title, author, narrator, duration, remaining) VALUES(?, ?, ?, ?, ?)",book_list) | |
logging.debug(f"\tgenerate_and_seeed(): committing to database") | |
db.commit() | |
logging.debug(f"\tgenerate_and_seeed(): closing connection to database") | |
cur.close() | |
db.close() | |
def get_titles(): | |
logging.debug(f"\tget_titles(): Connecting to database") | |
db = sqlite3.connect("audible_library.db") | |
cur = db.cursor() | |
logging.debug(f"\tget_titles(): Fetching titles") | |
cur.execute("SELECT title FROM audiobooks;") | |
raw_titles = list(cur.fetchall()) | |
logging.debug(f"get_titles(): flattening the list by processing {raw_titles} etc") | |
titles = list(map(lambda x: x[0], raw_titles)) | |
logging.debug(f"\tget_titles(): titles: {titles}") | |
logging.debug("\tget_titles(): closing connection to database") | |
cur.close() | |
db.close() | |
return titles | |
def get_narrators(): | |
logging.debug(f"\tget_narrators(): Connecting to database") | |
db = sqlite3.connect("audible_library.db") | |
cur = db.cursor() | |
logging.debug(f"\tget_narrators(): Fetching narrators") | |
cur.execute("SELECT narrator FROM audiobooks;") | |
narrators = list(set(map(lambda x: x[0], cur.fetchall()))) # removing non-unique values | |
logging.debug(f"\tget_narrators(): narrators = {narrators}") | |
# print(titles) | |
logging.debug(f"\tget_narrators(): closing connection to database") | |
cur.close() | |
db.close() | |
return narrators | |
def get_authors(): | |
logging.debug(f"\tget_authors(): Connecting to database") | |
db = sqlite3.connect("audible_library.db") | |
cur = db.cursor() | |
logging.debug(f"\tget_authors(): Fetching authors") | |
cur.execute("SELECT author FROM audiobooks;") | |
authors = list(set(map(lambda x: x[0], cur.fetchall()))) | |
logging.debug(f"\tget_authors(): authors = {authors}") | |
# print(titles) | |
cur.close() | |
db.close() | |
return authors | |
# doesn't work??? | |
def get_record(title:str)-> defaultdict: | |
logging.debug("get_record(): Connecting to database") | |
db = sqlite3.connect("audible_library.db") | |
cur = db.cursor() | |
logging.debug("get_record(): Selecting record by title") | |
cur.execute("SELECT * FROM audiobooks WHERE title = ?",(title,)) | |
row = cur.fetchone() | |
logging.debug(f"row: {row}") | |
# cur.execute("SELECT * FROM audiobooks") | |
# row = cur.fetchall() | |
logging.debug("get_record(): Constructing default dict from record") | |
row_dict = { # row[1] --> storing title as key in dictionary | |
K.ID.value : row[0], | |
K.AUTHOR.value : row[2], | |
K.NARRATOR.value : row[3], | |
K.DURATION.value : row[4], | |
K.REMAINING.value : row[5], | |
K.STARTED.value : row[6], | |
K.LAST_UPDATED.value : row[7], | |
K.FINISHED.value : row[8], | |
K.PERC.value : time_remaining_to_percentage(row[4],row[5]) | |
} | |
logging.debug("Closing connection to database") | |
cur.close() | |
db.close() | |
logging.debug("Returning record") | |
return row_dict | |
def get_main_layout(record, titles): | |
return [ | |
[sg.Text("Title: "), sg.Combo(values=titles,key=K.TITLE.value,enable_events=True)], | |
[sg.Text("Author: "), sg.Text(record[K.AUTHOR.value], key=K.AUTHOR.value, size=(20,1))], | |
[sg.Text("Narrator: "), sg.Text(record[K.AUTHOR.value], key=K.NARRATOR.value, size=(20,1))], | |
[sg.Text("Total Duration: "), sg.Text(record[K.DURATION.value], key=K.DURATION.value,size=(20,1))], | |
[sg.Text("Time Remaining:"), sg.InputText(default_text=record[K.REMAINING.value], key=K.REMAINING.value,tooltip="hh:mm:ss", size=(15,1))], | |
[sg.Text("Percentage Complete:"), sg.Text(f"{record[K.PERC.value]}%", key = K.PERC.value, size=(20,1))], | |
[sg.Text("Started: "), sg.InputText(default_text=record[K.STARTED.value],key= K.STARTED.value,tooltip="yyyy/mm/dd", size=(20,1))], | |
[sg.Text("LastUpdated: "), sg.Text(record[K.LAST_UPDATED.value], key = K.LAST_UPDATED.value, size=(20,1))], | |
[sg.Text("Finished: "), sg.InputText(default_text=record[K.FINISHED.value], key = K.FINISHED.value, size=(20,1))], | |
[sg.Cancel(), sg.Button("Update",key=K.UPDATE.value), sg.Button("Save",key=K.SAVE.value, disabled= True), sg.Button("New", key = K.NEW.value)] | |
] | |
def get_new_layout(narrators, authors): | |
return [ | |
[sg.Text("Title: "), sg.InputText(default_text="Title?" , key=K.TITLE.value)], | |
[sg.Text("Author: "), sg.InputText(key=K.AUTHOR.value, size=(20,1)), sg.Combo(authors,key=K.EXISTING_AUTHOR.value, size=(20,1))], | |
[sg.Text("Narrator: "), sg.InputText(key=K.NARRATOR.value, size=(20,1)),sg.Combo(narrators,key=K.EXISTING_NARRATOR.value, size=(20,1))], | |
[sg.Text("Total Duration: "), sg.InputText(key=K.DURATION.value,size=(20,1))], | |
[sg.Text("Time Remaining:"), sg.InputText(key=K.REMAINING.value,tooltip="hh:mm:ss", size=(20,1))], | |
[sg.Text("Started: "), sg.InputText(default_text=datetime.now().date(),key= K.STARTED.value,tooltip="yyyy/mm/dd", size=(20,1))], | |
[sg.Text("LastUpdated: "), sg.InputText(default_text = datetime.now().date(), key = K.LAST_UPDATED.value, size=(20,1))], | |
[sg.Text("Finished: "), sg.InputText(key = K.FINISHED.value, size=(20,1))], | |
[sg.Cancel(), sg.Button("Save",key=K.SAVE.value)] | |
] | |
def update_record(record: dict) -> None: | |
sql_save = """ | |
UPDATE audiobooks | |
SET remaining = ?, | |
started = ?, | |
last_updated = ?, | |
finished = ? | |
WHERE id = ? | |
""" | |
db = sqlite3.connect("audible_library.db") | |
cur = db.cursor() | |
# get all the values and save them | |
cur.execute(sql_save,(record[K.REMAINING.value], | |
record[K.STARTED.value], | |
record[K.LAST_UPDATED.value], | |
record[K.FINISHED.value], | |
record[K.ID.value])) | |
db.commit() | |
cur.close() | |
db.close() | |
def insert_record(record: tuple) -> None: | |
sql_insert_statement = """ | |
INSERT INTO audiobooks (title, author, narrator, duration, remaining, started, last_updated) | |
VALUES(?, ?, ?, ?, ?, ?, ?); | |
""" | |
db = sqlite3.connect(K.DB.value) | |
cur = db.cursor() | |
logging.debug("Inserting record") | |
cur.execute(sql_insert_statement, record) | |
db.commit() | |
logging.debug("New record") | |
cur.execute("SELECT * FROM audiobooks WHERE id = (SELECT MAX(id) FROM audiobooks);") | |
logging.debug(f"New record: {cur.fetchone()}") | |
cur.close() | |
db.close() | |
#TODO: Data validation | |
def build_new_record(v: dict) -> Tuple: | |
""" Takes values dict and returns an ordered tuple""" | |
author = v[K.AUTHOR.value] if v[K.AUTHOR.value] != '' else v[K.EXISTING_AUTHOR.value] | |
narrator = v[K.NARRATOR.value] if v[K.NARRATOR.value] != '' else v[K.EXISTING_NARRATOR.value] | |
record_tuple = ( | |
v[K.TITLE.value], | |
author, | |
narrator, | |
v[K.DURATION.value], | |
v[K.REMAINING.value], | |
v[K.STARTED.value], | |
v[K.LAST_UPDATED.value] | |
) | |
logging.debug("Collecting record:") | |
[logging.debug(f"{x}") for x in record_tuple] | |
return record_tuple | |
def main(): | |
# generate_and_seed_db() # runs first time only | |
d: dict = defaultdict(lambda: "") | |
is_updated = False | |
all_titles: List[str] = get_titles() | |
all_narrators: List[str] = get_narrators() | |
all_authors: List[str] = get_authors() | |
# generating the window | |
logging.debug(f"Generating main window with is_updated: {is_updated}") | |
main_layout = get_main_layout(d,all_titles) | |
w1 = sg.Window("Audiobook Updater", layout = main_layout) | |
w2_active = False | |
while True: | |
e1, v1 = w1.read() | |
print(f"event: {e1},\tvalues: {v1}\n") | |
if e1 == "Cancel" or e1 == "Quit" or e1 == sg.WIN_CLOSED: | |
break | |
if e1 == K.TITLE.value: | |
logging.debug("Title change detected") | |
is_updated = False | |
w1[K.SAVE.value].update(disabled = not is_updated) | |
try: | |
d = get_record(v1[K.TITLE.value]) | |
for each_key in to_update: | |
logging.debug(f"\tAttempting to update {each_key} --> {d[each_key]}") | |
w1[each_key].update(d[each_key]) | |
except Exception as e: | |
logging.error(e) | |
sg.Popup(title = "Error!", custom_text= f"Error: {e}") | |
if e1 == K.UPDATE.value: | |
try: | |
w1[K.AUTHOR.value].update("Author: Updated!") | |
w1[K.PERC.value].update() | |
if v1[K.REMAINING.value] != '': | |
d[K.REMAINING.value] = v1[K.REMAINING.value] # update time remaining | |
d[K.PERC.value] = f"{time_remaining_to_percentage(d[K.DURATION.value], d[K.REMAINING.value])}%" | |
else: | |
continue | |
if v1[K.STARTED.value] != '': | |
d[K.STARTED.value] = datetime.strptime(v1[K.STARTED.value],'%Y/%m/%d').date() # update time remaining | |
else: | |
continue | |
if v1[K.REMAINING.value] == "00:00:00": | |
if v1[K.FINISHED.value] != '': | |
d[K.FINISHED.value] = v1[K.FINISHED.value] | |
else: | |
d[K.FINISHED.value] = datetime.now().date() | |
if e1 == K.UPDATE.value: | |
d[K.LAST_UPDATED.value] = datetime.now().date() | |
is_updated = True | |
w1[K.SAVE.value].update(disabled = not is_updated) | |
for each_key in to_update: | |
print(f"updating: {d[each_key]}") | |
w1[each_key].update(d[each_key]) | |
except Exception as e: | |
sg.Popup(title = "Error!", custom_text= f"Error: {e}") | |
if e1 == K.SAVE.value: | |
logging.debug("Saving...") | |
[logging.debug(f"\t\t{k}: {v}") for k ,v in d.items()] | |
is_updated = False | |
w1[K.SAVE.value].update(disabled = not is_updated) | |
try: | |
update_record(d) | |
sg.PopupQuickMessage(image="success-icon-23194.png") | |
except Exception as e: | |
sg.Popup(title = "Error!", custom_text=(f"Error: {e}")) | |
if e1 == K.NEW.value and not w2_active: | |
logging.debug("New called! Opening new window with w2_active true") | |
w1.Hide() | |
w2_active = True | |
new_layout = get_new_layout(narrators=all_narrators,authors=all_authors) | |
w2 = sg.Window("New Audiobook", new_layout) | |
while True: | |
e2, v2 = w2.read() | |
if e2 == "Cancel" or e2 == sg.WIN_CLOSED: | |
break | |
elif e2 == K.SAVE.value: | |
new_record = build_new_record(v2) | |
insert_record(new_record) | |
sg.PopupQuickMessage(image="success-icon-23194.png") | |
logging.debug("Closing second window") | |
w2.close() | |
w2_active = False | |
w1.un_hide() | |
# refresh lists | |
break | |
d: dict = defaultdict(lambda: "") | |
all_titles: List[str] = get_titles() | |
all_narrators: List[str] = get_narrators() | |
all_authors: List[str] = get_authors() | |
# update window title list | |
w1[K.TITLE.value].update(values=all_titles) | |
logging.debug("Ending Program") | |
w1.close() | |
if __name__=='__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment