Skip to content

Instantly share code, notes, and snippets.

@JoelBender
Created June 30, 2018 04:48
Show Gist options
  • Save JoelBender/49dd7f3100fdbb1886bb463f52ceb7db to your computer and use it in GitHub Desktop.
Save JoelBender/49dd7f3100fdbb1886bb463f52ceb7db to your computer and use it in GitHub Desktop.
This is a sample of what I'm considering for backing up a `LocalTrendLogObject` implementation and using for testing. While this is in-memory, it doesn't seem like a huge stretch to make a database backing store that has the same API.
#!/usr/bin/python
"""
"""
import sys
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.primitivedata import Boolean, Unsigned, Integer, Real, BitString, Enumerated, Date, Time
from bacpypes.basetypes import ErrorType, StatusFlags, LogStatus, LogRecord, LogRecordLogDatum, DateTime
# some debugging
_debug = 0
_log = ModuleLogger(globals())
#
# LocalDateTime
#
class LocalDateTime(DateTime):
def __init__(self):
DateTime.__init__(self,
date=Date().now().value,
time=Time().now().value,
)
def __str__(self):
date_str = str(Date(self.date))
time_str = str(Time(self.time))
return date_str + " " + time_str
#
# TrendLog
#
@bacpypes_debugging
class TrendLog:
def __init__(self, buffer_size=1000, stop_when_full=False):
if _debug: TrendLog._debug("__init__ buffer_size=%r stop_when_full=%r", buffer_size, stop_when_full)
# start out enabled
self.enable = True
self.buffer_size = buffer_size
self.stop_when_full = stop_when_full
# array of log records
self.log_records = []
def record_datum(self, datum, status_flags=None):
if _debug: TrendLog._debug("record_datum %r status_flags=%r", datum, status_flags)
# make sure we are enabled
if not self.enable:
if _debug: TrendLog._debug(" - not enabled")
return
# check for buffer about to be full
if self.stop_when_full and len(self.log_records) == (self.buffer_size - 1):
if _debug: TrendLog._debug(" - stopping when full")
# build a log record recording the event
log_record = LogRecord(
timestamp=LocalDateTime(),
logDatum=LogRecordLogDatum(logStatus=[1, 0, 0]), # log disabled
)
self.log_records.append(log_record)
# no longer enabled
self.enable = False
return
# if status_flags is a string, it's the name of one of the bits
if isinstance(status_flags, str):
status_flags = StatusFlags([status_flags]).value
# build a log record
log_record = LogRecord(
timestamp=LocalDateTime(),
logDatum=datum,
)
if status_flags is not None:
log_record.statusFlags = status_flags
if _debug: TrendLog._debug(" - log_record: %r", log_record)
# delete the oldest record if necessary
if len(self.log_records) == self.buffer_size:
del self.log_records[0]
# append it to the log records
self.log_records.append(log_record)
def record_status(self, log_status, status_flags=None):
if _debug: TrendLog._debug("record_status %r status_flags=%r", log_status, status_flags)
# if log_status is a string, it's the name of one of the bits
if isinstance(log_status, str):
log_status = LogStatus([log_status]).value
self.record_datum(
LogRecordLogDatum(logStatus=log_status),
status_flags=status_flags,
)
def record_value(self, datatype, value, status_flags=None):
if _debug: TrendLog._debug("record_value %r %r status_flags=%r", datatype, value, status_flags)
# build an empty datum and set the appropriate value
datum = LogRecordLogDatum()
setattr(datum, datatype, value)
self.record_datum(datum, status_flags=status_flags)
def record_boolean(self, value, status_flags=None):
"""Record a boolean value."""
# a little error checking
assert Boolean.is_valid(value)
self.record_value('booleanValue', value, status_flags)
def record_real(self, value, status_flags=None):
"""Record a real value."""
# a little error checking
assert Real.is_valid(value)
self.record_value('realValue', value, status_flags)
def record_enumerated(self, value, status_flags=None):
"""Record an enumerated value. Because the primitive type
doesn't have any enumerations defined, the best this can do
is an unsigned value."""
# a little error checking
assert Enumerated.is_valid(value)
self.record_value('enumValue', value, status_flags)
def record_unsigned(self, value, status_flags=None):
"""Record an unsigned value."""
# a little error checking
assert Unsigned.is_valid(value)
self.record_value('unsignedValue', value, status_flags)
def record_signed(self, value, status_flags=None):
"""Record a signed (integer) value."""
# a little error checking
assert Integer.is_valid(value)
self.record_value('signedValue', value, status_flags)
def record_bitstring(self, value, status_flags=None):
"""Record a bitstring value, no names, just 1's and 0's."""
# a little error checking
assert BitString.is_valid(value)
self.record_value('bitstringValue', value, status_flags)
def record_null(self, status_flags=None):
"""Record a null."""
self.record_value('nullValue', (), status_flags)
def record_failure(self, error_class, error_code, status_flags=None):
"""Record an error."""
self.record_value('failure',
ErrorType(errorClass=error_class, errorCode=error_code),
status_flags,
)
def record_time_change(self, delta, status_flags=None):
"""Record a time change."""
self.record_value('timeChange', delta, status_flags)
def record_any(self, value, status_flags=None):
"""Record just about anything."""
self.record_value('anyValue', value, status_flags)
def purge(self):
"""Empty out the log buffer and record the purge event."""
if _debug: TrendLog._debug("purge")
# build a log record recording the event
log_record = LogRecord(
timestamp=LocalDateTime(),
logDatum=LogRecordLogDatum(logStatus=[0, 1, 0]), # buffer purged
)
# log buffer is the only event
self.log_records = [log_record]
def __len__(self):
return len(self.log_records)
def debug_contents(self, indent=1, file=sys.stdout, _ids=None):
i = 0
for log_record in self.log_records:
datum = log_record.logDatum
line = ["%s[%d]" % (" " * indent, i), log_record.timestamp]
for element in LogRecordLogDatum.choiceElements:
element_value = getattr(datum, element.name, None)
if element_value is not None:
if element.name == 'logStatus':
element_value = str(LogStatus(element_value))
line.append(element.name)
line.append(element_value)
break
else:
line.append("?")
if log_record.statusFlags is not None:
line.append(StatusFlags(log_record.statusFlags))
file.write(" ".join(str(p) for p in line) + '\n')
i += 1
#
#
#
print("different kinds of values")
trend_log = TrendLog()
trend_log.record_boolean(False)
trend_log.record_boolean(True)
trend_log.record_real(73.3, [1, 0, 0, 0])
trend_log.record_real(73.3, 'fault')
trend_log.debug_contents()
print("")
#
#
#
print("small buffer")
trend_log = TrendLog(buffer_size=5)
for i in range(10):
trend_log.record_unsigned(i)
trend_log.debug_contents()
print("")
#
#
#
print("small buffer, stop when full")
trend_log = TrendLog(buffer_size=5, stop_when_full=True)
for i in range(10):
trend_log.record_unsigned(i)
trend_log.debug_contents()
print("")
print("purge")
trend_log.purge()
trend_log.debug_contents()
print("")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment