Skip to content

Instantly share code, notes, and snippets.

@JoelBender
Created April 14, 2022 03:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JoelBender/e8413dfb466789a7b97dd095e0c2d172 to your computer and use it in GitHub Desktop.
Save JoelBender/e8413dfb466789a7b97dd095e0c2d172 to your computer and use it in GitHub Desktop.
Client-only Application
#!/usr/bin/env python
"""
This is a clone of the ReadProperty.py sample application that does not
support the Who-Is and Read-Property services (which are provided by default)
making this essentially a client-only device.
Note that it is not completely stealthy, it will respond with a Reject if
it is sent a Read-Property request.
"""
import sys
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.consolecmd import ConsoleCmd
from bacpypes.core import run, deferred, enable_sleeping
from bacpypes.comm import bind
from bacpypes.iocb import IOCB
from bacpypes.pdu import Address
from bacpypes.apdu import ReadPropertyRequest, ReadPropertyACK
from bacpypes.primitivedata import Unsigned, ObjectIdentifier
from bacpypes.constructeddata import Array
from bacpypes.app import ApplicationIOController
from bacpypes.appservice import ApplicationServiceAccessPoint, StateMachineAccessPoint
from bacpypes.netservice import NetworkServiceAccessPoint, NetworkServiceElement
from bacpypes.bvllservice import BIPSimple, AnnexJCodec, UDPMultiplexer
from bacpypes.object import get_datatype
from bacpypes.local.device import LocalDeviceObject
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# globals
this_application = None
@bacpypes_debugging
class ReadPropertyConsoleCmd(ConsoleCmd):
def do_read(self, args):
"""read <addr> <objid> <prop> [ <indx> ]"""
args = args.split()
if _debug:
ReadPropertyConsoleCmd._debug("do_read %r", args)
try:
addr, obj_id, prop_id = args[:3]
obj_id = ObjectIdentifier(obj_id).value
if prop_id.isdigit():
prop_id = int(prop_id)
datatype = get_datatype(obj_id[0], prop_id)
if not datatype:
raise ValueError("invalid property for object type")
# build a request
request = ReadPropertyRequest(
objectIdentifier=obj_id, propertyIdentifier=prop_id
)
request.pduDestination = Address(addr)
if len(args) == 4:
request.propertyArrayIndex = int(args[3])
if _debug:
ReadPropertyConsoleCmd._debug(" - request: %r", request)
# make an IOCB
iocb = IOCB(request)
if _debug:
ReadPropertyConsoleCmd._debug(" - iocb: %r", iocb)
# give it to the application
deferred(this_application.request_io, iocb)
# wait for it to complete
iocb.wait()
# do something for error/reject/abort
if iocb.ioError:
sys.stdout.write(str(iocb.ioError) + "\n")
# do something for success
elif iocb.ioResponse:
apdu = iocb.ioResponse
# should be an ack
if not isinstance(apdu, ReadPropertyACK):
if _debug:
ReadPropertyConsoleCmd._debug(" - not an ack")
return
# find the datatype
datatype = get_datatype(
apdu.objectIdentifier[0], apdu.propertyIdentifier
)
if _debug:
ReadPropertyConsoleCmd._debug(" - datatype: %r", datatype)
if not datatype:
raise TypeError("unknown datatype")
# special case for array parts, others are managed by cast_out
if issubclass(datatype, Array) and (
apdu.propertyArrayIndex is not None
):
if apdu.propertyArrayIndex == 0:
value = apdu.propertyValue.cast_out(Unsigned)
else:
value = apdu.propertyValue.cast_out(datatype.subtype)
else:
value = apdu.propertyValue.cast_out(datatype)
if _debug:
ReadPropertyConsoleCmd._debug(" - value: %r", value)
sys.stdout.write(str(value) + "\n")
if hasattr(value, "debug_contents"):
value.debug_contents(file=sys.stdout)
elif isinstance(value, list) and len(value) > 0:
for i, element in enumerate(value):
sys.stdout.write(" [{}] {}\n".format(i, element))
if hasattr(element, "debug_contents"):
element.debug_contents(file=sys.stdout, indent=2)
sys.stdout.flush()
# do something with nothing?
else:
if _debug:
ReadPropertyConsoleCmd._debug(
" - ioError or ioResponse expected"
)
except Exception as error:
ReadPropertyConsoleCmd._exception("exception: %r", error)
def do_rtn(self, args):
"""rtn <addr> <net> ... """
args = args.split()
if _debug:
ReadPropertyConsoleCmd._debug("do_rtn %r", args)
# provide the address and a list of network numbers
router_address = Address(args[0])
network_list = [int(arg) for arg in args[1:]]
# pass along to the service access point
this_application.nsap.update_router_references(
None, router_address, network_list
)
#
# BIPClientApplication
#
@bacpypes_debugging
class BIPClientApplication(ApplicationIOController):
"""
This class is essentially identical to the BIPSimpleApplication except that
it does not inherit the Who-Is and Read-Property services.
"""
def __init__(self, localDevice, localAddress, deviceInfoCache=None, aseID=None):
if _debug:
BIPClientApplication._debug(
"__init__ %r %r deviceInfoCache=%r aseID=%r",
localDevice,
localAddress,
deviceInfoCache,
aseID,
)
ApplicationIOController.__init__(
self, localDevice, localAddress, deviceInfoCache, aseID=aseID
)
# local address might be useful for subclasses
if isinstance(localAddress, Address):
self.localAddress = localAddress
else:
self.localAddress = Address(localAddress)
# include a application decoder
self.asap = ApplicationServiceAccessPoint()
# pass the device object to the state machine access point so it
# can know if it should support segmentation
self.smap = StateMachineAccessPoint(localDevice)
# the segmentation state machines need access to the same device
# information cache as the application
self.smap.deviceInfoCache = self.deviceInfoCache
# a network service access point will be needed
self.nsap = NetworkServiceAccessPoint()
# give the NSAP a generic network layer service element
self.nse = NetworkServiceElement()
bind(self.nse, self.nsap)
# bind the top layers
bind(self, self.asap, self.smap, self.nsap)
# create a generic BIP stack, bound to the Annex J server
# on the UDP multiplexer
self.bip = BIPSimple()
self.annexj = AnnexJCodec()
self.mux = UDPMultiplexer(self.localAddress)
# bind the bottom layers
bind(self.bip, self.annexj, self.mux.annexJ)
# bind the BIP stack to the network, no network number
self.nsap.bind(self.bip, address=self.localAddress)
def close_socket(self):
if _debug:
BIPClientApplication._debug("close_socket")
# pass to the multiplexer, then down to the sockets
self.mux.close_socket()
def main():
global this_application
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug:
_log.debug("initialization")
if _debug:
_log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(ini=args.ini)
if _debug:
_log.debug(" - this_device: %r", this_device)
# make a client application
this_application = BIPClientApplication(this_device, args.ini.address)
# make a console
this_console = ReadPropertyConsoleCmd()
if _debug:
_log.debug(" - this_console: %r", this_console)
# enable sleeping will help with threads
enable_sleeping()
_log.debug("running")
run()
_log.debug("fini")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment