Skip to content

Instantly share code, notes, and snippets.

@refugeesus
Last active January 15, 2018 23:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save refugeesus/12be57de055f294d2d68a6188f7d102e to your computer and use it in GitHub Desktop.
Save refugeesus/12be57de055f294d2d68a6188f7d102e to your computer and use it in GitHub Desktop.
from helium_client import Helium
from pylepton import Lepton
import numpy as np
import math
import time
from matplotlib import pyplot as plt
from Adafruit_AMG88xx import Adafruit_AMG88xx
from scipy.interpolate import griddata
import cv2
import Adafruit_TMP.TMP006 as TMP006
import json
import logging
helium = Helium("/dev/serial0")
helium.connect()
channel = helium.create_channel("googleiot-test")
config = channel.config()
ge_sensor = Adafruit_AMG88xx()
tmp_sensor = TMP006.TMP006()
logging.basicConfig(filename='/tmp/myapp.log', level=logging.DEBUG,
format='%(asctime)s %(levelname)s %(name)s %(message)s')
logger=logging.getLogger(__name__)
states = {
"interval": "2",
"tmp": "0",
"ge": "0",
"fl": "0",
}
def poll_inv():
if config.poll_invalidate(retries=1):
print("config correct")
pass
else:
try:
states["interval"] = config.get("channel.interval")
config.set("channel.interval", states["interval"])
print(states["interval"])
except Exception as e:
print("No data error: ", str(e))
logger.error(e)
try:
states["tmp"] = config.get("channel.tmp")
config.set("channel.tmp", states["tmp"])
print(states["tmp"])
except Exception as e:
print("No data error: ", str(e))
logger.error(e)
try:
states["ge"] = config.get("channel.ge")
config.set("channel.ge", states["ge"])
print(states["ge"])
except Exception as e:
print("No data error: ", str(e))
logger.error(e)
try:
states["fl"] = config.get("channel.fl")
config.set("channel.fl", states["fl"])
print(states["fl"])
except Exception as e:
print("No data error: ", str(e))
def tmp():
obj_temp = tmp_sensor.readObjTempC()
die_temp = tmp_sensor.readDieTempC()
print('Object temperature: {0:0.3F}*C / {1:0.3F}*F'.format(obj_temp, c_to_f(obj_temp)))
print('Die temperature: {0:0.3F}*C / {1:0.3F}*F'.format(die_temp, c_to_f(die_temp)))
# Structure json
payload = {"tmp": obj_temp, "dtmp": die_temp}
return payload
def ge():
# Read pixels, convert them to values between 0 and 1, map them to an 8x8 grid
pixels = ge_sensor.readPixels()
pixmax = max(pixels)
pixels = [x / pixmax for x in pixels]
points = [(math.floor(ix / 8), (ix % 8)) for ix in range(0, 64)]
grid_x, grid_y = np.mgrid[0:7:32j, 0:7:32j]
# bicubic interpolation of 8x8 grid to make a 32x32 grid
bicubic = griddata(points, pixels, (grid_x, grid_y), method='cubic')
image = np.array(bicubic)
image = np.reshape(image, (32, 32))
# print(image)
plt.imsave('color_img.jpg', image)
# Read image
img = cv2.imread("color_img.jpg", cv2.IMREAD_GRAYSCALE)
img = cv2.bitwise_not(img)
# Setup SimpleBlobDetector parameters.
params = cv2.SimpleBlobDetector_Params()
# Change thresholds
params.minThreshold = 10
params.maxThreshold = 255
# Filter by Area.
params.filterByArea = True
params.minArea = 5
# Filter by Circularity
params.filterByCircularity = True
params.minCircularity = 0.1
# Filter by Convexity
params.filterByConvexity = False
params.minConvexity = 0.87
# Filter by Inertia
params.filterByInertia = False
params.minInertiaRatio = 0.01
# Set up the detector with default parameters.
detector = cv2.SimpleBlobDetector(params)
# Detect blobs.
keypoints = detector.detect(img)
for i in range (0, len(keypoints)):
x = keypoints[i].pt[0]
y = keypoints[i].pt[1]
coords[i] = str(x,y)
print(x, y)
payload = {"ge":str(len(keypoints))}
return payload
def capture(flip_v = False, device = "/dev/spidev0.0"):
with Lepton(device) as l:
a,_ = l.capture()
if flip_v:
cv2.flip(a,0,a)
cv2.normalize(a, a, 0, 65535, cv2.NORM_MINMAX)
np.right_shift(a, 8, a)
return np.uint8(a)
def flir():
image = capture()
cv2.imwrite("flir.png", image)
time.sleep(1)
img = cv2.imread("flir.png", cv2.IMREAD_GRAYSCALE)
img = cv2.bitwise_not(img)
# Setup SimpleBlobDetector parameters.
params = cv2.SimpleBlobDetector_Params()
# Change thresholds
params.minThreshold = 10
params.maxThreshold = 255
# Filter by Area.
params.filterByArea = True
params.minArea = 100
# Filter by Circularity
params.filterByCircularity = True
params.minCircularity = 0.1
# Filter by Convexity
params.filterByConvexity = False
params.minConvexity = 0.87
# Filter by Inertia
params.filterByInertia = False
params.minInertiaRatio = 0.01
# Set up the detector with default parameters.
detector = cv2.SimpleBlobDetector(params)
# Detect blobs.
keypoints = detector.detect(img)
for i in range (0, len(keypoints)):
x = keypoints[i].pt[0]
y = keypoints[i].pt[1]
print(str(x), str(y))
payload = {"fl":str(len(keypoints))}
return payload
def c_to_f(c):
return c * 9.0 / 5.0 + 32.0
def send_to_channel(payload):
try:
channel.send(payload)
except Exception as e:
print("Channel send error %s", str(e))
logger.error(e)
def merge_dict(x,y,z):
d = x.copy()
d.update(y)
d.update(z)
return d
if __name__=='__main__':
while(1):
poll_inv()
interval = states["interval"]
tmps = states["tmp"]
ges = states["ge"]
fls = states["fl"]
if tmps=="1":
tmp_pl = tmp()
else:
tmp_pl = {"tmp":"off"}
if ges=="1":
ge_pl = ge()
else:
ge_pl = {"ge":"off"}
if fls=="1":
fl_pl = flir()
else:
fl_pl = {"fl":"off"}
payload = merge_dict(tmp_pl, ge_pl, fl_pl)
payload = json.dumps(payload)
print(type(payload))
send_to_channel(payload)
time.sleep(int(states["interval"]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment