Skip to content

Instantly share code, notes, and snippets.

@c4r-gists
Created March 15, 2019 12:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save c4r-gists/e7e1b060087b88f6d86b018f8d710e6f to your computer and use it in GitHub Desktop.
Save c4r-gists/e7e1b060087b88f6d86b018f8d710e6f to your computer and use it in GitHub Desktop.
Water level control on RaspberryPi using HCSR04 sensor
import sys
import traceback
import time
import json
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
import RPi.GPIO as GPIO
import cloud4rpi
from hcsr04sensor import sensor
TRIGGER_PIN = 17 #
ECHO_PIN = 27 # 13
PUMP_PIN = 4 # 7
BOUNCE_DELTA = 0.5
SEND_INTERVAL = 30 # sec
MIN_SEND_INTERVAL = 1 # sec
MIN_DISTANCE = 6 # cm
MAX_DISTANCE = 9 # cm
STATUS_MSG_ERR = 'Sensor Error!'
STATUS_MSG_OK = 'Water Level OK'
STATUS_MSG_POURING = 'Water pouring'
STATUS_MSG_OVERFLOW = 'Overflow!'
ALERT_MSG = 'WARNING! dxPump is probably out of order...'
C4R_TOKEN = '__YOUR_DEVICE_TOKEN__'
HOOK_URL = '__CHANNEL_WEB_HOOK_URL__'
print('Connecting HCSR04 sensor...')
sens = sensor.Measurement(trig_pin=TRIGGER_PIN,
echo_pin=ECHO_PIN,
temperature=20,
unit='metric',
round_to=2,
gpio_mode=GPIO.BCM
)
GPIO.setmode(GPIO.BCM)
sent_time = -1
prev_distance = -1
pump_on = False
disableAlerts = False
def read_distance():
try:
raw_measurement = sens.raw_distance(sample_size=5)
value = sens.distance_metric(raw_measurement)
return value if value < (MAX_DISTANCE * 2) else None
except Exception as err:
return None
def water_level_changed(prev, current):
return abs(round(prev, 2) - round(current, 2)) > BOUNCE_DELTA
def should_send(ts):
return time.time() - ts > SEND_INTERVAL
def toggle_pump(value):
print("[x] %s" % ('START' if value else 'STOP'))
global pump_on
pump_on = value
GPIO.setup(PUMP_PIN, GPIO.OUT)
GPIO.output(PUMP_PIN, pump_on) # Start/Stop pouring
def calc_water_level_percent(dist):
d = dist if dist else 0
return (MAX_DISTANCE - d) / (MAX_DISTANCE - MIN_DISTANCE) * 100
def notify(msg):
global disableAlerts
if disableAlerts:
return
print('[x] Notification: ', msg)
req = Request(HOOK_URL)
try:
data = json.dumps({'text': msg}).encode('ascii')
response = urlopen(req, data)
response.read()
disableAlerts = True
except HTTPError as e:
print("Request failed: %d %s", e.code, e.reason)
except URLError as e:
print("Server connection failed: %s", e.reason)
def send(cloud, variables, dist, error=False):
global pump_on
percent = calc_water_level_percent(dist)
variables['Distance']['value'] = dist
variables['WaterLevel']['value'] = percent
variables['PumpRelay']['value'] = pump_on
variables['Status']['value'] = STATUS_MSG_ERR if error \
else STATUS_MSG_OVERFLOW if percent > 110 else STATUS_MSG_POURING if pump_on else STATUS_MSG_OK
current = time.time()
global sent_time
if current - sent_time > MIN_SEND_INTERVAL:
readings = cloud.read_data()
cloud.publish_data(readings)
sent_time = current
def main():
variables = {
'Distance': {
'type': 'numeric',
},
'Status': {
'type': 'string',
},
'PumpRelay': {
'type': 'bool',
'value': False
},
'WaterLevel': {
'type': 'numeric',
},
}
cloud = cloud4rpi.connect(C4R_TOKEN, 'mq.stage.cloud4rpi.io')
cloud.declare(variables)
cloud.publish_config()
try:
print('Start reading distance...')
n = 0
while True:
n += 1
distance = read_distance()
if distance is None:
notify(ALERT_MSG)
send(cloud, variables, distance, True)
continue
print("#%d Distance = %.2f (cm)" % (n, distance))
if distance < MIN_DISTANCE:
toggle_pump(0)
send(cloud, variables, distance)
if distance > MAX_DISTANCE:
toggle_pump(1)
send(cloud, variables, distance)
global prev_distance
if should_send(sent_time) or water_level_changed(prev_distance, distance):
send(cloud, variables, distance)
prev_distance = distance
global disableAlerts
disableAlerts = False
except Exception as e:
print('FAILED:', e)
traceback.print_exc()
finally:
GPIO.cleanup()
sys.exit(0)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment