Skip to content

Instantly share code, notes, and snippets.

@bfritscher
Created October 14, 2016 08:46
Show Gist options
  • Save bfritscher/d1a0a68c3123a166caed912d35f2b38c to your computer and use it in GitHub Desktop.
Save bfritscher/d1a0a68c3123a166caed912d35f2b38c to your computer and use it in GitHub Desktop.
yl-38 yl-69 moisture sensor arduino nanpy script to collect data and upload to custom api with server to trigger manuel updates
#!/usr/bin/python3
from nanpy.arduinotree import ArduinoTree
from nanpy.serialmanager import SerialManager
from http.server import BaseHTTPRequestHandler, HTTPServer
import http.client, urllib.parse
import time
import datetime
import numpy
import logging
import threading
logging.basicConfig(format='%(asctime)s %(message)s', filename='sensor.log', filemode='a', level=logging.INFO)
SLEEP = 900
NB_READINGS = 50
PORT = 80
DIGITAL_PINS = ['D13', 'D12', 'D8', 'D7', 'D4', 'D2']
ANALOG_PINS = ['A0', 'A1', 'A2', 'A3', 'A4', 'A5']
sensor_io = threading.BoundedSemaphore(value=1)
def connect():
connection = SerialManager()
connection.open()
arduino = ArduinoTree(connection=connection)
[arduino.pin.get(pin).write_mode(1) for pin in DIGITAL_PINS]
[arduino.pin.get(pin).write_mode(0) for pin in ANALOG_PINS]
return arduino
def measure(pin):
readings = [arduino.pin.get(pin).read_analog_value() for i in range(NB_READINGS)]
value = numpy.median(readings)
logging.info('%s %s', pin, value)
return value
def power_up(arduino):
[arduino.pin.get(pin).write_digital_value(1) for pin in DIGITAL_PINS]
def power_down(arduino):
[arduino.pin.get(pin).write_digital_value(0) for pin in DIGITAL_PINS]
def loop(arduino):
sensor_io.acquire()
power_up(arduino)
time.sleep(2)
measures = [measure(pin) for pin in ANALOG_PINS]
send_data(measures)
power_down(arduino)
sensor_io.release()
return measures
def send_data(measures):
payload = ""
for i, val in enumerate(measures):
payload += "plant_hum,plant=%d value=%d\n" % (i + 1, val)
headers = {
'authorization': "Basic BASE64_HASH",
'cache-control': "no-cache"
}
conn = http.client.HTTPSConnection("API_HOST")
try:
conn.request("POST", "/influxdb/write?db=plants", payload, headers)
response = conn.getresponse()
#print(response.status, response.reason)
data = response.read()
conn.close()
except:
logging.info( "error %s %s" % (payload, "connection failed"))
if conn:
conn.close()
def init(arduino):
for pin in DIGITAL_PINS:
arduino.pin.get(pin).write_digital_value(1)
print(pin, 'on')
time.sleep(1)
arduino.pin.get(pin).write_digital_value(0)
print('INIT DONE');
class SensorThread(threading.Thread):
def __init__(self, arduino, sleep=SLEEP):
threading.Thread.__init__(self)
self.arduino = arduino
self.sleep = sleep
def run(self):
while True:
loop(self.arduino)
time.sleep(self.sleep)
if __name__ == "__main__":
arduino = connect()
init(arduino)
t = SensorThread(arduino)
t.start()
# allow for manual data retrieval/trigger
class MyHTTPServer_RequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/scan":
self.send_response(200)
self.send_header("Content-type", "text/plain")
self.end_headers()
res = ",".join(["%s" % x for x in loop(arduino)])
self.wfile.write(res.encode("utf-8"))
httpd = HTTPServer(('', PORT), MyHTTPServer_RequestHandler)
httpd.serve_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment