Skip to content

Instantly share code, notes, and snippets.

@lahwran
Last active December 17, 2015 00:09
Show Gist options
  • Save lahwran/a0224c67d6542d08462b to your computer and use it in GitHub Desktop.
Save lahwran/a0224c67d6542d08462b to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import subprocess
import os
import signal
import contextlib
import json
import argparse
import time
import struct
def get_window():
result = subprocess.check_output(["osascript", "-e", r"""
global frontApp, frontAppName, windowTitle
set windowTitle to "UNKNOWN WINDOW TITLE"
set frontApp to ""
set frontAppName to ""
tell application "System Events"
set frontApp to first application process whose frontmost is true
set frontAppName to name of frontApp
try
tell process frontAppName
tell (1st window whose value of attribute "AXMain" is true)
set windowTitle to value of attribute "AXTitle"
end tell
end tell
end try -- ignore errors
end tell
return frontAppName & "\n" & windowTitle
"""])
splitted = result.split("\n")
app = splitted[0]
window = splitted[1]
return app, window
class SignalHandlyThingy(object):
def __init__(self):
self.locked = False
self.triggered = None
self.old_signal_handler = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, self.signal_handler)
def signal_handler(self, *a, **kw):
self.triggered = (a, kw)
if not self.locked:
self.tick()
def tick(self):
self.locked = False
if self.triggered is not None:
a, kw = self.triggered
self.triggered = None
self.old_signal_handler(*a, **kw)
@contextlib.contextmanager
def __call__(self):
self.locked = True
yield
self.tick()
class IDStandin(object):
def __init__(self, filename):
self.mapping = {}
self.rmapping = []
self.max = 0
self.filename = filename
try:
reader = open(filename, 'rb')
except IOError as e:
if e.errno == 2: pass
else: raise
else:
with reader:
nocontinue = False
for index, line in enumerate(reader):
if not line.replace('\n', ''):
nocontinue = True
continue
assert not nocontinue
line = json.loads(line)
self.mapping[line] = index
self.rmapping.append(line)
self.writer = open(self.filename, "ab")
def collapse(self, item):
try:
return self.mapping[item]
except KeyError:
index = len(self.rmapping)
self.mapping[item] = index
self.rmapping.append(item)
self.writer.write(json.dumps(item).replace('\n', '') + "\n")
self.writer.flush()
os.fsync(self.writer.fileno())
return index
def expand(self, id):
return self.rmapping[id]
class Writeythingy(object):
def __init__(self, logdir):
try:
os.makedirs(logdir)
except OSError as e:
if e.errno == 17:
pass
else:
raise
self.oldlogfile = os.path.join(logdir, "log")
self.logfile = os.path.join(logdir, "b_log")
self.appfile = os.path.join(logdir, "apps")
self.winfile = os.path.join(logdir, "windows")
self.apps = IDStandin(self.appfile)
self.wins = IDStandin(self.winfile)
self.signallock = SignalHandlyThingy()
self.last = None
self.writer = open(self.logfile, "ab")
def write(self, line):
assert len(line) == 17
self.writer.write(line + "\n")
self.writer.flush()
os.fsync(self.writer.fileno())
def tickmsg(self, appname, winname, t=None):
if t is None: t = time.time()
appid = self.apps.collapse(appname)
winid = self.wins.collapse(winname)
data = struct.pack(">BLLQ", 0, appid, winid, int(t * 1000))
self.write(data)
def quitmsg(self, t=None):
if t is None: t = time.time()
data = struct.pack(">BLLQ", 1, 0, 0, int(t * 1000))
self.write(data)
def tick(self):
start = time.time()
cur = get_window()
if self.last != cur:
appname, winname = cur
self.tickmsg(appname, winname)
end = time.time()
remain = float(int(end) + 1) - end
if remain > 0:
time.sleep(remain)
self.last = cur
def convert(self):
with open(self.oldlogfile, "rb") as reader:
for line in reader:
if not line.strip(): continue
decoded = json.loads(line)
time = decoded["time"]
if "w" in decoded:
appname, winname = decoded["w"]
self.tickmsg(appname, winname, t=time)
else:
self.quitmsg(t=time)
def run(self):
try:
while True:
with self.signallock():
self.tick()
except:
self.quitmsg()
raise
def loadall(self):
items = []
with open(self.logfile, "rb") as reader:
while True:
line = reader.read(18)
assert len(line) in [0, 18]
if not line:
break
kind, appid, winid, t = struct.unpack(">BLLQ", line[:-1])
val = {"time": float(t)/1000}
if kind == 0:
appname = self.apps.expand(appid)
winname = self.wins.expand(winid)
val["w"] = [appname, winname]
else:
val["quit"] = True
items.append(val)
import pprint
pprint.pprint(items)
return items
def main(logfile, convert, read):
thingy = Writeythingy(os.path.dirname(logfile))
if convert:
thingy.convert()
elif read:
return thingy.loadall()
else:
thingy.run()
parser = argparse.ArgumentParser()
parser.add_argument('logfile', default="~/.usagelog/log", nargs="?",
type=lambda x: os.path.abspath(os.path.expanduser(x)))
parser.add_argument('--convert', action="store_true")
parser.add_argument('--read', action="store_true")
if __name__ == "__main__":
args = parser.parse_args()
r = main(**vars(args))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment