Skip to content

Instantly share code, notes, and snippets.

@HoracioDos
Created April 29, 2019 23:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save HoracioDos/627de9e8e9cb3a85f470255bf2f47e7e to your computer and use it in GitHub Desktop.
Save HoracioDos/627de9e8e9cb3a85f470255bf2f47e7e to your computer and use it in GitHub Desktop.
Weewx MQTT extension with enable option
# $Id: mqtt.py 1784 2018-08-26 15:46:33Z mwall $
# Copyright 2013 Matthew Wall
"""
Upload data to MQTT server
This service requires the python bindings for mqtt:
pip install paho-mqtt
Minimal configuration:
[StdRestful]
[[MQTT]]
server_url = mqtt://username:password@localhost:1883/
topic = weather
unit_system = METRIC
Use of the inputs map to customer name, format, or units:
[StdRestful]
[[MQTT]]
...
unit_system = METRIC # default to metric
[[[inputs]]]
[[[[outTemp]]]]
name = inside_temperature # use a label other than outTemp
format = %.2f # two decimal places of precision
units = degree_F # convert outTemp to F, others in C
[[[[windSpeed]]]]
units = knot # convert the wind speed to knots
Use of TLS to encrypt connection to broker. The TLS options will be passed to
Paho client tls_set method. Refer to Paho client documentation for details:
https://eclipse.org/paho/clients/python/docs/
[StdRestful]
[[MQTT]]
...
[[[tls]]]
# CA certificates file (mandatory)
ca_certs = /etc/ssl/certs/ca-certificates.crt
# PEM encoded client certificate file (optional)
certfile = /home/user/.ssh/id.crt
# private key file (optional)
keyfile = /home/user/.ssh/id.key
# Certificate requirements imposed on the broker (optional).
# Options are 'none', 'optional' or 'required'.
# Default is 'required'.
cert_reqs = required
# SSL/TLS protocol (optional).
# Options include sslv1, sslv2, sslv23, tls, tlsv1.
# Default is 'tlsv1'
# Not all options are supported by all systems.
tls_version = tlsv1
# Allowable encryption ciphers (optional).
# To specify multiple cyphers, delimit with commas and enclose
# in quotes.
#ciphers =
"""
import Queue
import paho.mqtt.client as mqtt
import sys
import syslog
import time
import urlparse
try:
import cjson as json
setattr(json, 'dumps', json.encode)
setattr(json, 'loads', json.decode)
except (ImportError, AttributeError):
try:
import simplejson as json
except ImportError:
import json
import weewx
import weewx.restx
import weewx.units
from weeutil.weeutil import to_bool, accumulateLeaves
VERSION = "0.18"
if weewx.__version__ < "3":
raise weewx.UnsupportedFeature("weewx 3 is required, found %s" %
weewx.__version__)
def logmsg(level, msg):
syslog.syslog(level, 'restx: MQTT: %s' % msg)
def logdbg(msg):
logmsg(syslog.LOG_DEBUG, msg)
def loginf(msg):
logmsg(syslog.LOG_INFO, msg)
def logerr(msg):
logmsg(syslog.LOG_ERR, msg)
def _compat(d, old_label, new_label):
if old_label in d and new_label not in d:
d.setdefault(new_label, d[old_label])
d.pop(old_label)
def _obfuscate_password(url):
parts = urlparse.urlparse(url)
if parts.password is not None:
# split out the host portion manually. We could use
# parts.hostname and parts.port, but then you'd have to check
# if either part is None. The hostname would also be lowercased.
host_info = parts.netloc.rpartition('@')[-1]
parts = parts._replace(netloc='{}:xxx@{}'.format(
parts.username, host_info))
url = parts.geturl()
return url
# some unit labels are rather lengthy. this reduces them to something shorter.
UNIT_REDUCTIONS = {
'degree_F': 'F',
'degree_C': 'C',
'inch': 'in',
'mile_per_hour': 'mph',
'mile_per_hour2': 'mph',
'km_per_hour': 'kph',
'km_per_hour2': 'kph',
'meter_per_second': 'mps',
'meter_per_second2': 'mps',
'degree_compass': None,
'watt_per_meter_squared': 'Wpm2',
'uv_index': None,
'percent': None,
'unix_epoch': None,
}
# return the units label for an observation
def _get_units_label(obs, unit_system):
(unit_type, _) = weewx.units.getStandardUnitType(unit_system, obs)
return UNIT_REDUCTIONS.get(unit_type, unit_type)
# get the template for an observation based on the observation key
def _get_template(obs_key, overrides, append_units_label, unit_system):
tmpl_dict = dict()
if append_units_label:
label = _get_units_label(obs_key, unit_system)
if label is not None:
tmpl_dict['name'] = "%s_%s" % (obs_key, label)
for x in ['name', 'format', 'units']:
if x in overrides:
tmpl_dict[x] = overrides[x]
return tmpl_dict
class MQTT(weewx.restx.StdRESTbase):
def __init__(self, engine, config_dict):
"""This service recognizes standard restful options plus the following:
Required parameters:
server_url: URL of the broker, e.g., something of the form
mqtt://username:password@localhost:1883/
Default is None
Optional parameters:
unit_system: one of US, METRIC, or METRICWX
Default is None; units will be those of data in the database
topic: the MQTT topic under which to post
Default is 'weather'
append_units_label: should units label be appended to name
Default is True
obs_to_upload: Which observations to upload. Possible values are
none or all. When none is specified, only items in the inputs list
will be uploaded. When all is specified, all observations will be
uploaded, subject to overrides in the inputs list.
Default is all
inputs: dictionary of weewx observation names with optional upload
name, format, and units
Default is None
tls: dictionary of TLS parameters used by the Paho client to establish
a secure connection with the broker.
Default is None
"""
super(MQTT, self).__init__(engine, config_dict)
loginf("service version is %s" % VERSION)
try:
site_dict = config_dict['StdRESTful']['MQTT']
site_dict = accumulateLeaves(site_dict, max_level=1)
site_dict['server_url']
except KeyError, e:
logerr("Data will not be uploaded: Missing option %s" % e)
return
# for backward compatibility: 'units' is now 'unit_system'
_compat(site_dict, 'units', 'unit_system')
site_dict.setdefault('topic', 'weather')
site_dict.setdefault('append_units_label', True)
site_dict.setdefault('augment_record', True)
site_dict.setdefault('obs_to_upload', 'all')
site_dict.setdefault('retain', False)
site_dict.setdefault('aggregation', 'individual,aggregate')
site_dict.setdefault('enable', True)
site_dict['enable'] = to_bool(site_dict.get('enable'))
if site_dict.get('enable'):
usn = site_dict.get('unit_system', None)
if usn is not None:
site_dict['unit_system'] = weewx.units.unit_constants[usn]
if 'tls' in config_dict['StdRESTful']['MQTT']:
site_dict['tls'] = dict(config_dict['StdRESTful']['MQTT']['tls'])
if 'inputs' in config_dict['StdRESTful']['MQTT']:
site_dict['inputs'] = dict(config_dict['StdRESTful']['MQTT']['inputs'])
site_dict['append_units_label'] = to_bool(site_dict.get('append_units_label'))
site_dict['augment_record'] = to_bool(site_dict.get('augment_record'))
site_dict['retain'] = to_bool(site_dict.get('retain'))
binding = site_dict.pop('binding', 'archive')
loginf("binding to %s" % binding)
# if we are supposed to augment the record with data from weather
# tables, then get the manager dict to do it. there may be no weather
# tables, so be prepared to fail.
try:
if site_dict.get('augment_record'):
_manager_dict = weewx.manager.get_manager_dict_from_config(config_dict, 'wx_binding')
site_dict['manager_dict'] = _manager_dict
except weewx.UnknownBinding:
pass
self.archive_queue = Queue.Queue()
self.archive_thread = MQTTThread(self.archive_queue, **site_dict)
self.archive_thread.start()
if 'archive' in binding:
self.bind(weewx.NEW_ARCHIVE_RECORD, self.new_archive_record)
if 'loop' in binding:
self.bind(weewx.NEW_LOOP_PACKET, self.new_loop_packet)
if 'topic' in site_dict:
loginf("topic is %s" % site_dict['topic'])
if usn is not None:
loginf("desired unit system is %s" % usn)
loginf("data will be uploaded to %s" %
_obfuscate_password(site_dict['server_url']))
if 'tls' in site_dict:
loginf("network encryption/authentication will be attempted")
else:
logerr("Extension is not enabled")
return
def new_archive_record(self, event):
self.archive_queue.put(event.record)
def new_loop_packet(self, event):
self.archive_queue.put(event.packet)
class TLSDefaults(object):
def __init__(self):
import ssl
# Paho acceptable TLS options
self.TLS_OPTIONS = [
'ca_certs', 'certfile', 'keyfile',
'cert_reqs', 'tls_version', 'ciphers'
]
# map for Paho acceptable TLS cert request options
self.CERT_REQ_OPTIONS = {
'none': ssl.CERT_NONE,
'optional': ssl.CERT_OPTIONAL,
'required': ssl.CERT_REQUIRED
}
# Map for Paho acceptable TLS version options. Some options are
# dependent on the OpenSSL install so catch exceptions
self.TLS_VER_OPTIONS = dict()
try:
self.TLS_VER_OPTIONS['sslv2'] = ssl.PROTOCOL_SSLv2
except AttributeError:
pass
try:
self.TLS_VER_OPTIONS['sslv3'] = ssl.PROTOCOL_SSLv3
except AttributeError:
pass
self.TLS_VER_OPTIONS['sslv23'] = ssl.PROTOCOL_SSLv23
self.TLS_VER_OPTIONS['tlsv1'] = ssl.PROTOCOL_TLSv1
try:
self.TLS_VER_OPTIONS['tls'] = ssl.PROTOCOL_TLS
except AttributeError:
pass
class MQTTThread(weewx.restx.RESTThread):
def __init__(self, queue, server_url,
topic='', unit_system=None, skip_upload=False,
augment_record=True, retain=False, aggregation='individual',
inputs={}, obs_to_upload='all', append_units_label=True,
manager_dict=None, tls=None,
post_interval=None, max_backlog=sys.maxint, stale=None,
log_success=True, log_failure=True,
timeout=60, max_tries=3, retry_wait=5, enable=True):
super(MQTTThread, self).__init__(queue,
protocol_name='MQTT',
manager_dict=manager_dict,
post_interval=post_interval,
max_backlog=max_backlog,
stale=stale,
log_success=log_success,
log_failure=log_failure,
max_tries=max_tries,
timeout=timeout,
retry_wait=retry_wait)
self.enable = enable
self.server_url = server_url
self.topic = topic
self.upload_all = True if obs_to_upload.lower() == 'all' else False
self.append_units_label = append_units_label
self.tls_dict = {}
if tls is not None:
# we have TLS options so construct a dict to configure Paho TLS
dflts = TLSDefaults()
for opt in tls:
if opt == 'cert_reqs':
if tls[opt] in dflts.CERT_REQ_OPTIONS:
self.tls_dict[opt] = dflts.CERT_REQ_OPTIONS.get(tls[opt])
elif opt == 'tls_version':
if tls[opt] in dflts.TLS_VER_OPTIONS:
self.tls_dict[opt] = dflts.TLS_VER_OPTIONS.get(tls[opt])
elif opt in dflts.TLS_OPTIONS:
self.tls_dict[opt] = tls[opt]
logdbg("TLS parameters: %s" % self.tls_dict)
self.inputs = inputs
self.unit_system = unit_system
self.augment_record = augment_record
self.retain = retain
self.aggregation = aggregation
self.templates = dict()
self.skip_upload = skip_upload
def filter_data(self, record):
# if uploading everything, we must check the upload variables list
# every time since variables may come and go in a record. use the
# inputs to override any generic template generation.
if self.upload_all:
for f in record:
if f not in self.templates:
self.templates[f] = _get_template(f,
self.inputs.get(f, {}),
self.append_units_label,
record['usUnits'])
# otherwise, create the list of upload variables once, based on the
# user-specified list of inputs.
elif not self.templates:
for f in self.inputs:
self.templates[f] = _get_template(f, self.inputs[f],
self.append_units_label,
record['usUnits'])
# loop through the templates, populating them with data from the record
data = dict()
for k in self.templates:
try:
v = float(record.get(k))
name = self.templates[k].get('name', k)
fmt = self.templates[k].get('format', '%s')
to_units = self.templates[k].get('units')
if to_units is not None:
(from_unit, from_group) = weewx.units.getStandardUnitType(
record['usUnits'], k)
from_t = (v, from_unit, from_group)
v = weewx.units.convert(from_t, to_units)[0]
s = fmt % v
data[name] = s
except (TypeError, ValueError):
pass
# FIXME: generalize this
if 'latitude' in data and 'longitude' in data:
parts = [str(data['latitude']), str(data['longitude'])]
if 'altitude_meter' in data:
parts.append(str(data['altitude_meter']))
elif 'altitude_foot' in data:
parts.append(str(data['altitude_foot']))
data['position'] = ','.join(parts)
return data
def process_record(self, record, dbm):
import socket
if self.augment_record and dbm is not None:
record = self.get_record(record, dbm)
if self.unit_system is not None:
record = weewx.units.to_std_system(record, self.unit_system)
data = self.filter_data(record)
if weewx.debug >= 2:
logdbg("data: %s" % data)
if self.skip_upload:
loginf("skipping upload")
return
url = urlparse.urlparse(self.server_url)
for _count in range(self.max_tries):
try:
mc = mqtt.Client()
if url.username is not None and url.password is not None:
mc.username_pw_set(url.username, url.password)
# if we have TLS opts configure TLS on our broker connection
if len(self.tls_dict) > 0:
mc.tls_set(**self.tls_dict)
mc.connect(url.hostname, url.port)
mc.loop_start()
if self.aggregation.find('aggregate') >= 0:
tpc = self.topic + '/loop'
(res, mid) = mc.publish(tpc, json.dumps(data),
retain=self.retain)
if res != mqtt.MQTT_ERR_SUCCESS:
logerr("publish failed for %s: %s" % (tpc, res))
if self.aggregation.find('individual') >= 0:
for key in data:
tpc = self.topic + '/' + key
(res, mid) = mc.publish(tpc, data[key],
retain=self.retain)
if res != mqtt.MQTT_ERR_SUCCESS:
logerr("publish failed for %s: %s" % (tpc, res))
mc.loop_stop()
mc.disconnect()
return
except (socket.error, socket.timeout, socket.herror), e:
logdbg("Failed upload attempt %d: %s" % (_count+1, e))
time.sleep(self.retry_wait)
else:
raise weewx.restx.FailedPost("Failed upload after %d tries" %
(self.max_tries,))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment