Created
March 20, 2014 17:21
-
-
Save justinribeiro/9669113 to your computer and use it in GitHub Desktop.
Simple pywebsocket setup for listening to MQTT broker.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
_OPEN_ = 1 | |
_CLOSING_ = 2 | |
_CLOSE_ = 3 | |
_status_ = _CONNECTING_ | |
# broker information for mqtt | |
_BROKER_URL = "localhost" | |
_BROKER_PORT = 1883 | |
_TOPIC_BASE = "justin/glass" | |
class broker(): | |
def __init__(self, url, port, topic, socket): | |
self.url = url | |
self.port = port | |
self.socket = socket | |
self.topic = topic | |
self.clientid = ''.join(random.choice(string.ascii_letters + string.digits) for letter in xrange(23)) | |
self.client = mosquitto.Mosquitto(self.clientid) | |
self.client.on_message = self.onMessage | |
self.client.on_connect = self.onConnect | |
self.client.on_subscribe = self.onSubscribe | |
self.client.on_publish = self.onPublish | |
self.client.connect(self.url) | |
self.client.subscribe(_TOPIC_BASE + "/#", 2) | |
def close(self): | |
self.client.disconnect() | |
def onMessage(self, mosq, obj, msg): | |
if _status_ == _OPEN_: | |
try: | |
string = json.dumps({"topic": msg.topic, "message": msg.payload}) | |
self.socket.ws_stream.send_message(string) | |
except Exception: | |
#self.socket.ws_stream.send_message("There was a message but I don't know what happened") | |
return | |
def onConnect(self, mosq, obj, rc): | |
if rc == 0: | |
string = json.dumps({"topic": self.topic + "/status", "message": "Connected to Justin's Glass"}) | |
self.socket.ws_stream.send_message(string) | |
def onSubscribe(self, mosq, obj, mid, granted_qos): | |
if _status_ == _OPEN_: | |
string = json.dumps({"topic": self.topic + "/status", "message": "Subscribed to timer"}) | |
self.socket.ws_stream.send_message(string) | |
def onPublish(self, mosq, obj, mid): | |
if _status_ == _OPEN_: | |
try: | |
string = json.dumps({"topic": self.topic + "/status", "message": "Message published to broker"}) | |
self.socket.ws_stream.send_message(string) | |
except Exception: | |
return | |
def run(self): | |
global _status_ | |
#keep web socket connected while mqtt is connected | |
while self.client.loop() == 0: | |
#self.socket.ws_stream.send_message(str(_status_)) | |
if _status_ == _OPEN_: | |
#self.socket.ws_stream.send_message("open") | |
pass | |
else: | |
#self.socket.ws_stream.send_message("disconnecting") | |
self.client.disconnect() | |
break | |
def web_socket_do_extra_handshake(request): | |
pass # Always accept. | |
def web_socket_transfer_data(request): | |
global _status_ | |
_status_ = _OPEN_ | |
instance = broker(_BROKER_URL, _BROKER_PORT, _TOPIC_BASE, request) | |
arr = () | |
talk = thread.start_new_thread(instance.run, arr) | |
while True: | |
try: | |
####################################################### | |
# Note:: | |
# mesgutil.receive_message() returns 'unicode', so | |
# if you want to treated as 'string', use encode('utf-8') | |
####################################################### | |
line = request.ws_stream.receive_message() | |
# client is still alive | |
if line == _HEARTBEAT: | |
continue | |
except Exception: | |
_status_ = _CLOSING_ | |
# wait until _status_ change. | |
i = 0 | |
while _status_ == _CLOSING_: | |
time.sleep(0.5) | |
i += 1 | |
if i > 10: | |
break | |
instance.close() | |
# close connection | |
return |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment