Created
March 2, 2018 04:16
-
-
Save sjlongland/007d0385b1f6cd2b37c0192a2ce0cecc to your computer and use it in GitHub Desktop.
BACnet connection management
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# MeterMaster Data Access Layer | |
# (C) 2017 VRT Systems | |
# | |
# The following Software is the proprietary property of VRT Systems. | |
# Unauthorised disclosure or use of this Software is forbidden to the extent | |
# permitted by law. | |
# | |
# vim: set ts=4 sts=4 et tw=78 sw=4: | |
from threading import Event, Semaphore, Thread | |
from bacpypes.core import run, stop, deferred, enable_sleeping | |
from bacpypes.service.device import LocalDeviceObject | |
from bacpypes.iocb import IOCB | |
from bacpypes.app import BIPSimpleApplication, BIPForeignApplication | |
from bacpypes.apdu import IAmRequest, WhoIsRequest | |
from bacpypes.pdu import Address | |
from functools import wraps | |
from tornado.gen import Future, coroutine, Return | |
from tornado.ioloop import IOLoop | |
from mm_util.sigslot import Signal, TornadoSlot | |
from sys import exc_info | |
from os import environ | |
from enum import Enum | |
import logging | |
debug_mods = environ.get('MM_BACPYPES_DEBUG','') | |
if len(debug_mods): | |
# Set debug options on given modules. | |
import importlib | |
for modname in debug_mods.split(','): | |
importlib.import_module(modname)._debug = 1 | |
# VRT's Vendor ID | |
VENDOR_ID = 704 | |
class BACnetSegmentation(Enum): | |
no = 'noSegmentation' | |
rx = 'segmentedReceive' | |
tx = 'segmentedTransmit' | |
both = 'segmentedBoth' | |
class BACnetIOError(IOError): | |
""" | |
A class representing an error received from bacpypes. | |
""" | |
def __init__(self, bacpypes_err, *args, **kwargs): | |
super(BACnetIOError, self).__init__(str(bacpypes_err), *args, **kwargs) | |
class BACnetConnection(object): | |
""" | |
Base BACnet connection class. | |
""" | |
def __init__(self, name, object_id, object_name=None, max_apdu=1024, | |
segmentation=BACnetSegmentation.both, max_segments=16, | |
apdu_segment_timeout=5000, apdu_timeout=3000, apdu_retries=3, | |
vendor_id=VENDOR_ID, iam_expiry=300.0, log=None, io_loop=None): | |
if log is None: | |
log = logging.getLogger(self.__class__.__module__) | |
if io_loop is None: | |
io_loop = IOLoop.current() | |
if not isinstance(segmentation, BACnetSegmentation): | |
try: | |
# Try the shortened labels first, case insensitive | |
segmentation = getattr(BACnetSegmentation, segmentation.lower()) | |
except AttributeError: | |
# Try the official bacpypes names | |
segmentation = BACnetSegmentation(segmentation) | |
if object_name is None: | |
object_name = name | |
self._name = name | |
self._log = log | |
self._io_loop = io_loop | |
self._running = False | |
self._thread = None | |
self._thread_sem = Semaphore() | |
self._bacnet_app = None | |
self._bacnet_dev = None | |
# Incoming IAm indications | |
self._iam = {} # key = ID, value = APDU | |
self._iam_timeout = None # Expiry timeout object | |
self._iam_waiting = {} # key = ID, value = list of Futures | |
self._iam_expiry = float(iam_expiry or 300.0) | |
# BACnet properties | |
self._bacnet_props = { | |
'objectName': object_name, | |
'vendorIdentifier': int(vendor_id), | |
'objectIdentifier': int(object_id), | |
'maxApduLengthAccepted': int(max_apdu), | |
'segmentationSupported': segmentation, | |
'maxSegmentsAccepted': int(max_segments), | |
'apduSegmentTimeout': int(apdu_segment_timeout), | |
'apduTimeout': int(apdu_timeout), | |
'numberOfApduRetries': int(apdu_retries), | |
} | |
self.closed_sig = Signal(name='closed_sig', threadsafe=True) | |
def _make_application(self): | |
""" | |
Construct the BACnet application instance. | |
""" | |
raise NotImplementedError('Abstract class') | |
def spawn_thread(self): | |
""" | |
Start a worker thread to handle this connection. | |
""" | |
if self._thread_sem.acquire(blocking=False): | |
try: | |
assert self._thread is None, 'Thread already exists' | |
assert self._running is False, 'Thread is running' | |
self._log.debug('Spawning worker thread') | |
starting = Event() | |
self._thread = Thread(group=None, | |
target=self._thread_main, name=self._name, | |
args=(starting,)) | |
self._thread.start() | |
starting.wait() | |
assert self._running is True, 'Thread failed to launch' | |
finally: | |
self._thread_sem.release() | |
else: | |
assert self._thread is True, 'Connection launch in progress' | |
def _on_indication(self, indication, **kwargs): | |
""" | |
Indication received from BACnet client. | |
""" | |
self._log.debug('Received indication %s', indication) | |
if isinstance(indication, IAmRequest): | |
self._log.debug('IAm from device %r vendor %r', | |
indication.iAmDeviceIdentifier, | |
indication.vendorID) | |
expiry = self._io_loop.time() + self._iam_expiry | |
self._iam[indication.iAmDeviceIdentifier] = (indication, expiry) | |
if self._iam_timeout is None: | |
self._iam_timeout = self._io_loop.add_timeout(expiry, | |
self._on_iam_expiry) | |
# If there are processes waiting for this device, grab the list of | |
# Future objects and resolve them. | |
futures = self._iam_waiting.pop(indication.iAmDeviceIdentifier, []) | |
self._log.debug('Passing reply to %d waiters', len(futures)) | |
for future in futures: | |
self._log.debug('Resolving %s', future) | |
future.set_result(indication) | |
def _on_iam_expiry(self): | |
""" | |
Do a clean-up of expired IAm responses. | |
""" | |
now = self._io_loop.time() | |
next_expiry = None | |
for (res, expiry) in list(self._iam.values()): | |
if expiry < now: | |
self._iam.pop(res.iAmDeviceIdentifier, None) | |
elif (next_expiry is None) or (next_expiry > expiry): | |
next_expiry = expiry | |
if expiry is not None: | |
self._iam_timeout = self._io_loop.add_timeout(expiry, | |
self._on_iam_expiry) | |
def _thread_main(self, starting): | |
""" | |
Main runner thread, runs in the context of the BACnet library | |
""" | |
try: | |
self._log.debug('Worker thread started') | |
# Create the application | |
self._make_application() | |
self._bacnet_app.indication_sig.connect( | |
TornadoSlot(self._on_indication, io_loop=self._io_loop, weak=True)) | |
# Enable sleeping to reduce CPU usage | |
enable_sleeping() | |
# What to do once we've entered the main loop? | |
def _on_run(): | |
# Tell our main thread we're up | |
self._log.debug('Processing loop is up') | |
self._running = True | |
starting.set() | |
deferred(_on_run) | |
# Now, enter the processing loop. | |
self._log.debug('Entering processing loop') | |
run() | |
self._log.debug('Exited processing loop') | |
except: | |
# This should not happen! Ever! | |
starting.set() | |
self._log.exception('Exception raised in back-end thread!') | |
# We've finished, clean up. | |
self._thread = None | |
self._bacnet_app = None | |
self._bacnet_dev = None | |
self._running = False | |
self._log.debug('Worker thread finished') | |
# Emit the closed signal from the main thread | |
self._io_loop.add_callback(self.closed_sig.emit) | |
def _thread_stop(self): | |
""" | |
Stop the thread from within, run from the BACnet thread. | |
""" | |
deferred(stop) | |
@property | |
def is_closed(self): | |
return not self._running | |
def get_object_iam(self, *object_id): | |
""" | |
Retrieve the IAm response/indication for the given object ID. | |
""" | |
try: | |
(iam, _) = self._iam[object_id] | |
except KeyError: | |
return None | |
return iam | |
@coroutine | |
def exec_whois(self, low_id, high_id, address, deadline): | |
""" | |
Submit a WhoIs request and wait for the reply. Returns a | |
dict of IAm replies, keyed by device ID. | |
""" | |
# Don't have it, so ask around. | |
rq = WhoIsRequest() | |
rq.pduDestination = address | |
rq.deviceInstanceRangeLowLimit = low_id | |
rq.deviceInstanceRangeHighLimit = high_id | |
# Add ourselves to the waiting list. | |
my_futures = [] | |
for dev_id in range(low_id, high_id+1): | |
future = Future() | |
self._log.debug('Adding to wait list for device %d: %s', dev_id, future) | |
self._iam_waiting.setdefault(('device', dev_id), []).append(future) | |
my_futures.append(future) | |
# Set up an expiry timer | |
def _on_timeout(): | |
for future in my_futures: | |
if not future.done: | |
self._log.debug('Failing future %s', future) | |
future.set_result(None) # No response | |
self._io_loop.add_timeout(deadline, _on_timeout) | |
# This request doesn't return a response. | |
yield self.exec_rq(rq, io_loop=self._io_loop) | |
# Now wait for the replies to come in | |
self._log.debug('Waiting for replies') | |
apdus = yield my_futures | |
self._log.debug('Replies: %s', apdus) | |
# Filter out the None replies and collate by identifier | |
apdus = dict([ | |
(apdu.iAmDeviceIdentifier[-1], apdu) | |
for apdu in filter(lambda r : r is not None, apdus) | |
]) | |
raise Return(apdus) | |
def exec_rq(self, rq, io_loop=None): | |
""" | |
Execute a request against the connection. | |
""" | |
self._log.debug('Received request %s', rq) | |
future = Future() | |
if io_loop is None: | |
io_loop = IOLoop.current() | |
def _on_done(iocb): | |
try: | |
self._log.debug('Processing response for %s (request %s)', | |
iocb, rq) | |
if iocb.ioError: | |
self._log.debug('Request failed with %s', iocb.ioError) | |
future.set_exception(BACnetIOError(iocb.ioError)) | |
else: | |
self._log.debug('Request succeeded with %s', iocb.ioResponse) | |
future.set_result(iocb.ioResponse) | |
except: | |
self._log.exception('Failed to process callback for rq %s', rq) | |
def _on_done_worker(iocb): | |
try: | |
io_loop.add_callback(_on_done, iocb) | |
except: | |
self._log.exception('Failed to process callback for rq %s', rq) | |
def _submit(): | |
try: | |
iocb = IOCB(rq) | |
iocb.add_callback(_on_done_worker) | |
self._log.debug('Submitting request %s as %s', rq, iocb) | |
self._bacnet_app.request_io(iocb) | |
except: | |
self._log.exception('Failed to submit rq %s', rq) | |
deferred(_submit) | |
return future | |
def close(self): | |
""" | |
Shut down the connection instance. | |
""" | |
# Grab thread instance | |
thread = self._thread | |
if thread is None: | |
return | |
# Request the thread to stop | |
deferred(self._thread_stop) | |
# Wait for thread to stop | |
thread.join() | |
class BACnetRqIndHandler(object): | |
""" | |
A mix-in class that intercepts the "request" and "indication" methods. | |
""" | |
def __init__(self, *args, **kwargs): | |
self.indication_sig = Signal(name='indication_sig', threadsafe=True) | |
super(BACnetRqIndHandler, self).__init__(*args, **kwargs) | |
def indication(self, ind, *args, **kwargs): | |
""" | |
Grab the indication received, emit it in a signal, then carry on. | |
""" | |
self.indication_sig.emit(indication=ind) | |
return super(BACnetRqIndHandler, self).indication(ind, *args, **kwargs) | |
class BACnetIPApplication(BACnetRqIndHandler, BIPSimpleApplication): | |
pass | |
class BACnetBBMDApplication(BACnetRqIndHandler, BIPForeignApplication): | |
pass | |
class BACnetIPConnection(BACnetConnection): | |
def __init__(self, name, address, *args, **kwargs): | |
self._bip_address = address | |
super(BACnetIPConnection, self).__init__(name, *args, **kwargs) | |
def _make_application(self): | |
self._bacnet_dev = LocalDeviceObject(**self._bacnet_props) | |
self._bacnet_app = BACnetIPApplication(self._bacnet_dev, self._bip_address) | |
self._bacnet_dev.protocolServicesSupported = \ | |
self._bacnet_app.get_services_supported().value | |
class BACnetBBMDConnection(BACnetConnection): | |
def __init__(self, name, address, bbmd_address, bbmd_ttl=255, *args, **kwargs): | |
self._bip_address = address | |
self._bbmd_address = bbmd_address | |
self._bbmd_ttl = int(bbmd_ttl) | |
super(BACnetBBMDConnection, self).__init__(name, *args, **kwargs) | |
def _make_application(self): | |
self._bacnet_dev = LocalDeviceObject(**self._bacnet_props) | |
self._bacnet_app = BACnetBBMDApplication(self._bacnet_dev, | |
self._bip_address, Address(self._bbmd_address), | |
self._bbmd_ttl) | |
self._bacnet_dev.protocolServicesSupported = \ | |
self._bacnet_app.get_services_supported().value |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment