Skip to content

Instantly share code, notes, and snippets.

@sidwarkd
Created March 18, 2014 05:32
Show Gist options
  • Save sidwarkd/9614055 to your computer and use it in GitHub Desktop.
Save sidwarkd/9614055 to your computer and use it in GitHub Desktop.
Streaming temperature readings with Plot.ly, TMP106, and Raspberry Pi
import RPi.GPIO as GPIO
import spidev
from time import sleep, localtime, strftime, time
import sys
import smbus
class PiHardware(object):
def __init__(self):
GPIO.setmode(GPIO.BOARD)
# Setup the LED pin as an output and to have an initial
# state of high which turns the LED off
GPIO.setup(12, GPIO.OUT, initial=GPIO.HIGH)
# Setup the switch pin as an input
GPIO.setup(16, GPIO.IN)
# Setup the button pin as an input
GPIO.setup(18, GPIO.IN)
# Setup spi module
self.spi = spidev.SpiDev()
self.spi.open(0,0)
# Setup i2c module
self.i2c = smbus.SMBus(1)
# Setup motion sensor
GPIO.setup(22, GPIO.IN)
def get_temp(self):
temp = self.i2c.read_word_data(0x48, 0)
byte1_mask = 0b0000000011111111
byte2_mask = 0b1111111100000000
byte1 = (temp & byte1_mask) << 4
byte2 = (temp & byte2_mask) >> 12
temp_c = byte2 | byte1
temp_c *= .0625
temp_f = temp_c*1.80 + 32.00
return temp_f
def spi_send(self, data):
xfer_list = []
if type(data) == str:
for c in data:
xfer_list.append(ord(c))
elif type(data) == list:
xfer_list += data
elif type(data) == int:
xfer_list.append(data)
else:
print "Unsupported type passed to spi_send. Must be str, int, or list"
self.spi.xfer2(xfer_list, 250000)
def clear_display(self):
self.spi_send([0x76])
def display_time(self):
t = strftime("%H%M", localtime())
self.clear_display()
self.spi_send(t)
self.spi_send([0x77, 0x10])
def display_temp(self, temp):
# Display temp with one decimal of precision
temp_str = "{:4.1f}f".format(round(temp,1))
display_val = temp_str.replace('.','')
self.clear_display()
self.spi_send(display_val)
# Turn on the decimal and the apostrophe
self.spi_send([0x77, 0x22])
def set_led(self, state):
GPIO.output(12, state);
def cleanup(self):
GPIO.output(12, GPIO.HIGH)
GPIO.cleanup()
self.clear_display()
self.spi.close()
#!/usr/bin/env python
from __future__ import division
import time
import RPi.GPIO as GPIO
import plotly
import datetime
import os
import sys
import json
import requests
from hardware import PiHardware
LOGGER = True # if not true, wont log!
FIRST_POST = True
spaces = (" "*50)
#init GPIO
GPIO.setmode(GPIO.BOARD)
GPIO.setwarnings(False)
hw = PiHardware()
def question(question, poss_ans):
response = False
while response == False:
answer = raw_input(question)
if answer not in poss_ans:
print b.ERR+'Not a Valid Answer, please try again...'+b.END
else:
response = True
return answer
#Set Colors for the Prompt
class b:
HEADER = '\033[95m'
BLUE = '\033[94m'
GREEN = '\033[92m'
WARN = '\033[93m'
ERR = '\033[91m'
END = '\033[0m'
def disable(self):
self.HEADER = ''
self.BLUE = ''
self.GREEN = ''
self.WARN = ''
self.FAIL = ''
self.END = ''
os.system('clear')
print(b.WARN+"========================================================")
print("Plot.ly + Raspberry Pi + TMP36 Temperature Visualization")
print("========================================================\n"+b.END)
#Questions
FILENAME = raw_input(b.BLUE+"Enter your Graph Name: \n>> "+b.END)
USERNAME = raw_input(b.BLUE+"Enter your Plotly Username: \n>> "+b.END)
API_KEY = raw_input(b.BLUE+"Enter your Plotly Api Key: \n>> "+b.END)
STREAM_TOKEN = raw_input(b.BLUE+"Enter your Plotly Stream Token: \n>> "+b.END)
SENSOR_TYPE = question(b.BLUE+"Are you using an analog sensor connected to the MCP3008 (m) or an I2C sensor (i)? \n>> "+b.END, ['m','i'])
I2C_ADDR = None
if SENSOR_TYPE == 'i':
I2C_ADDR = int(raw_input(b.BLUE+"What is the address of your sensor? \n>> "+b.END))
TEMP_TYPE = question(b.BLUE+"Do you want to plot Celius or Farenheit? (c/f) \n>> "+b.END, ['c','f'])
BAR_SCATTER = question(b.BLUE+"Scatter or Bar Chart? (s/b) \n>> "+b.END, ['s','b'])
DELAY = int(raw_input(b.BLUE+"How frequently do you want to post data? (in seconds, minimum is 1) \n>> "+b.END))
if DELAY < 1:
print b.ERR+'You chose a frequency less than 1 seconds.... defaulting to 1 seconds'+b.END
DELAY = 1
#Set Temp Scale for legend
if TEMP_TYPE == 'c':
TEMP_LEGEND = 'Temperature (C)'
elif TEMP_TYPE == 'f':
TEMP_LEGEND = 'Temperature (F)'
if BAR_SCATTER == 's':
chart_type = 'scatter'
elif BAR_SCATTER == 'b':
chart_type = 'bar'
#Set Layout
LAYOUT = {
'title' : FILENAME,
'showlegend' : True,
'yaxis' : {
'title' : TEMP_LEGEND
}
}
# Function to read data from Analog Pin 0 from MCP3008 (don't need to edit)
def readadc(adcnum, clockpin, mosipin, misopin, cspin):
if ((adcnum > 7) or (adcnum < 0)):
return -1
GPIO.output(cspin, True)
GPIO.output(clockpin, False) # start clock low
GPIO.output(cspin, False) # bring CS low
commandout = adcnum
commandout |= 0x18 # start bit + single-ended bit
commandout <<= 3 # we only need to send 5 bits here
for i in range(5):
if (commandout & 0x80):
GPIO.output(mosipin, True)
else:
GPIO.output(mosipin, False)
commandout <<= 1
GPIO.output(clockpin, True)
GPIO.output(clockpin, False)
adcout = 0
# read in one empty bit, one null bit and 10 ADC bits
for i in range(12):
GPIO.output(clockpin, True)
GPIO.output(clockpin, False)
adcout <<= 1
if (GPIO.input(misopin)):
adcout |= 0x1
GPIO.output(cspin, True)
adcout /= 2 # first bit is 'null' so drop it
return adcout
# change these as desired - they're the pins connected from the
# SPI port on the ADC to the GPIO Pins on the Raspi
SPICLK = 18
SPIMISO = 23
SPIMOSI = 24
SPICS = 25
# set up the SPI interface pins
if SENSOR_TYPE == 'm':
GPIO.setup(SPIMOSI, GPIO.OUT)
GPIO.setup(SPIMISO, GPIO.IN)
GPIO.setup(SPICLK, GPIO.OUT)
GPIO.setup(SPICS, GPIO.OUT)
# temperature sensor connected channel 0 of mcp3008
adcnum = 0
#init plotly
py = plotly.plotly(USERNAME, API_KEY)
response = py.plot([{'x': [], 'y': [], 'type': chart_type, 'name': TEMP_LEGEND, 'stream': {'token': STREAM_TOKEN, 'maxpoints': 1000}}], filename=FILENAME, fileopt='extend', layout=LAYOUT)
print response[u'url']
os.system('clear')
print(b.GREEN+"========================================================")
print("Successfully Posted to Plot.ly! Here is your Graph info:")
print("========================================================\n")
print(b.WARN+"Graph URL: "+response[u'url'])
print("Graph Title: "+response[u'filename']+'\n'+b.END)
print(b.GREEN+"========================================================\n"+b.END)
def movingAverage(newx, xarray, winlength):
xarray.append(newx)
if len(xarray) > winlength:
xarray.pop(0)
return sum(xarray) / len(xarray)
def streamdata():
TEMP_ARRAY = []
WIN_LENGTH = 10 # Set to 1 to remove averaging or another value to optimize smoothing for use case
while True:
temp_F = None
temp_C = None
if SENSOR_TYPE == 'm':
# read the analog pin (temperature sensor LM36)
read_adc0 = readadc(adcnum, SPICLK, SPIMOSI, SPIMISO, SPICS)
# convert analog reading to millivolts = ADC * ( 3300 / 1024 )
millivolts = read_adc0 * ( 3300.0 / 1024.0)
# 10 mv per degree
temp_C = ((millivolts - 100.0) / 10.0) - 40.0
# convert celsius to fahrenheit
temp_F = ( temp_C * 9.0 / 5.0 ) + 32
# remove decimal point from millivolts
millivolts = "%d" % millivolts
else:
temp_F = hw.get_temp()
temp_C = (temp_F - 32.00) / 1.80
if TEMP_TYPE == 'c':
TEMP = temp_C
elif TEMP_TYPE == 'f':
TEMP = temp_F
# Take moving average to apply smoothing
AVG_T = movingAverage(TEMP, TEMP_ARRAY, WIN_LENGTH)
# show only one decimal place for temperature
TEMP_READING = "%.1f" % AVG_T
date_stamp = datetime.datetime.now()
data = {
'x': date_stamp.strftime('%Y-%m-%d %H:%M:%S.%f'),
'y': TEMP_READING,
}
yield json.dumps(data)+'\n'
time.sleep(DELAY)
sys.stdout.write('STREAMING TO PLOTLY')
sys.stdout.flush()
sys.stdout.write("\b" * 30)
sys.stdout.write('Temperature Reading: '+ TEMP_READING)
sys.stdout.flush()
sys.stdout.write("\b" * 30)
requests.post('http://stream.plot.ly',
data=streamdata(),
headers = {'plotly-streamtoken': STREAM_TOKEN, 'connection': 'keep-alive'})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment