Skip to content

Instantly share code, notes, and snippets.

@JoelBender
Created August 24, 2021 04:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JoelBender/adbd9da33db8d5c1d50d34db83457aa5 to your computer and use it in GitHub Desktop.
Save JoelBender/adbd9da33db8d5c1d50d34db83457aa5 to your computer and use it in GitHub Desktop.
JSON Writable Configuration File
{
"avar": 74.1,
"bvar": "inactive"
}
"""
This sample application is a BACnet device that has an AnalogValueObject
and BinaryValueObject in a JSON configuration file that can be changed by
a BACnet client.
"""
import os
import json
from pathlib import Path
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.core import run
from bacpypes.errors import ExecutionError
from bacpypes.primitivedata import Real
from bacpypes.basetypes import BinaryPV
from bacpypes.object import (
Property,
AnalogValueObject,
BinaryValueObject,
register_object_type,
)
from bacpypes.app import BIPSimpleApplication
from bacpypes.local.device import LocalDeviceObject
from bacpypes.local.file import LocalStreamAccessFileObject
from bacpypes.service.file import FileServices
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# configuration
JSON_CONFIG_FILE = os.getenv("JSON_CONFIG_FILE", Path(__file__).stem + ".json")
# globals
json_config = {}
@bacpypes_debugging
class JSONStreamFile(LocalStreamAccessFileObject):
def __init__(self, **kwargs):
""" Initialize a stream accessed file object. """
if _debug:
JSONStreamFile._debug(
"__init__ %r",
kwargs,
)
LocalStreamAccessFileObject.__init__(self, **kwargs)
global json_config
# load in the file
with open(JSON_CONFIG_FILE, "rb") as config_file:
self._file_data = config_file.read()
if _debug:
JSONStreamFile._debug(
" - %d octets",
len(self._file_data),
)
# interpret the JSON blob
json_config = json.loads(self._file_data)
if _debug:
JSONStreamFile._debug(" - json_config: %r", json_config)
def __len__(self):
""" Return the number of octets in the file. """
if _debug:
JSONStreamFile._debug("__len__")
return len(self._file_data)
def read_stream(self, start_position, octet_count):
""" Read a chunk of data out of the file. """
if _debug:
JSONStreamFile._debug(
"read_stream %r %r",
start_position,
octet_count,
)
# end of file is true if last record is returned
end_of_file = (start_position + octet_count) >= len(self._file_data)
return (
end_of_file,
self._file_data[start_position : start_position + octet_count],
)
def write_stream(self, start_position, data):
""" Write a number of octets, starting at a specific offset. """
if _debug:
JSONStreamFile._debug(
"write_stream %r %r",
start_position,
data,
)
global json_config
# work on a copy of the file data
file_data = self._file_data
# check for append
if start_position < 0:
start_position = len(file_data)
file_data += data
# check to extend the file out to start_record records
elif start_position > len(file_data):
file_data += "\0" * (start_position - len(file_data))
start_position = len(file_data)
file_data += data
# no slice assignment, strings are immutable
else:
data_len = len(data)
prechunk = file_data[:start_position]
postchunk = file_data[start_position + data_len :]
file_data = prechunk + data + postchunk
if _debug:
JSONStreamFile._debug(" - file_data: %r", file_data)
# interpret the JSON blob
try:
json_config = json.loads(file_data)
if _debug:
JSONStreamFile._debug(" - json_config: %r", json_config)
except json.decoder.JSONDecodeError as err:
if _debug:
JSONStreamFile._debug(" - err: %r", err)
raise RuntimeError("invalid JSON format")
# good to go
self._file_data = file_data
# save the updated contents
with open(JSON_CONFIG_FILE, "wb") as config_file:
config_file.write(self._file_data)
# return where the 'writing' actually started
return start_position
@bacpypes_debugging
class JSONValueProperty(Property):
def __init__(self, datatype):
if _debug:
JSONValueProperty._debug("__init__ %r", datatype)
Property.__init__(
self, "presentValue", datatype, default=0.0, optional=True, mutable=False
)
def ReadProperty(self, obj, arrayIndex=None):
if _debug:
JSONValueProperty._debug("ReadProperty %r arrayIndex=%r", obj, arrayIndex)
global json_config
# access an array
if arrayIndex is not None:
raise ExecutionError(
errorClass="property", errorCode="propertyIsNotAnArray"
)
# return the contents of the configuration for the object
value = json_config[obj.objectName]
if _debug:
JSONValueProperty._debug(" - value: %r", value)
return value
def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False):
if _debug:
JSONValueProperty._debug(
"WriteProperty %r %r arrayIndex=%r priority=%r direct=%r",
obj,
value,
arrayIndex,
priority,
direct,
)
raise ExecutionError(errorClass="property", errorCode="writeAccessDenied")
@bacpypes_debugging
@register_object_type
class JSONAnalogValueObject(AnalogValueObject):
properties = [
JSONValueProperty(Real),
]
def __init__(self, **kwargs):
if _debug:
JSONAnalogValueObject._debug("__init__ %r", kwargs)
AnalogValueObject.__init__(self, **kwargs)
@bacpypes_debugging
@register_object_type
class JSONBinaryValueObject(BinaryValueObject):
properties = [
JSONValueProperty(BinaryPV),
]
def __init__(self, **kwargs):
if _debug:
JSONBinaryValueObject._debug("__init__ %r", kwargs)
BinaryValueObject.__init__(self, **kwargs)
#
# __main__
#
def main():
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug:
_log.debug("initialization")
if _debug:
_log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(ini=args.ini)
if _debug:
_log.debug(" - this_device: %r", this_device)
# make a sample application
this_application = BIPSimpleApplication(this_device, args.ini.address)
# add the capability to server file content
this_application.add_capability(FileServices)
# make a JSON stream access file, add to the device
json_config_file = JSONStreamFile(
objectIdentifier=("file", 1), objectName="JSON Config"
)
_log.debug(" - json_config_file: %r", json_config_file)
this_application.add_object(json_config_file)
javo = JSONAnalogValueObject(
objectIdentifier=("analogValue", 1),
objectName="avar",
)
_log.debug(" - javo: %r", javo)
this_application.add_object(javo)
jbvo = JSONBinaryValueObject(
objectIdentifier=("binaryValue", 1),
objectName="bvar",
)
_log.debug(" - jbvo: %r", jbvo)
this_application.add_object(jbvo)
_log.debug("running")
run()
_log.debug("fini")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment