Skip to content

Instantly share code, notes, and snippets.

@Jackenmen
Last active October 11, 2023 17:44
Show Gist options
  • Save Jackenmen/f1f7c7ff19f426f5ed7bd02697ad3a77 to your computer and use it in GitHub Desktop.
Save Jackenmen/f1f7c7ff19f426f5ed7bd02697ad3a77 to your computer and use it in GitHub Desktop.
ActivityWatch - summarize time spent into CSV file. Requires aw-client library
# Copyright 2023 Jakub Kuczys (https://github.com/Jackenmen)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import csv
import datetime
import aw_client
QUERY = """
afk_events = query_bucket(find_bucket("aw-watcher-afk_"));
web_events = [];
web_events = concat(web_events, query_bucket(find_bucket("aw-watcher-web-chrome")));
window_events = query_bucket(find_bucket("aw-watcher-window_"));
window_events = filter_period_intersect(
window_events, filter_keyvals(afk_events, "status", ["not-afk"])
);
web_usage = filter_period_intersect(
web_events, filter_keyvals(window_events, "app", ["chrome", "chrome.exe"])
);
other_usage = exclude_keyvals(window_events, "app", ["chrome", "chrome.exe"]);
merged_events = concat(web_usage, other_usage);
RETURN = sort_by_timestamp(merged_events);
"""
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
"start",
nargs="?",
default=0,
type=int,
help="How many days after (or before for negative numbers) today should the start date be.",
)
parser.add_argument(
"end",
nargs="?",
default=1,
type=int,
help="How many days after (or before for negative numbers) today should the end date be.",
)
parser.add_argument(
"--merge-period",
default=15,
type=int,
help="Events from how many consecutive minutes should be merged into one.",
)
return parser.parse_args()
def main() -> None:
today = datetime.datetime.combine(
datetime.date.today(), datetime.time()
).astimezone(datetime.timezone.utc)
args = parse_args()
start = today + datetime.timedelta(days=args.start)
end = today + datetime.timedelta(days=args.end)
merge_period = datetime.timedelta(minutes=args.merge_period)
client = aw_client.ActivityWatchClient()
response = client.query(QUERY, [(start, end)])
with open("output.csv", "w+", encoding="utf-8", newline="") as fp:
writer = csv.DictWriter(fp, ("timestamp", "duration", "description"))
writer.writeheader()
pending_events: dict[str, tuple[datetime.datetime, float]] = {}
for period_data in response:
for event in period_data:
event_data = event["data"]
duration = float(event["duration"])
if not duration:
continue
timestamp = datetime.datetime.fromisoformat(event["timestamp"])
description = (
f"{event_data['app']} - {event_data['title']}"
if "app" in event_data
else event_data["title"]
)
try:
existing_event = pending_events[description]
except KeyError:
pass
else:
existing_timestamp, existing_duration = existing_event
if timestamp < existing_timestamp + merge_period:
duration += existing_duration
pending_events[description] = (existing_timestamp, duration)
continue
writer.writerow(
{
"timestamp": existing_timestamp.isoformat(),
"duration": existing_duration,
"description": description,
}
)
pending_events[description] = (timestamp, duration)
for description, (timestamp, duration) in pending_events.items():
writer.writerow(
{
"timestamp": timestamp.isoformat(),
"duration": duration,
"description": description,
}
)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment