Skip to content

Instantly share code, notes, and snippets.

@peternewman22
Created November 27, 2021 01:07
Show Gist options
  • Save peternewman22/754b90c827faaf2598a57f7a5561cf0c to your computer and use it in GitHub Desktop.
Save peternewman22/754b90c827faaf2598a57f7a5561cf0c to your computer and use it in GitHub Desktop.
import PySimpleGUI as sg
import sqlite3
from datetime import timedelta, datetime
from enum import Enum
from collections import defaultdict
from typing import List, Tuple, Callable
import logging
logging.basicConfig(filename="main.log", filemode='w', level=logging.DEBUG, format='%(name)s - %(levelname)s - %(message)s')
def time_remaining_to_percentage(total_time_string: str, time_remaining_string: str) -> str:
total_time_list = list(map(lambda bit: float(bit), total_time_string.split(":")))
time_remaining_list = list(map(lambda bit: float(bit),time_remaining_string.split(":")))
total_duration = timedelta(hours=total_time_list[0], minutes = total_time_list[1], seconds = total_time_list[2])
remaining_duration = timedelta(hours=time_remaining_list[0], minutes= time_remaining_list[1], seconds = time_remaining_list[2])
total_seconds = total_duration.total_seconds()
remaining_seconds = remaining_duration.total_seconds()
# print(f"Total Duration: {total_duration} -> {total_seconds}, Remaining Duration: {remaining_duration} -> {remaining_seconds}")
return f"{round(((total_seconds - remaining_seconds)/total_seconds)*100,1)}%"
class K(Enum):
TITLE = "Title"
SAVE = "Save"
STARTED = "Started"
AUTHOR = "Author"
NARRATOR = "Narrator"
FINISHED = "Finished"
REMAINING = "Remaining"
PERC = "Perc"
DURATION = "Duration"
LAST_UPDATED = "Last Updated"
UPDATE = "Update"
ID = "id"
NEW = "New"
EXISTING_AUTHOR = "Existing Author"
EXISTING_NARRATOR = "Existing Narrator"
MAIN = "Main"
DB = "audible_library.db"
global all_titles
global all_authors
global all_narrators
to_update = [K.AUTHOR.value, K.NARRATOR.value, K.DURATION.value, K.REMAINING.value, K.PERC.value, K.STARTED.value, K.LAST_UPDATED.value, K.FINISHED.value]
book_list = [
("I Can't Make This Up: Life Lessons", "Neil Strauss", "Kevin Hart", "11:15:00","8:13:00"),
("The Queen's Gambit", "Walter Trevis", "Amy Landon", "11:50:00", "11:50:00"),
("Dishonesty is the second-best policy", "David Mitchell", "David Mitchell", "8:11:00","8:11:00"),
("Thinking about it only makes it worse", "David Mitchell", "David Mitchell", "9:10:00","9:10:00")
]
def generate_and_seed_db():
logging.debug(f"\tgenerate_and_seeed(): connecting to database")
db = sqlite3.connect(K.DB.value)
cur = db.cursor()
logging.debug(f"\tgenerate_and_seeed(): dropping table audiobooks if it exists")
cur.execute("DROP TABLE IF EXISTS audiobooks;")
logging.debug(f"\tgenerate_and_seeed(): creating new database")
cur.execute("""
CREATE TABLE IF NOT EXISTS audiobooks(
id integer PRIMARY KEY,
title text UNIQUE NOT NULL,
author text NOT NULL,
narrator text NOT NULL,
duration text NOT NULL,
remaining text NOT NULL,
started date,
last_updated date,
finished date
)
""")
logging.debug(f"\tgenerate_and_seeed(): seeding with values {[print(x) for x in book_list]}")
cur.executemany("INSERT INTO audiobooks(title, author, narrator, duration, remaining) VALUES(?, ?, ?, ?, ?)",book_list)
logging.debug(f"\tgenerate_and_seeed(): committing to database")
db.commit()
logging.debug(f"\tgenerate_and_seeed(): closing connection to database")
cur.close()
db.close()
def get_titles():
logging.debug(f"\tget_titles(): Connecting to database")
db = sqlite3.connect("audible_library.db")
cur = db.cursor()
logging.debug(f"\tget_titles(): Fetching titles")
cur.execute("SELECT title FROM audiobooks;")
raw_titles = list(cur.fetchall())
logging.debug(f"get_titles(): flattening the list by processing {raw_titles} etc")
titles = list(map(lambda x: x[0], raw_titles))
logging.debug(f"\tget_titles(): titles: {titles}")
logging.debug("\tget_titles(): closing connection to database")
cur.close()
db.close()
return titles
def get_narrators():
logging.debug(f"\tget_narrators(): Connecting to database")
db = sqlite3.connect("audible_library.db")
cur = db.cursor()
logging.debug(f"\tget_narrators(): Fetching narrators")
cur.execute("SELECT narrator FROM audiobooks;")
narrators = list(set(map(lambda x: x[0], cur.fetchall()))) # removing non-unique values
logging.debug(f"\tget_narrators(): narrators = {narrators}")
# print(titles)
logging.debug(f"\tget_narrators(): closing connection to database")
cur.close()
db.close()
return narrators
def get_authors():
logging.debug(f"\tget_authors(): Connecting to database")
db = sqlite3.connect("audible_library.db")
cur = db.cursor()
logging.debug(f"\tget_authors(): Fetching authors")
cur.execute("SELECT author FROM audiobooks;")
authors = list(set(map(lambda x: x[0], cur.fetchall())))
logging.debug(f"\tget_authors(): authors = {authors}")
# print(titles)
cur.close()
db.close()
return authors
# doesn't work???
def get_record(title:str)-> defaultdict:
logging.debug("get_record(): Connecting to database")
db = sqlite3.connect("audible_library.db")
cur = db.cursor()
logging.debug("get_record(): Selecting record by title")
cur.execute("SELECT * FROM audiobooks WHERE title = ?",(title,))
row = cur.fetchone()
logging.debug(f"row: {row}")
# cur.execute("SELECT * FROM audiobooks")
# row = cur.fetchall()
logging.debug("get_record(): Constructing default dict from record")
row_dict = { # row[1] --> storing title as key in dictionary
K.ID.value : row[0],
K.AUTHOR.value : row[2],
K.NARRATOR.value : row[3],
K.DURATION.value : row[4],
K.REMAINING.value : row[5],
K.STARTED.value : row[6],
K.LAST_UPDATED.value : row[7],
K.FINISHED.value : row[8],
K.PERC.value : time_remaining_to_percentage(row[4],row[5])
}
logging.debug("Closing connection to database")
cur.close()
db.close()
logging.debug("Returning record")
return row_dict
def get_main_layout(record, titles):
return [
[sg.Text("Title: "), sg.Combo(values=titles,key=K.TITLE.value,enable_events=True)],
[sg.Text("Author: "), sg.Text(record[K.AUTHOR.value], key=K.AUTHOR.value, size=(20,1))],
[sg.Text("Narrator: "), sg.Text(record[K.AUTHOR.value], key=K.NARRATOR.value, size=(20,1))],
[sg.Text("Total Duration: "), sg.Text(record[K.DURATION.value], key=K.DURATION.value,size=(20,1))],
[sg.Text("Time Remaining:"), sg.InputText(default_text=record[K.REMAINING.value], key=K.REMAINING.value,tooltip="hh:mm:ss", size=(15,1))],
[sg.Text("Percentage Complete:"), sg.Text(f"{record[K.PERC.value]}%", key = K.PERC.value, size=(20,1))],
[sg.Text("Started: "), sg.InputText(default_text=record[K.STARTED.value],key= K.STARTED.value,tooltip="yyyy/mm/dd", size=(20,1))],
[sg.Text("LastUpdated: "), sg.Text(record[K.LAST_UPDATED.value], key = K.LAST_UPDATED.value, size=(20,1))],
[sg.Text("Finished: "), sg.InputText(default_text=record[K.FINISHED.value], key = K.FINISHED.value, size=(20,1))],
[sg.Cancel(), sg.Button("Update",key=K.UPDATE.value), sg.Button("Save",key=K.SAVE.value, disabled= True), sg.Button("New", key = K.NEW.value)]
]
def get_new_layout(narrators, authors):
return [
[sg.Text("Title: "), sg.InputText(default_text="Title?" , key=K.TITLE.value)],
[sg.Text("Author: "), sg.InputText(key=K.AUTHOR.value, size=(20,1)), sg.Combo(authors,key=K.EXISTING_AUTHOR.value, size=(20,1))],
[sg.Text("Narrator: "), sg.InputText(key=K.NARRATOR.value, size=(20,1)),sg.Combo(narrators,key=K.EXISTING_NARRATOR.value, size=(20,1))],
[sg.Text("Total Duration: "), sg.InputText(key=K.DURATION.value,size=(20,1))],
[sg.Text("Time Remaining:"), sg.InputText(key=K.REMAINING.value,tooltip="hh:mm:ss", size=(20,1))],
[sg.Text("Started: "), sg.InputText(default_text=datetime.now().date(),key= K.STARTED.value,tooltip="yyyy/mm/dd", size=(20,1))],
[sg.Text("LastUpdated: "), sg.InputText(default_text = datetime.now().date(), key = K.LAST_UPDATED.value, size=(20,1))],
[sg.Text("Finished: "), sg.InputText(key = K.FINISHED.value, size=(20,1))],
[sg.Cancel(), sg.Button("Save",key=K.SAVE.value)]
]
def update_record(record: dict) -> None:
sql_save = """
UPDATE audiobooks
SET remaining = ?,
started = ?,
last_updated = ?,
finished = ?
WHERE id = ?
"""
db = sqlite3.connect("audible_library.db")
cur = db.cursor()
# get all the values and save them
cur.execute(sql_save,(record[K.REMAINING.value],
record[K.STARTED.value],
record[K.LAST_UPDATED.value],
record[K.FINISHED.value],
record[K.ID.value]))
db.commit()
cur.close()
db.close()
def insert_record(record: tuple) -> None:
sql_insert_statement = """
INSERT INTO audiobooks (title, author, narrator, duration, remaining, started, last_updated)
VALUES(?, ?, ?, ?, ?, ?, ?);
"""
db = sqlite3.connect(K.DB.value)
cur = db.cursor()
logging.debug("Inserting record")
cur.execute(sql_insert_statement, record)
db.commit()
logging.debug("New record")
cur.execute("SELECT * FROM audiobooks WHERE id = (SELECT MAX(id) FROM audiobooks);")
logging.debug(f"New record: {cur.fetchone()}")
cur.close()
db.close()
#TODO: Data validation
def build_new_record(v: dict) -> Tuple:
""" Takes values dict and returns an ordered tuple"""
author = v[K.AUTHOR.value] if v[K.AUTHOR.value] != '' else v[K.EXISTING_AUTHOR.value]
narrator = v[K.NARRATOR.value] if v[K.NARRATOR.value] != '' else v[K.EXISTING_NARRATOR.value]
record_tuple = (
v[K.TITLE.value],
author,
narrator,
v[K.DURATION.value],
v[K.REMAINING.value],
v[K.STARTED.value],
v[K.LAST_UPDATED.value]
)
logging.debug("Collecting record:")
[logging.debug(f"{x}") for x in record_tuple]
return record_tuple
def main():
# generate_and_seed_db() # runs first time only
d: dict = defaultdict(lambda: "")
is_updated = False
all_titles: List[str] = get_titles()
all_narrators: List[str] = get_narrators()
all_authors: List[str] = get_authors()
# generating the window
logging.debug(f"Generating main window with is_updated: {is_updated}")
main_layout = get_main_layout(d,all_titles)
w1 = sg.Window("Audiobook Updater", layout = main_layout)
w2_active = False
while True:
e1, v1 = w1.read()
print(f"event: {e1},\tvalues: {v1}\n")
if e1 == "Cancel" or e1 == "Quit" or e1 == sg.WIN_CLOSED:
break
if e1 == K.TITLE.value:
logging.debug("Title change detected")
is_updated = False
w1[K.SAVE.value].update(disabled = not is_updated)
try:
d = get_record(v1[K.TITLE.value])
for each_key in to_update:
logging.debug(f"\tAttempting to update {each_key} --> {d[each_key]}")
w1[each_key].update(d[each_key])
except Exception as e:
logging.error(e)
sg.Popup(title = "Error!", custom_text= f"Error: {e}")
if e1 == K.UPDATE.value:
try:
w1[K.AUTHOR.value].update("Author: Updated!")
w1[K.PERC.value].update()
if v1[K.REMAINING.value] != '':
d[K.REMAINING.value] = v1[K.REMAINING.value] # update time remaining
d[K.PERC.value] = f"{time_remaining_to_percentage(d[K.DURATION.value], d[K.REMAINING.value])}%"
else:
continue
if v1[K.STARTED.value] != '':
d[K.STARTED.value] = datetime.strptime(v1[K.STARTED.value],'%Y/%m/%d').date() # update time remaining
else:
continue
if v1[K.REMAINING.value] == "00:00:00":
if v1[K.FINISHED.value] != '':
d[K.FINISHED.value] = v1[K.FINISHED.value]
else:
d[K.FINISHED.value] = datetime.now().date()
if e1 == K.UPDATE.value:
d[K.LAST_UPDATED.value] = datetime.now().date()
is_updated = True
w1[K.SAVE.value].update(disabled = not is_updated)
for each_key in to_update:
print(f"updating: {d[each_key]}")
w1[each_key].update(d[each_key])
except Exception as e:
sg.Popup(title = "Error!", custom_text= f"Error: {e}")
if e1 == K.SAVE.value:
logging.debug("Saving...")
[logging.debug(f"\t\t{k}: {v}") for k ,v in d.items()]
is_updated = False
w1[K.SAVE.value].update(disabled = not is_updated)
try:
update_record(d)
sg.PopupQuickMessage(image="success-icon-23194.png")
except Exception as e:
sg.Popup(title = "Error!", custom_text=(f"Error: {e}"))
if e1 == K.NEW.value and not w2_active:
logging.debug("New called! Opening new window with w2_active true")
w1.Hide()
w2_active = True
new_layout = get_new_layout(narrators=all_narrators,authors=all_authors)
w2 = sg.Window("New Audiobook", new_layout)
while True:
e2, v2 = w2.read()
if e2 == "Cancel" or e2 == sg.WIN_CLOSED:
break
elif e2 == K.SAVE.value:
new_record = build_new_record(v2)
insert_record(new_record)
sg.PopupQuickMessage(image="success-icon-23194.png")
logging.debug("Closing second window")
w2.close()
w2_active = False
w1.un_hide()
# refresh lists
break
d: dict = defaultdict(lambda: "")
all_titles: List[str] = get_titles()
all_narrators: List[str] = get_narrators()
all_authors: List[str] = get_authors()
# update window title list
w1[K.TITLE.value].update(values=all_titles)
logging.debug("Ending Program")
w1.close()
if __name__=='__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment