Skip to content

Instantly share code, notes, and snippets.

@bfritscher
Created July 30, 2016 09:43
Show Gist options
  • Save bfritscher/ba95398d48fd78edbfa13ebccb1ec291 to your computer and use it in GitHub Desktop.
Save bfritscher/ba95398d48fd78edbfa13ebccb1ec291 to your computer and use it in GitHub Desktop.
yl-38 yl-69 moisture sensor arduino nanpy script to collect data and upload to thingspeak
#!/usr/bin/python3
from nanpy.arduinotree import ArduinoTree
from nanpy.serialmanager import SerialManager
import http.client, urllib.parse
import time
import datetime
import numpy
API_KEY = ''
SLEEP = 300
NB_READINGS = 50
DIGITAL_PINS = ['D13', 'D12', 'D8', 'D7', 'D4', 'D2']
ANALOG_PINS = ['A0', 'A1', 'A2', 'A3', 'A4', 'A5']
def connect():
connection = SerialManager()
connection.open()
arduino = ArduinoTree(connection=connection)
[arduino.pin.get(pin).write_mode(1) for pin in DIGITAL_PINS]
[arduino.pin.get(pin).write_mode(0) for pin in ANALOG_PINS]
return arduino
def measure(pin):
readings = [arduino.pin.get(pin).read_analog_value() for i in range(NB_READINGS)]
value = numpy.median(readings)
print(datetime.datetime.now().isoformat(), pin, value)
return value
def power_up(arduino):
[arduino.pin.get(pin).write_digital_value(1) for pin in DIGITAL_PINS]
def power_down(arduino):
[arduino.pin.get(pin).write_digital_value(0) for pin in DIGITAL_PINS]
def loop(arduino):
power_up(arduino)
time.sleep(2)
measures = [measure(pin) for pin in ANALOG_PINS]
send_data(measures)
power_down(arduino)
# thingspeak api only support sending all at once???
def send_data(measures):
keys = map(lambda x: 'field%d' % x, range(1, len(measures)+1))
params = dict(zip(keys, measures))
params['key'] = API_KEY
params = urllib.parse.urlencode(params)
headers = {"Content-type": "application/x-www-form-urlencoded","Accept": "text/plain"}
conn = http.client.HTTPConnection("api.thingspeak.com:80")
try:
conn.request("POST", "/update", params, headers)
response = conn.getresponse()
#print(response.status, response.reason)
data = response.read()
conn.close()
except:
print("connection failed")
def init(arduino):
for pin in DIGITAL_PINS:
arduino.pin.get(pin).write_digital_value(1)
time.sleep(1)
arduino.pin.get(pin).write_digital_value(0)
if __name__ == "__main__":
arduino = connect()
init(arduino)
while True:
loop(arduino)
time.sleep(SLEEP)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment