Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
#!/usr/bin/python3
import argparse
import datetime
import dbus
import os
import shutil
import stat
import subprocess
import sys
from threading import Event
import time
events = ['under-voltage', 'freq-cap', 'throttle']
# 0: under-voltage
# 1: arm frequency capped
# 2: currently throttled
# 16: under-voltage has occurred
# 17: arm frequency capped has occurred
# 18: throttling has occurred
def stdout_write(msg):
sys.stdout.write(msg); sys.stdout.flush()
class VcGenCmd(object):
def __init__(self, mask=0, period=0):
self.cmd = shutil.which("vcgencmd")
if not self.cmd:
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), 'vcgencmd')
self.t = time.time()
self.mask = mask
self.period = period
self.event = None
def get_throttled_now(self):
cmd = "%s get_throttled" % self.cmd
if self.mask:
cmd += " 0x%x" % self.mask
raw = subprocess.getoutput(cmd)
val = int(raw.split('=')[1], 16)
return val
def get_throttled(self):
t2 = time.time()
d = t2 - self.t
#print("d=%s" % (d))
if d < self.period:
if self.event:
self.event.wait(self.period - d)
else:
time.sleep(self.period - d)
self.t = time.time()
return self.get_throttled_now()
class EventOutput(object):
def __init__(self, events):
self.events = events
self.set_mask()
def set_mask(self):
self.mask = 0
if 'under-voltage' in self.events:
self.mask |= 1
if 'freq-cap' in self.events:
self.mask |= 2
if 'throttle' in self.events:
self.mask |= 4
self.mask |= self.mask << 16
class LogfileOutput(EventOutput):
def __init__(self, events, fname):
super().__init__(events)
# Try to verify that we can actually write to this file when the time comes
if fname:
if os.path.isfile(fname):
if not os.access(fname, os.W_OK):
raise PermissionError(" Permission denied: '%s'" % fname)
else:
with open(fname, 'a'):
os.utime(fname, None)
self.fname = fname
def value(self, value, prev):
v = value & self.mask
v >>= 16
p = prev & self.mask
p >>= 16
if (not v and not p) or v == p:
return
msg = ""
for i in range(3):
vbit = v & (1 << i)
pbit = p & (1 << i)
if vbit or pbit:
msg += " %s=%d" % (events[i], vbit > 0)
msg += " (0x%08x -> 0x%08x)\n" % (prev, value)
try:
self.log(msg)
except Exception as e:
sys.stderr.write("Error writing log: %s\n" % e); sys.stderr.flush()
def log(self, msg):
t = datetime.datetime.fromtimestamp(time.time()).strftime('%b %d %H:%M:%S')
msg = "%s:%s" % (t, msg)
if not self.fname:
sys.stdout.write(msg)
sys.stdout.flush()
else:
with open(self.fname, "a") as f:
f.write(msg)
class DmesgOutput(LogfileOutput):
def __init__(self, events):
super(LogfileOutput, self).__init__(events)
# Try to verify that we can actually write to this file when the time comes
self.fname = '/dev/kmsg'
if not stat.S_ISCHR(os.stat(self.fname).st_mode) or not os.access(self.fname, os.W_OK):
raise IOError("Can't access '%s'" % self.fname)
def log(self, msg):
with open(self.fname, "a") as f:
f.write(msg)
class NotifyOutput(EventOutput):
def __init__(self, events):
super().__init__(events)
bus = dbus.SessionBus()
notifier = bus.get_object('org.freedesktop.Notifications', '/org/freedesktop/Notifications')
self.iface = dbus.Interface(notifier, 'org.freedesktop.Notifications')
self.nid = None
def value(self, value, prev):
v = value & self.mask
v >>= 16
p = prev & self.mask
p >>= 16
if v == p:
return
if not v:
if self.nid:
try:
self.iface.CloseNotification(self.nid)
except dbus.exceptions.DBusException:
pass
self.nid = None
return
msg = ""
ev = None
for i in range(3):
vbit = v & (1 << i)
pbit = p & (1 << i)
if vbit or pbit:
msg += " %s=%d" % (events[i], vbit > 0)
if not ev and vbit:
ev = events[i]
msg += " (0x%08x -> 0x%08x)\n" % (prev, value)
if not ev:
ev = 'ERROR: Event not set'
try:
self.nid = self.iface.Notify('get_throttled', 0, 'dialog-warning', ev.capitalize(), msg, [], {}, 0)
except dbus.exceptions.DBusException as e:
sys.stderr.write("Error sending notification: %s\n" % e); sys.stderr.flush()
def show_get_throttled_bit(val, bit, msg):
if val & (1 << bit):
print("%d: %s" %(bit, msg))
def show_get_throttled(val):
print("throttled=0x%x" % val)
if not val:
return
print('\nBits set:')
show_get_throttled_bit(val, 0, 'under-voltage')
show_get_throttled_bit(val, 1, 'arm frequency capped')
show_get_throttled_bit(val, 2, 'currently throttled')
show_get_throttled_bit(val, 16, 'under-voltage has occurred')
show_get_throttled_bit(val, 17, 'arm frequency capped has occurred')
show_get_throttled_bit(val, 18, 'throttling has occurred')
exit = Event()
def quit(signo, _frame):
exit.set()
def main():
parser = argparse.ArgumentParser(description='vcgencmd get_throttled logger and notifier',
epilog="EVENT: %s\nIf event is not specified, all is implied.\n\nOnly the sticky bits are tracked" % (', '.join(events)),
formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument("--dmesg", metavar='EVENT', choices=events, nargs='*', help="Log events to kernel log")
parser.add_argument("--log", metavar='EVENT', choices=events, nargs='*', help="Log events to file")
parser.add_argument("--logfile", help="Logfile (default is stdout)")
parser.add_argument("--notify", metavar='EVENT', choices=events, nargs='*', help="Desktop event notification")
parser.add_argument("--multiload", metavar='EVENT', choices=events, nargs='*', help="multiload-ng parametric graphs")
parser.add_argument("--period", help="Polling period in seconds (default: %(default)s)", type=float, default=5.0)
parser.add_argument("-v", "--verbose", help="Verbose", action="store_true")
args = parser.parse_args()
running = subprocess.getoutput("ps aux | grep -e get_throttled | grep -v grep | wc -l")
try:
if int(running) > 1:
stdout_write("Looks like get_throttled is already running\n\n")
except ValueError:
pass
if not args.dmesg is None and not args.dmesg:
args.dmesg = events
if not args.log is None and not args.log:
args.log = events
if not args.notify is None and not args.notify:
args.notify = events
if not args.multiload is None and not args.multiload:
args.multiload = events
if args.verbose:
if args.dmesg:
stdout_write("dmesg=%s\n" % args.dmesg)
if args.log:
stdout_write("log=%s\n" % args.log)
if args.logfile:
stdout_write("logfile=%s\n" % args.logfile)
if args.notify:
stdout_write("notify=%s\n" % args.notify)
if args.multiload:
stdout_write("multiload=%s\n" % args.multiload)
stdout_write("period=%s\n" % args.period)
outputs = []
if args.dmesg:
outputs.append(DmesgOutput(args.dmesg))
if args.log:
outputs.append(LogfileOutput(args.log, args.logfile))
if args.notify:
try:
outputs.append(NotifyOutput(args.notify))
except dbus.exceptions.DBusException as e:
sys.stderr.write("Can't connect to dbus notification daemon\n")
sys.stderr.write("Error: %s\n" % e)
sys.stderr.flush()
if not outputs:
sys.exit(1)
sticky_clear_mask = 0
for output in outputs:
sticky_clear_mask |= output.mask
sticky_clear_mask >>= 16
if args.verbose:
stdout_write("sticky_clear_mask=0x%x\n" % sticky_clear_mask)
stdout_write("\n\n")
try:
vcgencmd = VcGenCmd(sticky_clear_mask, args.period)
except FileNotFoundError:
sys.stderr.write("Can't find the 'vcgencmd' command\n")
sys.exit(1)
if not outputs:
show_get_throttled(vcgencmd.get_throttled_now())
sys.exit(0)
vcgencmd.event = exit
import signal
for sig in ('TERM', 'HUP', 'INT'):
signal.signal(getattr(signal, 'SIG'+sig), quit);
prev = 0
while not exit.is_set():
try:
val = vcgencmd.get_throttled()
except (ValueError, IndexError, IOError) as e:
if not exit.is_set():
sys.stderr.write("Error getting value: %s\n" % e); sys.stderr.flush()
continue
if exit.is_set():
break
if args.verbose:
stdout_write("get_throttled=0x%x\n" % (val))
for output in outputs:
output.value(val, prev)
prev = val
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.