Created
August 9, 2015 20:29
-
-
Save RouquinBlanc/814e55e436674303a067 to your computer and use it in GitHub Desktop.
Retrieving pressure and temperature from MPL3115A2 sensor and publishing to Domoticz via MQTT broker
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Script interacting with MPL3115A2 sensor via I2C and publishing the resulting | |
pressure and temperature to a Domoticz server, via MQTT broker. | |
""" | |
from smbus import SMBus | |
import paho.mqtt.publish as publish | |
import time | |
import json | |
ADDR = 0x60 | |
CTRL_REG1 = 0x26 | |
PT_DATA_CFG = 0x13 | |
WHO_AM_I = 0x0C | |
class MPL3115A2: | |
"""Interact with MPL3115A2 I2C barometric module""" | |
def __init__(self, bus): | |
self.bus = SMBus(bus) | |
def check_myself(self): | |
i = self.read(WHO_AM_I) | |
return i == 0xc4 | |
def readn(self, reg, data): | |
time.sleep(0.010) | |
attempt = 10 | |
while attempt > 0: | |
try: | |
return self.bus.read_i2c_block_data(ADDR, reg, data) | |
except IOError: | |
time.sleep(0.5) | |
attempt -= 1 | |
raise IOError("Failed 10 times to read") | |
def read(self, reg): | |
return self.readn(reg, 1)[0] | |
def status(self): | |
return self.read(0x00) | |
def write(self, reg, data): | |
self.bus.write_i2c_block_data(ADDR, reg, [data]) | |
def set_event_flag(self): | |
self.write(PT_DATA_CFG, 0x07) | |
def toggle_one_shot(self): | |
setting = self.read(CTRL_REG1) | |
if (setting & 0x02) == 0: | |
self.write(CTRL_REG1, (setting | 0x02)) | |
def wait_for_data(self): | |
self.toggle_one_shot() | |
status = dev.status() | |
while (status & 0x08) == 0: | |
status = dev.status() | |
time.sleep(0.5) | |
dev = MPL3115A2(0) | |
if not dev.check_myself(): | |
print("Device not active.") | |
exit(1) | |
# Set oversample rate to 128 | |
newSetting = dev.read(CTRL_REG1) | 0x38 | |
dev.write(CTRL_REG1, newSetting) | |
# Enable event flags | |
dev.set_event_flag() | |
# Read sensor data | |
print("Waiting for data...") | |
dev.wait_for_data() | |
print("Reading sensor data...") | |
p_data = dev.readn(0x01, 3) | |
t_data = dev.readn(0x04, 2) | |
p_msb = p_data[0] | |
p_csb = p_data[1] | |
p_lsb = p_data[2] | |
t_msb = t_data[0] | |
t_lsb = t_data[1] | |
pressure = (p_msb << 10) | (p_csb << 2) | (p_lsb >> 6) | |
p_decimal = ((p_lsb & 0x30) >> 4)/4.0 | |
pressure = (pressure+p_decimal)/100.0 | |
celsius = t_msb + (t_lsb >> 4)/16.0 | |
fahrenheit = (celsius * 9)/5 + 32 | |
# Check quickly if temperature and pressure are meaningful | |
if (pressure < 900) or (pressure > 1050) or (celsius < 5) or (celsius > 45): | |
print("Meaningless values...") | |
exit(1) | |
print("Pressure and Temperature at "+time.strftime('%m/%d/%Y %H:%M:%S%z')) | |
print(str(pressure)+" hPa") | |
print(str(celsius)+" C") | |
print(str(fahrenheit)+" F") | |
print("Send data to Domoticz...") | |
msgs = [ | |
{ | |
'topic':"domoticz/in", | |
'payload': | |
json.dumps( | |
{"idx": 17, "nvalue": 0, "svalue": ("%.2f" % celsius)})}, | |
{ | |
'topic':"domoticz/in", | |
'payload': | |
json.dumps( | |
{"idx": 18, "nvalue": 0, "svalue": ("%.2f;0" % pressure)})} | |
] | |
publish.multiple(msgs, hostname="ds") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Publish an UV sensor (previously added with dummy hardware):