Skip to content

Instantly share code, notes, and snippets.

@ChristianTremblay
Created September 26, 2018 02:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ChristianTremblay/4a7905e18267343742f4e708648603d9 to your computer and use it in GitHub Desktop.
Save ChristianTremblay/4a7905e18267343742f4e708648603d9 to your computer and use it in GitHub Desktop.
Code example for MockServer using bacpypes
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.core import run, stop, deferred, enable_sleeping
from bacpypes.iocb import IOCB
from bacpypes.pdu import Address
from bacpypes.primitivedata import Real
from bacpypes.object import get_datatype, AnalogValueObject, Property, register_object_type
from bacpypes.apdu import ReadPropertyRequest
from bacpypes.primitivedata import Unsigned, CharacterString
from bacpypes.constructeddata import Array, ArrayOf
from bacpypes.basetypes import ServicesSupported
from bacpypes.app import BIPSimpleApplication
from bacpypes.local.device import LocalDeviceObject
from bacpypes.service.object import ReadWritePropertyMultipleServices
_debug = False
class RandomValueProperty(Property):
def __init__(self, identifier):
if _debug: RandomValueProperty._debug("__init__ %r", identifier)
Property.__init__(self, identifier, Real, default=0.0, optional=True, mutable=False)
def ReadProperty(self, obj, arrayIndex=None):
if _debug: RandomValueProperty._debug("ReadProperty %r arrayIndex=%r", obj, arrayIndex)
# access an array
if arrayIndex is not None:
raise ExecutionError(errorClass='property', errorCode='propertyIsNotAnArray')
# return a random value
value = random.random() * 100.0
if _debug: RandomValueProperty._debug(" - value: %r", value)
return value
def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False):
if _debug: RandomValueProperty._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", obj, value, arrayIndex, priority, direct)
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
class RandomAnalogValueObject(AnalogValueObject):
properties = [
RandomValueProperty('present-value'),
]
register_object_type(RandomAnalogValueObject)
@bacpypes_debugging
class MockApplication(BIPSimpleApplication, ReadWritePropertyMultipleServices):
pass
def main():
global this_device, this_application
_debug = False
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
print(args.ini.address)
if _debug:
_log.debug("initialization")
if _debug:
_log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(
objectName='mock',
objectIdentifier=123456,
maxApduLengthAccepted=1024,
segmentationSupported='segmentedBoth',
vendorIdentifier='0',
vendorName='TEST')
if _debug:
_log.debug(" - this_device: %r", this_device)
# make a simple application
this_application = MockApplication(
this_device, '192.168.210.21/24'
)
if _debug:
_log.debug(" - this_application: %r", this_application)
# make a random input object
ravo1 = RandomAnalogValueObject(
objectIdentifier=('analogValue', 1), objectName='Random1',
eventMessageTexts=ArrayOf(CharacterString)(["hello"]),
presentValue=9,
)
if _debug:
_log.debug(" - ravo1: %r", ravo1)
ravo2 = RandomAnalogValueObject(
objectIdentifier=('analogValue', 2), objectName='Random2',
presentValue=10,
)
if _debug:
_log.debug(" - ravo2: %r", ravo2)
# print(generateScheduleObjects(5))
# add it to the device
this_application.add_object(ravo1)
this_application.add_object(ravo2)
# all_objects = (generateBinaryInputObjects(5)
# + generateAnalogInputObjects(1500, 'degreesCelsius')
# + generateAnalogOutputObjects(5, 'percent')
# + generateBinaryOutputObjects(5)
# + generateBinaryValueObjects(5)
# + generateCalendarObjects(5)
# + generateScheduleObjects(5)
# )
# for obj in all_objects:
# this_application.add_object(obj)
print('------')
print(ServicesSupported.bitNames)
print('------')
print(this_application.localDevice.segmentationSupported)
print(this_application.smap.localDevice.segmentationSupported)
print(this_application.smap.localDevice.maxApduLengthAccepted)
print(this_application.smap.maxApduLengthAccepted)
print(this_application.smap.segmentationSupported)
#print(this_application.localDevice.device_info)
# enable sleeping will help with threads
enable_sleeping()
if _debug:
_log.debug("running")
print('Starting mock controller...')
run()
if _debug:
_log.debug("fini")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment