Skip to content

Instantly share code, notes, and snippets.

@RobThree
Last active May 30, 2016 00:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save RobThree/c6b9a5dc16357eeb0e011d3ed9a281b0 to your computer and use it in GitHub Desktop.
Save RobThree/c6b9a5dc16357eeb0e011d3ed9a281b0 to your computer and use it in GitHub Desktop.
Python script that runs a webservice that returns readings from DS18B20 temperature sensors connected to a Raspberry Pi as JSON data for al sensors or a specific sensor
#!/usr/bin/env python
# You will need to have web.py installed. To install, run: pip install web.py
# How to install as service: http://blog.scphillips.com/posts/2013/07/getting-a-python-script-to-run-in-the-background-as-a-service-on-boot/
# Should the above page be offline; a mirror is at: http://archive.is/20160529210455/http://blog.scphillips.com/posts/2013/07/getting-a-python-script-to-run-in-the-background-as-a-service-on-boot/
# You can run this as:
# python temps.py
# This will run the service on port 8080; # Optionally, you can specify a port:
# python temps.py 1234
# When running you can query http://localhost:8080/sensors for all sensor temperature readings or
# http://localhost:8080/sensor/<sensor-id> (e.g. http://localhost:8080/sensor/28-041600ffb5ff) for
# the temperature reading of that specific sensor.
import sys
import web
import json
import os
from threading import Thread
urls = (
'/sensors', 'sensors',
'/sensor/(.+)', 'sensor'
)
class sensors:
def GET(self):
temps = {}
# We read every sensor in it's own thread; this speeds up the process massively
# The DS18B20 driver sets the IC in 'read mode', then waits ~750ms, then reads the value; if we do this
# concurrently we don't have to wait num_sensors x sensor_delay but only 1 x sensor_delay (+ slight overhead)
# For 4 sensors we go from 3.5~4.0 seconds to 0.8~1.0 seconds
threads = []
sensors = SensorReader.ListSensors()
for s in sensors:
process = Thread(target=self.Retrieve, args=[s, temps])
process.start()
threads.append(process)
for process in threads:
process.join()
return json.dumps(temps); # Return data as json
def Retrieve(self, sensor, result):
result.update(SensorReader.Read(sensor))
class sensor:
def GET(self, id):
return json.dumps(SensorReader.Read(id)); # Return data as json
class SensorReader:
SENSORROOT = '/sys/bus/w1/devices/'
@staticmethod
def ListSensors():
return open(SensorReader.SENSORROOT + 'w1_bus_master1/w1_master_slaves').read().splitlines() # Find all sensors;
@staticmethod
def Read(id):
fname = SensorReader.SENSORROOT + id +"/w1_slave"
if os.path.isfile(fname): # Sensor id exists?
with open(fname, "r") as f:
data = f.readlines()
if data[0].strip()[-3:] == "YES": # Make sure sensor is ready & OK
return { id: float(data[1].split("=")[1]) / 1000 }
return { id: None } # Return null when not found or error
if __name__ == "__main__":
app = web.application(urls, globals())
app.internalerror = web.debugerror
app.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment