Create a gist now

Instantly share code, notes, and snippets.

Embed
Geekworm battery monitor script
#! /usr/bin/python
# -*- coding: utf-8 -*-
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
# Geekworks RPi Power Pack Hat Monitor Service
# Copyright (C) 2018 by Xose Pérez <xose.perez@gmail.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import sys
import signal
import threading
import time
import argparse
from lib.smbus import SMBus
from lib.config import Config
from lib.utils import ServiceExit, service_shutdown, write_pid_file
from lib.utils import printd, Level, Color, clr, set_debug_level
# ------------------------------------------------------------------------------
class Battery(threading.Thread):
bus = SMBus(1)
address = 0x62
capacity = 50
state = 1
count = 0
flag = False
def __init__(self, interval, threshold, mincount):
threading.Thread.__init__(self)
self.interval = interval
self.threshold = threshold
self.mincount = mincount
self.shutdown_flag = threading.Event()
self.bus.write_byte_data(self.address, 0x0A, 0x00)
def run(self):
while not self.shutdown_flag.is_set():
# Calculate current charge
msb = int(self.bus.read_byte_data(self.address, 0x04))
lsb = int(self.bus.read_byte_data(self.address, 0x05))
capacity = msb + lsb / 255.0
# Calculate tendency
if (self.capacity > capacity):
state = 0
if (self.capacity < capacity):
state = 1
if (self.state != state):
self.count = 0
self.state = state
self.capacity = capacity
self.count = self.count + 1
# Debug
printd(clr(Color.LIGHT_GREY, "Battery capacity: %6.2f%% (%s)" % (self.capacity, "charging" if self.state == 1 else "discharging")), Level.DEBUG)
# Alerts
if not self.flag and self.state == 0 and self.capacity < self.threshold and self.count > self.mincount:
self.flag = True
message = "Shutting down system in 60 seconds due to critical battery level"
printd(clr(Color.GREEN, message), Level.INFO)
os.system("sudo shutdown -h +1 %s" % message)
if self.flag and self.state == 1:
self.flag = False
message = "Battery charging: shutdown cancelled"
printd(clr(Color.GREEN, message), Level.INFO)
os.system("sudo shutdown -c")
# Check again in X seconds
time.sleep(self.interval)
# ------------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-c", dest="config", action="store", help="configuration file", default="config.yml")
parser.add_argument("-v", dest="verbose", action="count", help="verbose mode", default=0)
arg = parser.parse_args(sys.argv[1:])
# set debug level
set_debug_level(Level.INFO if 0 == arg.verbose else Level.DEBUG)
# Welcome message
printd(clr(Color.WHITE, "\nGeekworm Hat Battery Monitor\n"), Level.INFO)
# Load configuration
config = Config(arg.config)
config.load();
# Configuration values
check_interval = config.get("battery", "check_interval", 10)
charge_threshold = config.get("battery", "charge_threshold", 25)
threshold_count = config.get("battery", "threshold_count", 6)
try:
# Init job and signal callbacks
signal.signal(signal.SIGTERM, service_shutdown)
signal.signal(signal.SIGINT, service_shutdown)
job = Battery(check_interval, charge_threshold, threshold_count)
job.start()
# Keep on running
while True:
time.sleep(1)
except ServiceExit:
job.shutdown_flag.set()
None
if __name__ == '__main__':
main()
import ruamel.yaml
from ruamel.yaml.util import load_yaml_guess_indent
class Config(object):
filename = None
def __init__(self, filename):
self.filename = filename
self.load()
def load(self):
self.config, self.ind, self.bsi = load_yaml_guess_indent(open(self.filename))
def get(self, section, key, default):
try:
return self.config[section][key]
except:
None
return default
def set(self, section, key, value):
self.config[section][key] = value
battery:
# How often should we check the battery level (in seconds)
check_interval: 10
# Charge threshold
charge_threshold: 100
# Shut down after these many intervals discharging below the threshold
threshold_count: 180
import os
import re
import sys
import glob
from time import sleep
from threading import Thread
# ------------------------------------------------------------------------------
# USB
# ------------------------------------------------------------------------------
def find_devices(vendor_id = None, product_id = None):
"""
Looks for USB devices
optionally filtering by with the provided vendor and product IDs
"""
devices = []
for dn in glob.glob('/sys/bus/usb/devices/*'):
try:
vid = int(open(os.path.join(dn, "idVendor" )).read().strip(), 16)
pid = int(open(os.path.join(dn, "idProduct")).read().strip(), 16)
if ((vendor_id is None) or (vid == vendor_id)) and ((product_id is None) or (pid == product_id)):
dns = glob.glob(os.path.join(dn, os.path.basename(dn) + "*"))
for sdn in dns:
for fn in glob.glob(os.path.join(sdn, "*")):
if re.search(r"\/ttyUSB[0-9]+$", fn):
devices.append(os.path.join("/dev", os.path.basename(fn)))
pass
pass
pass
pass
except ( ValueError, TypeError, AttributeError, OSError, IOError ):
pass
pass
return devices
# ------------------------------------------------------------------------------
# TIMER
# ------------------------------------------------------------------------------
class IntervalTimer(Thread):
def __init__(self, interval, callback, c_kwargs={}):
Thread.__init__(self)
self.interval = interval
self.callback = callback
self.c_kwargs = c_kwargs
self.daemon = True
self.running = True
self.start()
def stop(self):
self.running = False
def run(self):
while self.running:
self.callback(**self.c_kwargs)
sleep(self.interval)
# ------------------------------------------------------------------------------
# EXCEPTIONS
# ------------------------------------------------------------------------------
class ServiceExit(Exception):
"""
Custom exception which is used to trigger the clean exit
of all running threads and the main program.
"""
pass
def service_shutdown(signum, frame):
print('Caught signal %d' % signum)
raise ServiceExit
# ------------------------------------------------------------------------------
# PID
# ------------------------------------------------------------------------------
def write_pid_file(name):
pid = str(os.getpid())
h = open("/tmp/%s.pid" % name, "w")
h.write(pid)
h.close()
# ------------------------------------------------------------------------------
# DEBUG
# ------------------------------------------------------------------------------
class Level(object):
CRITICAL = 0
WARNING = 1
INFO = 2
DEBUG = 3
BLOAT = 4
VERBOSITY = Level.INFO
def printd(string, level):
if VERBOSITY >= level:
print(string)
def set_debug_level(lvl):
global VERBOSITY
VERBOSITY = lvl
# ------------------------------------------------------------------------------
# OTHER
# ------------------------------------------------------------------------------
def isRPi():
return (os.uname()[4] == 'armv7l')
class Color(object):
BLACK = '\x1b[1;30m'
RED = '\x1b[1;31m'
GREEN = '\x1b[1;32m'
YELLOW = '\x1b[1;33m'
BLUE = '\x1b[1;34m'
MAGENTA = '\x1b[1;35m'
CYAN = '\x1b[1;36m'
WHITE = '\x1b[1;37m'
LIGHT_GREY = '\x1b[0;30m'
LIGHT_RED = '\x1b[0;31m'
LIGHT_GREEN = '\x1b[0;32m'
LIGHT_YELLOW = '\x1b[0;33m'
LIGHT_BLUE = '\x1b[0;34m'
LIGHT_MAGENTA = '\x1b[0;35m'
LIGHT_CYAN = '\x1b[0;36m'
LIGHT_WHITE = '\x1b[0;37m'
def clr(color, text):
return color + str(text) + '\x1b[0m'
def check_root():
if not os.geteuid() == 0:
printd(clr(Color.RED, "Run as root."), Level.CRITICAL)
exit(1)
def hex_offset_to_string(byte_array):
temp = byte_array.replace("\n", "")
temp = temp.replace(" ", "")
return temp.decode("hex")
def mac_to_bytes(mac):
return ''.join(chr(int(x, 16)) for x in mac.split(':'))
def bytes_to_mac(byte_array):
return ':'.join("{:02x}".format(ord(byte)) for byte in byte_array)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment