Skip to content

Instantly share code, notes, and snippets.

@quarxpl
Created March 2, 2019 00:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save quarxpl/9c25ee8a45e9d00a6145562090cfac51 to your computer and use it in GitHub Desktop.
Save quarxpl/9c25ee8a45e9d00a6145562090cfac51 to your computer and use it in GitHub Desktop.
sonoff-mqtt.py
#!/usr/bin/python
#sudo apt install libffi-dev
#pip install websocket-client
#pip install requests
#pip install paho-mqtt
import hmac, hashlib, base64
import json
import time
import random
import requests
import pprint
import websocket
import logging
import re
import signal
import sys
from thread import start_new_thread, allocate_lock
import paho.mqtt.client as mqtt
import paho.mqtt.publish as publish
class Sonoff():
def __init__(self):
self.opt = json.loads(open("/home/pi/HomeAutomation/config/ewelink.json", "r").read())
self.devices = {}
self.friendly_names = {}
self.lock = allocate_lock()
self.ws = None
self.broker = None
def ewelink_login(self):
logging.info("Attempting to login to the Ewelink server")
app_details = {
'password' : self.opt['password'],
'version' : '6',
'ts' : int(time.time()),
'nonce' : ''.join([str(random.randint(0, 9)) for i in range(15)]),
'appid' : 'oeVkj2lYFGnJu5XUtWisfW4utiN4u9Mq',
'imei' : self.opt['imei'], #str(uuid.uuid4())
'os' : 'iOS',
'model' : 'iPhone10,6',
'romVersion': '11.1.2',
'appVersion': '3.5.3',
'email' : self.opt['login']
}
sig = base64.b64encode(hmac.new(b'6Nz4n0xA8s8qdxQf2GqurZj2Fs55FUvM', str.encode(json.dumps(app_details)), digestmod = hashlib.sha256).digest()).decode()
self.headers = {
'Authorization' : 'Sign ' + sig,
'Content-Type' : 'application/json;charset=UTF-8'
}
r = requests.post('https://%s-api.coolkit.cc:8080/api/user/login' % self.opt['region'], headers = self.headers, json = app_details)
res = r.json()
if 'error' in res:
logging.error("/api/user/login returned an error")
raise
self.bearer = res['at']
self.user_apikey = res['user']['apikey']
self.headers.update({'Authorization' : 'Bearer ' + self.bearer})
if not ('host' in self.opt):
r = requests.post('https://%s-disp.coolkit.cc:8080/dispatch/app' % self.opt['region'], headers = self.headers)
resp = r.json()
if 'error' in resp and resp['error'] == 0 and 'domain' in resp:
opt['host'] = resp['domain']
def ewelink_connect(self):
logging.info("Trying to connect to the Ewelink websocket")
self.ws = websocket.WebSocketApp('wss://%s:8080/api/ws' % self.opt['host'], on_message = self.on_ewelink_update, on_close = self.on_ewelink_close, on_error = self.on_ewelink_error, on_open = self.on_ewelink_open)
#websocket.enableTrace(True)
def on_ewelink_update(self, message):
logging.info("Incoming Ewelink message: " + message)
message = json.loads(message)
self.lock.acquire()
if "action" in message and message["action"] == "update":
_id = message["deviceid"]
if _id in self.devices:
if 'params' in message and 'switches' in message['params']:
self.devices[_id]['params']['switches'] = message['params']['switches']
self.broker.publish("sonoff/" + _id + "_0/stat", "1" if self.get_state(_id, 0) else "0", retain = True)
self.broker.publish("sonoff/" + _id + "_1/stat", "1" if self.get_state(_id, 1) else "0", retain = True)
if 'params' in message and 'switch' in message['params']:
self.devices[_id]['params']['switch'] = message['params']['switch']
self.broker.publish("sonoff/" + _id + "/stat", "1" if self.get_state(_id) else "0", retain = True)
self.mqtt_publish_devices()
self.lock.release()
def on_ewelink_error(self, error):
if error != None:
logging.error(str(error))
def on_ewelink_close(self, *args):
self.ws = None
logging.error("Connection closed. We will try to reconnect in a few seconds")
raise
def on_ewelink_open(self, *args):
payload = {
'action' : "userOnline",
'userAgent' : 'app',
'version' : 6,
'nonce' : ''.join([str(random.randint(0, 9)) for i in range(15)]),
'apkVesrion': "1.8",
'os' : 'ios',
'at' : self.bearer,
'apikey' : self.user_apikey,
'ts' : str(int(time.time())),
'model' : 'iPhone10,6',
'romVersion': '11.1.2',
'sequence' : str(time.time()).replace('.','')
}
logging.info("Sending a userOnline message")
self.ws.send(json.dumps(payload))
logging.info("Retrieving the list of devices")
self.update_devices()
def mqtt_connect(self):
self.broker = mqtt.Client(clean_session = True)
self.broker.on_connect = self.mqtt_on_connect
self.broker.on_message = self.mqtt_on_message
self.broker.connect("127.0.0.1", 1883, 60)
self.broker.loop_start()
def mqtt_on_connect(self, client, userdata, flags, rc):
logging.info("Successfully connected to the MQTT broker.")
def mqtt_on_message(self, client, userdata, msg):
if self.ws == None:
return
message = str(msg.payload)
logging.info("Incoming MQTT message: " + msg.topic + " | " + message)
p = re.compile(r"sonoff\/(.+?)(_(\d))?\/power")
res = re.search(p, msg.topic)
if res != None:
device = res.group(1).strip()
try:
outlet = int(res.group(3))
except:
outlet = None
self.set_state(device, True if message == '1' else False, outlet)
def mqtt_subscribe(self):
logging.warning("Subscribing to MQTT topics")
self.broker.subscribe("sonoff/+/power")
for _id in self.devices:
if '2C' in self.devices[_id]['productModel']:
self.broker.publish("sonoff/" + _id + "_0/stat", "1" if self.get_state(_id, 0) else "0", retain = True)
self.broker.publish("sonoff/" + _id + "_1/stat", "1" if self.get_state(_id, 1) else "0", retain = True)
else:
self.broker.publish("sonoff/" + _id + "/stat", "1" if self.get_state(_id) else "0", retain = True)
def mqtt_publish_devices(self):
self.broker.publish("sonoff/devices/debug", pprint.pformat(self.devices), retain = True)
_devices = []
for i in self.devices:
state = ""
if "switch" in self.devices[i]["params"]:
state = self.devices[i]["params"]["switch"]
elif "switches" in self.devices[i]["params"]:
state = self.devices[i]["params"]["switches"]
_devices.append({"id": self.devices[i]["deviceid"], "name": self.devices[i]["name"], "state": state})
self.broker.publish("sonoff/devices", json.dumps(_devices), retain = True)
def get_device(self, device):
if isinstance(device, (str)):
if device.lower() in self.friendly_names:
device = self.friendly_names[device.lower()]
if not (device in self.devices):
return None
return self.devices[device]
def get_state(self, _device, outlet = 0):
if self.ws == None:
return None
device = self.get_device(_device)
if device == None:
logging.warning("Could not find the device: " + _device)
return None
if 'switches' in device['params']:
for item in device['params']['switches']:
if outlet == int(item['outlet']):
return True if item['switch'] == 'on' else False
if 'switch' in device['params']:
return True if device['params']['switch'] == 'on' else False
def set_state(self, _device, state, outlet = None):
if self.ws == None:
return
device = self.get_device(_device)
if device == None:
logging.warning("Could not find the device: " + _device)
return
self.lock.acquire()
update_params = {}
control_type = 5
if 'switches' in device['params']:
control_type = 7
apikey = device['apikey']
update_params['controlType'] = control_type
if control_type == 7:
for item in device['params']['switches']:
if outlet == None or outlet == int(item['outlet']):
item['switch'] = 'on' if state else 'off'
update_params['switches'] = device['params']['switches']
if control_type == 5:
device['params']['switch'] = 'on' if state else 'off'
update_params['switch'] = device['params']['switch']
logging.info("Attempting to change the device's state to: " + str(update_params))
payload = {
'action' : 'update',
'userAgent' : 'app',
'params' : update_params,
'apikey' : apikey,
'deviceid' : device['deviceid'],
'sequence' : str(time.time()).replace('.',''),
'controlType' : control_type,
'ts' : 0
}
self.ws.send(json.dumps(payload))
self.mqtt_publish_devices()
self.lock.release()
def update_devices(self):
if self.ws == None:
return
r = requests.get('https://%s-api.coolkit.cc:8080/api/user/device' % self.opt['region'], headers = self.headers)
_devices = r.json()
if not ('error' in _devices):
self.devices = {}
for entry in _devices:
self.devices[entry['deviceid']] = entry
self.friendly_names[entry['name'].lower()] = entry['deviceid']
self.mqtt_publish_devices()
else:
logging.error("Could not receive the list of devices with /api/user/device")
raise
def run(self):
self.mqtt_connect()
self.ewelink_login()
self.ewelink_connect()
self.mqtt_subscribe()
if self.ws != None:
try:
self.ws.run_forever(ping_interval = 145)
finally:
self.ws.close()
self.ws = None
else:
time.sleep(60)
self.ewelink_connect()
if __name__ == "__main__":
logging.basicConfig(level = logging.DEBUG, filename = "/var/log/ha/ewelink_mqtt.log", filemode = "a+", format = "%(asctime)-15s %(levelname)-8s %(message)s")
logging.getLogger().addHandler(logging.StreamHandler())
signal.signal(signal.SIGINT, lambda sig,frame : sys.exit(0))
while True:
try:
Sonoff().run()
except (SystemExit, KeyboardInterrupt):
logging.warning("Terminating the process...")
break
except Exception as err:
logging.error("Restarting due to the error: " + str(err))
time.sleep(60)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment