Created
May 21, 2021 03:43
-
-
Save JoelBender/2e056fb484268264262a2c9a122e11b8 to your computer and use it in GitHub Desktop.
Periodically generate a Who-Is request
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
This application generates a Who-Is request based on a timer, then lines up | |
the corresponding I-Am for incoming traffic and prints out the contents. | |
""" | |
import sys | |
from bacpypes.debugging import bacpypes_debugging, ModuleLogger | |
from bacpypes.consolelogging import ConfigArgumentParser | |
from bacpypes.core import run, stop, enable_sleeping | |
from bacpypes.task import RecurringTask | |
from bacpypes.pdu import GlobalBroadcast | |
from bacpypes.apdu import WhoIsRequest, IAmRequest | |
from bacpypes.errors import DecodingError | |
from bacpypes.app import BIPSimpleApplication | |
from bacpypes.local.device import LocalDeviceObject | |
###################################################### Bacnet ###################################################### | |
# some debugging | |
_debug = 1 | |
_log = ModuleLogger(globals()) | |
# globals | |
this_device = None | |
this_application = None | |
# | |
# WhoIsIAmApplication | |
# | |
@bacpypes_debugging | |
class WhoIsIAmApplication(BIPSimpleApplication): | |
def __init__(self, *args): | |
if _debug: | |
WhoIsIAmApplication._debug("__init__ %r", args) | |
BIPSimpleApplication.__init__(self, *args) | |
# keep track of requests to line up responses | |
self._request = None | |
def process_io(self, iocb): | |
if _debug: | |
WhoIsIAmApplication._debug("process_io %r", iocb) | |
# save a copy of the request | |
self._request = iocb.args[0] | |
# forward it along | |
BIPSimpleApplication.process_io(self, iocb) | |
def confirmation(self, apdu): | |
if _debug: | |
WhoIsIAmApplication._debug("confirmation %r", apdu) | |
# forward it along | |
BIPSimpleApplication.confirmation(self, apdu) | |
def indication(self, apdu): | |
if _debug: | |
WhoIsIAmApplication._debug("indication %r", apdu) | |
if (isinstance(self._request, WhoIsRequest)) and (isinstance(apdu, IAmRequest)): | |
device_type, device_instance = apdu.iAmDeviceIdentifier | |
if device_type != "device": | |
raise DecodingError("invalid object type") | |
if (self._request.deviceInstanceRangeLowLimit is not None) and ( | |
device_instance < self._request.deviceInstanceRangeLowLimit | |
): | |
pass | |
elif (self._request.deviceInstanceRangeHighLimit is not None) and ( | |
device_instance > self._request.deviceInstanceRangeHighLimit | |
): | |
pass | |
else: | |
# print out the contents | |
sys.stdout.write("pduSource = " + repr(apdu.pduSource) + "\n") | |
sys.stdout.write( | |
"iAmDeviceIdentifier = " + str(apdu.iAmDeviceIdentifier) + "\n" | |
) | |
sys.stdout.write( | |
"maxAPDULengthAccepted = " + str(apdu.maxAPDULengthAccepted) + "\n" | |
) | |
sys.stdout.write( | |
"segmentationSupported = " + str(apdu.segmentationSupported) + "\n" | |
) | |
sys.stdout.write("vendorID = " + str(apdu.vendorID) + "\n") | |
sys.stdout.flush() | |
# forward it along | |
BIPSimpleApplication.indication(self, apdu) | |
@bacpypes_debugging | |
class WhoIsTask(RecurringTask): | |
def __init__(self, lolimit, hilimit, loop_count, interval): | |
if _debug: | |
WhoIsTask._debug( | |
"__init__ %r %r %r %r", lolimit, hilimit, loop_count, interval | |
) | |
RecurringTask.__init__(self, interval) | |
# save the parameters for when this task is processed | |
self.lolimit = lolimit | |
self.hilimit = hilimit | |
# initialize the counter | |
self.counter = 0 | |
self.loop_count = loop_count | |
# install the task so it is scheduled to run | |
self.install_task() | |
def process_task(self): | |
if _debug: | |
WhoIsTask._debug("process_task (counter = %r)", self.counter) | |
# after the last time this is called, stop | |
if self.counter >= self.loop_count: | |
stop() | |
return | |
# code lives in the device service | |
this_application.who_is(self.lolimit, self.hilimit, GlobalBroadcast()) | |
# bump the counter | |
self.counter += 1 | |
# | |
# __main__ | |
# | |
def main(): | |
global this_device | |
global this_application | |
# parse the command line arguments | |
args = ConfigArgumentParser(description=__doc__).parse_args() | |
if _debug: | |
_log.debug("initialization") | |
if _debug: | |
_log.debug(" - args: %r", args) | |
# make a device object | |
this_device = LocalDeviceObject( | |
objectName=args.ini.objectname, | |
objectIdentifier=int(args.ini.objectidentifier), | |
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), | |
segmentationSupported=args.ini.segmentationsupported, | |
vendorIdentifier=int(args.ini.vendoridentifier), | |
) | |
# make a simple application | |
this_application = WhoIsIAmApplication(this_device, args.ini.address) | |
# make the task, runs three times 3000ms (3s) apart | |
whois_task = WhoIsTask(1000, 1000, 3, 3000) | |
if _debug: | |
_log.debug(" - whois_task: %r", whois_task) | |
# enable sleeping will help with threads | |
enable_sleeping() | |
# let it run | |
run() | |
_log.debug("fini") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment