Created
May 8, 2023 20:47
-
-
Save jamincollins/938db75038a8fadbf4a58546649c9a4e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import RPi.GPIO as GPIO | |
import os | |
import socket | |
import sys | |
import time | |
from threading import Thread | |
class Fan: | |
GPIO.setmode(GPIO.BCM) | |
GPIO.setwarnings(False) | |
rpm = 0 | |
t = time.time() | |
def __init__(self, name, tach_pin, pwm_pin, pulse=2): | |
self.name = name | |
self.tach = tach_pin | |
self.pulse = pulse | |
self.__setup_pwm(pwm_pin) | |
self.__setup_tach() | |
def __setup_tach(self): | |
# setup tach settings | |
GPIO.setup(self.tach, GPIO.IN, pull_up_down=GPIO.PUD_UP) # pull up to 3.3v | |
# add event to detect | |
GPIO.add_event_detect(self.tach, GPIO.FALLING, self.__fell) | |
def __fell(self, pin): | |
dt = time.time() - self.t | |
if dt < 0.0005: return # Reject spuriously short pulses | |
freq = 1 / dt | |
self.rpm = (freq / self.pulse) * 60 | |
self.t = time.time() | |
def __setup_pwm(self, pwm_pin): | |
# setup the pwm settings | |
self.pwm = GPIO.setup(pwm_pin, GPIO.OUT) | |
# setting the frequency - see Notcua's PWM specification whitepaper | |
# - https://noctua.at/pub/media/wysiwyg/Noctua_PWM_specifications_white_paper.pdf | |
# - frequency specified on page 6 (25kHz) | |
self.pwm = GPIO.PWM(pwm_pin, 25) | |
self.pwm.start(100) | |
# kick off thread monitoring pwm socket | |
t = Thread(target=self.__monitor_pwm_socket) | |
t.start() | |
def __monitor_pwm_socket(self): | |
# create thread and listening socket | |
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
s.setblocking(True) | |
try: | |
os.remove(f"./{self.name}") | |
except OSError: | |
pass | |
s.bind(f"./{self.name}") | |
while True: | |
s.listen(1) | |
conn, addr = s.accept() | |
with conn: | |
while True: | |
# this should block waiting for data | |
data = conn.recv(1024) | |
print(data) | |
if not data: | |
break | |
#TODO: check to see that data is int between 0 and 100 | |
duty_cycle = int(data.decode()) | |
print(duty_cycle) | |
try: | |
self.set_duty_cycle(duty_cycle) | |
except ValueError: | |
pass | |
def set_duty_cycle(self, duty_cycle): | |
self.pwm.ChangeDutyCycle(duty_cycle) | |
def __str__(self): | |
return f"name: {self.name}, tach: {self.tach}, pulse: {self.pulse}, rpm: {self.rpm}" | |
WAIT_TIME = 1 | |
FANS = [ | |
Fan("fan1", tach_pin=9, pwm_pin=7), | |
Fan("fan2", tach_pin=10, pwm_pin=8), | |
Fan("fan3", tach_pin=22, pwm_pin=25), | |
Fan("fan4", tach_pin=27, pwm_pin=24), | |
Fan("fan5", tach_pin=17, pwm_pin=23), | |
] | |
try: | |
while True: | |
for fan in FANS: | |
if fan.rpm > 0: | |
print(fan) | |
time.sleep(WAIT_TIME) | |
except KeyboardInterrupt: # trap a CTRL+C keyboard interrupt | |
GPIO.cleanup() # resets all GPIO ports used by this function | |
# vim: ts=4 sw=4 expandtab |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment