Skip to content

Instantly share code, notes, and snippets.

@JoelBender
Created June 26, 2018 18:32
Show Gist options
  • Save JoelBender/a1cadcf8e80488c0c9afb47177a71131 to your computer and use it in GitHub Desktop.
Save JoelBender/a1cadcf8e80488c0c9afb47177a71131 to your computer and use it in GitHub Desktop.
This application creates an application layer state machine which sends a Read Property Request and then quits after the ack is received.
#!/usr/bin/env python
"""
This application creates an application layer state machine which sends
a Read Property Request and then quits after the ack is received.
The state_machine module is almost identical to the one in the tests directory
with the exception that the TrafficLog class has been removed as well as
its reliance on the time_machine module.
"""
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.core import run, stop, deferred
from bacpypes.pdu import Address
from bacpypes.apdu import ReadPropertyRequest, ReadPropertyACK
from bacpypes.app import BIPSimpleApplication
from bacpypes.local.device import LocalDeviceObject
from state_machine import StateMachine
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# globals
this_application = None
#
# ApplicationStateMachine
#
@bacpypes_debugging
class ApplicationStateMachine(BIPSimpleApplication, StateMachine):
def __init__(self, *args, **kwargs):
if _debug: ApplicationStateMachine._debug("__init__ %r %r", args, kwargs)
BIPSimpleApplication.__init__(self, *args, **kwargs)
StateMachine.__init__(self)
def send(self, apdu):
if _debug: ApplicationStateMachine._debug("send %r", apdu)
# send the apdu down the stack
self.request(apdu)
def confirmation(self, apdu):
if _debug: ApplicationStateMachine._debug("confirmation %r", apdu)
# forward the confirmation to the state machine
self.receive(apdu)
# allow the application to process it
super(ApplicationStateMachine, self).confirmation(apdu)
#
# __main__
#
def main():
global this_application
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(ini=args.ini)
if _debug: _log.debug(" - this_device: %r", this_device)
# make a simple application
this_application = ApplicationStateMachine(this_device, args.ini.address)
# send a request to a non-existent device, get it rejected
this_application.start_state \
.send(ReadPropertyRequest(
destination=Address("2001:1"),
objectIdentifier=('notificationClass', 1),
propertyIdentifier='objectName',
)) \
.receive(ReadPropertyACK) \
.call(stop) \
.success()
# run the state machine when the stack is ready
deferred(this_application.run)
_log.debug("running")
run()
_log.debug("fini")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment