Skip to content

Instantly share code, notes, and snippets.

@d3dave
Last active April 16, 2021 19:28
Show Gist options
  • Save d3dave/4fc55a8c94a401b71316bec857efb5cd to your computer and use it in GitHub Desktop.
Save d3dave/4fc55a8c94a401b71316bec857efb5cd to your computer and use it in GitHub Desktop.
Work tracking
#
# Copyright (c) 2021 David Dorfman
# This work is licensed under the Creative Commons Attribution-NonCommercial-NoDerivatives 4.0 International License.
# To view a copy of this license, visit http://creativecommons.org/licenses/by-nc-nd/4.0/.
#
import os
import sys
import appdirs
from collections import defaultdict
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, Tuple
import shlex
import subprocess
import io
FILTER_COMMAND = os.getenv('FUZZY_FILTER', 'fzy')
TIME_FORMAT = '%y-%m-%d %H:%M:%S'
FIELD_SEP = ','
NEW_TASK_PROMPT = '* New task?'
data_dir_path = Path(appdirs.user_data_dir('work-track', 'd3dave'))
data_dir_path.mkdir(exist_ok=True, parents=True)
all_path = data_dir_path / 'all'
current_path = data_dir_path / 'current'
def filter_input(prompt, items):
with subprocess.Popen(
(FILTER_COMMAND, f'--prompt={prompt}'), stdin=subprocess.PIPE, stdout=subprocess.PIPE,
encoding='utf-8'
) as filter_process:
filter_process.stdin.write('\n'.join(items))
filter_process.stdin.close()
return filter_process.stdout.read().strip()
def print_working(task: str, elapsed_time: timedelta):
elapsed_time_str = str(elapsed_time).split('.')[0]
print(f'Working on {task} for {elapsed_time_str}')
def read_current() -> Optional[Tuple[str, datetime]]:
global current_path
if not current_path.exists():
return None
current = current_path.read_text().splitlines()
task, start_time = current
start_time = datetime.strptime(start_time, TIME_FORMAT)
return task, start_time
def write_current(task):
global current_path
start_time = datetime.now()
parts = (task, start_time.strftime(TIME_FORMAT))
current_path.touch()
current_path.write_text(os.linesep.join(parts))
def record_done(task, start_time):
global all_path
if not all_path.exists():
all_path.touch()
end_time = datetime.now()
with all_path.open('a') as f:
parts = (
f'"{task}"',
start_time.strftime(TIME_FORMAT),
end_time.strftime(TIME_FORMAT),
)
print(FIELD_SEP.join(parts), file=f)
def read_all():
if not all_path.exists():
return None
tasks = defaultdict(lambda: timedelta())
for line in all_path.read_text().splitlines():
task, start_time, end_time = line.rsplit(FIELD_SEP, maxsplit=2)
task = task[1:-1]
start_time = datetime.strptime(start_time, TIME_FORMAT)
end_time = datetime.strptime(end_time, TIME_FORMAT)
elapsed_time = end_time - start_time
tasks[task] += elapsed_time
return tasks
def choose_new_task():
tasks = read_all()
if tasks:
# # Sort tasks by elapsed time in descending order
# sorted_tasks = sorted(tasks.items(), key=lambda p: p[1], reverse=True)
# print('Worked on:')
# for i, p in enumerate(sorted_tasks):
# task, elapsed_time = p
# print(f' {i+1:-2}. {task} ({elapsed_time})')
new_task = filter_input(
'Task to work on? ',
list(tasks.keys()) + [NEW_TASK_PROMPT]
)
if new_task == NEW_TASK_PROMPT:
new_task = input('New task name: ')
elapsed_time = tasks.get(new_task, timedelta())
print_working(new_task, elapsed_time)
else:
print('No work yet.')
new_task = input('New task name: ')
write_current(new_task)
return new_task
def main():
current = read_current()
if current is not None:
task, start_time = current
print_working(task, datetime.now() - start_time)
inp = input('Stop (y/n)? ')
if inp != 'y':
sys.exit(0)
record_done(task, start_time)
current_path.unlink()
print()
choose_new_task()
try:
main()
except KeyboardInterrupt:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment