Skip to content

Instantly share code, notes, and snippets.

@aegis1980
Last active November 29, 2019 06:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aegis1980/7c30f1644befed59455f2b87e542c581 to your computer and use it in GitHub Desktop.
Save aegis1980/7c30f1644befed59455f2b87e542c581 to your computer and use it in GitHub Desktop.
For using a (hall sensor) flow sensor connected to RPi4 GPIO pins.d
import RPi.GPIO as GPIO
import time
import sys
import threading
from typing import List, Type
import math
class FlowSensor:
def __init__(self, pin=23, calibration_factor=6, calibration_offset=8, sample_interval : float = 1):
self.t: float = -1
self._Dt = sample_interval
self.flow_rates: List[Type[FlowSensor.FlowRate]] = []
self.pin = pin
self._hz_count = 0
self.thread = None
self.running = False
self.factor = calibration_factor
self.offset = calibration_offset
def init_gpio(self):
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
def count_pulse(self, channel):
t = time.time()
dt = t - self.t
if dt >= self._Dt:
f: Type[FlowSensor.FlowRate] = FlowSensor.FlowRate()
f.value = (self._Dt/dt) * ((self._hz_count / self.factor) + self.offset) #L/min
f.time = t
self.flow_rates.append(f)
self._hz_count = 0
self.t = t
self._hz_count += 1
def start(self, reset=True):
print ("Starting flow meter")
if reset:
self.reset()
def thread_function(name):
GPIO.add_event_detect(self.pin, GPIO.FALLING, callback=self.count_pulse)
self.running = True
while self.running:
time.sleep(0.9999 * self._Dt)
print('thread stopped')
GPIO.remove_event_detect(self.pin)
self.thread=threading.Thread(target=thread_function, args=(1,),daemon=True)
self.thread.start()
def stop(self):
self.running=False
def reset(self):
self.flow_rates = []
class FlowRate:
def __init__(self):
self.value = 0
self.time = -1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment