Skip to content

Instantly share code, notes, and snippets.

@RouquinBlanc
Created August 9, 2015 20:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save RouquinBlanc/814e55e436674303a067 to your computer and use it in GitHub Desktop.
Save RouquinBlanc/814e55e436674303a067 to your computer and use it in GitHub Desktop.
Retrieving pressure and temperature from MPL3115A2 sensor and publishing to Domoticz via MQTT broker
"""
Script interacting with MPL3115A2 sensor via I2C and publishing the resulting
pressure and temperature to a Domoticz server, via MQTT broker.
"""
from smbus import SMBus
import paho.mqtt.publish as publish
import time
import json
ADDR = 0x60
CTRL_REG1 = 0x26
PT_DATA_CFG = 0x13
WHO_AM_I = 0x0C
class MPL3115A2:
"""Interact with MPL3115A2 I2C barometric module"""
def __init__(self, bus):
self.bus = SMBus(bus)
def check_myself(self):
i = self.read(WHO_AM_I)
return i == 0xc4
def readn(self, reg, data):
time.sleep(0.010)
attempt = 10
while attempt > 0:
try:
return self.bus.read_i2c_block_data(ADDR, reg, data)
except IOError:
time.sleep(0.5)
attempt -= 1
raise IOError("Failed 10 times to read")
def read(self, reg):
return self.readn(reg, 1)[0]
def status(self):
return self.read(0x00)
def write(self, reg, data):
self.bus.write_i2c_block_data(ADDR, reg, [data])
def set_event_flag(self):
self.write(PT_DATA_CFG, 0x07)
def toggle_one_shot(self):
setting = self.read(CTRL_REG1)
if (setting & 0x02) == 0:
self.write(CTRL_REG1, (setting | 0x02))
def wait_for_data(self):
self.toggle_one_shot()
status = dev.status()
while (status & 0x08) == 0:
status = dev.status()
time.sleep(0.5)
dev = MPL3115A2(0)
if not dev.check_myself():
print("Device not active.")
exit(1)
# Set oversample rate to 128
newSetting = dev.read(CTRL_REG1) | 0x38
dev.write(CTRL_REG1, newSetting)
# Enable event flags
dev.set_event_flag()
# Read sensor data
print("Waiting for data...")
dev.wait_for_data()
print("Reading sensor data...")
p_data = dev.readn(0x01, 3)
t_data = dev.readn(0x04, 2)
p_msb = p_data[0]
p_csb = p_data[1]
p_lsb = p_data[2]
t_msb = t_data[0]
t_lsb = t_data[1]
pressure = (p_msb << 10) | (p_csb << 2) | (p_lsb >> 6)
p_decimal = ((p_lsb & 0x30) >> 4)/4.0
pressure = (pressure+p_decimal)/100.0
celsius = t_msb + (t_lsb >> 4)/16.0
fahrenheit = (celsius * 9)/5 + 32
# Check quickly if temperature and pressure are meaningful
if (pressure < 900) or (pressure > 1050) or (celsius < 5) or (celsius > 45):
print("Meaningless values...")
exit(1)
print("Pressure and Temperature at "+time.strftime('%m/%d/%Y %H:%M:%S%z'))
print(str(pressure)+" hPa")
print(str(celsius)+" C")
print(str(fahrenheit)+" F")
print("Send data to Domoticz...")
msgs = [
{
'topic':"domoticz/in",
'payload':
json.dumps(
{"idx": 17, "nvalue": 0, "svalue": ("%.2f" % celsius)})},
{
'topic':"domoticz/in",
'payload':
json.dumps(
{"idx": 18, "nvalue": 0, "svalue": ("%.2f;0" % pressure)})}
]
publish.multiple(msgs, hostname="ds")
@RouquinBlanc
Copy link
Author

Publish an UV sensor (previously added with dummy hardware):

import paho.mqtt.publish as publish
import json

uv = 3.2

publish.single("domoticz/in", json.dumps({"idx": 33, "nvalue": 0, "svalue": ("%.1f;0" % uv)}), hostname="ds")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment