Skip to content

Instantly share code, notes, and snippets.

Created January 31, 2013 15:48
Show Gist options
  • Save anonymous/4683842 to your computer and use it in GitHub Desktop.
Save anonymous/4683842 to your computer and use it in GitHub Desktop.
Imagine the following sitation: multiple sensors are in the field. Each is represented in the code by a instance of Sensor. Each Sensor creates its own request, passes it to the Messenger. The Messenger brings the request to the sensor and returns with the response. This response will get handles in the Sensor instance.
class Initializer:
""" Initializes this demo
"""
def __init__(self):
""" Create a Messenger and pass this messenger
to a new formed Sensor.
"""
self.messenger = create_messenger()
self.sensor = Sensor(self.messenger)
def create_messenger():
""" Returns a messenger instance
"""
pass
class Sensor:
""" Represents a sensor out in the field.
"""
def __init__(self, messenger):
self.messenger = messenger
def create_request(self):
""" Create reqeust to send to the fysical sensor in the field.
"""
try:
addr, obj_type, obj_inst, prop_id = args[:4]
if not get_object_class(obj_type):
raise Exception, 'probleem'
obj_inst = int(obj_inst)
datatype = get_datatype(obj_type, prop_id)
if not datatype:
raise ValueError, "invalid property for object type"
# build a request
self.request = ReadPropertyRequest(
objectIdentifier=(obj_type, obj_inst),
propertyIdentifier=prop_id,
)
self.request.pduDestination = Address(self.bacnet_address)
if len(args) == 5:
self.request.propertyArrayIndex = int(args[4])
except Exception, e:
print(e)
def get_data(self):
""" Retrieves a value out of the sensor
and adds it to the list of values.
Returns false when something failed during
retrieving the value.
"""
log.debug('Sensor %d at %s is tries to get data.', self.id, self.bacnet_address)
# Passes the request(envelope) to the Messengers request method.
# The Messenger will bring the envelope to the right address
# and returns the response.
self.messenger.run()
self.messenger.request(self.request)
class Messenger(BIPSimpleApplication):
"""
This class functions as a messenger between a Sensor (instance)
and a fysical sensor. This is an implementation of the BIPSimpleApplication
which is in turn extends Application.
"""
def __init__(self, device, address):
BIPSimpleApplication.__init__(self, device, address)
# Create a task manager where all calls/requests to
# a bacnet device are put on.
self.taskManager = TaskManager()
def confirmation(self, apdu):
""" This method returns the response of a sensor.
"""
if isinstance(apdu, AbortPDU):
apdu.debug_contents()
def run(self):
""" Executes the next task. Normally this task
is a request for data.
Copied out of bacpypes.core.
"""
running = False
taskManager = None
deferredFns = []
sleeptime = 0.0
SPIN = 1.0
running = True
try:
task, delta = self.taskManager.get_next_task()
# if there is a task to process, do it
if task:
print('got a task')
# _log.debug("task: %r", task)
self.taskManager.process_task(task)
# if delta is None, there are no tasks, default to spinning
if delta is None:
delta = SPIN
# there may be threads around, sleep for a bit
if sleeptime and (delta > sleeptime):
time.sleep(sleeptime)
delta -= sleeptime
# if there are deferred functions, use a small delta
if deferredFns:
delta = min(delta, 0.001)
#
# loop for socket activity
asyncore.loop(timeout=delta, count=1)
# check for deferred functions
while deferredFns:
# get a reference to the list
fnlist = deferredFns
deferredFns = []
# call the functions
for fn, args, kwargs in fnlist:
# _log.debug("call: %r %r %r", fn, args, kwargs)
fn( *args, **kwargs)
# done with this list
del fnlist
except KeyboardInterrupt:
_log.info("keyboard interrupt")
running = False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment