Skip to content

Instantly share code, notes, and snippets.

@Linusp
Last active February 21, 2024 02:34
Show Gist options
  • Save Linusp/12a763c965432f1f60cf430582ece52e to your computer and use it in GitHub Desktop.
Save Linusp/12a763c965432f1f60cf430582ece52e to your computer and use it in GitHub Desktop.
org-agenda 中 clock 记录归档脚本

使用:

python archive_logbook.py archive -i Dropbox/org/tasks/ -o Dropbox/org/tasks/archive

假设 Dropbox/org/tasks/ 中文件如下:

Dropbox/org/tasks/
├── inbox.org
├── personal.org
└── work.org

inbox.org 会被忽略,因为在我的配置里 inbox.org 只用来做收集,不参与计时。最终会把原来的 personal.org/work.org 中非本年的 clock 记录都删掉并在 Drobox/org/tasks/archive 下生成如下归档文件,其中 archive20240221094758_personal.org 是原来 personal.org 的副本,这样如果发现归档不符合预期的话方便恢复,archive20240221094759_2023_work.org 同理。

Dropbox/org/tasks/archive/
├── archive20240221094758_2018_personal.org
├── archive20240221094758_2019_personal.org
├── archive20240221094758_2020_personal.org
├── archive20240221094758_2021_personal.org
├── archive20240221094758_2022_personal.org
├── archive20240221094758_2023_personal.org
├── archive20240221094758_personal.org
├── archive20240221094759_2020_work.org
├── archive20240221094759_2021_work.org
├── archive20240221094759_2022_work.org
├── archive20240221094759_2023_work.org
└── archive20240221094759_work.org
import locale
import os
import re
from collections import defaultdict
from glob import glob
import shutil
import arrow
import click
import orgparse
CLOCK_PATTERN = re.compile(
r"\[(?P<bdate>\S+) \S+ (?P<btime>[^\s\]]+)\]--"
r"\[(?P<edate>\S+) \S+ (?P<etime>[^\s\]]+)\]"
)
STATE_CHANGE_PATTERN = re.compile(
r'- State\s+"[A-Za-z]+"\s+from "[A-Za-z]+"\s+\[(?P<edate>\S+) \S+ (?P<etime>[^\s\]]+)\]'
)
locale.setlocale(locale.LC_ALL, "zh_CN.UTF8")
@click.group()
def main():
pass
def parse_node(node):
in_logbook, out_of_logbook = False, False
in_properties, out_of_properties = False, False
body, records, record = [], [], {}
if str(node).find(':LOGBOOK:') < 0:
out_of_logbook = True
if str(node).find(':PROPERTIES:') < 0:
out_of_properties = True
for line in str(node).split("\n")[1:]:
origin_line = line
line = origin_line.strip()
if re.findall(r'SCHEDULED|CLOSED|DEADLINE', line):
continue
if line == ":LOGBOOK:":
in_logbook = True
continue
if line == ":PROPERTIES:":
in_properties = True
continue
if in_properties and line == ':END:':
in_properties = False
out_of_properties = True
continue
if in_logbook and line == ':END:':
in_logbook = False
out_of_logbook = True
if record:
records.append(record)
continue
if in_logbook:
if line.startswith("CLOCK: ") or STATE_CHANGE_PATTERN.match(line.strip()):
if record:
records.append(record)
record = {}
if line.startswith("CLOCK: "):
match = CLOCK_PATTERN.match(
line.replace("CLOCK:", "").strip()
).groupdict()
record["begin"] = arrow.get(f'{match["bdate"]} {match["btime"]}')
record["end"] = arrow.get(f'{match["edate"]} {match["etime"]}')
record["duration"] = record["end"] - record["begin"]
record["content"] = [origin_line]
else:
match = STATE_CHANGE_PATTERN.match(line.strip()).groupdict()
record["begin"] = arrow.get(f'{match["edate"]} {match["etime"]}')
record["end"] = arrow.get(f'{match["edate"]} {match["etime"]}')
record["duration"] = record["end"] - record["begin"]
record["content"] = [origin_line]
else:
try:
record["content"].append(origin_line)
except Exception:
import ipdb
ipdb.set_trace()
elif out_of_logbook and out_of_properties:
if re.match(r'^\*+ ', origin_line):
break
body.append(origin_line)
if record:
records.append(record)
return {'body': body, 'clock': records}
def archive_clocklogs(filename, outdir):
env = orgparse.OrgEnv(todos=['TODO', 'NEXT', 'SOMETIME'], filename=filename)
root = orgparse.load(filename, env=env)
today = arrow.now().date()
node_to_year_clocks = {}
node_to_body = {}
year_to_nodes = defaultdict(set)
def parse_and_save(node):
node_info = parse_node(node)
if node_info['body']:
node_to_body[node] = node_info['body']
if node_info['clock']:
for record in node_info['clock']:
year = record['end'].year
if node not in node_to_year_clocks:
node_to_year_clocks[node] = defaultdict(list)
node_to_year_clocks[node][year].append(record)
year_to_nodes[year].add(node)
for node in root.children:
parse_and_save(node)
if not node.children:
continue
for l2 in node.children:
parse_and_save(l2)
if not l2.children:
continue
for l3 in l2.children:
parse_and_save(l3)
if not l3.children:
continue
for l4 in l3.children:
parse_and_save(l4)
has_new = True
while has_new:
has_new = False
for year, nodes in year_to_nodes.items():
new_nodes = {node for node in nodes}
for node in nodes:
parent = node.parent
if parent != root and parent not in nodes:
new_nodes.add(parent)
has_new = True
year_to_nodes[year] = new_nodes
def write_node(node, year, file_obj, keep_id=False):
title = str(node).split('\n')[0]
print(title, file=file_obj)
if node.closed or node.scheduled or node.deadline:
status_line = ''
for status, status_time in zip(['CLOSED', 'SCHEDULED', 'DEADLINE'], [node.closed, node.scheduled, node.deadline]):
if not status_time:
continue
status_line += f' {status}: {str(status_time)}'
status_line = status_line.strip()
status_line = ' ' * (node.level + 1) + status_line
print(status_line, file=file_obj)
if node.properties:
print(' ' * (node.level + 1) + ':PROPERTIES:', file=file_obj)
for key, value in node.properties.items():
if keep_id or key != 'ID':
print(' ' * (node.level + 1) + f':{key}: {value}', file=file_obj)
print(' ' * (node.level + 1) + ':END:', file=file_obj)
if node_to_year_clocks.get(node, {}).get(year):
print(' ' * (node.level + 1) + ':LOGBOOK:', file=file_obj)
for record in node_to_year_clocks[node][year]:
for line in record['content']:
print(line, file=file_obj)
print(' ' * (node.level + 1) + ':END:', file=file_obj)
for line in node_to_body.get(node, []):
print(line, file=file_obj)
def archive_year(year, outfile):
with open(outfile, 'w') as fout:
print(str(root), file=fout)
for node in root.children:
if node not in year_to_nodes[year]:
continue
write_node(node, year, fout)
if not node.children:
continue
for l2 in node.children:
if l2 not in year_to_nodes[year]:
continue
write_node(l2, year, fout)
if not l2.children:
continue
for l3 in l2.children:
if l3 not in year_to_nodes[year]:
continue
write_node(l3, year, fout)
if not l3.children:
continue
for l4 in l3.children:
if l4 not in year_to_nodes[year]:
continue
write_node(l4, year, fout)
def remove_other_years(year, outfile):
with open(outfile, 'w') as fout:
print(str(root), file=fout)
for node in root.children:
if node not in year_to_nodes[year] and (node.todo in ('DONE', 'ABORT') or 'ARCHIVE' in node.tags):
continue
write_node(node, year, fout, keep_id=True)
if not node.children:
continue
for l2 in node.children:
if l2 not in year_to_nodes[year] and (l2.todo in ('DONE', 'ABORT') or 'ARCHIVE' in l2.tags):
continue
write_node(l2, year, fout, keep_id=True)
if not l2.children:
continue
for l3 in l2.children:
if l3 not in year_to_nodes[year] and (l3.todo in ('DONE', 'ABORT') or 'ARCHIVE' in l3.tags):
continue
write_node(l3, year, fout, keep_id=True)
if not l3.children:
continue
for l4 in l3.children:
if l4 not in year_to_nodes[year] and (l4.todo in ('DONE', 'ABORT') or 'ARCHIVE' in l4.tags):
continue
write_node(l4, year, fout, keep_id=True)
timestamp = arrow.now().format('YYYYMMDDHHmmss')
for year in year_to_nodes:
if year == today.year:
archive_file = os.path.join(outdir, f'archive{timestamp}_' + os.path.basename(filename))
shutil.copy(filename, archive_file)
remove_other_years(year, filename)
else:
outfile = os.path.join(outdir, f'archive{timestamp}_{year}_' + os.path.basename(filename))
archive_year(year, outfile)
@main.command()
@click.option("-i", "--indir", required=True)
@click.option("-o", "--outdir")
def archive(indir, outdir):
outdir = outdir or indir
if not os.path.exists(outdir):
os.makedirs(outdir)
for filename in glob(os.path.join(indir, '*.org')):
if filename.find('archive') > 0:
continue
if os.path.basename(filename) == 'inbox.org':
continue
archive_clocklogs(filename, outdir)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment