Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save CatherineH/6821b0009b39a4557d6af7ca334fc700 to your computer and use it in GitHub Desktop.
Save CatherineH/6821b0009b39a4557d6af7ca334fc700 to your computer and use it in GitHub Desktop.
Continuously reads power values from an Aerotec Energy Meter with pyopenzwave
import time
from openzwave.option import ZWaveOption
from libopenzwave import PyManager
import csv
from matplotlib import pyplot
device="/dev/ttyUSB0"
options = ZWaveOption(device, config_path="/etc/openzwave/", user_path=".", cmd_line="")
options.set_console_output(False)
options.lock()
class PowerMeter(PyManager):
def __init__(self):
super(PowerMeter, self).__init__()
self.create()
self.addWatcher(self.callback)
time.sleep(1.0)
self.addDriver(device)
self.meter_values = []
self.start_time = None
self.units = None
self.value_id = {}
def get_meter_values(self, num_seconds):
self.start_time = time.time()
while time.time() - self.start_time < num_seconds:
time.sleep(0.10)
if self.value_id:
assert self.refreshValue(self.value_id)
def callback(self, zwargs):
# get the node id/home from the first message tagged with the sensor we want
if "valueId" in zwargs:
if zwargs["valueId"]["commandClass"] == "COMMAND_CLASS_METER":
if zwargs["valueId"]["label"] == "Power" and \
zwargs["valueId"]["instance"] == 1 \
and self.start_time is not None:
self.value_id = zwargs["valueId"]["id"]
self.units = zwargs["valueId"]["units"]
self.meter_values.append((time.time() - self.start_time,
zwargs["valueId"]["value"]))
print("Power", zwargs["valueId"]["value"])
manager = PowerMeter()
# collect data for an hour
manager.get_meter_values(60*60)
# add to a csv
with open('meter_values_%s.csv' % manager.start_time, 'wb') as csvfile:
spamwriter = csv.writer(csvfile, delimiter=',', quoting=csv.QUOTE_MINIMAL)
spamwriter.writerow(['time (s)', 'meter (%s)' % manager.units])
for row in manager.meter_values:
spamwriter.writerow(row)
fig, ax = pyplot.subplots()
ax.plot([point[0]/60.0 for point in manager.meter_values],
[point[1] for point in manager.meter_values])
ax.set_xlabel("Time (minutes)")
ax.set_ylabel("Meter Value (%s)" % manager.units)
fig.savefig("power_meter_%s.png" % manager.start_time)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment