Skip to content

Instantly share code, notes, and snippets.

@dz0ny
Last active November 12, 2021 12:37
Show Gist options
  • Save dz0ny/f5f4412bba6c09a914861e5f8e3a64ff to your computer and use it in GitHub Desktop.
Save dz0ny/f5f4412bba6c09a914861e5f8e3a64ff to your computer and use it in GitHub Desktop.
Track your work hours
#!/usr/bin/python3
# <xbar.title>Work Hours</xbar.title>
# <xbar.desc>A simple command line tool for managing your work hours</xbar.desc>
# <xbar.dependencies>python,ruby,node</xbar.dependencies>
# <xbar.author.github>dz0ny</xbar.author.github>
from argparse import ArgumentParser
import csv
from enum import Enum, auto
import os
import sys
import datetime
LOG_DIR_PATH = os.path.join(os.path.expanduser("~"), "Documents", "Hours")
LOG_FILE_PATH = os.path.join(LOG_DIR_PATH, "log.csv")
DAILY_FILE_PATH = os.path.join(LOG_DIR_PATH, "daily.csv")
class ModeFailException(Exception):
pass
def to_hours_minutes(seconds, short=False):
m, _ = divmod(seconds, 60)
h, m = divmod(m, 60)
if short:
m10 = 100 / 60 * m
return float(f"{int(h):02d}.{int(m10):02d}")
return f"{int(h):02d}:{int(m):02d}"
def script_path():
return os.path.realpath(__file__)
class LogEvent(Enum):
START = auto()
END = auto()
class LogReport:
by_date = None
def __init__(self, current_shift_started_at=None):
self.current_shift_started_at = current_shift_started_at
@property
def in_shift(self):
return self.current_shift_started_at != None
@property
def current_shift_duration(self):
if self.current_shift_started_at is None:
return None
else:
duration = datetime.datetime.utcnow() - self.current_shift_started_at
if duration.seconds < 0:
raise ModeFailException(
f"Log file at {LOG_FILE_PATH} is corrupted; the ongoing shift seems to have been started in the future."
)
return to_hours_minutes(duration.seconds)
def prepare_report():
report = LogReport()
by_date = {}
for event, value in read_log():
if event == LogEvent.START:
if report.in_shift:
raise ModeFailException(
f"Log file at {LOG_FILE_PATH} is corrupted; found two successive {LogEvent.START.name}s without a {LogEvent.END.name} in between. Try fixing or deleting it."
)
report.current_shift_started_at = datetime.datetime.strptime(
value, "%Y-%m-%dT%H:%M:%S.%f"
)
elif event == LogEvent.END:
if not report.in_shift:
raise ModeFailException(
f"Log file at {LOG_FILE_PATH} is corrupted; found two successive {LogEvent.END.name}s without a {LogEvent.START.name} in between. Try fixing or deleting it."
)
diff = (
datetime.datetime.strptime(value, "%Y-%m-%dT%H:%M:%S.%f")
- report.current_shift_started_at
)
report.current_shift_started_at = None
if diff.seconds < 0:
raise ModeFailException(
f"Log file at {LOG_FILE_PATH} is corrupted; A shift's duration cannot be negative. Try fixing or deleting it."
)
current_date = value.split("T")[0]
if by_date.get(current_date):
by_date[current_date] += diff.seconds
else:
by_date[current_date] = diff.seconds
else:
assert False, f"Support for new LogEvent {event.name} not added."
report.by_date = by_date
return report
def read_log():
with open(LOG_FILE_PATH, "r") as log_file:
csv_reader = csv.reader(log_file)
for log in csv_reader:
event = next((e for e in LogEvent if e.name == log[0]), None)
if event is None:
raise ModeFailException(
f"Log file at {LOG_FILE_PATH} is corrupted; found an unknown log event: {log}"
)
value = log[1]
yield event, value
def write_log(event, value):
with open(LOG_FILE_PATH, "a") as log_file:
csv_writer = csv.writer(log_file)
csv_writer.writerow([event.name, value])
def read_sanitized_report(expected_in_shift=None, if_shift_err=None):
if (expected_in_shift is None and if_shift_err is not None) or (
expected_in_shift is not None and if_shift_err is None
):
raise ValueError(
"Either both, or neither of expected_in_shift and if_shift_err should be null."
)
report = prepare_report()
if expected_in_shift is not None and report.in_shift != expected_in_shift:
raise ModeFailException(if_shift_err)
return report
class App:
class Mode:
def __init__(self, name, runner, help, is_default):
self.name = name
self.runner = runner
self.help = help
self.is_default = is_default
def __init__(self):
self.__registered_modes = []
def run(self):
assert len(self.__registered_modes) > 0, "No modes were registered"
default_modes = [mode for mode in self.__registered_modes if mode.is_default]
assert (
len(default_modes) == 1
), "Exactly 1 mode should be registered as the default"
default_mode = default_modes[0]
parser = ArgumentParser(description="A tool for managing your work hours.")
group = parser.add_mutually_exclusive_group()
for mode in self.__registered_modes:
group.add_argument(
f"-{mode.name[0]}",
f"--{mode.name}",
action="store_true",
help=mode.help,
)
args = parser.parse_args()
matching_mode = next(
(mode for mode in self.__registered_modes if getattr(args, mode.name)),
default_mode,
)
try:
matching_mode.runner()
return 0
except ModeFailException as e:
print(str(e))
return 3
def register_mode(
self,
expected_in_shift=None,
if_shift_err=None,
help=None,
is_default=False,
):
def wrapper(mode_fn):
report_param_name = next(
(
param[0]
for param in mode_fn.__annotations__.items()
if param[1] == LogReport
),
None,
)
num_other_params = len(
[
param
for param in mode_fn.__annotations__.items()
if param[1] != LogReport
]
)
assert (
num_other_params == 0
), "mode functions can only optionally request the current report. Everything else must be gathered via user input for bitbar compatibility."
def mode_runner():
if os.path.isfile(LOG_FILE_PATH):
report = read_sanitized_report(expected_in_shift, if_shift_err)
else:
report = LogReport()
kwargs = dict()
if report_param_name is not None:
kwargs[report_param_name] = report
mode_fn(**kwargs)
self.__registered_modes.append(
App.Mode(
name=mode_fn.__name__,
runner=mode_runner,
help=help,
is_default=is_default,
)
)
return mode_runner
return wrapper
app = App()
@app.register_mode(
help="see the current status summary in a bitbar compatible syntax",
is_default=True,
)
def bitbar(report: LogReport):
if report.in_shift:
print(f"🕒 {report.current_shift_duration}")
else:
print("🏡")
print("---")
if report.in_shift:
print(
f'End Timer | refresh=true bash="{script_path()}" param1=-e terminal=false'
)
else:
print(
f'Start Timer | refresh=true bash="{script_path()}" param1=-s terminal=false'
)
print("---")
print("Daily reports")
if report.by_date:
for date, seconds in list(report.by_date.items())[:6]:
print(f"{date}: {to_hours_minutes(seconds)} | refresh=true")
print("---")
print(f'Generate reports | bash="{script_path()}" param1=-g terminal=false')
print(
f'Open reports | refresh=true bash="open" param1={LOG_DIR_PATH} terminal=false'
)
@app.register_mode(help="see the current status summary info")
def info(report: LogReport):
if report and report.in_shift:
print(f"🕒 {report.current_shift_duration}", end="")
else:
print("🏡", end="")
print()
@app.register_mode(
expected_in_shift=False,
if_shift_err="Cannot start a shift while one is ongoing.",
help="start a shift",
)
def start():
write_log(LogEvent.START, datetime.datetime.utcnow().isoformat("T"))
@app.register_mode(
expected_in_shift=True,
if_shift_err="Cannot end a shift when none is ongoing.",
help="end a shift",
)
def end():
write_log(LogEvent.END, datetime.datetime.utcnow().isoformat("T"))
@app.register_mode(help="generate reports")
def generate_reports(report: LogReport):
if not report.by_date:
return
with open(DAILY_FILE_PATH, "w") as log_file:
csv_writer = csv.writer(log_file)
csv_writer.writerow(["Day", "Hours"])
for date, seconds in report.by_date.items():
print(f"{date}: {to_hours_minutes(seconds, True)}")
csv_writer.writerow([date, to_hours_minutes(seconds, True)])
@app.register_mode(help="prints the path to the log file")
def log():
print(LOG_FILE_PATH)
if __name__ == "__main__":
os.makedirs(LOG_DIR_PATH, exist_ok=True)
sys.exit(app.run())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment