Last active
April 16, 2021 19:28
-
-
Save d3dave/4fc55a8c94a401b71316bec857efb5cd to your computer and use it in GitHub Desktop.
Work tracking
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# Copyright (c) 2021 David Dorfman | |
# This work is licensed under the Creative Commons Attribution-NonCommercial-NoDerivatives 4.0 International License. | |
# To view a copy of this license, visit http://creativecommons.org/licenses/by-nc-nd/4.0/. | |
# | |
import os | |
import sys | |
import appdirs | |
from collections import defaultdict | |
from datetime import datetime, timedelta | |
from pathlib import Path | |
from typing import Optional, Tuple | |
import shlex | |
import subprocess | |
import io | |
FILTER_COMMAND = os.getenv('FUZZY_FILTER', 'fzy') | |
TIME_FORMAT = '%y-%m-%d %H:%M:%S' | |
FIELD_SEP = ',' | |
NEW_TASK_PROMPT = '* New task?' | |
data_dir_path = Path(appdirs.user_data_dir('work-track', 'd3dave')) | |
data_dir_path.mkdir(exist_ok=True, parents=True) | |
all_path = data_dir_path / 'all' | |
current_path = data_dir_path / 'current' | |
def filter_input(prompt, items): | |
with subprocess.Popen( | |
(FILTER_COMMAND, f'--prompt={prompt}'), stdin=subprocess.PIPE, stdout=subprocess.PIPE, | |
encoding='utf-8' | |
) as filter_process: | |
filter_process.stdin.write('\n'.join(items)) | |
filter_process.stdin.close() | |
return filter_process.stdout.read().strip() | |
def print_working(task: str, elapsed_time: timedelta): | |
elapsed_time_str = str(elapsed_time).split('.')[0] | |
print(f'Working on {task} for {elapsed_time_str}') | |
def read_current() -> Optional[Tuple[str, datetime]]: | |
global current_path | |
if not current_path.exists(): | |
return None | |
current = current_path.read_text().splitlines() | |
task, start_time = current | |
start_time = datetime.strptime(start_time, TIME_FORMAT) | |
return task, start_time | |
def write_current(task): | |
global current_path | |
start_time = datetime.now() | |
parts = (task, start_time.strftime(TIME_FORMAT)) | |
current_path.touch() | |
current_path.write_text(os.linesep.join(parts)) | |
def record_done(task, start_time): | |
global all_path | |
if not all_path.exists(): | |
all_path.touch() | |
end_time = datetime.now() | |
with all_path.open('a') as f: | |
parts = ( | |
f'"{task}"', | |
start_time.strftime(TIME_FORMAT), | |
end_time.strftime(TIME_FORMAT), | |
) | |
print(FIELD_SEP.join(parts), file=f) | |
def read_all(): | |
if not all_path.exists(): | |
return None | |
tasks = defaultdict(lambda: timedelta()) | |
for line in all_path.read_text().splitlines(): | |
task, start_time, end_time = line.rsplit(FIELD_SEP, maxsplit=2) | |
task = task[1:-1] | |
start_time = datetime.strptime(start_time, TIME_FORMAT) | |
end_time = datetime.strptime(end_time, TIME_FORMAT) | |
elapsed_time = end_time - start_time | |
tasks[task] += elapsed_time | |
return tasks | |
def choose_new_task(): | |
tasks = read_all() | |
if tasks: | |
# # Sort tasks by elapsed time in descending order | |
# sorted_tasks = sorted(tasks.items(), key=lambda p: p[1], reverse=True) | |
# print('Worked on:') | |
# for i, p in enumerate(sorted_tasks): | |
# task, elapsed_time = p | |
# print(f' {i+1:-2}. {task} ({elapsed_time})') | |
new_task = filter_input( | |
'Task to work on? ', | |
list(tasks.keys()) + [NEW_TASK_PROMPT] | |
) | |
if new_task == NEW_TASK_PROMPT: | |
new_task = input('New task name: ') | |
elapsed_time = tasks.get(new_task, timedelta()) | |
print_working(new_task, elapsed_time) | |
else: | |
print('No work yet.') | |
new_task = input('New task name: ') | |
write_current(new_task) | |
return new_task | |
def main(): | |
current = read_current() | |
if current is not None: | |
task, start_time = current | |
print_working(task, datetime.now() - start_time) | |
inp = input('Stop (y/n)? ') | |
if inp != 'y': | |
sys.exit(0) | |
record_done(task, start_time) | |
current_path.unlink() | |
print() | |
choose_new_task() | |
try: | |
main() | |
except KeyboardInterrupt: | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment