Skip to content

Instantly share code, notes, and snippets.

@luigidr
Created January 13, 2015 08:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save luigidr/151985965d177bdadaf8 to your computer and use it in GitHub Desktop.
Save luigidr/151985965d177bdadaf8 to your computer and use it in GitHub Desktop.
WebSocket utility for Dog 3.x using ws4py
#!/usr/bin/python
'''
Created on Jan 12, 2015
@author: derussis
'''
from ws4py.client.threadedclient import WebSocketClient
import json, getopt, sys
class DogClient(WebSocketClient):
# what happens after a successfully connection
def opened(self):
print "Successfully connected to Dog via WebSocket!"
# what happens when the server close the connection
def closed(self, code, reason=None):
print "Closed down", code, reason
# handle the WebSocket communication with Dog: it asks for device status
# and subscribe to all the device notifications
def received_message(self, m):
global clientId
message = json.loads(m.data.decode('utf-8'))
messageType = message['messageType']
if(messageType != None):
if(messageType == 'presentation'):
clientId = message['clientId']
self.getStatus()
self.subscribeToAllNotifications()
elif(messageType == 'response'):
# print all the device states, only
if('devicesStatus' in message['response'].keys()):
response = message['response']['devicesStatus']
print json.dumps(response, sort_keys=True, indent=3, separators=(',', ': '))
for deviceState in response:
if('id' in deviceState.keys()):
print deviceState['id'] + ' is in the states:'
print json.dumps(deviceState['status'], sort_keys=True, indent=3, separators=(',', ': '))
elif(messageType == 'notification'):
# print the incoming notifications
print 'Incoming notification:'
print json.dumps(message['notification'], sort_keys=True, indent=3, separators=(',', ': '))
# send the status request for all devices
def getStatus(self):
body = '{ "clientId":"%s", "sequenceNumber":"1", "messageType":"request", "action":"GET", "endPoint":"/api/v1/devices/status" }'%clientId
self.send(body)
# subscribe to all device notifications
def subscribeToAllNotifications(self):
body = '{"clientId":"%s", "sequenceNumber":"2", "messageType":"request", "action":"POST", "endPoint":"/api/v1/subscriptions/devices/notifications", "parameters":"" }'%clientId
self.send(body)
def usage():
print "WebSocket tester for Dog 3.x"
print "Usage: ws-py -h websocket_server -p port -t prefix"
def main():
websocket_server = 'localhost'
websocket_port = '8080'
websocket_prefix = 'ws'
try:
opts, args = getopt.getopt(sys.argv[1:], "h:p:pr:", ["server=", "port=", "prefix="])
except getopt.GetoptError as err:
# print help information and exit:
print str(err) # will print something like "option -a not recognized"
usage()
sys.exit(2)
# handle options
for o, a in opts:
if o in ("-h", "--server"):
websocket_server = a
elif o in ("-p", "--port"):
websocket_port = a
elif o in ("-pr", "--prefix"):
websocket_prefix = a
else:
assert False, "unhandled option"
# try to open the WebSocket connection with Dog
try:
ws = DogClient('ws://'+websocket_server+':'+websocket_port+'/'+websocket_prefix)
ws.connect()
ws.run_forever()
except KeyboardInterrupt:
ws.close()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment