Skip to content

Instantly share code, notes, and snippets.

@XanderXAJ
Last active May 25, 2023 09:25
Show Gist options
  • Save XanderXAJ/c4854add7b17216ac553db5c5f94898b to your computer and use it in GitHub Desktop.
Save XanderXAJ/c4854add7b17216ac553db5c5f94898b to your computer and use it in GitHub Desktop.
Steam Deck Stock Fan Control Script 2023-05-25

These can be found on a Steam Deck at: /usr/share/jupiter-fan-control

#!/usr/bin/python -u
"""jupiter-fan-controller"""
import signal
import os
import sys
import subprocess
import time
import math
import yaml
from PID import PID
# quadratic function RPM = AT^2 + BT + C
class Quadratic():
'''quadratic function controller'''
def __init__(self, A, B, C, T_threshold = 0) -> None:
'''constructor'''
self.A = A
self.B = B
self.C = C
self.T_threshold = T_threshold
self.output = 0
def update(self, temp_input, _) -> int:
'''update output'''
if temp_input < self.T_threshold:
self.output = 0
else:
self.output = int(self.A * math.pow(temp_input, 2) + self.B * temp_input + self.C)
return self.output
class FeedForward():
'''RPM predicted by APU power is fed forward + PID output stage'''
def __init__(self, Kp, Ki, Kd, windup, winddown, a_ff, b_ff, temp_setpoint) -> None:
'''constructor'''
self.a_ff = a_ff
self.b_ff = b_ff
self.temp_setpoint = temp_setpoint
self.pid = PID(Kp, Ki, Kd)
self.pid.SetPoint = temp_setpoint
self.pid.setWindup(windup)
self.pid.setWinddown(winddown)
self.output = 0
def print_ff_state(self, ff_output, pid_output) -> str:
'''prints state variables of FF and PID, helpful for debug'''
print(f"FeedForward Controller - FF:{ff_output:.0f} PID: {-1 * self.pid.PTerm:.0f} {-1 * self.pid.Ki * self.pid.ITerm:.0f} {-1 * self.pid.Kd * self.pid.DTerm:.0f} = {pid_output:.0f}")
def get_ff_setpoint(self, power_input) -> int:
'''returns the feed forward portion of the controller output'''
rpm_setpoint = int(self.a_ff * power_input + self.b_ff)
return rpm_setpoint
def update(self, temp_input, power_input) -> int:
'''run controller to update output'''
pid_output = self.pid.update(temp_input)
ff_output = self.get_ff_setpoint(power_input)
self.output = int(pid_output + ff_output)
# self.print_ff_state(ff_output, pid_output)
return self.output
class FeedForwardMin():
'''FF with an additional min curve'''
def __init__(self, Kp, Ki, Kd, windup, winddown, a_ff, b_ff, temp_setpoint, a_min, b_min) -> None:
'''constructor'''
self.a_ff = a_ff
self.b_ff = b_ff
self.a_min = a_min
self.b_min = b_min
self.temp_setpoint = temp_setpoint
self.pid = PID(Kp, Ki, Kd)
self.pid.SetPoint = temp_setpoint
self.pid.setWindup(windup)
self.pid.setWinddown(winddown)
self.output = 0
def print_ff_state(self, ff_output, pid_output, min_setpoint) -> str:
'''prints state variables of FF and PID, helpful for debug'''
print(f"FeedForward Controller - Min:{min_setpoint} FF:{ff_output:.0f} PID:{-1 * self.pid.PTerm:.0f}/{-1 * self.pid.Ki * self.pid.ITerm:.0f}/{-1 * self.pid.Kd * self.pid.DTerm:.0f} = {ff_output + pid_output:.0f}")
def get_ff_setpoint(self, power_input) -> int:
'''returns the feed forward portion of the controller output'''
rpm_setpoint = int(self.a_ff * power_input + self.b_ff)
return rpm_setpoint
def get_min_setpoint(self, temp_input) -> int:
'''returns a minimum rpm speed for the given temperature'''
rpm_setpoint = int(self.a_min * temp_input + self.b_min)
return rpm_setpoint
def update(self, temp_input, power_input) -> int:
'''run controller to update output'''
pid_output = int(self.pid.update(temp_input))
ff_output = self.get_ff_setpoint(power_input)
min_setpoint = self.get_min_setpoint(temp_input)
self.output = max(min_setpoint,(pid_output + ff_output))
self.print_ff_state(ff_output, pid_output, min_setpoint)
return self.output
class FeedForwardQuad():
'''FF with an additional min curve'''
def __init__(self, a_quad, b_quad, c_quad, a_ff, b_ff) -> None:
'''constructor'''
self.a_ff = a_ff
self.b_ff = b_ff
# self.temp_setpoint = temp_setpoint
self.ff_deadzone = 300
self.ff_last_setpoint = 0
self.quad = Quadratic(a_quad, b_quad, c_quad)
self.output = 0
def print_ff_state(self, ff_output, quad_output):
'''prints state variables of FF and PID, helpful for debug'''
print(f"FeedForward Controller - Quad:{quad_output} FF:{ff_output:.0f}")
def get_ff_setpoint(self, power_input) -> int:
'''returns the feed forward portion of the controller output'''
rpm_setpoint = int(self.a_ff * power_input + self.b_ff)
if abs(rpm_setpoint - self.ff_last_setpoint) > self.ff_deadzone:
self.ff_last_setpoint = rpm_setpoint
return rpm_setpoint
return self.ff_last_setpoint
def update(self, temp_input, power_input) -> int:
'''run controller to update output'''
quad_output = int(self.quad.update(temp_input, None))
ff_output = self.get_ff_setpoint(power_input)
# min_setpoint = self.get_min_setpoint(temp_input)
self.output = quad_output + ff_output
self.print_ff_state(ff_output, quad_output)
return self.output
class Fan():
'''fan object controls all jupiter hwmon parameters'''
def __init__(self, fan_path, config) -> None:
'''constructor'''
self.fan_path = fan_path
self.charge_state_path = config["charge_state_path"]
self.min_speed = config["fan_min_speed"]
self.threshold_speed = config["fan_threshold_speed"]
self.max_speed = config["fan_max_speed"]
self.gain = config["fan_gain"]
self.ec_ramp_rate = config["ec_ramp_rate"]
self.fc_speed = 0
self.measured_speed = 0
self.charge_state = False
self.charge_min_speed = 2000
self.has_std_bios = self.bios_compatibility_check()
self.take_control_from_ec()
self.set_speed(3000)
@staticmethod
def bios_compatibility_check() -> bool:
"""returns True for bios versions >= 106, false for earlier versions"""
version = subprocess.check_output(["dmidecode", "-s", "bios-version"]) # b'F7A0104T06\n'
version = int(version[3:7])
if version >= 106:
return True
else:
return False
def take_control_from_ec(self) -> None:
'''take over fan control from ec mcu'''
if self.has_std_bios:
return
else:
with open(self.fan_path + "gain", 'w', encoding="utf8") as f:
f.write(str(self.gain))
with open(self.fan_path + "ramp_rate", 'w', encoding="utf8") as f:
f.write(str(self.ec_ramp_rate))
with open(self.fan_path + "recalculate", 'w', encoding="utf8") as f:
f.write(str(1))
def return_to_ec_control(self) -> None:
'''reset EC to generate fan values internally'''
if self.has_std_bios:
with open(self.fan_path + "fan1_target", 'w', encoding="utf8") as f:
f.write(str(int(0)))
else:
with open(self.fan_path + "gain", 'w', encoding="utf8") as f:
f.write(str(10))
with open(self.fan_path + "ramp_rate", 'w', encoding="utf8") as f:
f.write(str(20))
with open(self.fan_path + "recalculate", 'w', encoding="utf8") as f:
f.write(str(0))
def get_speed(self) -> int:
'''returns the measured (real) fan speed'''
with open(self.fan_path + "fan1_input", 'r', encoding="utf8") as f:
self.measured_speed = int(f.read().strip())
return self.measured_speed
def get_charge_state(self) -> bool:
'''updates min rpm depending on charge state'''
with open(self.charge_state_path, 'r', encoding="utf8") as f:
state = f.read().strip()
if state == "Charging":
self.charge_state = True
else:
self.charge_state = False
return self.charge_state
def set_speed(self, speed) -> None:
'''sets a new target speed'''
if speed > self.max_speed:
speed = self.max_speed
elif self.charge_state:
if speed < self.charge_min_speed:
speed = self.charge_min_speed
elif speed < self.threshold_speed:
speed = self.min_speed
self.fc_speed = speed
with open(self.fan_path + "fan1_target", 'w', encoding="utf8") as f:
f.write(str(int(self.fc_speed)))
class Device():
'''devices are sources of heat - CPU, GPU, etc.'''
def __init__(self, base_path, config, fan_max_speed, n_sample_avg) -> None:
'''constructor'''
self.sensor_path = get_full_path(base_path, config["hwmon_name"]) + config["sensor_name"]
self.sensor_path_input = self.sensor_path + "_input"
self.fan_max_speed = fan_max_speed
self.n_sample_avg = n_sample_avg
self.nice_name = config["nice_name"]
self.max_temp = config["max_temp"]
self.poll_reduction_multiple = config["poll_mult"]
self.n_poll_requests = 0
# try to pull critical temperature from the hwmon
try:
crit_temp = self.get_critical_temp()
if not 60 <= crit_temp <= 100:
raise Exception("critical temperature out of range")
self.max_temp = crit_temp
print(f'loaded critical temp from {self.nice_name} hwmon: {self.max_temp}')
except:
print(f'failed to load critical temp from {self.nice_name} hwmon, falling back to config')
self.temp_deadzone = config["temp_deadzone"]
self.temp = 0
self.control_temp = 0 # deadzone temp
self.control_output = 0 # controller output if > 0, max fan speed if max temp reached
self.buffer_full = False
self.control_temps = []
self.avg_control_temp = 0
# instantiate controller depending on type
self.type = config["type"]
if self.type == "pid":
self.controller = PID(float(config["Kp"]), float(config["Ki"]), float(config["Kd"]))
self.controller.SetPoint = config["T_setpoint"]
self.controller.setWindup(config["windup_limit"]) # windup limits the I term of the output
elif self.type == "quadratic":
self.controller = Quadratic(float(config["A"]), float(config["B"]), float(config["C"]), float(config["T_threshold"]))
elif self.type == "feedforward":
self.controller = FeedForward(float(config["Kp"]), float(config["Ki"]), float(config["Kd"]), int(config["windup"]), int(config["winddown"]), float(config["A_ff"]), float(config["B_ff"]), float(config["T_setpoint"]))
elif self.type == "ffmin":
self.controller = FeedForwardMin(float(config["Kp"]), float(config["Ki"]), float(config["Kd"]), int(config["windup"]), int(config["winddown"]), float(config["A_ff"]), float(config["B_ff"]), float(config["T_setpoint"]), float(config["A_min"]), float(config["B_min"]))
elif self.type == "ffquad":
self.controller = FeedForwardQuad(float(config["A_quad"]), float(config["B_quad"]), float(config["C_quad"]), float(config["A_ff"]), float(config["B_ff"]))
else:
print("error loading device controller \n")
exit(1)
def get_critical_temp(self) -> float:
'''returns the critical temperature of the device'''
with open(self.sensor_path + '_crit', 'r', encoding="utf8") as f:
return int(f.read().strip()) / 1000
def get_temp(self) -> float:
'''updates temperatures'''
self.n_poll_requests += 1
if self.n_poll_requests >= self.poll_reduction_multiple:
with open(self.sensor_path_input, 'r', encoding="utf8") as f:
self.temp = int(f.read().strip()) / 1000
# only update the control temp if it's outside temp_deadzone
if math.fabs(self.temp - self.control_temp) >= self.temp_deadzone:
self.control_temp = self.temp
self.n_poll_requests = 0
return self.control_temp
def get_avg_temp(self) -> float:
'''updates temperature list + generates average value'''
self.control_temps.append(self.get_temp())
if self.buffer_full:
self.control_temps.pop(0)
elif len(self.control_temps) >= self.n_sample_avg:
self.buffer_full = True
self.avg_control_temp = math.fsum(self.control_temps) / len(self.control_temps)
return self.avg_control_temp
def get_output(self, temp_input, power_input) -> int:
'''updates the device controller and returns bounded output'''
self.controller.update(temp_input, power_input)
self.control_output = max(self.controller.output, 0)
if(temp_input > self.max_temp):
self.control_output = self.fan_max_speed
return self.control_output
class Sensor():
'''sensor for measuring non-temperature values'''
def __init__(self, base_path, config) -> None:
self.sensor_path = get_full_path(base_path, config["hwmon_name"]) + config["sensor_name"]
self.nice_name = config["nice_name"]
self.n_sample_avg = config["n_sample_avg"]
self.value = 0
self.avg_value = 0
self.buffer_full = False
self.values = []
def get_value(self) -> float:
'''returns instantaneous value'''
with open(self.sensor_path, 'r', encoding='utf-8') as f:
self.value = int(f.read().strip()) / 1000000
return self.value
def get_avg_value(self) -> float:
'''returns average value'''
self.values.append(self.get_value())
if self.buffer_full:
self.values.pop(0)
elif len(self.values) >= self.n_sample_avg:
self.buffer_full = True
self.avg_value = math.fsum(self.values) / len(self.values)
return self.avg_value
def get_full_path(base_path, name) -> str:
'''helper function to find correct hwmon* path for a given device name'''
for directory in os.listdir(base_path):
full_path = base_path + directory + '/'
test_name = open(full_path + "name", encoding="utf8").read().strip()
if test_name == name:
return full_path
print(f"failed to find device {name}")
class FanController():
'''main FanController class'''
def __init__(self, config_file):
'''constructor'''
# read in config yaml file
with open(config_file, "r", encoding="utf8") as f:
try:
self.config = yaml.safe_load(f)
except yaml.YAMLError as exc:
print("error loading config file \n", exc)
exit(1)
# store global parameters
self.base_hwmon_path = self.config["base_hwmon_path"]
self.loop_interval = self.config["loop_interval"]
self.control_loop_ratio = self.config["control_loop_ratio"]
# initialize fan
fan_path = get_full_path(self.base_hwmon_path, self.config["fan_hwmon_name"])
self.fan = Fan(fan_path, self.config)
# initialize list of devices
self.devices = [ Device(self.base_hwmon_path, device_config, self.fan.max_speed, self.control_loop_ratio) for device_config in self.config["devices"] ]
# initialize APU power sensor
self.power_sensor = Sensor(self.base_hwmon_path, self.config["sensors"][0])
# exit handler
signal.signal(signal.SIGTERM, self.on_exit)
def print_single(self, source_name):
'''pretty print all device values, temp source, and output'''
for device in self.devices:
print(f"{device.nice_name}: {device.temp:.1f}/{device.control_output:.0f} ", end = '')
#print("{}: {} ".format(device.nice_name, device.temp), end = '')
print(f"{self.power_sensor.nice_name}: {self.power_sensor.value:.1f}/{self.power_sensor.avg_value:.1f} ", end = '')
print(f"Fan[{source_name}]: {int(self.fan.fc_speed)}/{self.fan.measured_speed}")
def loop_read_sensors(self):
'''internal loop to measure device temps and sensor value'''
start_time = time.time()
self.power_sensor.get_avg_value()
for device in self.devices:
device.get_avg_temp()
sleep_time = self.loop_interval - (time.time() - start_time)
if sleep_time > 0:
time.sleep(sleep_time)
def loop_control(self):
'''main control loop'''
print("jupiter-fan-control starting up ...")
while True:
fan_error = abs(self.fan.fc_speed - self.fan.get_speed())
if fan_error > 500:
self.fan.take_control_from_ec()
# read device temps and power sensor
for _ in range(self.control_loop_ratio):
self.loop_read_sensors()
# read charge state
self.fan.get_charge_state()
for device in self.devices:
device.get_output(device.avg_control_temp, self.power_sensor.avg_value)
max_output = max(device.control_output for device in self.devices)
self.fan.set_speed(max_output)
# find source name for the max control output
source_name = next(device for device in self.devices if device.control_output == max_output).nice_name
# print all values
self.print_single(source_name)
def on_exit(self, signum, frame):
'''exit handler'''
self.fan.return_to_ec_control()
print("returning fan to EC control loop")
exit()
# main
if __name__ == '__main__':
# specify config file path
CONFIG_FILE_PATH = "/usr/share/jupiter-fan-control/jupiter-fan-control-config.yaml"
controller = FanController(config_file = CONFIG_FILE_PATH)
args = sys.argv
if len(args) == 2:
command = args[1]
if command == "--run":
controller.loop_control()
# otherwise, exit cleanly
controller.on_exit(None, None)
# config file for juputer-fan-control.service
loop_interval: 0.2
control_loop_ratio: 5
base_hwmon_path: /sys/class/hwmon/
charge_state_path: /sys/class/power_supply/BAT1/status
fan_hwmon_name: jupiter
fan_min_speed: 10
fan_threshold_speed: 1500
fan_max_speed: 7300
fan_gain: 10
ec_ramp_rate: 10
sensors:
- hwmon_name: amdgpu
nice_name: P_APU
sensor_name: power1_average
n_sample_avg: 120
devices:
- hwmon_name: acpitz
nice_name: CPU
poll_mult: 1
max_temp: 90
temp_deadzone: 2
sensor_name: temp1
type: quadratic
A: 2.286
B: -188.6
C: 5457
T_threshold: 55
# quadratic fit {{55, 2000}, {80, 5000}, {90, 7000}}
- hwmon_name: amdgpu
nice_name: GPU
poll_mult: 1
max_temp: 90
temp_deadzone: 2
sensor_name: temp1
type: quadratic
A: 2.286
B: -188.6
C: 5457
T_threshold: 55
# quadratic fit {{55, 2000}, {80, 5000}, {90, 7000}}
- hwmon_name: nvme
nice_name: SSD
poll_mult: 20
max_temp: 90.0
temp_deadzone: 3
sensor_name: temp1
type: pid
T_setpoint: 80
Kp: 0
Ki: 20
Kd: 0
windup_limit: 3000
#!/usr/bin/python
#
# This file is part of IvPID.
# Copyright (C) 2015 Ivmech Mechatronics Ltd. <bilgi@ivmech.com>
#
# IvPID is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# IvPID is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# title :PID.py
# description :python pid controller
# author :Caner Durmusoglu
# date :20151218
# version :0.1
# notes :
# python_version :2.7
# ==============================================================================
"""Ivmech PID Controller is simple implementation of a Proportional-Integral-Derivative (PID) Controller in the Python Programming Language.
More information about PID Controller: http://en.wikipedia.org/wiki/PID_controller
"""
import time
class PID:
"""PID Controller
"""
def __init__(self, P=0.2, I=0.0, D=0.0, current_time=None):
self.Kp = P
self.Ki = I
self.Kd = D
self.current_time = current_time if current_time is not None else time.time()
self.last_time = self.current_time
self.clear()
def clear(self):
"""Clears PID computations and coefficients"""
self.SetPoint = 0.0
self.PTerm = 0.0
self.ITerm = 0.0
self.DTerm = 0.0
self.last_error = 0.0
# Windup Guard
self.int_error = 0.0
self.windup_guard = 20.0
self.winddown_guard = 20.0
self.output = 0.0
def update(self, temp_input, _=None, current_time=None):
"""Calculates PID value for given reference feedback
.. math::
u(t) = K_p e(t) + K_i \int_{0}^{t} e(t)dt + K_d {de}/{dt}
.. figure:: images/pid_1.png
:align: center
Test PID with Kp=1.2, Ki=1, Kd=0.001 (test_pid.py)
"""
error = self.SetPoint - temp_input
self.current_time = current_time if current_time is not None else time.time()
delta_time = self.current_time - self.last_time
delta_error = error - self.last_error
self.PTerm = self.Kp * error
self.ITerm += error * delta_time
if (self.ITerm < -self.windup_guard):
self.ITerm = -self.windup_guard
elif (self.ITerm > self.winddown_guard):
self.ITerm = self.winddown_guard
self.DTerm = 0.0
if(delta_time > 0):
self.DTerm = delta_error / delta_time
# Remember last time and last error for next calculation
self.last_time = self.current_time
self.last_error = error
self.output = -1 * (self.PTerm + (self.Ki * self.ITerm) + (self.Kd * self.DTerm))
return self.output
def setKp(self, proportional_gain):
"""Determines how aggressively the PID reacts to the current error with setting Proportional Gain"""
self.Kp = proportional_gain
def setKi(self, integral_gain):
"""Determines how aggressively the PID reacts to the current error with setting Integral Gain"""
self.Ki = integral_gain
def setKd(self, derivative_gain):
"""Determines how aggressively the PID reacts to the current error with setting Derivative Gain"""
self.Kd = derivative_gain
def setWindup(self, windup):
"""Integral windup, also known as integrator windup or reset windup,
refers to the situation in a PID feedback controller where
a large change in setpoint occurs (say a positive change)
and the integral terms accumulates a significant error
during the rise (windup), thus overshooting and continuing
to increase as this accumulated error is unwound
(offset by errors in the other direction).
The specific problem is the excess overshooting.
"""
self.windup_guard = windup
def setWinddown(self, winddown):
self.winddown_guard = winddown
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment