These can be found on a Steam Deck at: /usr/share/jupiter-fan-control
Last active
May 25, 2023 09:25
-
-
Save XanderXAJ/c4854add7b17216ac553db5c5f94898b to your computer and use it in GitHub Desktop.
Steam Deck Stock Fan Control Script 2023-05-25
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python -u | |
"""jupiter-fan-controller""" | |
import signal | |
import os | |
import sys | |
import subprocess | |
import time | |
import math | |
import yaml | |
from PID import PID | |
# quadratic function RPM = AT^2 + BT + C | |
class Quadratic(): | |
'''quadratic function controller''' | |
def __init__(self, A, B, C, T_threshold = 0) -> None: | |
'''constructor''' | |
self.A = A | |
self.B = B | |
self.C = C | |
self.T_threshold = T_threshold | |
self.output = 0 | |
def update(self, temp_input, _) -> int: | |
'''update output''' | |
if temp_input < self.T_threshold: | |
self.output = 0 | |
else: | |
self.output = int(self.A * math.pow(temp_input, 2) + self.B * temp_input + self.C) | |
return self.output | |
class FeedForward(): | |
'''RPM predicted by APU power is fed forward + PID output stage''' | |
def __init__(self, Kp, Ki, Kd, windup, winddown, a_ff, b_ff, temp_setpoint) -> None: | |
'''constructor''' | |
self.a_ff = a_ff | |
self.b_ff = b_ff | |
self.temp_setpoint = temp_setpoint | |
self.pid = PID(Kp, Ki, Kd) | |
self.pid.SetPoint = temp_setpoint | |
self.pid.setWindup(windup) | |
self.pid.setWinddown(winddown) | |
self.output = 0 | |
def print_ff_state(self, ff_output, pid_output) -> str: | |
'''prints state variables of FF and PID, helpful for debug''' | |
print(f"FeedForward Controller - FF:{ff_output:.0f} PID: {-1 * self.pid.PTerm:.0f} {-1 * self.pid.Ki * self.pid.ITerm:.0f} {-1 * self.pid.Kd * self.pid.DTerm:.0f} = {pid_output:.0f}") | |
def get_ff_setpoint(self, power_input) -> int: | |
'''returns the feed forward portion of the controller output''' | |
rpm_setpoint = int(self.a_ff * power_input + self.b_ff) | |
return rpm_setpoint | |
def update(self, temp_input, power_input) -> int: | |
'''run controller to update output''' | |
pid_output = self.pid.update(temp_input) | |
ff_output = self.get_ff_setpoint(power_input) | |
self.output = int(pid_output + ff_output) | |
# self.print_ff_state(ff_output, pid_output) | |
return self.output | |
class FeedForwardMin(): | |
'''FF with an additional min curve''' | |
def __init__(self, Kp, Ki, Kd, windup, winddown, a_ff, b_ff, temp_setpoint, a_min, b_min) -> None: | |
'''constructor''' | |
self.a_ff = a_ff | |
self.b_ff = b_ff | |
self.a_min = a_min | |
self.b_min = b_min | |
self.temp_setpoint = temp_setpoint | |
self.pid = PID(Kp, Ki, Kd) | |
self.pid.SetPoint = temp_setpoint | |
self.pid.setWindup(windup) | |
self.pid.setWinddown(winddown) | |
self.output = 0 | |
def print_ff_state(self, ff_output, pid_output, min_setpoint) -> str: | |
'''prints state variables of FF and PID, helpful for debug''' | |
print(f"FeedForward Controller - Min:{min_setpoint} FF:{ff_output:.0f} PID:{-1 * self.pid.PTerm:.0f}/{-1 * self.pid.Ki * self.pid.ITerm:.0f}/{-1 * self.pid.Kd * self.pid.DTerm:.0f} = {ff_output + pid_output:.0f}") | |
def get_ff_setpoint(self, power_input) -> int: | |
'''returns the feed forward portion of the controller output''' | |
rpm_setpoint = int(self.a_ff * power_input + self.b_ff) | |
return rpm_setpoint | |
def get_min_setpoint(self, temp_input) -> int: | |
'''returns a minimum rpm speed for the given temperature''' | |
rpm_setpoint = int(self.a_min * temp_input + self.b_min) | |
return rpm_setpoint | |
def update(self, temp_input, power_input) -> int: | |
'''run controller to update output''' | |
pid_output = int(self.pid.update(temp_input)) | |
ff_output = self.get_ff_setpoint(power_input) | |
min_setpoint = self.get_min_setpoint(temp_input) | |
self.output = max(min_setpoint,(pid_output + ff_output)) | |
self.print_ff_state(ff_output, pid_output, min_setpoint) | |
return self.output | |
class FeedForwardQuad(): | |
'''FF with an additional min curve''' | |
def __init__(self, a_quad, b_quad, c_quad, a_ff, b_ff) -> None: | |
'''constructor''' | |
self.a_ff = a_ff | |
self.b_ff = b_ff | |
# self.temp_setpoint = temp_setpoint | |
self.ff_deadzone = 300 | |
self.ff_last_setpoint = 0 | |
self.quad = Quadratic(a_quad, b_quad, c_quad) | |
self.output = 0 | |
def print_ff_state(self, ff_output, quad_output): | |
'''prints state variables of FF and PID, helpful for debug''' | |
print(f"FeedForward Controller - Quad:{quad_output} FF:{ff_output:.0f}") | |
def get_ff_setpoint(self, power_input) -> int: | |
'''returns the feed forward portion of the controller output''' | |
rpm_setpoint = int(self.a_ff * power_input + self.b_ff) | |
if abs(rpm_setpoint - self.ff_last_setpoint) > self.ff_deadzone: | |
self.ff_last_setpoint = rpm_setpoint | |
return rpm_setpoint | |
return self.ff_last_setpoint | |
def update(self, temp_input, power_input) -> int: | |
'''run controller to update output''' | |
quad_output = int(self.quad.update(temp_input, None)) | |
ff_output = self.get_ff_setpoint(power_input) | |
# min_setpoint = self.get_min_setpoint(temp_input) | |
self.output = quad_output + ff_output | |
self.print_ff_state(ff_output, quad_output) | |
return self.output | |
class Fan(): | |
'''fan object controls all jupiter hwmon parameters''' | |
def __init__(self, fan_path, config) -> None: | |
'''constructor''' | |
self.fan_path = fan_path | |
self.charge_state_path = config["charge_state_path"] | |
self.min_speed = config["fan_min_speed"] | |
self.threshold_speed = config["fan_threshold_speed"] | |
self.max_speed = config["fan_max_speed"] | |
self.gain = config["fan_gain"] | |
self.ec_ramp_rate = config["ec_ramp_rate"] | |
self.fc_speed = 0 | |
self.measured_speed = 0 | |
self.charge_state = False | |
self.charge_min_speed = 2000 | |
self.has_std_bios = self.bios_compatibility_check() | |
self.take_control_from_ec() | |
self.set_speed(3000) | |
@staticmethod | |
def bios_compatibility_check() -> bool: | |
"""returns True for bios versions >= 106, false for earlier versions""" | |
version = subprocess.check_output(["dmidecode", "-s", "bios-version"]) # b'F7A0104T06\n' | |
version = int(version[3:7]) | |
if version >= 106: | |
return True | |
else: | |
return False | |
def take_control_from_ec(self) -> None: | |
'''take over fan control from ec mcu''' | |
if self.has_std_bios: | |
return | |
else: | |
with open(self.fan_path + "gain", 'w', encoding="utf8") as f: | |
f.write(str(self.gain)) | |
with open(self.fan_path + "ramp_rate", 'w', encoding="utf8") as f: | |
f.write(str(self.ec_ramp_rate)) | |
with open(self.fan_path + "recalculate", 'w', encoding="utf8") as f: | |
f.write(str(1)) | |
def return_to_ec_control(self) -> None: | |
'''reset EC to generate fan values internally''' | |
if self.has_std_bios: | |
with open(self.fan_path + "fan1_target", 'w', encoding="utf8") as f: | |
f.write(str(int(0))) | |
else: | |
with open(self.fan_path + "gain", 'w', encoding="utf8") as f: | |
f.write(str(10)) | |
with open(self.fan_path + "ramp_rate", 'w', encoding="utf8") as f: | |
f.write(str(20)) | |
with open(self.fan_path + "recalculate", 'w', encoding="utf8") as f: | |
f.write(str(0)) | |
def get_speed(self) -> int: | |
'''returns the measured (real) fan speed''' | |
with open(self.fan_path + "fan1_input", 'r', encoding="utf8") as f: | |
self.measured_speed = int(f.read().strip()) | |
return self.measured_speed | |
def get_charge_state(self) -> bool: | |
'''updates min rpm depending on charge state''' | |
with open(self.charge_state_path, 'r', encoding="utf8") as f: | |
state = f.read().strip() | |
if state == "Charging": | |
self.charge_state = True | |
else: | |
self.charge_state = False | |
return self.charge_state | |
def set_speed(self, speed) -> None: | |
'''sets a new target speed''' | |
if speed > self.max_speed: | |
speed = self.max_speed | |
elif self.charge_state: | |
if speed < self.charge_min_speed: | |
speed = self.charge_min_speed | |
elif speed < self.threshold_speed: | |
speed = self.min_speed | |
self.fc_speed = speed | |
with open(self.fan_path + "fan1_target", 'w', encoding="utf8") as f: | |
f.write(str(int(self.fc_speed))) | |
class Device(): | |
'''devices are sources of heat - CPU, GPU, etc.''' | |
def __init__(self, base_path, config, fan_max_speed, n_sample_avg) -> None: | |
'''constructor''' | |
self.sensor_path = get_full_path(base_path, config["hwmon_name"]) + config["sensor_name"] | |
self.sensor_path_input = self.sensor_path + "_input" | |
self.fan_max_speed = fan_max_speed | |
self.n_sample_avg = n_sample_avg | |
self.nice_name = config["nice_name"] | |
self.max_temp = config["max_temp"] | |
self.poll_reduction_multiple = config["poll_mult"] | |
self.n_poll_requests = 0 | |
# try to pull critical temperature from the hwmon | |
try: | |
crit_temp = self.get_critical_temp() | |
if not 60 <= crit_temp <= 100: | |
raise Exception("critical temperature out of range") | |
self.max_temp = crit_temp | |
print(f'loaded critical temp from {self.nice_name} hwmon: {self.max_temp}') | |
except: | |
print(f'failed to load critical temp from {self.nice_name} hwmon, falling back to config') | |
self.temp_deadzone = config["temp_deadzone"] | |
self.temp = 0 | |
self.control_temp = 0 # deadzone temp | |
self.control_output = 0 # controller output if > 0, max fan speed if max temp reached | |
self.buffer_full = False | |
self.control_temps = [] | |
self.avg_control_temp = 0 | |
# instantiate controller depending on type | |
self.type = config["type"] | |
if self.type == "pid": | |
self.controller = PID(float(config["Kp"]), float(config["Ki"]), float(config["Kd"])) | |
self.controller.SetPoint = config["T_setpoint"] | |
self.controller.setWindup(config["windup_limit"]) # windup limits the I term of the output | |
elif self.type == "quadratic": | |
self.controller = Quadratic(float(config["A"]), float(config["B"]), float(config["C"]), float(config["T_threshold"])) | |
elif self.type == "feedforward": | |
self.controller = FeedForward(float(config["Kp"]), float(config["Ki"]), float(config["Kd"]), int(config["windup"]), int(config["winddown"]), float(config["A_ff"]), float(config["B_ff"]), float(config["T_setpoint"])) | |
elif self.type == "ffmin": | |
self.controller = FeedForwardMin(float(config["Kp"]), float(config["Ki"]), float(config["Kd"]), int(config["windup"]), int(config["winddown"]), float(config["A_ff"]), float(config["B_ff"]), float(config["T_setpoint"]), float(config["A_min"]), float(config["B_min"])) | |
elif self.type == "ffquad": | |
self.controller = FeedForwardQuad(float(config["A_quad"]), float(config["B_quad"]), float(config["C_quad"]), float(config["A_ff"]), float(config["B_ff"])) | |
else: | |
print("error loading device controller \n") | |
exit(1) | |
def get_critical_temp(self) -> float: | |
'''returns the critical temperature of the device''' | |
with open(self.sensor_path + '_crit', 'r', encoding="utf8") as f: | |
return int(f.read().strip()) / 1000 | |
def get_temp(self) -> float: | |
'''updates temperatures''' | |
self.n_poll_requests += 1 | |
if self.n_poll_requests >= self.poll_reduction_multiple: | |
with open(self.sensor_path_input, 'r', encoding="utf8") as f: | |
self.temp = int(f.read().strip()) / 1000 | |
# only update the control temp if it's outside temp_deadzone | |
if math.fabs(self.temp - self.control_temp) >= self.temp_deadzone: | |
self.control_temp = self.temp | |
self.n_poll_requests = 0 | |
return self.control_temp | |
def get_avg_temp(self) -> float: | |
'''updates temperature list + generates average value''' | |
self.control_temps.append(self.get_temp()) | |
if self.buffer_full: | |
self.control_temps.pop(0) | |
elif len(self.control_temps) >= self.n_sample_avg: | |
self.buffer_full = True | |
self.avg_control_temp = math.fsum(self.control_temps) / len(self.control_temps) | |
return self.avg_control_temp | |
def get_output(self, temp_input, power_input) -> int: | |
'''updates the device controller and returns bounded output''' | |
self.controller.update(temp_input, power_input) | |
self.control_output = max(self.controller.output, 0) | |
if(temp_input > self.max_temp): | |
self.control_output = self.fan_max_speed | |
return self.control_output | |
class Sensor(): | |
'''sensor for measuring non-temperature values''' | |
def __init__(self, base_path, config) -> None: | |
self.sensor_path = get_full_path(base_path, config["hwmon_name"]) + config["sensor_name"] | |
self.nice_name = config["nice_name"] | |
self.n_sample_avg = config["n_sample_avg"] | |
self.value = 0 | |
self.avg_value = 0 | |
self.buffer_full = False | |
self.values = [] | |
def get_value(self) -> float: | |
'''returns instantaneous value''' | |
with open(self.sensor_path, 'r', encoding='utf-8') as f: | |
self.value = int(f.read().strip()) / 1000000 | |
return self.value | |
def get_avg_value(self) -> float: | |
'''returns average value''' | |
self.values.append(self.get_value()) | |
if self.buffer_full: | |
self.values.pop(0) | |
elif len(self.values) >= self.n_sample_avg: | |
self.buffer_full = True | |
self.avg_value = math.fsum(self.values) / len(self.values) | |
return self.avg_value | |
def get_full_path(base_path, name) -> str: | |
'''helper function to find correct hwmon* path for a given device name''' | |
for directory in os.listdir(base_path): | |
full_path = base_path + directory + '/' | |
test_name = open(full_path + "name", encoding="utf8").read().strip() | |
if test_name == name: | |
return full_path | |
print(f"failed to find device {name}") | |
class FanController(): | |
'''main FanController class''' | |
def __init__(self, config_file): | |
'''constructor''' | |
# read in config yaml file | |
with open(config_file, "r", encoding="utf8") as f: | |
try: | |
self.config = yaml.safe_load(f) | |
except yaml.YAMLError as exc: | |
print("error loading config file \n", exc) | |
exit(1) | |
# store global parameters | |
self.base_hwmon_path = self.config["base_hwmon_path"] | |
self.loop_interval = self.config["loop_interval"] | |
self.control_loop_ratio = self.config["control_loop_ratio"] | |
# initialize fan | |
fan_path = get_full_path(self.base_hwmon_path, self.config["fan_hwmon_name"]) | |
self.fan = Fan(fan_path, self.config) | |
# initialize list of devices | |
self.devices = [ Device(self.base_hwmon_path, device_config, self.fan.max_speed, self.control_loop_ratio) for device_config in self.config["devices"] ] | |
# initialize APU power sensor | |
self.power_sensor = Sensor(self.base_hwmon_path, self.config["sensors"][0]) | |
# exit handler | |
signal.signal(signal.SIGTERM, self.on_exit) | |
def print_single(self, source_name): | |
'''pretty print all device values, temp source, and output''' | |
for device in self.devices: | |
print(f"{device.nice_name}: {device.temp:.1f}/{device.control_output:.0f} ", end = '') | |
#print("{}: {} ".format(device.nice_name, device.temp), end = '') | |
print(f"{self.power_sensor.nice_name}: {self.power_sensor.value:.1f}/{self.power_sensor.avg_value:.1f} ", end = '') | |
print(f"Fan[{source_name}]: {int(self.fan.fc_speed)}/{self.fan.measured_speed}") | |
def loop_read_sensors(self): | |
'''internal loop to measure device temps and sensor value''' | |
start_time = time.time() | |
self.power_sensor.get_avg_value() | |
for device in self.devices: | |
device.get_avg_temp() | |
sleep_time = self.loop_interval - (time.time() - start_time) | |
if sleep_time > 0: | |
time.sleep(sleep_time) | |
def loop_control(self): | |
'''main control loop''' | |
print("jupiter-fan-control starting up ...") | |
while True: | |
fan_error = abs(self.fan.fc_speed - self.fan.get_speed()) | |
if fan_error > 500: | |
self.fan.take_control_from_ec() | |
# read device temps and power sensor | |
for _ in range(self.control_loop_ratio): | |
self.loop_read_sensors() | |
# read charge state | |
self.fan.get_charge_state() | |
for device in self.devices: | |
device.get_output(device.avg_control_temp, self.power_sensor.avg_value) | |
max_output = max(device.control_output for device in self.devices) | |
self.fan.set_speed(max_output) | |
# find source name for the max control output | |
source_name = next(device for device in self.devices if device.control_output == max_output).nice_name | |
# print all values | |
self.print_single(source_name) | |
def on_exit(self, signum, frame): | |
'''exit handler''' | |
self.fan.return_to_ec_control() | |
print("returning fan to EC control loop") | |
exit() | |
# main | |
if __name__ == '__main__': | |
# specify config file path | |
CONFIG_FILE_PATH = "/usr/share/jupiter-fan-control/jupiter-fan-control-config.yaml" | |
controller = FanController(config_file = CONFIG_FILE_PATH) | |
args = sys.argv | |
if len(args) == 2: | |
command = args[1] | |
if command == "--run": | |
controller.loop_control() | |
# otherwise, exit cleanly | |
controller.on_exit(None, None) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# config file for juputer-fan-control.service | |
loop_interval: 0.2 | |
control_loop_ratio: 5 | |
base_hwmon_path: /sys/class/hwmon/ | |
charge_state_path: /sys/class/power_supply/BAT1/status | |
fan_hwmon_name: jupiter | |
fan_min_speed: 10 | |
fan_threshold_speed: 1500 | |
fan_max_speed: 7300 | |
fan_gain: 10 | |
ec_ramp_rate: 10 | |
sensors: | |
- hwmon_name: amdgpu | |
nice_name: P_APU | |
sensor_name: power1_average | |
n_sample_avg: 120 | |
devices: | |
- hwmon_name: acpitz | |
nice_name: CPU | |
poll_mult: 1 | |
max_temp: 90 | |
temp_deadzone: 2 | |
sensor_name: temp1 | |
type: quadratic | |
A: 2.286 | |
B: -188.6 | |
C: 5457 | |
T_threshold: 55 | |
# quadratic fit {{55, 2000}, {80, 5000}, {90, 7000}} | |
- hwmon_name: amdgpu | |
nice_name: GPU | |
poll_mult: 1 | |
max_temp: 90 | |
temp_deadzone: 2 | |
sensor_name: temp1 | |
type: quadratic | |
A: 2.286 | |
B: -188.6 | |
C: 5457 | |
T_threshold: 55 | |
# quadratic fit {{55, 2000}, {80, 5000}, {90, 7000}} | |
- hwmon_name: nvme | |
nice_name: SSD | |
poll_mult: 20 | |
max_temp: 90.0 | |
temp_deadzone: 3 | |
sensor_name: temp1 | |
type: pid | |
T_setpoint: 80 | |
Kp: 0 | |
Ki: 20 | |
Kd: 0 | |
windup_limit: 3000 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# | |
# This file is part of IvPID. | |
# Copyright (C) 2015 Ivmech Mechatronics Ltd. <bilgi@ivmech.com> | |
# | |
# IvPID is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# | |
# IvPID is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License | |
# along with this program. If not, see <http://www.gnu.org/licenses/>. | |
# title :PID.py | |
# description :python pid controller | |
# author :Caner Durmusoglu | |
# date :20151218 | |
# version :0.1 | |
# notes : | |
# python_version :2.7 | |
# ============================================================================== | |
"""Ivmech PID Controller is simple implementation of a Proportional-Integral-Derivative (PID) Controller in the Python Programming Language. | |
More information about PID Controller: http://en.wikipedia.org/wiki/PID_controller | |
""" | |
import time | |
class PID: | |
"""PID Controller | |
""" | |
def __init__(self, P=0.2, I=0.0, D=0.0, current_time=None): | |
self.Kp = P | |
self.Ki = I | |
self.Kd = D | |
self.current_time = current_time if current_time is not None else time.time() | |
self.last_time = self.current_time | |
self.clear() | |
def clear(self): | |
"""Clears PID computations and coefficients""" | |
self.SetPoint = 0.0 | |
self.PTerm = 0.0 | |
self.ITerm = 0.0 | |
self.DTerm = 0.0 | |
self.last_error = 0.0 | |
# Windup Guard | |
self.int_error = 0.0 | |
self.windup_guard = 20.0 | |
self.winddown_guard = 20.0 | |
self.output = 0.0 | |
def update(self, temp_input, _=None, current_time=None): | |
"""Calculates PID value for given reference feedback | |
.. math:: | |
u(t) = K_p e(t) + K_i \int_{0}^{t} e(t)dt + K_d {de}/{dt} | |
.. figure:: images/pid_1.png | |
:align: center | |
Test PID with Kp=1.2, Ki=1, Kd=0.001 (test_pid.py) | |
""" | |
error = self.SetPoint - temp_input | |
self.current_time = current_time if current_time is not None else time.time() | |
delta_time = self.current_time - self.last_time | |
delta_error = error - self.last_error | |
self.PTerm = self.Kp * error | |
self.ITerm += error * delta_time | |
if (self.ITerm < -self.windup_guard): | |
self.ITerm = -self.windup_guard | |
elif (self.ITerm > self.winddown_guard): | |
self.ITerm = self.winddown_guard | |
self.DTerm = 0.0 | |
if(delta_time > 0): | |
self.DTerm = delta_error / delta_time | |
# Remember last time and last error for next calculation | |
self.last_time = self.current_time | |
self.last_error = error | |
self.output = -1 * (self.PTerm + (self.Ki * self.ITerm) + (self.Kd * self.DTerm)) | |
return self.output | |
def setKp(self, proportional_gain): | |
"""Determines how aggressively the PID reacts to the current error with setting Proportional Gain""" | |
self.Kp = proportional_gain | |
def setKi(self, integral_gain): | |
"""Determines how aggressively the PID reacts to the current error with setting Integral Gain""" | |
self.Ki = integral_gain | |
def setKd(self, derivative_gain): | |
"""Determines how aggressively the PID reacts to the current error with setting Derivative Gain""" | |
self.Kd = derivative_gain | |
def setWindup(self, windup): | |
"""Integral windup, also known as integrator windup or reset windup, | |
refers to the situation in a PID feedback controller where | |
a large change in setpoint occurs (say a positive change) | |
and the integral terms accumulates a significant error | |
during the rise (windup), thus overshooting and continuing | |
to increase as this accumulated error is unwound | |
(offset by errors in the other direction). | |
The specific problem is the excess overshooting. | |
""" | |
self.windup_guard = windup | |
def setWinddown(self, winddown): | |
self.winddown_guard = winddown |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment