Skip to content

Instantly share code, notes, and snippets.

@justinribeiro
Created March 20, 2014 17:21
Show Gist options
  • Save justinribeiro/9669113 to your computer and use it in GitHub Desktop.
Save justinribeiro/9669113 to your computer and use it in GitHub Desktop.
Simple pywebsocket setup for listening to MQTT broker.
_OPEN_ = 1
_CLOSING_ = 2
_CLOSE_ = 3
_status_ = _CONNECTING_
# broker information for mqtt
_BROKER_URL = "localhost"
_BROKER_PORT = 1883
_TOPIC_BASE = "justin/glass"
class broker():
def __init__(self, url, port, topic, socket):
self.url = url
self.port = port
self.socket = socket
self.topic = topic
self.clientid = ''.join(random.choice(string.ascii_letters + string.digits) for letter in xrange(23))
self.client = mosquitto.Mosquitto(self.clientid)
self.client.on_message = self.onMessage
self.client.on_connect = self.onConnect
self.client.on_subscribe = self.onSubscribe
self.client.on_publish = self.onPublish
self.client.connect(self.url)
self.client.subscribe(_TOPIC_BASE + "/#", 2)
def close(self):
self.client.disconnect()
def onMessage(self, mosq, obj, msg):
if _status_ == _OPEN_:
try:
string = json.dumps({"topic": msg.topic, "message": msg.payload})
self.socket.ws_stream.send_message(string)
except Exception:
#self.socket.ws_stream.send_message("There was a message but I don't know what happened")
return
def onConnect(self, mosq, obj, rc):
if rc == 0:
string = json.dumps({"topic": self.topic + "/status", "message": "Connected to Justin's Glass"})
self.socket.ws_stream.send_message(string)
def onSubscribe(self, mosq, obj, mid, granted_qos):
if _status_ == _OPEN_:
string = json.dumps({"topic": self.topic + "/status", "message": "Subscribed to timer"})
self.socket.ws_stream.send_message(string)
def onPublish(self, mosq, obj, mid):
if _status_ == _OPEN_:
try:
string = json.dumps({"topic": self.topic + "/status", "message": "Message published to broker"})
self.socket.ws_stream.send_message(string)
except Exception:
return
def run(self):
global _status_
#keep web socket connected while mqtt is connected
while self.client.loop() == 0:
#self.socket.ws_stream.send_message(str(_status_))
if _status_ == _OPEN_:
#self.socket.ws_stream.send_message("open")
pass
else:
#self.socket.ws_stream.send_message("disconnecting")
self.client.disconnect()
break
def web_socket_do_extra_handshake(request):
pass # Always accept.
def web_socket_transfer_data(request):
global _status_
_status_ = _OPEN_
instance = broker(_BROKER_URL, _BROKER_PORT, _TOPIC_BASE, request)
arr = ()
talk = thread.start_new_thread(instance.run, arr)
while True:
try:
#######################################################
# Note::
# mesgutil.receive_message() returns 'unicode', so
# if you want to treated as 'string', use encode('utf-8')
#######################################################
line = request.ws_stream.receive_message()
# client is still alive
if line == _HEARTBEAT:
continue
except Exception:
_status_ = _CLOSING_
# wait until _status_ change.
i = 0
while _status_ == _CLOSING_:
time.sleep(0.5)
i += 1
if i > 10:
break
instance.close()
# close connection
return
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment