Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
BACnet connection management
# MeterMaster Data Access Layer
# (C) 2017 VRT Systems
#
# The following Software is the proprietary property of VRT Systems.
# Unauthorised disclosure or use of this Software is forbidden to the extent
# permitted by law.
#
# vim: set ts=4 sts=4 et tw=78 sw=4:
from threading import Event, Semaphore, Thread
from bacpypes.core import run, stop, deferred, enable_sleeping
from bacpypes.service.device import LocalDeviceObject
from bacpypes.iocb import IOCB
from bacpypes.app import BIPSimpleApplication, BIPForeignApplication
from bacpypes.apdu import IAmRequest, WhoIsRequest
from bacpypes.pdu import Address
from functools import wraps
from tornado.gen import Future, coroutine, Return
from tornado.ioloop import IOLoop
from mm_util.sigslot import Signal, TornadoSlot
from sys import exc_info
from os import environ
from enum import Enum
import logging
debug_mods = environ.get('MM_BACPYPES_DEBUG','')
if len(debug_mods):
# Set debug options on given modules.
import importlib
for modname in debug_mods.split(','):
importlib.import_module(modname)._debug = 1
# VRT's Vendor ID
VENDOR_ID = 704
class BACnetSegmentation(Enum):
no = 'noSegmentation'
rx = 'segmentedReceive'
tx = 'segmentedTransmit'
both = 'segmentedBoth'
class BACnetIOError(IOError):
"""
A class representing an error received from bacpypes.
"""
def __init__(self, bacpypes_err, *args, **kwargs):
super(BACnetIOError, self).__init__(str(bacpypes_err), *args, **kwargs)
class BACnetConnection(object):
"""
Base BACnet connection class.
"""
def __init__(self, name, object_id, object_name=None, max_apdu=1024,
segmentation=BACnetSegmentation.both, max_segments=16,
apdu_segment_timeout=5000, apdu_timeout=3000, apdu_retries=3,
vendor_id=VENDOR_ID, iam_expiry=300.0, log=None, io_loop=None):
if log is None:
log = logging.getLogger(self.__class__.__module__)
if io_loop is None:
io_loop = IOLoop.current()
if not isinstance(segmentation, BACnetSegmentation):
try:
# Try the shortened labels first, case insensitive
segmentation = getattr(BACnetSegmentation, segmentation.lower())
except AttributeError:
# Try the official bacpypes names
segmentation = BACnetSegmentation(segmentation)
if object_name is None:
object_name = name
self._name = name
self._log = log
self._io_loop = io_loop
self._running = False
self._thread = None
self._thread_sem = Semaphore()
self._bacnet_app = None
self._bacnet_dev = None
# Incoming IAm indications
self._iam = {} # key = ID, value = APDU
self._iam_timeout = None # Expiry timeout object
self._iam_waiting = {} # key = ID, value = list of Futures
self._iam_expiry = float(iam_expiry or 300.0)
# BACnet properties
self._bacnet_props = {
'objectName': object_name,
'vendorIdentifier': int(vendor_id),
'objectIdentifier': int(object_id),
'maxApduLengthAccepted': int(max_apdu),
'segmentationSupported': segmentation,
'maxSegmentsAccepted': int(max_segments),
'apduSegmentTimeout': int(apdu_segment_timeout),
'apduTimeout': int(apdu_timeout),
'numberOfApduRetries': int(apdu_retries),
}
self.closed_sig = Signal(name='closed_sig', threadsafe=True)
def _make_application(self):
"""
Construct the BACnet application instance.
"""
raise NotImplementedError('Abstract class')
def spawn_thread(self):
"""
Start a worker thread to handle this connection.
"""
if self._thread_sem.acquire(blocking=False):
try:
assert self._thread is None, 'Thread already exists'
assert self._running is False, 'Thread is running'
self._log.debug('Spawning worker thread')
starting = Event()
self._thread = Thread(group=None,
target=self._thread_main, name=self._name,
args=(starting,))
self._thread.start()
starting.wait()
assert self._running is True, 'Thread failed to launch'
finally:
self._thread_sem.release()
else:
assert self._thread is True, 'Connection launch in progress'
def _on_indication(self, indication, **kwargs):
"""
Indication received from BACnet client.
"""
self._log.debug('Received indication %s', indication)
if isinstance(indication, IAmRequest):
self._log.debug('IAm from device %r vendor %r',
indication.iAmDeviceIdentifier,
indication.vendorID)
expiry = self._io_loop.time() + self._iam_expiry
self._iam[indication.iAmDeviceIdentifier] = (indication, expiry)
if self._iam_timeout is None:
self._iam_timeout = self._io_loop.add_timeout(expiry,
self._on_iam_expiry)
# If there are processes waiting for this device, grab the list of
# Future objects and resolve them.
futures = self._iam_waiting.pop(indication.iAmDeviceIdentifier, [])
self._log.debug('Passing reply to %d waiters', len(futures))
for future in futures:
self._log.debug('Resolving %s', future)
future.set_result(indication)
def _on_iam_expiry(self):
"""
Do a clean-up of expired IAm responses.
"""
now = self._io_loop.time()
next_expiry = None
for (res, expiry) in list(self._iam.values()):
if expiry < now:
self._iam.pop(res.iAmDeviceIdentifier, None)
elif (next_expiry is None) or (next_expiry > expiry):
next_expiry = expiry
if expiry is not None:
self._iam_timeout = self._io_loop.add_timeout(expiry,
self._on_iam_expiry)
def _thread_main(self, starting):
"""
Main runner thread, runs in the context of the BACnet library
"""
try:
self._log.debug('Worker thread started')
# Create the application
self._make_application()
self._bacnet_app.indication_sig.connect(
TornadoSlot(self._on_indication, io_loop=self._io_loop, weak=True))
# Enable sleeping to reduce CPU usage
enable_sleeping()
# What to do once we've entered the main loop?
def _on_run():
# Tell our main thread we're up
self._log.debug('Processing loop is up')
self._running = True
starting.set()
deferred(_on_run)
# Now, enter the processing loop.
self._log.debug('Entering processing loop')
run()
self._log.debug('Exited processing loop')
except:
# This should not happen! Ever!
starting.set()
self._log.exception('Exception raised in back-end thread!')
# We've finished, clean up.
self._thread = None
self._bacnet_app = None
self._bacnet_dev = None
self._running = False
self._log.debug('Worker thread finished')
# Emit the closed signal from the main thread
self._io_loop.add_callback(self.closed_sig.emit)
def _thread_stop(self):
"""
Stop the thread from within, run from the BACnet thread.
"""
deferred(stop)
@property
def is_closed(self):
return not self._running
def get_object_iam(self, *object_id):
"""
Retrieve the IAm response/indication for the given object ID.
"""
try:
(iam, _) = self._iam[object_id]
except KeyError:
return None
return iam
@coroutine
def exec_whois(self, low_id, high_id, address, deadline):
"""
Submit a WhoIs request and wait for the reply. Returns a
dict of IAm replies, keyed by device ID.
"""
# Don't have it, so ask around.
rq = WhoIsRequest()
rq.pduDestination = address
rq.deviceInstanceRangeLowLimit = low_id
rq.deviceInstanceRangeHighLimit = high_id
# Add ourselves to the waiting list.
my_futures = []
for dev_id in range(low_id, high_id+1):
future = Future()
self._log.debug('Adding to wait list for device %d: %s', dev_id, future)
self._iam_waiting.setdefault(('device', dev_id), []).append(future)
my_futures.append(future)
# Set up an expiry timer
def _on_timeout():
for future in my_futures:
if not future.done:
self._log.debug('Failing future %s', future)
future.set_result(None) # No response
self._io_loop.add_timeout(deadline, _on_timeout)
# This request doesn't return a response.
yield self.exec_rq(rq, io_loop=self._io_loop)
# Now wait for the replies to come in
self._log.debug('Waiting for replies')
apdus = yield my_futures
self._log.debug('Replies: %s', apdus)
# Filter out the None replies and collate by identifier
apdus = dict([
(apdu.iAmDeviceIdentifier[-1], apdu)
for apdu in filter(lambda r : r is not None, apdus)
])
raise Return(apdus)
def exec_rq(self, rq, io_loop=None):
"""
Execute a request against the connection.
"""
self._log.debug('Received request %s', rq)
future = Future()
if io_loop is None:
io_loop = IOLoop.current()
def _on_done(iocb):
try:
self._log.debug('Processing response for %s (request %s)',
iocb, rq)
if iocb.ioError:
self._log.debug('Request failed with %s', iocb.ioError)
future.set_exception(BACnetIOError(iocb.ioError))
else:
self._log.debug('Request succeeded with %s', iocb.ioResponse)
future.set_result(iocb.ioResponse)
except:
self._log.exception('Failed to process callback for rq %s', rq)
def _on_done_worker(iocb):
try:
io_loop.add_callback(_on_done, iocb)
except:
self._log.exception('Failed to process callback for rq %s', rq)
def _submit():
try:
iocb = IOCB(rq)
iocb.add_callback(_on_done_worker)
self._log.debug('Submitting request %s as %s', rq, iocb)
self._bacnet_app.request_io(iocb)
except:
self._log.exception('Failed to submit rq %s', rq)
deferred(_submit)
return future
def close(self):
"""
Shut down the connection instance.
"""
# Grab thread instance
thread = self._thread
if thread is None:
return
# Request the thread to stop
deferred(self._thread_stop)
# Wait for thread to stop
thread.join()
class BACnetRqIndHandler(object):
"""
A mix-in class that intercepts the "request" and "indication" methods.
"""
def __init__(self, *args, **kwargs):
self.indication_sig = Signal(name='indication_sig', threadsafe=True)
super(BACnetRqIndHandler, self).__init__(*args, **kwargs)
def indication(self, ind, *args, **kwargs):
"""
Grab the indication received, emit it in a signal, then carry on.
"""
self.indication_sig.emit(indication=ind)
return super(BACnetRqIndHandler, self).indication(ind, *args, **kwargs)
class BACnetIPApplication(BACnetRqIndHandler, BIPSimpleApplication):
pass
class BACnetBBMDApplication(BACnetRqIndHandler, BIPForeignApplication):
pass
class BACnetIPConnection(BACnetConnection):
def __init__(self, name, address, *args, **kwargs):
self._bip_address = address
super(BACnetIPConnection, self).__init__(name, *args, **kwargs)
def _make_application(self):
self._bacnet_dev = LocalDeviceObject(**self._bacnet_props)
self._bacnet_app = BACnetIPApplication(self._bacnet_dev, self._bip_address)
self._bacnet_dev.protocolServicesSupported = \
self._bacnet_app.get_services_supported().value
class BACnetBBMDConnection(BACnetConnection):
def __init__(self, name, address, bbmd_address, bbmd_ttl=255, *args, **kwargs):
self._bip_address = address
self._bbmd_address = bbmd_address
self._bbmd_ttl = int(bbmd_ttl)
super(BACnetBBMDConnection, self).__init__(name, *args, **kwargs)
def _make_application(self):
self._bacnet_dev = LocalDeviceObject(**self._bacnet_props)
self._bacnet_app = BACnetBBMDApplication(self._bacnet_dev,
self._bip_address, Address(self._bbmd_address),
self._bbmd_ttl)
self._bacnet_dev.protocolServicesSupported = \
self._bacnet_app.get_services_supported().value
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment