Skip to content

Instantly share code, notes, and snippets.

@volodymyrsmirnov
Last active August 29, 2015 14:07
Show Gist options
  • Save volodymyrsmirnov/86ca1ca884aae757cf87 to your computer and use it in GitHub Desktop.
Save volodymyrsmirnov/86ca1ca884aae757cf87 to your computer and use it in GitHub Desktop.
from zway import ZWay, ZWayData
import sys
import signal
import binascii
from flask import Flask, jsonify, request, abort
app = Flask(__name__)
class ZWayHandler(ZWay):
def on_device(self, type, node_id, instance_id, command_id):
print "got on_device event", (type, node_id, instance_id, command_id)
zway_instance = ZWayHandler(
"/dev/ttyAMA0",
"/opt/z-way-server/config/",
"/opt/z-way-server/translations/",
"/opt/z-way-server/ZDDX/",
"/tmp/pyzway.log", 0
)
zway_instance.start()
zway_instance.device_add_callback(0x01 | 0x02 | 0x04 | 0x08 | 0x10 | 0x20)
zway_instance.discover()
def check_response(data):
return jsonify({
"success": type(data) is int or type(data) is bool or bool(data),
"data": data
})
@app.route("/")
def home():
return "PyZBerry Testing Application"
@app.route("/inclusion/<int:state>")
def inclusion(state):
return check_response(zway_instance.controller_add_node_to_network(state))
@app.route("/zway/device_guess/<int:device_id>")
def device_guess(device_id):
return check_response(zway_instance.device_guess(device_id))
@app.route("/exclusion/<int:state>")
def exclusion(state):
return check_response(zway_instance.controller_remove_node_from_network(state))
@app.route("/devices")
def devices():
return check_response(zway_instance.devices_list())
@app.route("/instances")
def instances():
return check_response(zway_instance.instances_list(int(request.values.get("device", 0))))
@app.route("/command_classes")
def command_classes():
return check_response(zway_instance.command_classes_list(int(request.values.get("device", 0)), int(request.values.get("instance", 0))))
@app.route("/controller")
def controller():
data = ZWayData.find_controller_data(zway_instance, request.values.get("path", "").encode("ascii", "ignore"))
data_output = [{
"path": data_child.path,
"name": data_child.name,
"type": data_child.type,
"value": binascii.hexlify(data_child.value) if data_child.type == 5 else data_child.value
} for data_child in data.children]
data_output.insert(0, {
"path": data.path,
"name": data.name,
"type": data.type,
"value": data.value
})
return check_response(data_output)
@app.route("/device")
def device():
device_i = request.values.get("device", None)
instance_i = request.values.get("instance", None)
command_class_i = request.values.get("command_class", None)
if not device:
return abort(500)
data = ZWayData.find_device_data(
zway_instance,
request.values.get("path", "").encode("ascii", "ignore"),
int(device_i) if device_i is not None else None,
int(instance_i) if instance_i is not None else None,
int(command_class_i) if command_class_i is not None else None,
)
data_output = [{
"path": data_child.path,
"name": data_child.name,
"type": data_child.type,
"value": binascii.hexlify(data_child.value) if data_child.type == 5 else data_child.value
} for data_child in data.children]
data_output.insert(0, {
"path": data.path,
"name": data.name,
"type": data.type,
"value": data.value
})
return check_response(data_output)
def signal_handler(signal, frame):
print ("now exit")
zway_instance.stop()
zway_instance.terminate()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
if __name__ == '__main__':
app.run(host="0.0.0.0", debug=True, use_reloader=False)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment