Skip to content

Instantly share code, notes, and snippets.

@yokoyama-flogics
Created August 11, 2023 08:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yokoyama-flogics/adaa6d19c8f8ba1eb1869c8842374cf5 to your computer and use it in GitHub Desktop.
Save yokoyama-flogics/adaa6d19c8f8ba1eb1869c8842374cf5 to your computer and use it in GitHub Desktop.
Obtain power consumption data of a smart plug (Tuya) and put the data to Elasticsearch (also also MQTT)
import json
import time
from datetime import datetime
import paho.mqtt.publish as publish
import schedule
import tinytuya
from elasticsearch import Elasticsearch
ELASTIC_HOSTS = ["node1", "node2"]
ELASTIC_INDEX = "power_meter"
MQTT_HOST = "mqtthost"
MQTT_TOPIC = "elec_power/atsu_aircon"
UNIT_ID = "atsu_aircon"
def get_power_and_elastic():
while True:
try:
d = tinytuya.OutletDevice(
dev_id="xxxxxxxx",
# address="Auto", # Or set to 'Auto' to auto-discover IP address
address="192.168.0.100",
local_key="yyyyyyyy",
version=3.3,
)
break
except Exception as e:
print(e)
print("Retrying...")
time.sleep(1)
# Get Status
status = d.status()
# print("set_status() result %r" % status)
if "dps" not in status:
print(status)
print("status doesn't have 'dps'. Aborted.")
return
if "18" not in status["dps"]:
print(status)
print("status doesn't have 'dps[18]'. Aborted.")
return
es = Elasticsearch(
ELASTIC_HOSTS,
sniff_on_start=True,
sniff_on_connection_fail=True,
sniffer_timeout=60,
)
mqtt_payload = {
"voltage": status["dps"]["20"] / 10,
"current": status["dps"]["18"] / 1000,
"power": status["dps"]["19"] / 10,
}
publish.single(MQTT_TOPIC, json.dumps(mqtt_payload), hostname=MQTT_HOST)
doc = mqtt_payload
doc["datetime"] = datetime.utcnow()
doc["unit_id"] = UNIT_ID
print(doc)
es.index(index=ELASTIC_INDEX, doc_type="_doc", id=None, body=doc)
def main():
# tinytuya.set_debug(toggle=True, color=False)
# get_power_and_elastic()
schedule.every().minute.at(":00").do(get_power_and_elastic)
schedule.every().minute.at(":20").do(get_power_and_elastic)
schedule.every().minute.at(":40").do(get_power_and_elastic)
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment