Last active
November 12, 2021 12:37
-
-
Save dz0ny/f5f4412bba6c09a914861e5f8e3a64ff to your computer and use it in GitHub Desktop.
Track your work hours
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# <xbar.title>Work Hours</xbar.title> | |
# <xbar.desc>A simple command line tool for managing your work hours</xbar.desc> | |
# <xbar.dependencies>python,ruby,node</xbar.dependencies> | |
# <xbar.author.github>dz0ny</xbar.author.github> | |
from argparse import ArgumentParser | |
import csv | |
from enum import Enum, auto | |
import os | |
import sys | |
import datetime | |
LOG_DIR_PATH = os.path.join(os.path.expanduser("~"), "Documents", "Hours") | |
LOG_FILE_PATH = os.path.join(LOG_DIR_PATH, "log.csv") | |
DAILY_FILE_PATH = os.path.join(LOG_DIR_PATH, "daily.csv") | |
class ModeFailException(Exception): | |
pass | |
def to_hours_minutes(seconds, short=False): | |
m, _ = divmod(seconds, 60) | |
h, m = divmod(m, 60) | |
if short: | |
m10 = 100 / 60 * m | |
return float(f"{int(h):02d}.{int(m10):02d}") | |
return f"{int(h):02d}:{int(m):02d}" | |
def script_path(): | |
return os.path.realpath(__file__) | |
class LogEvent(Enum): | |
START = auto() | |
END = auto() | |
class LogReport: | |
by_date = None | |
def __init__(self, current_shift_started_at=None): | |
self.current_shift_started_at = current_shift_started_at | |
@property | |
def in_shift(self): | |
return self.current_shift_started_at != None | |
@property | |
def current_shift_duration(self): | |
if self.current_shift_started_at is None: | |
return None | |
else: | |
duration = datetime.datetime.utcnow() - self.current_shift_started_at | |
if duration.seconds < 0: | |
raise ModeFailException( | |
f"Log file at {LOG_FILE_PATH} is corrupted; the ongoing shift seems to have been started in the future." | |
) | |
return to_hours_minutes(duration.seconds) | |
def prepare_report(): | |
report = LogReport() | |
by_date = {} | |
for event, value in read_log(): | |
if event == LogEvent.START: | |
if report.in_shift: | |
raise ModeFailException( | |
f"Log file at {LOG_FILE_PATH} is corrupted; found two successive {LogEvent.START.name}s without a {LogEvent.END.name} in between. Try fixing or deleting it." | |
) | |
report.current_shift_started_at = datetime.datetime.strptime( | |
value, "%Y-%m-%dT%H:%M:%S.%f" | |
) | |
elif event == LogEvent.END: | |
if not report.in_shift: | |
raise ModeFailException( | |
f"Log file at {LOG_FILE_PATH} is corrupted; found two successive {LogEvent.END.name}s without a {LogEvent.START.name} in between. Try fixing or deleting it." | |
) | |
diff = ( | |
datetime.datetime.strptime(value, "%Y-%m-%dT%H:%M:%S.%f") | |
- report.current_shift_started_at | |
) | |
report.current_shift_started_at = None | |
if diff.seconds < 0: | |
raise ModeFailException( | |
f"Log file at {LOG_FILE_PATH} is corrupted; A shift's duration cannot be negative. Try fixing or deleting it." | |
) | |
current_date = value.split("T")[0] | |
if by_date.get(current_date): | |
by_date[current_date] += diff.seconds | |
else: | |
by_date[current_date] = diff.seconds | |
else: | |
assert False, f"Support for new LogEvent {event.name} not added." | |
report.by_date = by_date | |
return report | |
def read_log(): | |
with open(LOG_FILE_PATH, "r") as log_file: | |
csv_reader = csv.reader(log_file) | |
for log in csv_reader: | |
event = next((e for e in LogEvent if e.name == log[0]), None) | |
if event is None: | |
raise ModeFailException( | |
f"Log file at {LOG_FILE_PATH} is corrupted; found an unknown log event: {log}" | |
) | |
value = log[1] | |
yield event, value | |
def write_log(event, value): | |
with open(LOG_FILE_PATH, "a") as log_file: | |
csv_writer = csv.writer(log_file) | |
csv_writer.writerow([event.name, value]) | |
def read_sanitized_report(expected_in_shift=None, if_shift_err=None): | |
if (expected_in_shift is None and if_shift_err is not None) or ( | |
expected_in_shift is not None and if_shift_err is None | |
): | |
raise ValueError( | |
"Either both, or neither of expected_in_shift and if_shift_err should be null." | |
) | |
report = prepare_report() | |
if expected_in_shift is not None and report.in_shift != expected_in_shift: | |
raise ModeFailException(if_shift_err) | |
return report | |
class App: | |
class Mode: | |
def __init__(self, name, runner, help, is_default): | |
self.name = name | |
self.runner = runner | |
self.help = help | |
self.is_default = is_default | |
def __init__(self): | |
self.__registered_modes = [] | |
def run(self): | |
assert len(self.__registered_modes) > 0, "No modes were registered" | |
default_modes = [mode for mode in self.__registered_modes if mode.is_default] | |
assert ( | |
len(default_modes) == 1 | |
), "Exactly 1 mode should be registered as the default" | |
default_mode = default_modes[0] | |
parser = ArgumentParser(description="A tool for managing your work hours.") | |
group = parser.add_mutually_exclusive_group() | |
for mode in self.__registered_modes: | |
group.add_argument( | |
f"-{mode.name[0]}", | |
f"--{mode.name}", | |
action="store_true", | |
help=mode.help, | |
) | |
args = parser.parse_args() | |
matching_mode = next( | |
(mode for mode in self.__registered_modes if getattr(args, mode.name)), | |
default_mode, | |
) | |
try: | |
matching_mode.runner() | |
return 0 | |
except ModeFailException as e: | |
print(str(e)) | |
return 3 | |
def register_mode( | |
self, | |
expected_in_shift=None, | |
if_shift_err=None, | |
help=None, | |
is_default=False, | |
): | |
def wrapper(mode_fn): | |
report_param_name = next( | |
( | |
param[0] | |
for param in mode_fn.__annotations__.items() | |
if param[1] == LogReport | |
), | |
None, | |
) | |
num_other_params = len( | |
[ | |
param | |
for param in mode_fn.__annotations__.items() | |
if param[1] != LogReport | |
] | |
) | |
assert ( | |
num_other_params == 0 | |
), "mode functions can only optionally request the current report. Everything else must be gathered via user input for bitbar compatibility." | |
def mode_runner(): | |
if os.path.isfile(LOG_FILE_PATH): | |
report = read_sanitized_report(expected_in_shift, if_shift_err) | |
else: | |
report = LogReport() | |
kwargs = dict() | |
if report_param_name is not None: | |
kwargs[report_param_name] = report | |
mode_fn(**kwargs) | |
self.__registered_modes.append( | |
App.Mode( | |
name=mode_fn.__name__, | |
runner=mode_runner, | |
help=help, | |
is_default=is_default, | |
) | |
) | |
return mode_runner | |
return wrapper | |
app = App() | |
@app.register_mode( | |
help="see the current status summary in a bitbar compatible syntax", | |
is_default=True, | |
) | |
def bitbar(report: LogReport): | |
if report.in_shift: | |
print(f"π {report.current_shift_duration}") | |
else: | |
print("π‘") | |
print("---") | |
if report.in_shift: | |
print( | |
f'End Timer | refresh=true bash="{script_path()}" param1=-e terminal=false' | |
) | |
else: | |
print( | |
f'Start Timer | refresh=true bash="{script_path()}" param1=-s terminal=false' | |
) | |
print("---") | |
print("Daily reports") | |
if report.by_date: | |
for date, seconds in list(report.by_date.items())[:6]: | |
print(f"{date}: {to_hours_minutes(seconds)} | refresh=true") | |
print("---") | |
print(f'Generate reports | bash="{script_path()}" param1=-g terminal=false') | |
print( | |
f'Open reports | refresh=true bash="open" param1={LOG_DIR_PATH} terminal=false' | |
) | |
@app.register_mode(help="see the current status summary info") | |
def info(report: LogReport): | |
if report and report.in_shift: | |
print(f"π {report.current_shift_duration}", end="") | |
else: | |
print("π‘", end="") | |
print() | |
@app.register_mode( | |
expected_in_shift=False, | |
if_shift_err="Cannot start a shift while one is ongoing.", | |
help="start a shift", | |
) | |
def start(): | |
write_log(LogEvent.START, datetime.datetime.utcnow().isoformat("T")) | |
@app.register_mode( | |
expected_in_shift=True, | |
if_shift_err="Cannot end a shift when none is ongoing.", | |
help="end a shift", | |
) | |
def end(): | |
write_log(LogEvent.END, datetime.datetime.utcnow().isoformat("T")) | |
@app.register_mode(help="generate reports") | |
def generate_reports(report: LogReport): | |
if not report.by_date: | |
return | |
with open(DAILY_FILE_PATH, "w") as log_file: | |
csv_writer = csv.writer(log_file) | |
csv_writer.writerow(["Day", "Hours"]) | |
for date, seconds in report.by_date.items(): | |
print(f"{date}: {to_hours_minutes(seconds, True)}") | |
csv_writer.writerow([date, to_hours_minutes(seconds, True)]) | |
@app.register_mode(help="prints the path to the log file") | |
def log(): | |
print(LOG_FILE_PATH) | |
if __name__ == "__main__": | |
os.makedirs(LOG_DIR_PATH, exist_ok=True) | |
sys.exit(app.run()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment