Skip to content

Instantly share code, notes, and snippets.

@OrangeTux
Forked from anonymous/gist:4683842
Last active December 12, 2015 00:28
Show Gist options
  • Save OrangeTux/4683870 to your computer and use it in GitHub Desktop.
Save OrangeTux/4683870 to your computer and use it in GitHub Desktop.
class Initializer:
""" Initializes this demo
"""
def __init__(self):
""" Create a Messenger and pass this messenger
to a new formed Sensor.
"""
self.messenger = create_messenger()
self.sensor = Sensor(self.messenger)
def create_messenger():
""" Returns a messenger instance
"""
pass
class Sensor:
""" Represents a sensor out in the field.
"""
def __init__(self, messenger):
self.messenger = messenger
self.create_request()
self.run()
def create_request(self):
""" Create reqeust to send to the fysical sensor in the field.
"""
try:
addr, obj_type, obj_inst, prop_id = args[:4]
if not get_object_class(obj_type):
raise Exception, 'probleem'
obj_inst = int(obj_inst)
datatype = get_datatype(obj_type, prop_id)
if not datatype:
raise ValueError, "invalid property for object type"
# build a request
self.request = ReadPropertyRequest(
objectIdentifier=(obj_type, obj_inst),
propertyIdentifier=prop_id,
)
self.request.pduDestination = Address(self.bacnet_address)
if len(args) == 5:
self.request.propertyArrayIndex = int(args[4])
except Exception, e:
print(e)
def get_data(self):
""" Retrieves a value out of the sensor
and adds it to the list of values.
Returns false when something failed during
retrieving the value.
"""
log.debug('Sensor %d at %s is tries to get data.', self.id, self.bacnet_address)
# Passes the request(envelope) to the Messengers request method.
# The Messenger will bring the envelope to the right address
# and returns the response.
self.messenger.run()
response = self.messenger.request(self.request)
# TODO extract data we need out of the response
return data
def run(self):
""" This method continuesly retrieves a new
value of the sensor, calculates the average
and write the result to the database.
"""
while True:
# Check if a new value has been retrieved out of the sensor.
if self.get_data():
self.calculate_average()
self.write_to_db()
# Sleep for a few second before do it all again.
time.sleep(self.CHECK_INTERVAL)
class Messenger(BIPSimpleApplication):
"""
This class functions as a messenger between a Sensor (instance)
and a fysical sensor. This is an implementation of the BIPSimpleApplication
which is in turn extends Application.
"""
def __init__(self, device, address):
BIPSimpleApplication.__init__(self, device, address)
# Create a task manager where all calls/requests to
# a bacnet device are put on.
self.taskManager = TaskManager()
def confirmation(self, apdu):
""" This method returns the response of a sensor.
"""
if isinstance(apdu, AbortPDU):
apdu.debug_contents()
def run(self):
""" Executes the next task. Normally this task
is a request for data.
Copied out of bacpypes.core.
"""
running = False
taskManager = None
deferredFns = []
sleeptime = 0.0
SPIN = 1.0
running = True
try:
task, delta = self.taskManager.get_next_task()
# if there is a task to process, do it
if task:
print('got a task')
# _log.debug("task: %r", task)
self.taskManager.process_task(task)
# if delta is None, there are no tasks, default to spinning
if delta is None:
delta = SPIN
# there may be threads around, sleep for a bit
if sleeptime and (delta > sleeptime):
time.sleep(sleeptime)
delta -= sleeptime
# if there are deferred functions, use a small delta
if deferredFns:
delta = min(delta, 0.001)
#
# loop for socket activity
asyncore.loop(timeout=delta, count=1)
# check for deferred functions
while deferredFns:
# get a reference to the list
fnlist = deferredFns
deferredFns = []
# call the functions
for fn, args, kwargs in fnlist:
# _log.debug("call: %r %r %r", fn, args, kwargs)
fn( *args, **kwargs)
# done with this list
del fnlist
except KeyboardInterrupt:
_log.info("keyboard interrupt")
running = False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment