Skip to content

Instantly share code, notes, and snippets.

@bjarneeins
Forked from rbaarda/hs110-data-collect.py
Last active October 23, 2022 11:40
Show Gist options
  • Save bjarneeins/f10cb7dbca53f3150e151c7d3206fdb0 to your computer and use it in GitHub Desktop.
Save bjarneeins/f10cb7dbca53f3150e151c7d3206fdb0 to your computer and use it in GitHub Desktop.
Python3 script which collects data from a HS110 power plug and tries to either send it to a local or a remote Influxdb database.
import sys
import time
import socket
import json
import threading
import requests
import argparse
from struct import pack
# needed to disable ssl warnings
import urllib3
urllib3.disable_warnings()
# needs to be installed (pip3 install influxdb)
from influxdb import InfluxDBClient
HOST = 'localhost'
PORT = '8086'
USER = 'dose'
PASSWORD = 'xxx'
DATABASE = 'dosedata'
PLUGIP = "192.168.94.2"
dbClient = InfluxDBClient(host=HOST, port=PORT, username=USER, password=PASSWORD, database=DATABASE, ssl=True)
# https://stackoverflow.com/questions/21017698/converting-int-to-bytes-in-python-3
def int_to_bytes(x):
return x.to_bytes((x.bit_length() + 7) // 8, 'big')
# Based on: https://github.com/softScheck/tplink-smartplug/blob/master/tplink-smartplug.py
def encrypt(string):
key = 171
result = pack('>I', len(string))
for i in string:
a = key ^ i
key = a
result += int_to_bytes(a)
return result
def decrypt(string):
key = 171
result = ""
for i in string:
a = key ^ i
key = i
result += chr(a)
return result
def send_hs_command(address, port, cmd):
data = b""
tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
tcp_sock.connect((address, port))
tcp_sock.send(encrypt(cmd))
data = tcp_sock.recv(2048)
except socket.error:
print("Socket closed.", file=sys.stderr)
finally:
tcp_sock.close()
return data
def store_metrics(current_ma, voltage_mv, power_mw, total_wh):
submitData = [
{"measurement":"PowerData",
"fields":
{
"current_ma":current_ma,
"voltage_mv":voltage_mv,
"power_mw":power_mw
}
},
{"measurement":"PowerUsage",
"fields":
{
"total_wh":total_wh
}
}
]
dbClient.write_points(submitData)
def run():
threading.Timer(10.0, run).start()
data = send_hs_command(PLUGIP, 9999, b'{"emeter":{"get_realtime":{}}}')
if not data:
print("No data returned on power request.", file=sys.stderr)
store_metrics(0, 0, 0)
return
decrypted_data = decrypt(data[4:])
json_data = json.loads(decrypted_data)
emeter = json_data["emeter"]["get_realtime"]
if not emeter:
print("No emeter data returned on power request.", file=sys.stderr)
store_metrics(0, 0, 0)
return
store_metrics(emeter["current_ma"], emeter["voltage_mv"], emeter["power_mw"], emeter["total_wh"])
print("Stored values, current_ma: {0}, voltage_mv: {1}, power_mw: {2}, total_wh: {3}".format(
emeter["current_ma"], emeter["voltage_mv"], emeter["power_mw"], emeter["total_wh"]))
run()
import sys
import time
import socket
import json
import threading
import requests
import argparse
from struct import pack
# needed to disable ssl warnings
import urllib3
urllib3.disable_warnings()
## needs to be installed (pip install 'influxdb-client')
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
HOST = 'localhost'
PLUGIP = "192.168.94.2"
BUCKET = 'dosedata'
TOKEN = 't0ken'
ORG = 'org'
client = InfluxDBClient(url=HOST, token=TOKEN, org=ORG)
write_api = client.write_api(write_options=SYNCHRONOUS)
# https://stackoverflow.com/questions/21017698/converting-int-to-bytes-in-python-3
def int_to_bytes(x):
return x.to_bytes((x.bit_length() + 7) // 8, 'big')
# Based on: https://github.com/softScheck/tplink-smartplug/blob/master/tplink-smartplug.py
def encrypt(string):
key = 171
result = pack('>I', len(string))
for i in string:
a = key ^ i
key = a
result += int_to_bytes(a)
return result
def decrypt(string):
key = 171
result = ""
for i in string:
a = key ^ i
key = i
result += chr(a)
return result
def send_hs_command(address, port, cmd):
data = b""
tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
tcp_sock.connect((address, port))
tcp_sock.send(encrypt(cmd))
data = tcp_sock.recv(2048)
except socket.error:
print("Socket closed.", file=sys.stderr)
finally:
tcp_sock.close()
return data
def store_metrics(current_ma, voltage_mv, power_mw, total_wh):
# current_time = time.time()
submitData = [
{"measurement":"PowerData",
"fields":
{
"current_ma":current_ma,
"voltage_mv":voltage_mv,
"power_mw":power_mw
}
},
{"measurement":"PowerUsage",
"fields":
{
"total_wh":total_wh
}
}
]
write_api.write(bucket=BUCKET, record=submitData)
def run():
threading.Timer(10.0, run).start()
data = send_hs_command(PLUGIP, 9999, b'{"emeter":{"get_realtime":{}}}')
if not data:
print("No data returned on power request.", file=sys.stderr)
store_metrics(0, 0, 0)
return
decrypted_data = decrypt(data[4:])
json_data = json.loads(decrypted_data)
emeter = json_data["emeter"]["get_realtime"]
if not emeter:
print("No emeter data returned on power request.", file=sys.stderr)
store_metrics(0, 0, 0)
return
store_metrics(emeter["current_ma"], emeter["voltage_mv"], emeter["power_mw"], emeter["total_wh"])
#print("Stored values, current_ma: {0}, voltage_mv: {1}, power_mw: {2}, total_wh: {3}".format(emeter["current_ma"], emeter["voltage_mv"], emeter["power_mw"], emeter["total_wh"]))
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment