Skip to content

Instantly share code, notes, and snippets.

@ChristianTremblay
Last active November 13, 2019 17:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ChristianTremblay/79bb81e40df9358cfeace929b0c28858 to your computer and use it in GitHub Desktop.
Save ChristianTremblay/79bb81e40df9358cfeace929b0c28858 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
"""
Rebuilt Commandable
"""
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.core import run
from bacpypes.task import OneShotTask
from bacpypes.errors import ExecutionError
from bacpypes.primitivedata import BitString, CharacterString, Date, Integer, \
Double, Enumerated, OctetString, Real, Time, Unsigned
from bacpypes.basetypes import BinaryPV, ChannelValue, DateTime, DoorValue, PriorityValue, \
PriorityArray
from bacpypes.object import Property, ReadableProperty, WritableProperty, \
register_object_type, \
AccessDoorObject, AnalogOutputObject, AnalogValueObject, \
BinaryOutputObject, BinaryValueObject, BitStringValueObject, CharacterStringValueObject, \
DateValueObject, DatePatternValueObject, DateTimePatternValueObject, \
DateTimeValueObject, IntegerValueObject, \
LargeAnalogValueObject, LightingOutputObject, MultiStateOutputObject, \
MultiStateValueObject, OctetStringValueObject, PositiveIntegerValueObject, \
TimeValueObject, TimePatternValueObject, ChannelObject
from bacpypes.app import BIPSimpleApplication
from bacpypes.local.object import CurrentPropertyListMixIn
from bacpypes.local.device import LocalDeviceObject
# some debugging
_debug = 0
_log = ModuleLogger(globals())
#
# Commandable
#
class LocalBehaviour(object):
"""
This is a template class that must be implemented by local commandable object
This will provide a mechanism to make changes to the local "thing" controlled.
on_write and feedback should be functions
"""
def __init__(self, on_write=None, feedback=None):
self.on_write = on_write
self.feedback = feedback
@property
def presentValue(self):
try:
return self.feedback()
except Exception:
raise
def write(self,*args,*kwargs):
try:
self.on_write(*args,*kwargs)
except Exception:
raise
@bacpypes_debugging
def Commandable(datatype, presentValue='presentValue', priorityArray='priorityArray', relinquishDefault='relinquishDefault',localBehaviour=None):
if _debug: Commandable._debug("Commandable %r ...", datatype)
class _Commando(object):
intrinsic = localBehaviour
properties = [
WritableProperty(presentValue, datatype),
ReadableProperty(priorityArray, PriorityArray),
ReadableProperty(relinquishDefault, datatype),
]
_pv_choice = None
def __init__(self, **kwargs):
super(_Commando, self).__init__(**kwargs)
# build a default value in case one is needed
default_value = datatype().value
if issubclass(datatype, Enumerated):
default_value = datatype._xlate_table[default_value]
if _debug: Commandable._debug(" - default_value: %r", default_value)
# see if a present value was provided
if (presentValue not in kwargs):
setattr(self, presentValue, default_value)
# see if a priority array was provided
if (priorityArray not in kwargs):
setattr(self, priorityArray, PriorityArray())
# see if a present value was provided
if (relinquishDefault not in kwargs):
setattr(self, relinquishDefault, default_value)
def _highest_priority_value(self):
if _debug: Commandable._debug("_highest_priority_value")
priority_array = getattr(self, priorityArray)
for i in range(1, 17):
priority_value = priority_array[i]
if priority_value.null is None:
if _debug: Commandable._debug(" - found at index: %r", i)
value = getattr(priority_value, _Commando._pv_choice)
value_source = "###"
if issubclass(datatype, Enumerated):
value = datatype._xlate_table[value]
if _debug: Commandable._debug(" - remapped enumeration: %r", value)
break
else:
value = getattr(self, relinquishDefault)
value_source = None
if _debug: Commandable._debug(" - value, value_source: %r, %r", value, value_source)
# return what you found
return value, value_source
def WriteProperty(self, property, value, arrayIndex=None, priority=None, direct=False):
if _debug: Commandable._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", property, value, arrayIndex, priority, direct)
# when writing to the presentValue with a priority
if (property == presentValue):
if _debug: Commandable._debug(" - writing to %s, priority %r", presentValue, priority)
# default (lowest) priority
if priority is None:
priority = 16
if _debug: Commandable._debug(" - translate to priority array, index %d", priority)
# translate to updating the priority array
property = priorityArray
arrayIndex = priority
priority = None
# update the priority array entry
if (property == priorityArray):
if (arrayIndex is None):
if _debug: Commandable._debug(" - writing entire %s", priorityArray)
# pass along the request
super(_Commando, self).WriteProperty(
property, value,
arrayIndex=arrayIndex, priority=priority, direct=direct,
)
else:
if _debug: Commandable._debug(" - writing to %s, array index %d", priorityArray, arrayIndex)
# check the bounds
if arrayIndex == 0:
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
if (arrayIndex < 1) or (arrayIndex > 16):
raise ExecutionError(errorClass='property', errorCode='invalidArrayIndex')
# update the specific priorty value element
priority_value = getattr(self, priorityArray)[arrayIndex]
if _debug: Commandable._debug(" - priority_value: %r", priority_value)
# the null or the choice has to be set, the other clear
if value is ():
if _debug: Commandable._debug(" - write a null")
priority_value.null = value
setattr(priority_value, _Commando._pv_choice, None)
else:
if _debug: Commandable._debug(" - write a value")
if issubclass(datatype, Enumerated):
value = datatype._xlate_table[value]
if _debug: Commandable._debug(" - remapped enumeration: %r", value)
priority_value.null = None
setattr(priority_value, _Commando._pv_choice, value)
# look for the highest priority value
value, value_source = self._highest_priority_value()
# compare with the current value
current_value = getattr(self, presentValue)
if value == current_value:
if _debug: Commandable._debug(" - no present value change")
return
# turn this into a present value change
# process an intrinsic on_write
if not intrinsic:
property = presentValue
else:
intrinsic.on_write(property) # should provide some exception handlng somewhere
# get the real present value
property = intrinsic.feedback()
arrayIndex = priority = None
# allow the request to pass through
if _debug: Commandable._debug(" - super: %r %r arrayIndex=%r priority=%r", property, value, arrayIndex, priority)
super(_Commando, self).WriteProperty(
property, value,
arrayIndex=arrayIndex, priority=priority, direct=direct,
)
# look up a matching priority value choice
for element in PriorityValue.choiceElements:
if issubclass(datatype, element.klass):
_Commando._pv_choice = element.name
break
else:
_Commando._pv_choice = 'constructedValue'
if _debug: Commandable._debug(" - _pv_choice: %r", _Commando._pv_choice)
# return the class
return _Commando
#
# MinOnOffTask
#
@bacpypes_debugging
class MinOnOffTask(OneShotTask):
def __init__(self, binary_obj):
if _debug: MinOnOffTask._debug("__init__ %s", repr(binary_obj))
OneShotTask.__init__(self)
# save a reference to the object
self.binary_obj = binary_obj
# listen for changes to the present value
self.binary_obj._property_monitors['presentValue'].append(self.present_value_change)
def present_value_change(self, old_value, new_value):
if _debug: MinOnOffTask._debug("present_value_change %r %r", old_value, new_value)
# if there's no value change, skip all this
if old_value == new_value:
if _debug: MinOnOffTask._debug(" - no state change")
return
# get the minimum on/off time
if new_value == 'inactive':
task_delay = getattr(self.binary_obj, 'minimumOnTime') or 0
if _debug: MinOnOffTask._debug(" - minimum on: %r", task_delay)
elif new_value == 'active':
task_delay = getattr(self.binary_obj, 'minimumOffTime') or 0
if _debug: MinOnOffTask._debug(" - minimum off: %r", task_delay)
else:
raise ValueError("unrecognized present value for %r: %r" % (self.binary_obj.objectIdentifier, new_value))
# if there's no delay, don't bother
if not task_delay:
if _debug: MinOnOffTask._debug(" - no delay")
return
# set the value at priority 6
self.binary_obj.WriteProperty('presentValue', new_value, priority=6)
# install this to run, if there is a delay
self.install_task(delta=task_delay)
def process_task(self):
if _debug: MinOnOffTask._debug("process_task(%s)", self.binary_obj.objectName)
# clear the value at priority 6
self.binary_obj.WriteProperty('presentValue', (), priority=6)
#
# MinOnOff
#
@bacpypes_debugging
class MinOnOff(object):
def __init__(self, **kwargs):
if _debug: MinOnOff._debug("__init__ ...")
super(MinOnOff, self).__init__(**kwargs)
# create the timer task
self._min_on_off_task = MinOnOffTask(self)
#
# Commandable Standard Objects
#
class AccessDoorObjectCmd(Commandable(DoorValue), AccessDoorObject):
pass
class AnalogOutputObjectCmd(Commandable(Real), AnalogOutputObject):
pass
class AnalogValueObjectCmd(Commandable(Real), AnalogValueObject):
pass
### class BinaryLightingOutputObjectCmd(Commandable(Real), BinaryLightingOutputObject):
### pass
class BinaryOutputObjectCmd(Commandable(BinaryPV), MinOnOff, BinaryOutputObject):
pass
class BinaryValueObjectCmd(Commandable(BinaryPV), MinOnOff, BinaryValueObject):
pass
class BitStringValueObjectCmd(Commandable(BitString), BitStringValueObject):
pass
class CharacterStringValueObjectCmd(Commandable(CharacterString), CharacterStringValueObject):
pass
class DateValueObjectCmd(Commandable(Date), DateValueObject):
pass
class DatePatternValueObjectCmd(Commandable(Date), DatePatternValueObject):
pass
class DateTimeValueObjectCmd(Commandable(DateTime), DateTimeValueObject):
pass
class DateTimePatternValueObjectCmd(Commandable(DateTime), DateTimePatternValueObject):
pass
class IntegerValueObjectCmd(Commandable(Integer), IntegerValueObject):
pass
class LargeAnalogValueObjectCmd(Commandable(Double), LargeAnalogValueObject):
pass
class LightingOutputObjectCmd(Commandable(Real), LightingOutputObject):
pass
class MultiStateOutputObjectCmd(Commandable(Unsigned), MultiStateOutputObject):
pass
class MultiStateValueObjectCmd(Commandable(Unsigned), MultiStateValueObject):
pass
class OctetStringValueObjectCmd(Commandable(OctetString), OctetStringValueObject):
pass
class PositiveIntegerValueObjectCmd(Commandable(Unsigned), PositiveIntegerValueObject):
pass
class TimeValueObjectCmd(Commandable(Time), TimeValueObject):
pass
class TimePatternValueObjectCmd(Commandable(Time), TimePatternValueObject):
pass
#
# ChannelValueProperty
#
class ChannelValueProperty(Property):
def __init__(self):
if _debug: ChannelValueProperty._debug("__init__")
Property.__init__(self, 'presentValue', ChannelValue, default=None, optional=False, mutable=True)
def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False):
if _debug: ChannelValueProperty._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", obj, value, arrayIndex, priority, direct)
### Clause 12.53.5, page 487
raise NotImplementedError()
#
# ChannelObjectCmd
#
class ChannelObjectCmd(ChannelObject):
properties = [
ChannelValueProperty(),
]
##
##
##
##
##
@register_object_type(vendor_id=999)
class LocalAnalogValueObjectCmd(CurrentPropertyListMixIn, AnalogValueObjectCmd):
pass
@register_object_type(vendor_id=999)
class LocalBinaryOutputObjectCmd(CurrentPropertyListMixIn, BinaryOutputObjectCmd):
pass
@register_object_type(vendor_id=999)
class LocalDateValueObjectCmd(CurrentPropertyListMixIn, DateValueObjectCmd):
pass
#
# __main__
#
def main():
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(ini=args.ini)
if _debug: _log.debug(" - this_device: %r", this_device)
# make a sample application
this_application = BIPSimpleApplication(this_device, args.ini.address)
# make a commandable analog value object, add to the device
avo1 = LocalAnalogValueObjectCmd(
objectIdentifier=('analogValue', 1),
objectName='avo1',
)
if _debug: _log.debug(" - avo1: %r", avo1)
this_application.add_object(avo1)
# make a commandable binary output object, add to the device
boo1 = LocalBinaryOutputObjectCmd(
objectIdentifier=('binaryOutput', 1),
objectName='boo1',
presentValue='inactive',
relinquishDefault='inactive',
minimumOnTime=5, # let it warm up
minimumOffTime=10, # let it cool off
)
if _debug: _log.debug(" - boo1: %r", boo1)
this_application.add_object(boo1)
# get the current date
today = Date().now()
# make a commandable date value object, add to the device
dvo1 = LocalDateValueObjectCmd(
objectIdentifier=('dateValue', 1),
objectName='dvo1',
presentValue=today.value,
)
if _debug: _log.debug(" - dvo1: %r", dvo1)
this_application.add_object(dvo1)
if _debug: _log.debug("running")
run()
_log.debug("fini")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment