Skip to content

Instantly share code, notes, and snippets.

@JoelBender
Created July 18, 2018 03:14
Show Gist options
  • Save JoelBender/03e189d4bdb5dbd95f9190f61e36f97c to your computer and use it in GitHub Desktop.
Save JoelBender/03e189d4bdb5dbd95f9190f61e36f97c to your computer and use it in GitHub Desktop.
This application generates a Who-Is request at startup and collects the I-Am responses into a list for more processing.
#!/usr/bin/env python
"""
This application generates a Who-Is request at startup and collects the I-Am
responses into a list for more processing.
"""
import sys
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.core import run, stop, deferred
from bacpypes.task import FunctionTask
from bacpypes.pdu import Address, GlobalBroadcast
from bacpypes.apdu import WhoIsRequest
from bacpypes.errors import ParameterOutOfRange
from bacpypes.app import BIPSimpleApplication
from bacpypes.local.device import LocalDeviceObject
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# settings
WHOIS_TIMEOUT = 3.0
# globals
args = None
this_device = None
this_application = None
#
# WhoIsMoreApplication
#
@bacpypes_debugging
class WhoIsMoreApplication(BIPSimpleApplication):
def __init__(self, *args):
if _debug: WhoIsMoreApplication._debug("__init__ %r", args)
BIPSimpleApplication.__init__(self, *args)
# keep track of request to line up responses
self._who_is_request = None
self._i_am_responses = []
def who_is(self):
if _debug: WhoIsMoreApplication._debug("who_is")
global args
# check for an existing request
if self._who_is_request:
raise RuntimeError("one request at a time")
# build a request
who_is = WhoIsRequest(
deviceInstanceRangeLowLimit=args.lolimit,
deviceInstanceRangeHighLimit=args.hilimit,
destination=args.address,
)
if _debug: WhoIsMoreApplication._debug(" - who_is: %r", who_is)
# save the request to line up responses, clear out the responses
self._who_is_request = who_is
self._i_am_responses = []
# set a timeout
timeout_task = FunctionTask(self.next_thing_to_do)
timeout_task.install_task(delta=args.timeout)
# call it soon
self.request(who_is)
def do_IAmRequest(self, apdu):
"""Process an I-Am request."""
if _debug: WhoIsMoreApplication._debug("do_IAmRequest %r", apdu)
# no request issued yet, or too late
if not self._who_is_request:
return
# extract the device instance number
device_instance = apdu.iAmDeviceIdentifier[1]
if _debug: WhoIsMoreApplication._debug(" - device_instance: %r", device_instance)
# extract the source address
device_address = apdu.pduSource
if _debug: WhoIsMoreApplication._debug(" - device_address: %r", device_address)
# check to see if the application is waiting for this one
if self._who_is_request.deviceInstanceRangeLowLimit and (device_instance < self._who_is_request.deviceInstanceRangeLowLimit):
pass
elif self._who_is_request.deviceInstanceRangeHighLimit and (device_instance > self._who_is_request.deviceInstanceRangeHighLimit):
pass
else:
self._i_am_responses.append(apdu)
def next_thing_to_do(self):
if _debug: WhoIsMoreApplication._debug("next_thing_to_do")
# clear out the current request
self._who_is_request = None
# do something with the responses
for iam in self._i_am_responses:
print(repr(iam))
iam.debug_contents(file=sys.stdout)
# done for now
stop()
#
# main
#
def main():
global args, this_device, this_application
# parse the command line arguments
parser = ConfigArgumentParser(description=__doc__)
parser.add_argument(
"lolimit", type=int,
help="device instance range low limit",
)
parser.add_argument(
"hilimit", type=int,
help="device instance range high limit",
)
parser.add_argument(
"address", nargs='?', type=Address,
help="destination address",
default=GlobalBroadcast(),
)
parser.add_argument(
"--timeout", type=float,
help="who-is timeout",
default=WHOIS_TIMEOUT,
)
args = parser.parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# litte error checking
if (args.lolimit < 0) or (args.lolimit > 4194303):
raise ParameterOutOfRange("low limit out of range")
if (args.hilimit < 0) or (args.hilimit > 4194303):
raise ParameterOutOfRange("high limit out of range")
# make a device object
this_device = LocalDeviceObject(ini=args.ini)
if _debug: _log.debug(" - this_device: %r", this_device)
# make a simple application
this_application = WhoIsMoreApplication(
this_device, args.ini.address,
)
if _debug: _log.debug(" - this_application: %r", this_application)
# call this when things are up and running
deferred(this_application.who_is)
if _debug: _log.debug("running")
run()
if _debug: _log.debug("fini")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment