Skip to content

Instantly share code, notes, and snippets.

@4knahs
Last active June 3, 2021 11:36
Show Gist options
  • Save 4knahs/07d97ced5b1ded244cf8ff0b0a33eaa6 to your computer and use it in GitHub Desktop.
Save 4knahs/07d97ced5b1ded244cf8ff0b0a33eaa6 to your computer and use it in GitHub Desktop.
Monsoon python daemon
import Monsoon.HVPM as HVPM
import Monsoon.LVPM as LVPM
import Monsoon.sampleEngine as sampleEngine
import Monsoon.pmapi as pmapi
import os, signal
from multiprocessing import Process, Pipe
class MonsoonDaemon:
_instance = None
_logger = None
_warn = _info = _debug = lambda x: print(x)
@classmethod
def get_instance(cls, current, vout, serialno, auto_start=True, use_logger=True):
if cls._instance is None:
cls._instance = cls(
current,
vout,
serialno,
auto_start=auto_start,
use_logger=use_logger)
return cls._instance
def __init__(self, current, vout, serialno, auto_start=True, use_logger=True):
if self._instance is not None:
raise Exception(f"{self.__class__.__name__} class is a singleton")
else:
# https://stackoverflow.com/questions/40775054/capturing-sigint-using-keyboardinterrupt-exception-works-in-terminal-not-in-scr
signal.signal(signal.SIGINT, signal.default_int_handler)
self.daemon_process = None
self.parent_conn, self.child_conn = None, None
self.current = current
self.vout = vout
self.serialno = serialno
if use_logger:
from .logger import Logger
MonsoonDaemon._logger = Logger.get()
MonsoonDaemon._warn = self._logger.warn
MonsoonDaemon._info = self._logger.info
MonsoonDaemon._debug = self._logger.debug
if auto_start:
self.launch_daemon()
@classmethod
def init_vout_amps(cls, engine, hvmon, current, vout, tolerance=0.1):
# Retrieves the current monsoon status
hvmon.fillStatusPacket()
powerup_amps = hvmon.statusPacket.powerupCurrentLimit
runtime_amps = hvmon.statusPacket.runtimeCurrentLimit
vout_channel = hvmon.getVoltageChannel()
cls._debug(vout_channel)
cls._debug(f"powerupCurrentLimit: {powerup_amps}")
cls._debug(f"runtimeCurrentLimit: {runtime_amps}")
#self._debug("dacCalHigh: ", hvmon.statusPacket.dacCalHigh)
#self._debug("dacCalLow: ", hvmon.statusPacket.dacCalLow)
# Avoid setting if already fine, this prevents device power off
# Note that we assume the vout_channel is the main one, change if you are measuring
# e.g., USB channel power. Also by default the current seems to be [2.2, 8.9]
if vout_channel != 0 or \
powerup_amps > current + tolerance or \
powerup_amps < current - tolerance or \
runtime_amps > current + tolerance or \
runtime_amps < current - tolerance :
cls._warn("detected wrong current level. resetting monsoon.")
# The following lines temporarely turn off power
# i.e., device might power off and require manual intervention
hvmon.setPowerUpCurrentLimit(current)
hvmon.setRunTimeCurrentLimit(current)
hvmon.fillStatusPacket()
hvmon.calibrateVoltage()
hvmon.setVout(vout)
#Setting all channels enabled
engine.enableChannel(sampleEngine.channels.MainCurrent)
engine.enableChannel(sampleEngine.channels.MainVoltage)
# Disabling other channels
engine.disableChannel(sampleEngine.channels.USBCurrent)
engine.disableChannel(sampleEngine.channels.USBVoltage)
engine.disableChannel(sampleEngine.channels.AuxCurrent)
#mon.setUSBPassthroughMode(op.USB_Passthrough.On)
else:
# device stays powered on
cls._debug("skipping monsoon vout/amps settings")
@classmethod
def monsoon_engine(cls, current, vout, serialno):
usb_proto = pmapi.USB_protocol()
cls._debug("Configuring monsoon")
hvmon = HVPM.Monsoon()
hvmon.setup_usb(serialno, usb_proto)
cls._debug("HVPM Serial Number: " + repr(hvmon.getSerialNumber()))
engine = sampleEngine.SampleEngine(hvmon)
#Turning off engine log spam
engine.ConsoleOutput(False)
cls.init_vout_amps(engine, hvmon, current, vout)
cls._info("Finished monsoon init")
return hvmon, engine
@classmethod
def measure(
_cls,
hv_engine,
_output_csv="energy.csv", # still keeping the API in case we switch from csv to mem
samples=sampleEngine.triggers.SAMPLECOUNT_INFINITE):
# For signaling purposes its better to keep the csv out from here
#hv_engine.enableCSVOutput(output_csv)
hv_engine.startSampling(samples, 1)
@classmethod
def monsoon_process_f(cls, conn, current, vout, serialno):
_mon, engine = cls.monsoon_engine(current, vout, serialno)
cls._debug('(MonsoonDaemon) Started Monsoon Daemon')
conn.send('init')
while True:
cls._debug('(MonsoonDaemon) Waiting for measurements start')
measurement_name = conn.recv()
if measurement_name == 'terminate':
cls._debug('(MonsoonDaemon) Terminating')
conn.send('terminate')
break
cls._debug(f'(MonsoonDaemon) Starting measurement: {measurement_name}')
engine.enableCSVOutput(measurement_name)
conn.send('experiment start')
cls.measure(engine)
conn.send('experiment stop')
def launch_daemon(self):
MonsoonDaemon._debug('Launching Monsoon daemon')
self.parent_conn, self.child_conn = Pipe()
self.daemon_process = Process(
target=self.monsoon_process_f,
args=(
self.child_conn,
self.current,
self.vout,
self.serialno))
self.daemon_process.start()
MonsoonDaemon._debug(self.parent_conn.recv())
def check_daemon(self):
if self.daemon_process is None:
raise ValueError('launch_daemon needs to be run before any other method')
if not self.daemon_process.is_alive():
raise ValueError('Monsoon Daemon died unexpectably')
def start_measurement(self, name):
self.check_daemon()
MonsoonDaemon._info(f'Starting monsoon measurement: {name}')
self.parent_conn.send(name)
MonsoonDaemon._debug(self.parent_conn.recv())
def wait_read_data(self, timeout=1):
import select
ready = []
while not len(ready):
ready, _, _ = select.select([self.parent_conn], [], [], timeout)
if len(ready):
return self.parent_conn.recv()
return False
def stop_measurement(self, max_attempts=3):
self.check_daemon()
MonsoonDaemon._info('Stopping monsoon measurement')
pid = self.daemon_process.pid
MonsoonDaemon._debug(pid)
# monsoon library is blocking, this is the only reliable way i found to stop measurements
# (ctrl + c in unix systems)
attempts = 0
while True:
if attempts >= max_attempts:
self._warn(f"Max number of SIGINT attempts reached : {max_attempts}")
break
MonsoonDaemon._debug("Sending SIGINT")
os.kill(pid, signal.SIGINT)
ret = self.wait_read_data(timeout=1)
if ret:
MonsoonDaemon._debug(ret)
break
attempts += 1
MonsoonDaemon._debug('done')
def stop_daemon(self):
MonsoonDaemon._info('Stopping monsoon daemon')
self.check_daemon()
self.parent_conn.send('terminate')
MonsoonDaemon._debug(self.parent_conn.recv())
self.daemon_process.join(20)
# SIGTERM stops the monsoon process
self.daemon_process.terminate()
self.daemon_process = None
self.parent_conn = None
def measure(daemon, sleep_time, output):
import time
print('Giving some time for the daemon to be set.')
time.sleep(10) # TODO: check the daemon status instead
print('starting measurements')
daemon.start_measurement(output)
time.sleep(sleep_time)
daemon.stop_measurement()
daemon.stop_daemon()
def main():
import argparse # only needed when main is run
import time
parser = argparse.ArgumentParser(description='Monsoon Daemon.')
parser.add_argument('-measure', action="store_true", default=False, help='Measures energy and outputs to a csv file.')
parser.add_argument('-output', action="store", type=str, default='/tmp/energy.csv', help='Use together with -measure to define the output file location (default: /tmp/energy.csv).')
parser.add_argument('-serial', action="store", type=str, required=True, help='Set the serialno of your monsoon device (default: None).')
parser.add_argument('-time', action="store", type=int, default=60, help='Use together with -measure to define the period of time (in s) to measure energy (default: 60).')
parser.add_argument('-current', action="store", type=float, default=5.0, help='Set the current in Amps to be used (default: 5.0).')
parser.add_argument('-vout', action="store", type=float, default=12.0, help='Set the vouts in V to be used (default: 12.0).')
args = parser.parse_args()
print(f'args: {args}')
print('Starting daemon')
daemon = MonsoonDaemon(args.current, args.vout, args.serial, use_logger=False)
if args.measure:
measure(daemon, args.time, args.output)
else: # Turns power on
time.sleep(5)
daemon.stop_daemon()
print("You can now boot your mobile device")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment