Skip to content

Instantly share code, notes, and snippets.

@JoelBender
Created June 15, 2022 06:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JoelBender/df2a4d365ee4bf2b842002a1670f1d56 to your computer and use it in GitHub Desktop.
Save JoelBender/df2a4d365ee4bf2b842002a1670f1d56 to your computer and use it in GitHub Desktop.
Put in a "fuzz" layer in an application stack
#!/usr/bin/env python
"""
This application presents a 'console' prompt to the user asking for Who-Is
commands which create the related APDUs and sends them through a "fuzz" layer
for additional processing.
"""
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.consolecmd import ConsoleCmd
from bacpypes.core import run, enable_sleeping
from bacpypes.pdu import Address, GlobalBroadcast
from bacpypes.comm import bind, Client, Server
from bacpypes.app import ApplicationIOController
from bacpypes.appservice import StateMachineAccessPoint, ApplicationServiceAccessPoint
from bacpypes.netservice import NetworkServiceAccessPoint, NetworkServiceElement
from bacpypes.bvllservice import BIPSimple, AnnexJCodec, UDPMultiplexer
from bacpypes.local.device import LocalDeviceObject
from bacpypes.service.device import WhoIsIAmServices
# some debugging
_debug = 1
_log = ModuleLogger(globals())
# globals
this_device = None
this_application = None
@bacpypes_debugging
class Fuzz(Client, Server):
def __init__(self, cid=None, sid=None):
if _debug:
Fuzz._debug("__init__ cid=%r sid=%r", cid, sid)
Client.__init__(self, cid)
Server.__init__(self, sid)
def confirmation(self, *args, **kwargs):
print("Fuzz.confirmation")
for i, arg in enumerate(args):
print(" - args[{:d}]: {!r}".format(i, arg))
if hasattr(arg, "debug_contents"):
arg.debug_contents(2)
for key, value in kwargs.items():
print(" - kwargs[{!r}]: {!r}".format(key, value))
if hasattr(value, "debug_contents"):
value.debug_contents(2)
if self.serverPeer:
self.response(*args, **kwargs)
def indication(self, *args, **kwargs):
print("Fuzz.indication")
for i, arg in enumerate(args):
print(" - args[{:d}]: {!r}".format(i, arg))
if hasattr(arg, "debug_contents"):
arg.debug_contents(2)
for key, value in kwargs.items():
print(" - kwargs[{!r}]: {!r}".format(key, value))
if hasattr(value, "debug_contents"):
value.debug_contents(2)
if self.clientPeer:
self.request(*args, **kwargs)
#
# FuzzApplication
#
@bacpypes_debugging
class FuzzApplication(ApplicationIOController, WhoIsIAmServices):
def __init__(self, localDevice, localAddress, deviceInfoCache=None, aseID=None):
if _debug:
FuzzApplication._debug(
"__init__ %r %r deviceInfoCache=%r aseID=%r",
localDevice,
localAddress,
deviceInfoCache,
aseID,
)
ApplicationIOController.__init__(
self, localDevice, localAddress, deviceInfoCache, aseID=aseID
)
# local address might be useful for subclasses
if isinstance(localAddress, Address):
self.localAddress = localAddress
else:
self.localAddress = Address(localAddress)
# include a application decoder
self.asap = ApplicationServiceAccessPoint()
# pass the device object to the state machine access point so it
# can know if it should support segmentation
self.smap = StateMachineAccessPoint(localDevice)
# the segmentation state machines need access to the same device
# information cache as the application
self.smap.deviceInfoCache = self.deviceInfoCache
# a network service access point will be needed
self.nsap = NetworkServiceAccessPoint()
# give the NSAP a generic network layer service element
self.nse = NetworkServiceElement()
bind(self.nse, self.nsap)
# bind the top layers
bind(self, self.asap, self.smap, self.nsap)
# create a generic BIP stack, bound to the Annex J server
# on the UDP multiplexer
self.bip = BIPSimple()
self.annexj = AnnexJCodec()
# create a fuzzing layer
self.fuzz = Fuzz()
# layer just above reading/writing sockets
self.mux = UDPMultiplexer(self.localAddress)
# bind the bottom layers
bind(self.bip, self.annexj, self.fuzz, self.mux.annexJ)
# bind the BIP stack to the network, no network number
self.nsap.bind(self.bip, address=self.localAddress)
def request(self, apdu):
if _debug:
FuzzApplication._debug("request %r", apdu)
# forward it along
ApplicationIOController.request(self, apdu)
def indication(self, apdu):
if _debug:
FuzzApplication._debug("indication %r", apdu)
# forward it along
ApplicationIOController.indication(self, apdu)
def confirmation(self, apdu):
if _debug:
FuzzApplication._debug("confirmation %r", apdu)
# forward it along
ApplicationIOController.confirmation(self, apdu)
def close_socket(self):
if _debug:
FuzzApplication._debug("close_socket")
# pass to the multiplexer, then down to the sockets
self.mux.close_socket()
#
# WhoIsIAmConsoleCmd
#
@bacpypes_debugging
class WhoIsIAmConsoleCmd(ConsoleCmd):
def do_whois(self, args):
"""whois [ <addr> ] [ <lolimit> <hilimit> ]"""
args = args.split()
if _debug:
WhoIsIAmConsoleCmd._debug("do_whois %r", args)
try:
# gather the parameters
if (len(args) == 1) or (len(args) == 3):
addr = Address(args[0])
del args[0]
else:
addr = GlobalBroadcast()
if len(args) == 2:
lolimit = int(args[0])
hilimit = int(args[1])
else:
lolimit = hilimit = None
# code lives in the device service
this_application.who_is(lolimit, hilimit, addr)
except Exception as error:
WhoIsIAmConsoleCmd._exception("exception: %r", error)
def do_iam(self, args):
"""iam"""
args = args.split()
if _debug:
WhoIsIAmConsoleCmd._debug("do_iam %r", args)
# code lives in the device service
this_application.i_am()
#
# __main__
#
def main():
global this_device
global this_application
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug:
_log.debug("initialization")
if _debug:
_log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(
objectName=args.ini.objectname,
objectIdentifier=int(args.ini.objectidentifier),
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
segmentationSupported=args.ini.segmentationsupported,
vendorIdentifier=int(args.ini.vendoridentifier),
)
# make a simple application
this_application = FuzzApplication(this_device, args.ini.address)
# make a console
this_console = WhoIsIAmConsoleCmd()
if _debug:
_log.debug(" - this_console: %r", this_console)
# enable sleeping will help with threads
enable_sleeping()
_log.debug("running")
run()
_log.debug("fini")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment