Skip to content

Instantly share code, notes, and snippets.

@bgulla
Created August 14, 2019 12:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bgulla/c23c39383b99f4ac28ce8d2987a954fe to your computer and use it in GitHub Desktop.
Save bgulla/c23c39383b99f4ac28ce8d2987a954fe to your computer and use it in GitHub Desktop.
Reading electric meters using a usb-sdr
#!/usr/bin/python
import sys
import os
import platform
import time
import subprocess
import json
import socket
import logging
from influxdb import InfluxDBClient
import os.path
import paho.mqtt.publish as publish
import paho.mqtt.client as paho
RTLAMR_PATH='/usr/bin/rtlamr'
ARCH="x86_64"
# TODO: this is hacky AF fix it
for key in platform.uname():
if 'armv' in key:
RTLAMR_PATH = RTLAMR_PATH+"-arm"
ARCH=key
# TODO: get rid of this
HOSTNAME = platform.node()
try:
HOSTNAME = os.getenv('HOSTNAME', 'housemeter')
except:
pass
def send_mqtt(topic, payload,):
try:
publish.single(topic, payload=(payload), qos=1, hostname=os.getenv('MQTT_HOST'), port=int(os.getenv('MQTT_PORT')), auth=None)
logger.info("[mqtt] " + topic +"->" + str(payload))
except Exception as ex:
logger.error("MQTT Publish Failed: " + str(ex) +" "+ topic + " " + str(payload))
def isPortOpen(HOST, PORT):
"""
:param HOST:
:param PORT:
:return:
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex((HOST,int(PORT)))
if result == 0:
return False
else:
return True
# TODO: make this an argument '--rtlamr=...'
if not os.path.exists(RTLAMR_PATH):
RTLAMR_PATH='/home/brandon/projects/golang/bin/rtlamr'
if not os.path.exists(RTLAMR_PATH):
print("rtlamr executable not found")
sys.exit(1)
# TODO: pass in influxdb vars as method arguments
def send_reading_to_influxdb(reading):
"""
:param reading:
:return:
"""
dbname = os.getenv('INFLUXDB_DATABASE', 'power')
user = os.getenv('INFLUXDB_USER', "")
password = os.getenv('INFLUXDB_PASSWORD', "")
port = os.getenv('INFLUXDB_PORT', 8086)
host = os.getenv('INFLUXDB_HOST', '10.0.1.9')
client = InfluxDBClient(host, port, user, password, dbname)
client.create_database(dbname)
# TODO this is hardcoded isnt it
json_body = [
{
'measurement': 'power',
'fields': {
'WATTS' : float(reading),
},
'tags': {
'host': 'housemeter'
}
}
]
client.write_points(json_body)
client.close()
return
if __name__ == '__main__':
logger = logging.getLogger('meterears')
# Setup Logging
logger.addHandler(logging.StreamHandler(stream=sys.stdout))
logger.setLevel(logging.INFO)
RTL_TCP_HOST = os.getenv('RTL_TCP_HOST', '0.0.0.0')
RTL_TCP_PORT = os.getenv('RTL_TCP_PORT', 1234)
FILTER_ID = os.getenv('FILTER_ID', "49623078")
READ_DELAY = os.getenv('READ_DELAY', 10)
INFLUXDB_HOST = os.getenv('INFLUXDB_HOST', "localhost")
INFLUXDB_DATABASE = os.getenv('INFLUXDB_DATABASE', "power")
INFLUXDB_USER = os.getenv('INFLUXDB_USER', "")
INFLUXDB_PASSWORD = os.getenv('INFLUXDB_PASSWORD', "")
INFLUXDB_PORT = os.getenv('INFLUXDB_PORT', 8086)
MQTT_HOST = os.getenv('MQTT_HOST', "localhost")
MQTT_PORT = os.getenv('MQTT_PORT', 1883)
print ("- service vars")
print ("RTL_TCP_HOST: ", RTL_TCP_HOST)
print ("ARCH: ", ARCH)
print ("RTLAMR_PATH: ", RTLAMR_PATH)
print ("RTL_TCP_PORT: ", RTL_TCP_PORT)
print ("FILTER_ID: ", FILTER_ID)
print ("- influxdb vars")
print ("INFLUXDB_HOST: ", INFLUXDB_HOST )
print ("INFLUXDB_PORT: ", INFLUXDB_PORT )
print ("INFLUXDB_DATABASE: ", INFLUXDB_DATABASE )
print ("INFLUXDB_USER: ", INFLUXDB_USER )
print ("INFLUXDB_PASSWORD: ", INFLUXDB_PASSWORD )
print("--------------------------------")
INFLUX_ENABLED = False
while isPortOpen(RTL_TCP_HOST, RTL_TCP_PORT) or True: ## What the shit is this brandon?
SERVER_STRING = "-server="+ RTL_TCP_HOST+ ":"+ str(RTL_TCP_PORT)
FILTER_ID_STRING = "-filterid=" + FILTER_ID
completed = subprocess.run( [RTLAMR_PATH,
SERVER_STRING,
FILTER_ID_STRING,
'-single=true',
'-format=json',
'-duration=10m'],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL)
try:
data=json.loads(completed.stdout.decode("utf-8"))
except ValueError:
logger.fatal("Unable to retrieve data from the USB SDR. Is the RTL_TCP process running?")
else:
reading = data['Message']['Consumption']
if reading:
logger.info(("[RTL_TCP] reading: " + str(reading)))
#if INFLUX_ENABLED:
# send_reading_to_influxdb(reading)
WH_MULTIPLIER = 1000
rate = int(reading) * WH_MULTIPLIER * (60.0 / int(READ_DELAY))
current_reading_in_kwh = (int(reading) * WH_MULTIPLIER) / 1000
send_mqtt( "foo", "bar")
send_mqtt(
'readings/' + str(FILTER_ID) + '/meter_reading',
'%s' % str(current_reading_in_kwh)
) #readings/49623078/meter_reading
send_mqtt(
'readings/' + str(FILTER_ID) + '/meter_rate',
'%s' % str(rate)
)
else:
logger.info("[WARN] blank reading")
time.sleep(int(READ_DELAY))
logger.fatal("[FATAL] unable to connect to " + RTL_TCP_HOST + " on port " + str(RTL_TCP_PORT))
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment