Skip to content

Instantly share code, notes, and snippets.

@JoelBender
Created May 21, 2021 03:43
Show Gist options
  • Save JoelBender/2e056fb484268264262a2c9a122e11b8 to your computer and use it in GitHub Desktop.
Save JoelBender/2e056fb484268264262a2c9a122e11b8 to your computer and use it in GitHub Desktop.
Periodically generate a Who-Is request
#!/usr/bin/env python
"""
This application generates a Who-Is request based on a timer, then lines up
the corresponding I-Am for incoming traffic and prints out the contents.
"""
import sys
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.core import run, stop, enable_sleeping
from bacpypes.task import RecurringTask
from bacpypes.pdu import GlobalBroadcast
from bacpypes.apdu import WhoIsRequest, IAmRequest
from bacpypes.errors import DecodingError
from bacpypes.app import BIPSimpleApplication
from bacpypes.local.device import LocalDeviceObject
###################################################### Bacnet ######################################################
# some debugging
_debug = 1
_log = ModuleLogger(globals())
# globals
this_device = None
this_application = None
#
# WhoIsIAmApplication
#
@bacpypes_debugging
class WhoIsIAmApplication(BIPSimpleApplication):
def __init__(self, *args):
if _debug:
WhoIsIAmApplication._debug("__init__ %r", args)
BIPSimpleApplication.__init__(self, *args)
# keep track of requests to line up responses
self._request = None
def process_io(self, iocb):
if _debug:
WhoIsIAmApplication._debug("process_io %r", iocb)
# save a copy of the request
self._request = iocb.args[0]
# forward it along
BIPSimpleApplication.process_io(self, iocb)
def confirmation(self, apdu):
if _debug:
WhoIsIAmApplication._debug("confirmation %r", apdu)
# forward it along
BIPSimpleApplication.confirmation(self, apdu)
def indication(self, apdu):
if _debug:
WhoIsIAmApplication._debug("indication %r", apdu)
if (isinstance(self._request, WhoIsRequest)) and (isinstance(apdu, IAmRequest)):
device_type, device_instance = apdu.iAmDeviceIdentifier
if device_type != "device":
raise DecodingError("invalid object type")
if (self._request.deviceInstanceRangeLowLimit is not None) and (
device_instance < self._request.deviceInstanceRangeLowLimit
):
pass
elif (self._request.deviceInstanceRangeHighLimit is not None) and (
device_instance > self._request.deviceInstanceRangeHighLimit
):
pass
else:
# print out the contents
sys.stdout.write("pduSource = " + repr(apdu.pduSource) + "\n")
sys.stdout.write(
"iAmDeviceIdentifier = " + str(apdu.iAmDeviceIdentifier) + "\n"
)
sys.stdout.write(
"maxAPDULengthAccepted = " + str(apdu.maxAPDULengthAccepted) + "\n"
)
sys.stdout.write(
"segmentationSupported = " + str(apdu.segmentationSupported) + "\n"
)
sys.stdout.write("vendorID = " + str(apdu.vendorID) + "\n")
sys.stdout.flush()
# forward it along
BIPSimpleApplication.indication(self, apdu)
@bacpypes_debugging
class WhoIsTask(RecurringTask):
def __init__(self, lolimit, hilimit, loop_count, interval):
if _debug:
WhoIsTask._debug(
"__init__ %r %r %r %r", lolimit, hilimit, loop_count, interval
)
RecurringTask.__init__(self, interval)
# save the parameters for when this task is processed
self.lolimit = lolimit
self.hilimit = hilimit
# initialize the counter
self.counter = 0
self.loop_count = loop_count
# install the task so it is scheduled to run
self.install_task()
def process_task(self):
if _debug:
WhoIsTask._debug("process_task (counter = %r)", self.counter)
# after the last time this is called, stop
if self.counter >= self.loop_count:
stop()
return
# code lives in the device service
this_application.who_is(self.lolimit, self.hilimit, GlobalBroadcast())
# bump the counter
self.counter += 1
#
# __main__
#
def main():
global this_device
global this_application
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug:
_log.debug("initialization")
if _debug:
_log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(
objectName=args.ini.objectname,
objectIdentifier=int(args.ini.objectidentifier),
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
segmentationSupported=args.ini.segmentationsupported,
vendorIdentifier=int(args.ini.vendoridentifier),
)
# make a simple application
this_application = WhoIsIAmApplication(this_device, args.ini.address)
# make the task, runs three times 3000ms (3s) apart
whois_task = WhoIsTask(1000, 1000, 3, 3000)
if _debug:
_log.debug(" - whois_task: %r", whois_task)
# enable sleeping will help with threads
enable_sleeping()
# let it run
run()
_log.debug("fini")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment