MicroPython on ESP32: MQTT and DS18B20 temperature sensor full example
def connect():
import network
sta_if = network.WLAN(network.STA_IF)
if not sta_if.isconnected():
print('connecting to network...')
sta_if.connect('<YOUR SSID>', '<YOUR PASSWORD>')
while not sta_if.isconnected():
print('network config:', sta_if.ifconfig())
def no_debug():
import esp
# this can be run from the REPL as well
import time
from ds18x20 import DS18X20
from machine import Pin
from onewire import OneWire
class TemperatureSensor:
Represents a Temperature sensor
def __init__(self, pin):
Finds address of single DS18B20 on bus specified by `pin`
:param pin: 1-Wire bus pin
:type pin: int
self.ds = DS18X20(OneWire(Pin(pin)))
addrs = self.ds.scan()
if not addrs:
raise Exception('no DS18B20 found at bus on pin %d' % pin)
# save what should be the only address found
self.addr = addrs.pop()
def read_temp(self, fahrenheit=True):
Reads temperature from a single DS18X20
:param fahrenheit: Whether or not to return value in Fahrenheit
:type fahrenheit: bool
:return: Temperature
:rtype: float
temp = self.ds.read_temp(self.addr)
if fahrenheit:
return self.c_to_f(temp)
return temp
def c_to_f(c):
Converts Celsius to Fahrenheit
:param c: Temperature in Celsius
:type c: float
:return: Temperature in Fahrenheit
:rtype: float
return (c * 1.8) + 32
import json
import time
from temperature import TemperatureSensor
from umqtt.robust import MQTTClient
class TemperatureClient:
Represents an MQTT client which publishes temperature data on an interval
def __init__(self, client_id, server, pin, fahrenheit=True, topic=None,
Instantiates a TemperatureSensor and MQTTClient; connects to the
MQTT broker.
Arguments `server` and `client_id` are required.
:param client_id: Unique MQTT client ID
:type client_id: str
:param server: MQTT broker domain name / IP
:type server: str
:param pin: 1-Wire bus pin
:type pin: int
:param fahrenheit: Whether or not to publish temperature in Fahrenheit
:type fahrenheit: bool
:param topic: Topic to publish temperature on
:type topic: str
:param kwargs: Arguments for MQTTClient constructor
self.sensor = TemperatureSensor(pin)
self.client = MQTTClient(client_id, server, **kwargs)
if not topic:
self.topic = 'devices/%s/temperature/degrees' % \
self.topic = topic
self.fahrenheit = bool(fahrenheit)
def publishTemperature(self):
Reads the current temperature and publishes a JSON payload on the
configured topic, e.g., `{"unit": "F", "degrees": 72.5}`
t = self.sensor.read_temp(self.fahrenheit)
payload = dict(degrees=t)
if self.fahrenheit:
payload['unit'] = 'F'
payload['unit'] = 'C'
self.client.publish(self.topic, json.dumps(payload))
def start(self, interval=60):
Begins to publish temperature data on an interval (in seconds).
This function will not exit! Consider using deep sleep instead.
:param interval: How often to publish temperature data (60s default)
:type interval: int
while True:

