Created
April 22, 2012 12:55
-
-
Save claws/2464017 to your computer and use it in GitHub Desktop.
A Python Twisted friendly pywws Weather Station
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
This module represents a twist-ification of the pywws Weather Station. | |
There was nothing wrong with the original pywws version. I just wanted a | |
version that would integrate consistently with my other Twisted based | |
software. | |
""" | |
from twisted.internet import reactor, defer, task | |
import datetime | |
import logging | |
import pywws.WeatherStation | |
import hid | |
def timeUntilTomorrow(): | |
""" | |
Return the number of seconds until tomorrow 00:00 am | |
""" | |
tomorrow = datetime.datetime.replace(datetime.datetime.utcnow() + datetime.timedelta(days=1), | |
hour=0, minute=0, second=0) | |
delta = tomorrow - datetime.datetime.utcnow() | |
return delta.seconds | |
WRITE_COMMAND = pywws.WeatherStation.CUSBDrive.WriteCommand | |
END_MARK = pywws.WeatherStation.CUSBDrive.EndMark | |
READ_COMMAND = pywws.WeatherStation.CUSBDrive.ReadCommand | |
WRITE_COMMAND_WORD = pywws.WeatherStation.CUSBDrive.WriteCommandWord | |
SUCCESS_STATUS = 0xA5 | |
VALID_MAGIC_NUMBERS = [[0x55, 0xAA], [0xFF, 0xFF], [0x55, 0x55], [0xC4, 0x00]] | |
DATA_CHANGED_FORMAT = pywws.WeatherStation.weather_station.fixed_format['data_changed'] | |
DATA_CHANGED_VALUE = 0xAA | |
DATA_CHANGED_OFFSET = DATA_CHANGED_FORMAT[0] | |
class UsbDevice(object): | |
""" | |
Low level interface to USB Weather Station using cython-hidapi | |
This object tries to present a non-blocking interface. In | |
reality it is more likely a minimally blocking interface. | |
Read and write counts are low in an effort to reduce any | |
blocking. | |
""" | |
def __init__(self): | |
self.logger = logging.getLogger('WeatherStation') | |
self._hid = None | |
# This attribute stores a reference to a twisted.internet.task.LoopingCall | |
# object that is started after a command is written to the USB device. It | |
# is run to regularly check for an indication that the USB device has read | |
# and processed the command message. | |
self._writeAcknowledgeTaskRef = None | |
# This attribute stores a references to a twisted.internet.interfaces.IDelayedCall | |
# object that will fire if the write acknowledge task fails to detect that the | |
# last command written has been successfully processed by the Weather Stations | |
# USB device. | |
self._writeAcknowledgeTaskTimeoutRef = None | |
def open(self, vid=0x1941, pid=0x8021): | |
""" | |
Open a connection with the USB Weather Station device. | |
""" | |
try: | |
self._hid = hid.device(vid, pid) | |
except IOError as ex: | |
self.logger.critical("USB Weather Station device not found: %s" % str(ex)) | |
return False | |
return True | |
def close(self): | |
""" | |
Close the connection with the USB Weather Station device. | |
""" | |
if self._hid: | |
self._hid.close() | |
self._hid = None | |
def sendCommand(self, commandData): | |
""" | |
Send a command to the Weather Station. The function is typically | |
used to modify settings on the Weather Station head unit. | |
@param commandData: Each tuple element contains | |
an address and a command byte. | |
@type commandData: list of 2-tuple elements. | |
@return: A deferred that fires with the boolean success of the write | |
""" | |
return self._write_command_data(commandData) | |
def _read_data(self): | |
""" | |
Read and returns a list of 8 bytes from USB device | |
Wraps the result in a maybeDeferred to provide a consistent | |
asynchronous interface even though this call is synchronous. | |
It should be brief enough to be considered asynchronous. | |
""" | |
return defer.maybeDeferred(self._hid.read, 8) | |
def _write_data(self, buf): | |
""" | |
Writes a list of bytes to the USB device | |
Wraps the result in a maybeDeferred to provide a consistent | |
asynchronous interface even though this call is synchronous. | |
It should be brief enough to be considered asynchronous. | |
""" | |
return defer.maybeDeferred(self._hid.write, buf) | |
@defer.inlineCallbacks | |
def read_block(self, address): | |
""" | |
Read a block of data starting at the specified address. | |
Returns a deferred that returns a list of 32 ints read from the | |
weather station. | |
A read command is first sent to the usb device. As part of the | |
command write process, the write status is checked and if good | |
then 4 consecutive reads (of 8 bytes each) are performed to | |
obtain a 32 byte data block which is returned to the caller | |
through the deferred callback. | |
""" | |
# byte 5 is 0x00 in original format.c version | |
# byte 6 is 0x00 in original format.c version | |
buf = [READ_COMMAND, | |
address / 256, | |
address % 256, | |
END_MARK, | |
READ_COMMAND, | |
address / 256, | |
address % 256, | |
END_MARK] | |
read_command_result = yield self._write_data(buf) | |
if read_command_result: | |
multiple_read_results = yield defer.DeferredList([self._read_data() for i in range(4)], | |
consumeErrors=True) | |
data = [] | |
problem_detected = False | |
for success, values in multiple_read_results: | |
if success: | |
data.extend(values) | |
else: | |
self.logger.error("Error reading 8 byte data block") | |
problem_detected = True | |
if problem_detected: | |
defer.returnValue(None) | |
else: | |
defer.returnValue(data) | |
else: | |
self.logger.error("Problem occurred while writing the read address to the USB device") | |
defer.returnValue(None) | |
@defer.inlineCallbacks | |
def read_settings_block(self, hi=0x0100): | |
""" | |
Read data from the fixed block, up to the specified 'hi' address. | |
Return a deferred that returns the raw data as a list of ints | |
""" | |
deferreds = [self.read_block(mempos) for mempos in range(0x0000, hi, 0x0020)] | |
multiple_read_block_results = yield defer.DeferredList(deferreds, consumeErrors=True) | |
# concatenate multiple reads into a list of bytes | |
data = [] | |
problem_detected = False | |
for success, value in multiple_read_block_results: | |
if success: | |
data.extend(value) | |
else: | |
self.logger.error("Error reading bytes from fixed block: %s" % value.getErrorMessage()) | |
problem_detected = True | |
if problem_detected: | |
defer.returnValue(None) | |
# confirm that start of fixed block contains 'magic number' | |
magic = data[:2] | |
if magic not in VALID_MAGIC_NUMBERS: | |
self.logger.error("Unexpected 'magic number' %02x %02x", data[0], data[1]) | |
defer.returnValue(data) | |
@defer.inlineCallbacks | |
def _write_byte(self, address, value): | |
""" | |
Writes a byte, wrapped in specific structure, to the weather station | |
Returns a deferred that returns a boolean indicating the write success status | |
""" | |
buf = [WRITE_COMMAND_WORD, | |
address / 256, | |
address % 256, | |
END_MARK, | |
WRITE_COMMAND_WORD, | |
value, | |
0, | |
END_MARK] | |
write_result = yield self._write_data(buf) | |
# If the weather station reads the command properly it should | |
# replace the written data with 0xA5 characters. Check for | |
# this to confirm write data was received. | |
write_data_readback = yield self._read_data() | |
if write_data_readback: | |
for b in write_data_readback: | |
if b != SUCCESS_STATUS: | |
self.logger.error("Unexpected data found during write status check indicates that write failed") | |
defer.returnValue(False) | |
defer.returnValue(True) | |
else: | |
self.logger.error("No data available when attempting write status check") | |
defer.returnValue(False) | |
@defer.inlineCallbacks | |
def _write_command_data(self, data): | |
""" | |
Returns a deferred that fires with the write result upon detecting that | |
the data has been acknowledged by the weather station. | |
@param data: A list of tuples of (ptr, value) to write. | |
@type data: list(int) | |
""" | |
# Every write needs to include a write to the data changed address, | |
# which tells the weather station that a data write is finished and | |
# awaiting implementation. Stick that onto the end of the data list. | |
data_changed = (DATA_CHANGED_OFFSET, DATA_CHANGED_VALUE) | |
data.append(data_changed) | |
deferreds = [self._write_byte(ptr, value) for (ptr, value) in data] | |
write_results = yield defer.DeferredList(deferreds) | |
write_results_ok = True | |
for success, result in write_results: | |
if not success: | |
self.logger.error("Problem with write results in _write_command_data") | |
write_results_ok = False | |
if write_results_ok: | |
# Data has been written. Begin a task that periodically checks | |
# for confirmation that the written data has been read and | |
# acknowledged by the device. | |
d = defer.Deferred() | |
self._startWriteAcknowledgeTask(d) | |
defer.returnValue(d) | |
else: | |
defer.returnValue(False) | |
def _startWriteAcknowledgeTask(self, d): | |
""" | |
Starts the task that periodically runs the write acknowledge checking function. | |
""" | |
if self.writeAcknowledgeTaskRef is None: | |
self.logger.debug("Starting write acknowledge task") | |
self.writeAcknowledgeTaskRef = task.LoopingCall(self._writeAcknowledgeTask, d) | |
self.writeAcknowledgeTaskRef.start(1.0) | |
# schedule a timeout function to run in 5.0 seconds, if no write | |
# acknowledge has been detected. | |
self._writeAcknowledgeTaskTimeoutRef = reactor.callLater(5.0, self._writeAcknowledgeTaskTimeout, d) | |
else: | |
self.logger.error("Can't start another write acknowledge task because there is one already running!") | |
d.callback(False) | |
def _stopWriteAcknowledgeTask(self): | |
""" | |
Stops the task that periodically runs the write acknowledge checking function. | |
""" | |
self.logger.debug("Stopping write acknowledge task") | |
if self.writeAcknowledgeTaskRef and self.writeAcknowledgeTaskRef.running: | |
self.writeAcknowledgeTaskRef.stop() | |
self.writeAcknowledgeTaskRef = None | |
# cancel any scheduled timeout handlers | |
if self._writeAcknowledgeTaskTimeoutRef: | |
self._writeAcknowledgeTaskTimeoutRef.cancel() | |
self._writeAcknowledgeTaskTimeoutRef = None | |
def _writeAcknowledgeTaskTimeout(self, d): | |
""" | |
Cancels the task that periodically runs the write acknowledge checking function | |
as the timeout period in which we expect a write confirmation has expired. | |
""" | |
self.logger.debug("Write acknowledge timeout - canceling write acknowledge task") | |
self._stopWriteAcknowledgeTask() | |
# signal write acknowledge failure | |
d.callback(False) | |
@defer.inlineCallbacks | |
def _writeAcknowledgeTask(self, d): | |
""" | |
This function is scheduled to be called periodically after a write to check that | |
the weather station acknowledges the write data. It checks the data_changed field | |
that indicates the weather station has read the data written to it. This function | |
stops itself from being periodically called when it detects the weather station | |
acknowledges the written data. | |
""" | |
# Read a block (32 bytes) of data from the USB device. This block | |
# contains the data_changed field that we need to inspect for | |
# write acknowledgement. | |
fixed_block_data = yield self.read_settings_block(0x0020) | |
data_changed_value = fixed_block_data[DATA_CHANGED_OFFSET] | |
if data_changed_value == 0: | |
self._stopWriteAcknowledgeTask() | |
d.callback(True) | |
defer.returnValue(True) | |
class WeatherStation(pywws.WeatherStation.weather_station): | |
""" | |
A non-blocking version of the WeatherStation built on top of | |
the Twisted event driven networking framework. | |
This Weather Station functions as a service providing weather | |
data. The service can be started and stopped without having to | |
restart the software. | |
This Weather Station service is built on top of the pywws weather | |
station library. Extensive changes were required to achieve the | |
goal of a non-blocking, Twisted friendly, live weather monitor | |
design. | |
""" | |
reading_format = pywws.WeatherStation.weather_station.reading_format | |
fixed_format = pywws.WeatherStation.weather_station.fixed_format | |
data_start = pywws.WeatherStation.weather_station.data_start | |
reading_len = pywws.WeatherStation.weather_station.reading_len | |
# The following derived keys are added to all weather measurement | |
# requests to get_weather_data | |
derivedDataKeys = ['apparent_temp_in', | |
'apparent_temp_out', | |
'dew_point', | |
'invalid_keys', | |
'invalid_data', | |
'timestamp', | |
'wind_chill',] | |
def __init__(self, ws_type='1080'): | |
""" | |
Connect to weather station and prepare to read data. | |
""" | |
self.logger = logging.getLogger('WeatherStation') | |
self.device = None | |
self.rawSettingsData = None | |
self._data_block = None | |
self._data_pos = None | |
self.old_pos = None | |
self.ws_type = ws_type | |
self.settings_data = None | |
self.previous_results = None | |
self._synchroniseTaskRef = None | |
self._periodicUpdateTaskRef = None | |
self.rainToday = None | |
self.resetRainTaskRef = None | |
# defines all of the keys contained in the weather data returned | |
# from get_weather data, and in the current_data field of the | |
# current_results tuple returned from get_current_weather_data | |
# and to the live monitor callback handler. | |
self.weatherKeys = self.reading_format[self.ws_type].keys() | |
self.allWeatherKeys = self.weatherKeys + self.derivedDataKeys | |
def start(self): | |
""" | |
Start the Weather Station service by connecting to the USB device. | |
Return a deferred that returns a boolean indicating weather station | |
initialisation completed successfully. | |
""" | |
self.logger.info("Starting weather station") | |
d = self._initialise() | |
return d | |
def stop(self): | |
""" | |
Stop the Weather Station service by disconnecting from the USB device. | |
""" | |
self.logger.info("Stopping weather station") | |
self._stopSynchroniseTask() | |
self._stopPeriodicUpdateTask() | |
if self.resetRainTaskRef: | |
self.resetRainTaskRef.cancel() | |
self.device.close() | |
return defer.succeed(True) | |
@defer.inlineCallbacks | |
def _initialise(self): | |
""" | |
Perform a check to confirm weather station model settings are | |
supported by the weather station. This action also results in | |
the _fixed_data attribute being updated with the fixed data | |
block settings so that future fixed data block queries return | |
immediately (using cached data) unless unbuffered is set to True. | |
""" | |
self.device = UsbDevice() | |
if self.device.open(): | |
illuminance_value = yield self.get_settings_data(['max', 'illuminance', 'val']) | |
if illuminance_value: | |
if self.ws_type != '3080': | |
self.logger.warning('ws_type change %s -> %s', self.ws_type, '3080') | |
self.ws_type = '3080' | |
else: | |
if self.ws_type != '1080': | |
self.logger.warning('ws_type change %s -> %s', self.ws_type, '1080') | |
self.ws_type = '1080' | |
# update keys just in case the ws_type changes. | |
self.weatherKeys = self.reading_format[self.ws_type].keys() | |
self.allWeatherKeys = self.weatherKeys + self.derivedDataKeys | |
defer.returnValue(True) | |
else: | |
defer.returnValue(False) | |
def _processDecodedWeatherData(self, decoded_data): | |
""" | |
Inspect and process the weather data in the current data dict. | |
Occasionally communications with the remote sensor unit gets lost | |
and nothing gets stored in the current data block. Sometimes | |
just a specific sensor component does not report. For example | |
I have observed the UV and lux fields blank while all others are | |
fine. Accommodate these situations as best as possible. | |
* Detect invalid fields | |
* Add extra useful fields (timestamp, invalid_keys, etc) | |
* Add derived values calculated from measured values | |
""" | |
invalid_data_keys = [] | |
for k, v in decoded_data.items(): | |
if v is None: | |
invalid_data_keys.append(k) | |
# is all the data invalid or just a few keys? | |
if invalid_data_keys: | |
decoded_data_keys = decoded_data.keys() | |
decoded_data_keys.sort() | |
invalid_data_keys.sort() | |
decoded_data['invalid_keys'] = invalid_data_keys | |
decoded_data['invalid_data'] = decoded_data_keys == invalid_data_keys | |
else: | |
decoded_data['invalid_keys'] = [] | |
decoded_data['invalid_data'] = False | |
if decoded_data['invalid_keys']: | |
self.logger.info("Invalid data keys encountered - %s" % str(decoded_data['invalid_keys'])) | |
if decoded_data['invalid_data']: | |
self.logger.info("Invalid data update encountered") | |
# adjust rain to a daily value if performing live weather | |
if self.rainToday: | |
decoded_data['rain'] = decoded_data['rain'] - self.rainToday | |
# Add a timestamp to the data | |
decoded_data['timestamp'] = datetime.datetime.utcnow() | |
# Add derived measurement calculation noting that some calculations | |
# can not be performed if certain data fields are invalid. | |
not_available = 'N/A' | |
decoded_data['apparent_temp_out'] = not_available | |
decoded_data['apparent_temp_in'] = not_available | |
decoded_data['wind_chill'] = not_available | |
decoded_data['dew_point'] = not_available | |
# we overwrite the wind_dir enum with a string so don't overwrite to N/A here | |
if 'temp_out' not in decoded_data['invalid_keys'] and \ | |
'hum_out' not in decoded_data['invalid_keys'] and \ | |
'wind_ave' not in decoded_data['invalid_keys']: | |
decoded_data['apparent_temp_out'] = pywws.WeatherStation.apparent_temp(decoded_data['temp_out'], | |
decoded_data['hum_out'], | |
decoded_data['wind_ave']) | |
if 'temp_out' not in decoded_data['invalid_keys'] and \ | |
'hum_out' not in decoded_data['invalid_keys']: | |
decoded_data['dew_point'] = pywws.WeatherStation.dew_point(decoded_data['temp_out'], | |
decoded_data['hum_out']) | |
if 'temp_out' not in decoded_data['invalid_keys'] and \ | |
'wind_ave' not in decoded_data['invalid_keys']: | |
# Windchill is only defined for temperatures at or below 10 C | |
# and wind speeds above 4.8 kilometres per hour. Only report | |
# wind chill if appropriate. | |
if (decoded_data['temp_out'] <= 10.0) and (decoded_data['wind_ave'] <= 4.8): | |
decoded_data['wind_chill'] = pywws.WeatherStation.wind_chill(decoded_data['temp_out'], | |
decoded_data['wind_ave']) | |
if 'temp_in' not in decoded_data['invalid_keys'] and \ | |
'hum_in' not in decoded_data['invalid_keys']: | |
decoded_data['apparent_temp_in'] = pywws.WeatherStation.apparent_temp(decoded_data['temp_in'], | |
decoded_data['hum_in'], | |
0.0) | |
if 'wind_dir' not in decoded_data['invalid_keys']: | |
windDirectionList = pywws.WeatherStation.get_wind_dir_text() | |
windDirectionIndex = decoded_data['wind_dir'] | |
try: | |
# retrieve the wind direction description string by using | |
#the wind direction integer enumeration to index into a | |
# list of descriptions. | |
decoded_data['wind_dir'] = windDirectionList[windDirectionIndex] | |
except IndexError: | |
decoded_data['wind_dir'] = not_available | |
else: | |
decoded_data['wind_dir'] = not_available | |
def resetRainForNewDay(self, current_data=None): | |
""" | |
The rain value accumulates since the last head unit reset. | |
Take a sample to use as the reference for the day and schedule a | |
new reference sample to be taken tomorrow. | |
""" | |
if current_data is None: | |
current_data, current_pos, newDataAddress = self.get_current_weather_data() | |
if 'rain' not in current_data['invalid_keys']: | |
self.rainToday = current_data['rain'] | |
self.resetRainTaskRef = reactor.callLater(timeUntilTomorrow(), | |
self.resetRainForNewDay) | |
else: | |
# problem with rain value, try again soon | |
delay = 60.0 | |
self.resetRainTaskRef = reactor.callLater(delay, self.resetRainForNewDay) | |
self.logger.warning("Failed to reset rain value for new day - will retry in %s seconds" % delay) | |
@defer.inlineCallbacks | |
def get_raw_data(self, ptr, unbuffered=False): | |
""" | |
Get raw weather station data from the specified position in | |
the circular buffer. | |
If unbuffered is false then a cached value that was obtained | |
earlier may be returned.""" | |
if unbuffered: | |
self._data_pos = None | |
idx = ptr - (ptr % 0x20) | |
ptr -= idx | |
count = self.reading_len[self.ws_type] | |
# 0x20 (32) bytes is the default read block size. Determine | |
# the minimum number of reads to satisfy the request. | |
if ptr + count <= 0x20: | |
# reading doesn't straddle a block boundary | |
if self._data_pos != idx: | |
self._data_pos = idx | |
self._data_block = yield self.device.read_block(idx) | |
elif self._data_pos == idx + 0x20: | |
# reuse last block read | |
self._data_pos = idx | |
data = yield self.device.read_block(idx) | |
self._data_block = data + self._data_block[0:0x20] | |
elif self._data_pos != idx: | |
# read two blocks | |
self._data_pos = idx | |
data_read_results = yield defer.DeferredList([self.device.read_block(i) for i in [idx, idx + 0x20]]) | |
data = [] | |
for success, values in data_read_results: | |
data.extend(values) | |
self._data_block = data | |
elif len(self._data_block) <= 0x20: | |
# 'top up' current block | |
data = yield self.device.read_block(idx + 0x20) | |
self._data_block.extend(data) | |
defer.returnValue(self._data_block[ptr:ptr + count]) | |
@defer.inlineCallbacks | |
def get_raw_settings_data(self, unbuffered=False): | |
""" | |
Get the raw settings data. | |
""" | |
if unbuffered or not self.rawSettingsData: | |
raw_settings_data = yield self.device.read_settings_block() | |
self.rawSettingsData = raw_settings_data | |
defer.returnValue(self.rawSettingsData) | |
# Decoded data retrievers. | |
@defer.inlineCallbacks | |
def get_settings_data(self, keys=None, unbuffered=False): | |
""" | |
Get the decoded of settings data. | |
A subset of the entire data can be selected using keys. | |
""" | |
raw_settings_data = yield self.get_raw_settings_data(unbuffered) | |
decoded_settings_data = pywws.WeatherStation._decode(raw_settings_data, | |
self.fixed_format) | |
self.settings_data = decoded_settings_data | |
if keys: | |
for key in keys: | |
decoded_settings_data = decoded_settings_data[key] | |
defer.returnValue(decoded_settings_data) | |
@defer.inlineCallbacks | |
def get_weather_data(self, ptr, unbuffered=False): | |
""" | |
Returns decoded weather data read from the specified address in the circular buffer. | |
If unbuffered is false then a cached value that was obtained | |
earlier may be returned. | |
""" | |
raw_data = yield self.get_raw_data(ptr, unbuffered) | |
decoded_data = pywws.WeatherStation._decode(raw_data, | |
self.reading_format[self.ws_type]) | |
self._processDecodedWeatherData(decoded_data) | |
defer.returnValue(decoded_data) | |
@defer.inlineCallbacks | |
def current_pos(self): | |
""" | |
Returns a deferred that returns the circular buffer location where | |
current data is being written. | |
""" | |
current_pos = yield self.get_settings_data(['current_pos'], unbuffered=True) | |
defer.returnValue(current_pos) | |
@defer.inlineCallbacks | |
def get_current_weather_data(self): | |
""" | |
Return a deferred that returns a 3-tuple containing a dict of | |
current values, an integer address that the data was read | |
from and a boolean flag indicating if the position address is | |
new. | |
""" | |
self.logger.debug("getting current weather") | |
current_pos = yield self.current_pos() | |
newDataAddress = False | |
if current_pos != self.old_pos: | |
self.old_pos = current_pos | |
newDataAddress = True | |
current_data = yield self.get_weather_data(current_pos, unbuffered=True) | |
current_results = (current_data, current_pos, newDataAddress) | |
defer.returnValue(current_results) | |
# live weather functions | |
@defer.inlineCallbacks | |
def start_live_monitor(self, handler): | |
""" | |
The live polling period is every 48 seconds - which reflects how often | |
the data is actually updated in the memory area on the USB device. | |
There is no point querying for current data any faster than that. | |
Begin reading data every 2 seconds until a change is detected. This | |
signifies that new data has been updated into the current position. | |
""" | |
self.logger.info("Weather Station starting live monitor mode") | |
self.liveDataUpdateHandler = handler | |
fixed_block_data = yield self.get_settings_data() | |
read_period = fixed_block_data['read_period'] | |
current_data, current_pos, newDataAddress = yield self.get_current_weather_data() | |
delay = current_data['delay'] | |
self.resetRainForNewDay(current_data) | |
self.logger.debug("Read Period is %i minutes and %i minutes have elapsed since last data log" % (read_period, delay)) | |
next_log_time = datetime.datetime.utcnow() + datetime.timedelta(minutes=read_period) - datetime.timedelta(delay) | |
self.logger.debug("Expect next log entry to occur in %i minutes at %s" % (read_period - delay, next_log_time)) | |
self._startSynchroniseTask(2.0) | |
defer.returnValue(True) | |
def _startSynchroniseTask(self, interval, now=False): | |
""" | |
Start the synchronisation task | |
""" | |
self.logger.debug("Synchronisation task starting at %s second interval" % interval) | |
self._synchroniseTaskRef = task.LoopingCall(self._synchroniseTask) | |
self._synchroniseTaskRef.start(interval, now) | |
def _stopSynchroniseTask(self): | |
""" | |
Stop the synchronisation task | |
""" | |
if self._synchroniseTaskRef: | |
if self._synchroniseTaskRef.running: | |
self.logger.debug("Synchronisation task stopping") | |
self._synchroniseTaskRef.stop() | |
self._synchroniseTaskRef = None | |
@defer.inlineCallbacks | |
def _synchroniseTask(self): | |
""" | |
Retrieve current data and compare it to the previous data looking for a change | |
that signifies a data update. These should occur at 48 second intervals. | |
Upon detecting a data change indicating that a synchronise point has been | |
observed, start the periodic update task to run at 48 second intervals. | |
""" | |
self.logger.debug("Synchronisation task running") | |
current_results = yield self.get_current_weather_data() | |
current_data, current_pos, newDataAddress = current_results | |
if not current_data['invalid_data']: | |
if self.previous_results: | |
previous_data, previous_pos, previousNewAddr = self.previous_results | |
difference_detected = False | |
for key in previous_data: | |
# ignore the internally generated 'derived' fields from comparison | |
if key in self.weatherKeys: | |
if previous_data[key] != current_data[key]: | |
difference_detected = True | |
break | |
if difference_detected: | |
self.logger.info("Data update detected - synchronisation achieved") | |
self._stopSynchroniseTask() | |
self._startPeriodicUpdateTask() | |
self.liveDataUpdateHandler(current_results) | |
else: | |
self.previous_results = current_results | |
self.liveDataUpdateHandler(current_results) | |
else: | |
self.logger.info("Invalid data encountered - remote sensor communications lost ?") | |
defer.returnValue(True) | |
def _startPeriodicUpdateTask(self): | |
live_interval = 48.0 | |
self.logger.debug("Starting periodic update task at %s second interval" % live_interval) | |
self._periodicUpdateTaskRef = task.LoopingCall(self._periodicUpdateTask) | |
self._periodicUpdateTaskRef.start(live_interval, now=False) | |
def _stopPeriodicUpdateTask(self): | |
self.logger.debug("Stopping periodic update task") | |
if self._periodicUpdateTaskRef and self._periodicUpdateTaskRef.running: | |
self._periodicUpdateTaskRef.stop() | |
@defer.inlineCallbacks | |
def _periodicUpdateTask(self): | |
""" | |
Retrieve the latest data from the current data log position and | |
return it through the liveDataUpdateHandler method supplied to | |
the start_live_monitor function. | |
""" | |
self.logger.info("Periodic update task running") | |
current_results = yield self.get_current_weather_data() | |
self.liveDataUpdateHandler(current_results) | |
defer.returnValue(True) | |
if __name__ == "__main__": | |
# | |
# A simple Live weather demonstration | |
# | |
def live_weather_data_handler(update_data): | |
""" | |
Process a live weather update. | |
""" | |
current_data, current_pos, newDataAddress = update_data | |
print "\nTimestamp: %s" % current_data['timestamp'] | |
print "Measured Values:" | |
print " Abs Pressure: %s" % current_data['abs_pressure'] | |
print " Inside temperature: %sC" % current_data['temp_in'] | |
print " Outside temperature: %sC" % current_data['temp_out'] | |
print " Inside humidity: %s%%" % current_data['hum_in'] | |
print " Outside humidity: %s%%" % current_data['hum_out'] | |
print " UV Index: %s" % current_data['uv'] | |
print " Illuminance: %s" % current_data['illuminance'] | |
print " Rain (mm): %s" % current_data['rain'] | |
print " Wind Avg Speed: %s" % current_data['wind_ave'] | |
print " Wind dir: %s" % current_data['wind_dir'] | |
print " Wind Gust Speed: %s" % current_data['wind_gust'] | |
print "Derived Values:" | |
print " Inside apparent temperature: %sC" % current_data['apparent_temp_in'] | |
print " Outside apparent temperature: %sC" % current_data['apparent_temp_out'] | |
print " Dew Point: %s" % current_data['dew_point'] | |
print " Wind Chill: %s" % current_data['wind_chill'] | |
weatherStation = WeatherStation(ws_type='3080') | |
reactor.callWhenRunning(weatherStation.start) | |
reactor.callLater(3.0, weatherStation.start_live_monitor, live_weather_data_handler) | |
reactor.addSystemEventTrigger('before', 'shutdown', weatherStation.stop) | |
reactor.run() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Dear Claws
Kindly please tell me How to modify your weather_station.py to work with kind of 'generic' USB-HID.
the device just send comma delimeted values
Actualy, I have existing twisted script that listen to an arduino device, using serial.
Currently it receive data (from arduino) in format, i.e : BTN,2,0 or SWC,7,1
Where :
But I plan to replace this arduino with http://digistump.com/products/1
The new device is not USB-SERIAL, it's usb-hid
Sincerely
-bino-