Created
February 17, 2018 16:28
-
-
Save notro/33a6b8b9a10665dc09a9224f917bc10b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import argparse | |
import datetime | |
import dbus | |
import os | |
import shutil | |
import stat | |
import subprocess | |
import sys | |
from threading import Event | |
import time | |
events = ['under-voltage', 'freq-cap', 'throttle'] | |
# 0: under-voltage | |
# 1: arm frequency capped | |
# 2: currently throttled | |
# 16: under-voltage has occurred | |
# 17: arm frequency capped has occurred | |
# 18: throttling has occurred | |
def stdout_write(msg): | |
sys.stdout.write(msg); sys.stdout.flush() | |
class VcGenCmd(object): | |
def __init__(self, mask=0, period=0): | |
self.cmd = shutil.which("vcgencmd") | |
if not self.cmd: | |
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), 'vcgencmd') | |
self.t = time.time() | |
self.mask = mask | |
self.period = period | |
self.event = None | |
def get_throttled_now(self): | |
cmd = "%s get_throttled" % self.cmd | |
if self.mask: | |
cmd += " 0x%x" % self.mask | |
raw = subprocess.getoutput(cmd) | |
val = int(raw.split('=')[1], 16) | |
return val | |
def get_throttled(self): | |
t2 = time.time() | |
d = t2 - self.t | |
#print("d=%s" % (d)) | |
if d < self.period: | |
if self.event: | |
self.event.wait(self.period - d) | |
else: | |
time.sleep(self.period - d) | |
self.t = time.time() | |
return self.get_throttled_now() | |
class EventOutput(object): | |
def __init__(self, events): | |
self.events = events | |
self.set_mask() | |
def set_mask(self): | |
self.mask = 0 | |
if 'under-voltage' in self.events: | |
self.mask |= 1 | |
if 'freq-cap' in self.events: | |
self.mask |= 2 | |
if 'throttle' in self.events: | |
self.mask |= 4 | |
self.mask |= self.mask << 16 | |
class LogfileOutput(EventOutput): | |
def __init__(self, events, fname): | |
super().__init__(events) | |
# Try to verify that we can actually write to this file when the time comes | |
if fname: | |
if os.path.isfile(fname): | |
if not os.access(fname, os.W_OK): | |
raise PermissionError(" Permission denied: '%s'" % fname) | |
else: | |
with open(fname, 'a'): | |
os.utime(fname, None) | |
self.fname = fname | |
def value(self, value, prev): | |
v = value & self.mask | |
v >>= 16 | |
p = prev & self.mask | |
p >>= 16 | |
if (not v and not p) or v == p: | |
return | |
msg = "" | |
for i in range(3): | |
vbit = v & (1 << i) | |
pbit = p & (1 << i) | |
if vbit or pbit: | |
msg += " %s=%d" % (events[i], vbit > 0) | |
msg += " (0x%08x -> 0x%08x)\n" % (prev, value) | |
try: | |
self.log(msg) | |
except Exception as e: | |
sys.stderr.write("Error writing log: %s\n" % e); sys.stderr.flush() | |
def log(self, msg): | |
t = datetime.datetime.fromtimestamp(time.time()).strftime('%b %d %H:%M:%S') | |
msg = "%s:%s" % (t, msg) | |
if not self.fname: | |
sys.stdout.write(msg) | |
sys.stdout.flush() | |
else: | |
with open(self.fname, "a") as f: | |
f.write(msg) | |
class DmesgOutput(LogfileOutput): | |
def __init__(self, events): | |
super(LogfileOutput, self).__init__(events) | |
# Try to verify that we can actually write to this file when the time comes | |
self.fname = '/dev/kmsg' | |
if not stat.S_ISCHR(os.stat(self.fname).st_mode) or not os.access(self.fname, os.W_OK): | |
raise IOError("Can't access '%s'" % self.fname) | |
def log(self, msg): | |
with open(self.fname, "a") as f: | |
f.write(msg) | |
class NotifyOutput(EventOutput): | |
def __init__(self, events): | |
super().__init__(events) | |
bus = dbus.SessionBus() | |
notifier = bus.get_object('org.freedesktop.Notifications', '/org/freedesktop/Notifications') | |
self.iface = dbus.Interface(notifier, 'org.freedesktop.Notifications') | |
self.nid = None | |
def value(self, value, prev): | |
v = value & self.mask | |
v >>= 16 | |
p = prev & self.mask | |
p >>= 16 | |
if v == p: | |
return | |
if not v: | |
if self.nid: | |
try: | |
self.iface.CloseNotification(self.nid) | |
except dbus.exceptions.DBusException: | |
pass | |
self.nid = None | |
return | |
msg = "" | |
ev = None | |
for i in range(3): | |
vbit = v & (1 << i) | |
pbit = p & (1 << i) | |
if vbit or pbit: | |
msg += " %s=%d" % (events[i], vbit > 0) | |
if not ev and vbit: | |
ev = events[i] | |
msg += " (0x%08x -> 0x%08x)\n" % (prev, value) | |
if not ev: | |
ev = 'ERROR: Event not set' | |
try: | |
self.nid = self.iface.Notify('get_throttled', 0, 'dialog-warning', ev.capitalize(), msg, [], {}, 0) | |
except dbus.exceptions.DBusException as e: | |
sys.stderr.write("Error sending notification: %s\n" % e); sys.stderr.flush() | |
def show_get_throttled_bit(val, bit, msg): | |
if val & (1 << bit): | |
print("%d: %s" %(bit, msg)) | |
def show_get_throttled(val): | |
print("throttled=0x%x" % val) | |
if not val: | |
return | |
print('\nBits set:') | |
show_get_throttled_bit(val, 0, 'under-voltage') | |
show_get_throttled_bit(val, 1, 'arm frequency capped') | |
show_get_throttled_bit(val, 2, 'currently throttled') | |
show_get_throttled_bit(val, 16, 'under-voltage has occurred') | |
show_get_throttled_bit(val, 17, 'arm frequency capped has occurred') | |
show_get_throttled_bit(val, 18, 'throttling has occurred') | |
exit = Event() | |
def quit(signo, _frame): | |
exit.set() | |
def main(): | |
parser = argparse.ArgumentParser(description='vcgencmd get_throttled logger and notifier', | |
epilog="EVENT: %s\nIf event is not specified, all is implied.\n\nOnly the sticky bits are tracked" % (', '.join(events)), | |
formatter_class=argparse.RawTextHelpFormatter) | |
parser.add_argument("--dmesg", metavar='EVENT', choices=events, nargs='*', help="Log events to kernel log") | |
parser.add_argument("--log", metavar='EVENT', choices=events, nargs='*', help="Log events to file") | |
parser.add_argument("--logfile", help="Logfile (default is stdout)") | |
parser.add_argument("--notify", metavar='EVENT', choices=events, nargs='*', help="Desktop event notification") | |
parser.add_argument("--multiload", metavar='EVENT', choices=events, nargs='*', help="multiload-ng parametric graphs") | |
parser.add_argument("--period", help="Polling period in seconds (default: %(default)s)", type=float, default=5.0) | |
parser.add_argument("-v", "--verbose", help="Verbose", action="store_true") | |
args = parser.parse_args() | |
running = subprocess.getoutput("ps aux | grep -e get_throttled | grep -v grep | wc -l") | |
try: | |
if int(running) > 1: | |
stdout_write("Looks like get_throttled is already running\n\n") | |
except ValueError: | |
pass | |
if not args.dmesg is None and not args.dmesg: | |
args.dmesg = events | |
if not args.log is None and not args.log: | |
args.log = events | |
if not args.notify is None and not args.notify: | |
args.notify = events | |
if not args.multiload is None and not args.multiload: | |
args.multiload = events | |
if args.verbose: | |
if args.dmesg: | |
stdout_write("dmesg=%s\n" % args.dmesg) | |
if args.log: | |
stdout_write("log=%s\n" % args.log) | |
if args.logfile: | |
stdout_write("logfile=%s\n" % args.logfile) | |
if args.notify: | |
stdout_write("notify=%s\n" % args.notify) | |
if args.multiload: | |
stdout_write("multiload=%s\n" % args.multiload) | |
stdout_write("period=%s\n" % args.period) | |
outputs = [] | |
if args.dmesg: | |
outputs.append(DmesgOutput(args.dmesg)) | |
if args.log: | |
outputs.append(LogfileOutput(args.log, args.logfile)) | |
if args.notify: | |
try: | |
outputs.append(NotifyOutput(args.notify)) | |
except dbus.exceptions.DBusException as e: | |
sys.stderr.write("Can't connect to dbus notification daemon\n") | |
sys.stderr.write("Error: %s\n" % e) | |
sys.stderr.flush() | |
if not outputs: | |
sys.exit(1) | |
sticky_clear_mask = 0 | |
for output in outputs: | |
sticky_clear_mask |= output.mask | |
sticky_clear_mask >>= 16 | |
if args.verbose: | |
stdout_write("sticky_clear_mask=0x%x\n" % sticky_clear_mask) | |
stdout_write("\n\n") | |
try: | |
vcgencmd = VcGenCmd(sticky_clear_mask, args.period) | |
except FileNotFoundError: | |
sys.stderr.write("Can't find the 'vcgencmd' command\n") | |
sys.exit(1) | |
if not outputs: | |
show_get_throttled(vcgencmd.get_throttled_now()) | |
sys.exit(0) | |
vcgencmd.event = exit | |
import signal | |
for sig in ('TERM', 'HUP', 'INT'): | |
signal.signal(getattr(signal, 'SIG'+sig), quit); | |
prev = 0 | |
while not exit.is_set(): | |
try: | |
val = vcgencmd.get_throttled() | |
except (ValueError, IndexError, IOError) as e: | |
if not exit.is_set(): | |
sys.stderr.write("Error getting value: %s\n" % e); sys.stderr.flush() | |
continue | |
if exit.is_set(): | |
break | |
if args.verbose: | |
stdout_write("get_throttled=0x%x\n" % (val)) | |
for output in outputs: | |
output.value(val, prev) | |
prev = val | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment