Skip to content

Instantly share code, notes, and snippets.

@claws claws/
Last active Apr 22, 2016

What would you like to do?
Publish Australian weather observations to MQTT
#!/usr/bin/env python
Chris Laws 2013
Publish Australian weather observations over MQTT.
This script periodically retrieves weather observations from the
Australia Bureau of Meteorology. The source data comes from a file
that has the designator IDY03021 - which represents the National
Meteorological Operations Centre. This file contains observation
data for all Australian states and territories.
The information is parsed into station data for each state and
published on topics constructed from state name and station name.
An example is: bomau/South Australia/Adelaide
import datetime
import json
import logging
from MQTT import MQTTProtocol
from twisted.application.service import Service
from twisted.internet import reactor, defer
from twisted.internet.protocol import ClientCreator
from twisted.internet.task import LoopingCall
from twisted.python.log import PythonLoggingObserver
from twisted.web.client import getPage
# There are currently two public MQTT test servers.
mqtt_broker = ""
#mqtt_broker = ""
observations_url = ''
# Attempting to comply with BoM secondary distribution guildelines. These
# items will accompany any data published by this applicaiton.
copyright_statement = "Copyright Commonwealth of Australia , Bureau of Meteorology (ABN 92 637 533 532)"
data_source = ''
NA = 'N/A'
CALM = 'Calm'
Precis_Expansion = {'----': NA,
'Cldy': 'Cloudy',
'Clr': 'Clear',
'Drzl': 'Drizzle',
'Fine': 'Fine',
'Haze': 'Haze',
'Fog': 'Foggy',
'Ocst': 'Overcast',
'Rain': 'Rain',
'Smke': 'Smoky',
'Snow': 'Snow',
'Wndy': 'Windy'}
WIND = 'wind'
NAME = 'name'
CLOUD_COVER = 'cloud_cover'
DAY = 'day'
HOUR = 'hour'
LATITUDE = 'latitude'
LONGITUDE = 'longitude'
PRECIS = 'precis'
PRESSURE = 'pressure_hpa'
RAIN = 'rain_mm'
RELATIVE_HUMIDITY = 'rel_humidity'
TEMPERATURE = 'temperature'
TEMPERATURE_MAX = 'temperature_max'
TEMPERATURE_MIN = 'temperature_min'
UPDATED = 'updated'
VISIBILITY = 'visibility_km'
WIND_DIRECTION = 'wind_direction'
WIND_SPEED = 'wind_speed_kmh'
# Sometimes weather stations are offline. In these cases a set of
# fields contain default data indicating no data is available.
# Define a set of fields which are expected to contain data that
# changes from update to update. We can use this to filter out
# stations that contain no data.
# The following dict is used to crack each station line. The dict
# contains the field name and a start and stop index. The characters
# between these markers will get extracted to represent the value for
# the named field.
FIELDS = {NAME: (0, 10),
LATITUDE: (11, 16),
LONGITUDE: (17, 22),
DAY: (23, 26),
HOUR: (26, 30),
VISIBILITY: (31, 33),
CLOUD_COVER: (35, 36),
WIND: (37, 44),
TEMPERATURE: (44, 47),
PRESSURE: (51, 58),
RAIN: (59, 65),
PRECIS: (67, 71),
class MQTTPublisher(MQTTProtocol):
pingPeriod = 60
def __init__(self, client_id, onBrokerConnected):
self.client_id = client_id
self.onBrokerConnected = onBrokerConnected
def connectionMade(self):
''' Physical connection made to broker, now perform protocol connection ''''Connected to MQTT Broker')
self.connect(self.client_id, keepalive=self.pingPeriod * 1000)
reactor.callLater(self.pingPeriod, self.pingreq)
def connectionLost(self, reason):
''' '''"Disconnected from MQTT Broker: %s" % reason)
def pingrespReceived(self):'Ping received from MQTT broker')
reactor.callLater(self.pingPeriod, self.pingreq)
def connackReceived(self, status):
if status == 0:
else:'Connection to MQTT broker failed')
class BomauObservationsClient(object):
The observations client allows a user to obtain BoM observations for a
specified URL. The client can operate in two modes.
The first mode simply retrieves the BoM observations whenever the
Client's get_observations method is called.
The second mode will keep the client's observations attribute
up to date by running a periodic task that retrieves the latest observation.
The update routine used in this mode is optimized to make the fewest
update requests as possible to maintain current data. It inspects the first
response and determines the appropriate time to begin the periodic
observations retrieval such that the minimum number of requests are
made to keep the observations up to date.
def __init__(self, observation_url):
self.observation_url = observation_url
# A reference to the task that periodically requests an
# observation update. This is needed so the task can be
# stopped later.
self.periodicRetrievalTask = None
self.mqttConnection = None
# This list is used to temporarily hold the list of publish
# messages
self.publishQueue = []
def start(self):
''' Start the weather observation MQTT publisher ''''BoM Observation Client starting')
clientCreator = ClientCreator(reactor,
self.onMQTTBrokerCommunicationEstablished)'Creating MQTT client')
self.mqttConnection = yield clientCreator.connectTCP(mqtt_broker, 1883)
def stop(self):
Stop monitoring sensors in and around the home environment
"""'BoM Observation Client stopping')
if self.mqttConnection:
if self.periodicRetrievalTask:
def onMQTTBrokerCommunicationEstablished(self):
Upon connection to MQTT Broker begin periodic weather
observation retrievals.
def retrieveObservations(self):
Retrieve the latest BoM observation and store it
observations = yield self.get_observations()
if observations:"BoM observations for %i regions retrieved successfully" % len(observations))
# convert observations into MQTT format messages and add them
# into a queue for publishing.
for state in observations:
# for this demonstration only publish stations for the
# state of South Australia.
if state in ['South Australia']:"%s has %i stations" %(state, len(observations[state])))
for station in observations[state]:
# for this demonstration only publish the Adelaide
# station data.
if station == 'Adelaide':
payload = json.dumps(observations[state][station])
topic = 'bomau/%s/%s' % (state, station)
self.publishQueue.append((topic, payload))
if self.publishQueue:"Beginning to publish %i updates to MQTT Broker" % len(self.publishQueue))
def get_observations(self):
Retrieve the latest observations from the BOM in JSON format.
Returns a deferred that will eventually return an Observation
object with attributes populated from parsing the JSON update.
@return: A deferred that returns an Observations object
@rtype: defer.Deferred
logging.debug("Requesting new observation data from: %s" % self.observation_url)
pageHtml = yield getPage(self.observation_url)
log.debug("Retrieved new observation data")
# observations is a dict of state name keys holding dicts
# as values. Each state holds a dict of station names with
# a dict value containing observation data.
observations = {}
lines = pageHtml.split('\n')
start = lines.index('<pre style="font: Courier;">')
end = lines.index('</pre>')
lines = lines[start+1:end]
# Search through the lines (from bottom to top so we encounter
# the Antarctica section first which has time in UTC) to find
# the UTC time that this update was recorded so a timer can
# be set to perform the next retrieval.
for line in reversed(lines):
if 'Updated: ' in line:
line = line[1:-1]
line = line.replace('Updated: ', '')
line = line.split('(')[0]
update_time_utc = line.strip()
# Eg. Mon Apr 29 04:10:24 UTC 2013 (0410 UTC)
format = "%a %b %d %H:%M:%S UTC %Y"
updated_utc = datetime.datetime.strptime(update_time_utc,
# Add five minutes to the next measurement update time to
# allow the data to be updated on the BoM website.
next_update_time = updated_utc + datetime.timedelta(minutes=65)
now_utc = datetime.datetime.utcnow()
time_until_next_update = next_update_time - now_utc"Next observation retrieval will occur after delay of: %s" % time_until_next_update)
delay_in_seconds = time_until_next_update.total_seconds()
self.periodicRetrievalTask = reactor.callLater(delay_in_seconds,
# recombine lines so they can be split on region (state)
# boundaries.
data = '\n'.join(lines)
state_sections = data.split('\nIDY')[1:]
print "detected %i state sections" % len(state_sections)
for state_section in state_sections:
lines = state_section.split('\n')[1:]
state = None
station = None
for line in lines:
if True in [line.startswith(x) for x in ['+', '|']]:
# this is a header banner line
if 'Weather Observations : ' in line:
state = line[1:-1]
state = state.replace('Weather Observations : ', '')
state = state.strip()
observations[state] = {}
# initialise station data to contain some mandatory fields
d = {'updated' : update_time_utc,
'copyright': copyright_statement,
'source': data_source}
for field, (start, stop) in FIELDS.items():
value = line[start:stop]
value = value.strip()
# The wind field is always separated into
# two fields representing direction and speed.
if value.startswith('-') and value.endswith('-'):
# This field has no data
if field == WIND:
d[field] = NA
# This field has data
if field == PRECIS:
value = Precis_Expansion.get(value, value)
d[field] = value
elif field == WIND:
if value == CALM:
d[WIND_SPEED] = '0'
wind_dir, wind_speed_kt = value.split('/')
if wind_dir == '---':
wind_dir = NA
if wind_speed_kt == "---":
wind_speed_kmh = NA
# convert kt to km/h
wind_speed_kmh = int(wind_speed_kt) * 0.539956803456
wind_speed_kmh = "%.2f" % wind_speed_kmh
d[WIND_DIRECTION] = wind_dir
d[WIND_SPEED] = wind_speed_kmh
elif field == NAME:
if value.endswith('AP') or value.endswith('Ap'):
value = value[:-2]
if not value.endswith(' '):
value += ' '
value += 'Airport'
station = value
d[field] = value
# Only add station data if there was some useful data.
station_data = [d[k] != NA for k in CHANGING_DATA_FIELDS]
station_data_available = True in station_data
if station_data_available:
# Stations are sometimes listed twice.
if station in observations[state]:
identical_day = observations[state][station][DAY] == d[DAY]
identical_hour = observations[state][station][HOUR] == d[HOUR]
if identical_hour and identical_hour:
# Blend the values if the times are
# identical. Only take values from
# the second instance if existing
# value is 'N/A'
for k, v in observations[state][station].items():
if v == NA:
observations[state][station][k] = d[k]
elif d[DAY] != observations[state][station][DAY]:
# If the days do not match then take the
# most recent day which is generally the
# larger number, expect at month rollover.
if (d[DAY] > observations[state][station][DAY]) or (d[DAY] == 1):
# This is the end of month rollover or the
# end of night rollover. Overwrite the earlier
# data with more up to date data.
observations[state][station] = d
observations[state][station] = d
log.debug("Ignoring %s - no data available" % station)
log.debug("%s contained %i stations" % (state, len(observations[state])))
except Exception, ex:
log.error("Unable to retrieve observations data:")
def publishUpdates(self):
Publish updates to MQTT Broker.
Publish one message fromthe queue at a time. Reschedule
calls back to this function until all messages are sent.
This approach gives some time back to the reactor to handle
other things.
if self.publishQueue:
topic, message = self.publishQueue.pop()
self.mqttConnection.publish(topic, message)
if self.publishQueue:
reactor.callLater(0.1, self.publishUpdates)
else:"Completed publishing observation updates.")
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)-15s: %(levelname)s - %(filename)s:%(lineno)d - %(message)s',
log = logging.getLogger(__name__)
# Send twisted log output to common logger too
bomau_weather = BomauObservationsClient(observations_url)
#!/usr/bin/env python
Chris Laws 2013
This script subscribes for MQTT publications of weather observations
for Adelaide, South Australia.
import logging
from MQTT import MQTTProtocol
from twisted.internet import reactor
from twisted.internet.protocol import ClientCreator
from twisted.python.log import PythonLoggingObserver
class MQTTListener(MQTTProtocol):
pingPeriod = 60 # seconds
def connectionMade(self):'Connected to MQTT Broker')
self.connect("BomAuSub", keepalive=self.pingPeriod * 1000)
reactor.callLater(self.pingPeriod, self.pingreq)
def pingrespReceived(self):'Ping response received from MQTT broker')
reactor.callLater(self.pingPeriod, self.pingreq)
def connackReceived(self, status):
if status == 0:
topic = "bomau/South Australia/Adelaide"
self.subscribe(topic)"Subscribed for topic: %s" % topic)
log.error('Connection to MQTT broker failed')
def publishReceived(self, topic, message, qos, dup, retain, messageId):
# Received a publish on an output topic'Update received. Topic: %s, Message: %s' % (topic, message))
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)-15s: %(levelname)s - %(filename)s:%(lineno)d - %(message)s',
log = logging.getLogger(__name__)
# Send twisted log output to common logger too
# There are currently two public MQTT test servers.
mqtt_broker = ""
#mqtt_broker = ""
clientCreator = ClientCreator(reactor,
MQTTListener)'Creating MQTT client')
reactor.callWhenRunning(clientCreator.connectTCP, mqtt_broker, 1883)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.