Skip to content

Instantly share code, notes, and snippets.

@jamincollins
Created May 8, 2023 20:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jamincollins/938db75038a8fadbf4a58546649c9a4e to your computer and use it in GitHub Desktop.
Save jamincollins/938db75038a8fadbf4a58546649c9a4e to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import RPi.GPIO as GPIO
import os
import socket
import sys
import time
from threading import Thread
class Fan:
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
rpm = 0
t = time.time()
def __init__(self, name, tach_pin, pwm_pin, pulse=2):
self.name = name
self.tach = tach_pin
self.pulse = pulse
self.__setup_pwm(pwm_pin)
self.__setup_tach()
def __setup_tach(self):
# setup tach settings
GPIO.setup(self.tach, GPIO.IN, pull_up_down=GPIO.PUD_UP) # pull up to 3.3v
# add event to detect
GPIO.add_event_detect(self.tach, GPIO.FALLING, self.__fell)
def __fell(self, pin):
dt = time.time() - self.t
if dt < 0.0005: return # Reject spuriously short pulses
freq = 1 / dt
self.rpm = (freq / self.pulse) * 60
self.t = time.time()
def __setup_pwm(self, pwm_pin):
# setup the pwm settings
self.pwm = GPIO.setup(pwm_pin, GPIO.OUT)
# setting the frequency - see Notcua's PWM specification whitepaper
# - https://noctua.at/pub/media/wysiwyg/Noctua_PWM_specifications_white_paper.pdf
# - frequency specified on page 6 (25kHz)
self.pwm = GPIO.PWM(pwm_pin, 25)
self.pwm.start(100)
# kick off thread monitoring pwm socket
t = Thread(target=self.__monitor_pwm_socket)
t.start()
def __monitor_pwm_socket(self):
# create thread and listening socket
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.setblocking(True)
try:
os.remove(f"./{self.name}")
except OSError:
pass
s.bind(f"./{self.name}")
while True:
s.listen(1)
conn, addr = s.accept()
with conn:
while True:
# this should block waiting for data
data = conn.recv(1024)
print(data)
if not data:
break
#TODO: check to see that data is int between 0 and 100
duty_cycle = int(data.decode())
print(duty_cycle)
try:
self.set_duty_cycle(duty_cycle)
except ValueError:
pass
def set_duty_cycle(self, duty_cycle):
self.pwm.ChangeDutyCycle(duty_cycle)
def __str__(self):
return f"name: {self.name}, tach: {self.tach}, pulse: {self.pulse}, rpm: {self.rpm}"
WAIT_TIME = 1
FANS = [
Fan("fan1", tach_pin=9, pwm_pin=7),
Fan("fan2", tach_pin=10, pwm_pin=8),
Fan("fan3", tach_pin=22, pwm_pin=25),
Fan("fan4", tach_pin=27, pwm_pin=24),
Fan("fan5", tach_pin=17, pwm_pin=23),
]
try:
while True:
for fan in FANS:
if fan.rpm > 0:
print(fan)
time.sleep(WAIT_TIME)
except KeyboardInterrupt: # trap a CTRL+C keyboard interrupt
GPIO.cleanup() # resets all GPIO ports used by this function
# vim: ts=4 sw=4 expandtab
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment