Skip to content

Instantly share code, notes, and snippets.

@JoelBender
Created September 10, 2019 13:43
Show Gist options
  • Save JoelBender/c945660e70b0ec2af02786928cf31a20 to your computer and use it in GitHub Desktop.
Save JoelBender/c945660e70b0ec2af02786928cf31a20 to your computer and use it in GitHub Desktop.
Who-Is and Device Information
#!/usr/bin/env python
"""
This application generates a Who-Is request at startup and collects the I-Am
responses into a list, then reads various properties of the device objects,
then outputs the content as a JSON document.
"""
import sys
import json
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.core import run, stop, deferred
from bacpypes.task import FunctionTask
from bacpypes.iocb import IOCB
from bacpypes.pdu import Address, GlobalBroadcast
from bacpypes.apdu import WhoIsRequest, ReadPropertyRequest
from bacpypes.primitivedata import Unsigned
from bacpypes.constructeddata import Array
from bacpypes.errors import ParameterOutOfRange
from bacpypes.object import get_datatype
from bacpypes.app import BIPSimpleApplication
from bacpypes.local.device import LocalDeviceObject
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# settings
WHOIS_TIMEOUT = 3.0
# globals
args = None
this_device = None
this_application = None
device_info = {}
@bacpypes_debugging
class SampleApplication(BIPSimpleApplication):
def __init__(self, *args):
if _debug:
SampleApplication._debug("__init__ %r", args)
BIPSimpleApplication.__init__(self, *args)
# keep track of request to line up responses
self._who_is_request = None
# other things to read
self.point_queue = []
def who_is(self):
if _debug:
SampleApplication._debug("who_is")
global args
# check for an existing request
if self._who_is_request:
raise RuntimeError("one request at a time")
# build a request
who_is = WhoIsRequest(
deviceInstanceRangeLowLimit=args.lolimit,
deviceInstanceRangeHighLimit=args.hilimit,
destination=args.address,
)
if _debug:
SampleApplication._debug(" - who_is: %r", who_is)
# save the request to line up responses
self._who_is_request = who_is
# set a timeout
timeout_task = FunctionTask(self.next_request)
timeout_task.install_task(delta=args.timeout)
# call it soon
self.request(who_is)
def do_IAmRequest(self, apdu):
"""Process an I-Am request."""
if _debug:
SampleApplication._debug("do_IAmRequest %r", apdu)
global device_info
# no request issued yet, or too late
if not self._who_is_request:
return
# extract the device instance number
device_instance = apdu.iAmDeviceIdentifier[1]
if _debug:
SampleApplication._debug(" - device_instance: %r", device_instance)
# extract the source address
device_address = apdu.pduSource
if _debug:
SampleApplication._debug(" - device_address: %r", device_address)
# check to see if the application is waiting for this one
if self._who_is_request.deviceInstanceRangeLowLimit and (
device_instance < self._who_is_request.deviceInstanceRangeLowLimit
):
pass
elif self._who_is_request.deviceInstanceRangeHighLimit and (
device_instance > self._who_is_request.deviceInstanceRangeHighLimit
):
pass
else:
# save the device information
device_info[device_instance] = {
"_device_instance": device_instance,
"maxAPDULengthAccepted": apdu.maxAPDULengthAccepted,
"segmentationSupported": apdu.segmentationSupported,
"vendorID": apdu.vendorID,
}
# queue up some reads
self.point_queue.append(
(device_address, apdu.iAmDeviceIdentifier, "objectName")
)
self.point_queue.append(
(device_address, apdu.iAmDeviceIdentifier, "vendorName")
)
self.point_queue.append(
(device_address, apdu.iAmDeviceIdentifier, "modelName")
)
self.point_queue.append(
(device_address, apdu.iAmDeviceIdentifier, "firmwareRevision")
)
def next_request(self):
if _debug:
SampleApplication._debug("next_request")
# check to see if we're done
if not self.point_queue:
if _debug:
SampleApplication._debug(" - done")
stop()
return
# get the next request
addr, obj_id, prop_id = self.point_queue.pop(0)
# build a request
request = ReadPropertyRequest(
destination=addr, objectIdentifier=obj_id, propertyIdentifier=prop_id
)
if _debug:
SampleApplication._debug(" - request: %r", request)
# make an IOCB
iocb = IOCB(request)
# set a callback for the response
iocb.add_callback(self.complete_request)
if _debug:
SampleApplication._debug(" - iocb: %r", iocb)
# send the request
this_application.request_io(iocb)
def complete_request(self, iocb):
if _debug:
SampleApplication._debug("complete_request %r", iocb)
if iocb.ioResponse:
apdu = iocb.ioResponse
# extract the device instance number
device_instance = apdu.objectIdentifier[1]
if _debug:
SampleApplication._debug(" - device_instance: %r", device_instance)
# find the datatype
datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier)
if _debug:
SampleApplication._debug(" - datatype: %r", datatype)
if not datatype:
raise TypeError("unknown datatype")
# special case for array parts, others are managed by cast_out
if issubclass(datatype, Array) and (apdu.propertyArrayIndex is not None):
if apdu.propertyArrayIndex == 0:
value = apdu.propertyValue.cast_out(Unsigned)
else:
value = apdu.propertyValue.cast_out(datatype.subtype)
else:
value = apdu.propertyValue.cast_out(datatype)
if _debug:
SampleApplication._debug(" - value: %r", value)
# save the value
device_info[device_instance][apdu.propertyIdentifier] = value
if iocb.ioError:
if _debug:
SampleApplication._debug(" - error: %r", iocb.ioError)
sys.stderr.write(
"error reading {} from {}: {}\n".format(
iocb.args[0].propertyIdentifier,
iocb.args[0].objectIdentifier,
iocb.ioError,
)
)
# fire off another request
deferred(self.next_request)
def main():
global args, this_device, this_application
# parse the command line arguments
parser = ConfigArgumentParser(description=__doc__)
parser.add_argument("lolimit", type=int, help="device instance range low limit")
parser.add_argument("hilimit", type=int, help="device instance range high limit")
parser.add_argument(
"address",
nargs="?",
type=Address,
help="destination address",
default=GlobalBroadcast(),
)
parser.add_argument(
"--timeout", type=float, help="who-is timeout", default=WHOIS_TIMEOUT
)
args = parser.parse_args()
if _debug:
_log.debug("initialization")
if _debug:
_log.debug(" - args: %r", args)
# litte error checking
if (args.lolimit < 0) or (args.lolimit > 4194303):
raise ParameterOutOfRange("low limit out of range")
if (args.hilimit < 0) or (args.hilimit > 4194303):
raise ParameterOutOfRange("high limit out of range")
# make a device object
this_device = LocalDeviceObject(ini=args.ini)
if _debug:
_log.debug(" - this_device: %r", this_device)
# make a simple application
this_application = SampleApplication(this_device, args.ini.address)
if _debug:
_log.debug(" - this_application: %r", this_application)
# call this when things are up and running
deferred(this_application.who_is)
if _debug:
_log.debug("running")
run()
if _debug:
_log.debug("fini")
json.dump({str(k): v for k, v in device_info.items()}, sys.stdout)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment