-
-
Save jurkovic-nikola/4db1383c4e073d21703d0e365285e28a to your computer and use it in GitHub Desktop.
init, default color, default speed
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""liquidctl drivers for Corsair Link System Hub. | |
Supported devices: | |
- Corsair Link System Hub | |
Copyright Nikola Jurkovic | |
SPDX-License-Identifier: GPL-3.0-or-later | |
""" | |
import dataclasses | |
import logging | |
import struct | |
from typing import Any | |
from liquidctl.driver.usb import UsbHidDriver | |
_LOGGER = logging.getLogger(__name__) | |
# Lengths | |
_BUFFER_WRITE_LENGTH = 513 | |
_BUFFER_READ_LENGTH = 512 | |
_READ_HEADER_SIZE = 3 | |
_WRITE_HEADER_SIZE = 4 | |
_MAXIMUM_BUFFER_PER_REQUEST = 508 | |
# Interface | |
_INTERFACE_NUMBER = 0 | |
# Commands | |
_CMD_OPEN_ENDPOINT = (0x0d, 0x01) | |
_CMD_OPEN_COLOR_ENDPOINT = (0x0d, 0x00) | |
_CMD_CLOSE_ENDPOINT = (0x05, 0x01, 0x01) | |
_CMD_GET_FIRMWARE = (0x02, 0x13) | |
_CMD_SOFTWARE_MODE = (0x01, 0x03, 0x00, 0x02) | |
_CMD_HARDWARE_MODE = (0x01, 0x03, 0x00, 0x01) | |
_CMD_WRITE = (0x06, 0x01) | |
_CMD_WRITE_COLOR = (0x06, 0x00) | |
_CMD_READ = (0x08, 0x01) | |
_CMD_GET_DEVICE_MODE = (0x01, 0x08, 0x01) | |
# Command modes | |
_MODE_GET_DEVICES = (0x36,) | |
_MODE_GET_TEMPERATURES = (0x21,) | |
_MODE_GET_SPEEDS = (0x17,) | |
_MODE_SET_SPEED = (0x18,) | |
_MODE_SET_COLOR = (0x22,) | |
# Command data types | |
_DATA_TYPE_GET_DEVICES = (0x21, 0x00) | |
_DATA_TYPE_GET_TEMPERATURES = (0x10, 0x00) | |
_DATA_TYPE_GET_SPEEDS = (0x25, 0x00) | |
_DATA_TYPE_SET_SPEED = (0x07, 0x00) | |
_DATA_TYPE_SET_COLOR = (0x12, 0x00) | |
# Endpoint types | |
_ENDPOINT_TYPE_DEFAULT = 0 | |
_ENDPOINT_TYPE_COLOR = 1 | |
# Color brightness | |
_COLOR_BRIGHTNESS = 1 # Range is from 0 to 1 | |
# Speed types | |
_SPEED_TYPE_PERCENT = 0 | |
_SPEED_TYPE_RPM = 1 | |
# Default values | |
_DEFAULT_FAN_PERCENT = 60 | |
_DEFAULT_PUMP_PERCENT = 70 | |
@dataclasses.dataclass | |
class LinkDevice: | |
channel_id: int | |
device_type: int | |
device_id: str | |
name: str | |
rpm: float | |
temperature: float | |
led_channels: int | |
contains_pump: bool | |
@dataclasses.dataclass | |
class DeviceData: | |
channel_id: int | |
status: int | |
value: float | |
class DeviceList: | |
def __init__(self, device_id: int, model: int, name: str, led_channels: int): | |
self.device_id = device_id | |
self.model = model | |
self.name = name | |
self.led_channels = led_channels | |
class RGB: | |
def __init__(self, r, g, b): | |
self.R = r | |
self.G = g | |
self.B = b | |
class HSL: | |
def __init__(self, h, s, ll): | |
self.H = h | |
self.S = s | |
self.L = ll | |
class Color: | |
def __init__(self, red, green, blue, brightness=1.0): | |
self.R = red | |
self.G = green | |
self.B = blue | |
self.Brightness = brightness | |
def to_hsl(color): | |
r_r = color.R | |
r_g = color.G | |
r_b = color.B | |
max_value = max(r_r, r_g, r_b) | |
min_value = min(r_r, r_g, r_b) | |
ll = (max_value + min_value) / 2 | |
delta = max_value - min_value | |
if delta == 0: | |
return HSL(0, 0, ll) | |
if ll < 0.5: | |
s = delta / (max_value + min_value) | |
else: | |
s = delta / (2 - max_value - min_value) | |
r2 = (((max_value - r_r) / 6) + (delta / 2)) / delta | |
g2 = (((max_value - r_g) / 6) + (delta / 2)) / delta | |
b2 = (((max_value - r_b) / 6) + (delta / 2)) / delta | |
if r_r == max_value: | |
h = b2 - g2 | |
elif r_g == max_value: | |
h = (1.0 / 3.0) + r2 - b2 | |
else: | |
h = (2.0 / 3.0) + g2 - r2 | |
if h < 0: | |
h += 1 | |
elif h > 1: | |
h -= 1 | |
return HSL(h, s, ll) | |
def hue_to_rgb(v1, v2, h): | |
if h < 0: | |
h += 1 | |
if h > 1: | |
h -= 1 | |
if 6 * h < 1: | |
return v1 + (v2 - v1) * 6 * h | |
if 2 * h < 1: | |
return v2 | |
if 3 * h < 2: | |
return v1 + (v2 - v1) * ((2.0 / 3.0) - h) * 6 | |
return v1 | |
def to_rgb(hsl): | |
h_h = hsl.H | |
h_s = hsl.S | |
h_l = hsl.L | |
if h_s == 0: | |
return Color(round(h_l), round(h_l), round(h_l)) | |
if h_l < 0.5: | |
v2 = h_l * (1 + h_s) | |
else: | |
v2 = (h_l + h_s) - (h_s * h_l) | |
v1 = 2 * h_l - v2 | |
r = hue_to_rgb(v1, v2, h_h + (1.0 / 3.0)) | |
g = hue_to_rgb(v1, v2, h_h) | |
b = hue_to_rgb(v1, v2, h_h - (1.0 / 3.0)) | |
return Color(round(r), round(g), round(b)) | |
def modify_brightness(color, brightness): | |
if brightness > 1: | |
brightness = 1 | |
elif brightness < 0: | |
brightness = 0 | |
hsl = to_hsl(color) | |
hsl.L *= brightness | |
return to_rgb(hsl) | |
def contains_pump(t: int) -> bool: | |
return t == 0x07 or t == 0x0c or t == 0x0e | |
def process_multi_chunk_packet(data, max_chunk_size): | |
result = [] | |
while len(data) > 0: | |
# Calculate the end index for the current chunk | |
end = max_chunk_size | |
if len(data) < max_chunk_size: | |
end = len(data) | |
# Get the current chunk to process | |
chunk = data[:end] | |
# Append the chunk to the result | |
result.append(chunk) | |
# If the current chunk size is less than max size, break the loop | |
if len(data) <= max_chunk_size: | |
break | |
# Move to the next chunk | |
data = data[end:] | |
return result | |
def set_speed(data, mode): | |
buffer = bytearray(len(data) * 4 + 1) | |
buffer[0] = len(data) | |
i = 1 | |
for channel, speed in data.items(): | |
v = 2 | |
buffer[i] = channel | |
buffer[i + 1] = mode # Either percent mode or RPM mode | |
for value in range(len(speed)): | |
buffer[i + v] = speed[value] | |
v += 1 | |
i += 4 # Move to the next place | |
return buffer | |
def set_color(data): | |
buffer = bytearray(len(data) * 3) | |
i = 0 | |
# We need to sort keys due to the nature of RGB. | |
# R G B needs to be applied in the same way it was created. | |
keys = sorted(data.keys()) | |
for k in keys: | |
buffer[i] = data[k][0] # R | |
buffer[i + 1] = data[k][1] # G | |
buffer[i + 2] = data[k][2] # B | |
i += 3 # Move to the next place | |
return buffer | |
def int_to_byte_array(num): | |
size = 2 | |
buffer = bytearray(size) | |
for i in range(size): | |
byt = (num >> (i * 8)) & 0xFF | |
buffer[i] = byt | |
return buffer | |
class LinkSystemHub(UsbHidDriver): | |
# Corsair iCUE LINK System Hub | |
# https://www3.corsair.com/software/CUE_V5/public/modules/windows/packages/cuepkg-metadata.json | |
# Download cue pkg and extract Meta folder for device Ids | |
deviceList = [ | |
DeviceList(0x01, 0x00, "QX Fan", 34), # Fan | |
DeviceList(0x13, 0x00, "RX Fan", 0), # Fan No LEDs | |
DeviceList(0x0f, 0x00, "RX RGB Fan", 8), # Fan | |
DeviceList(0x07, 0x02, "H150i", 20), # AIO Black | |
DeviceList(0x07, 0x05, "H150i", 20), # AIO White | |
DeviceList(0x07, 0x01, "H115i", 20), # AIO | |
DeviceList(0x07, 0x03, "H170i", 20), # AIO | |
DeviceList(0x07, 0x00, "H100i", 20), # AIO Black | |
DeviceList(0x07, 0x04, "H100i", 20), # AIO White | |
DeviceList(0x09, 0x00, "XC7 Elite", 24), # CPU Block Stealth Gray | |
DeviceList(0x09, 0x01, "XC7 Elite", 24), # CPU Block White | |
DeviceList(0x0d, 0x00, "XG7", 16), # GPU Block | |
DeviceList(0x0c, 0x00, "XD5 Elite", 22), # Pump reservoir Stealth Gray | |
DeviceList(0x0c, 0x01, "XD5 Elite", 22), # Pump reservoir White (?) | |
DeviceList(0x0e, 0x00, "XD5 Elite LCD", 22), # Pump reservoir Stealth Gray | |
DeviceList(0x0e, 0x01, "XD5 Elite LCD", 22) # Pump reservoir White (?) | |
] | |
_MATCHES = [ | |
# No clue what has_pump means at all, but meh... | |
(0x1b1c, 0x0c3f, 'Corsair iCUE LINK System Hub', {"has_pump": True}), | |
] | |
@classmethod | |
def probe(cls, handle, **kwargs): | |
if handle.hidinfo['interface_number'] != _INTERFACE_NUMBER: | |
return | |
yield from super().probe(handle, **kwargs) | |
def __init__(self, device, description, has_pump, **kwargs): | |
super().__init__(device, description, **kwargs) | |
self._has_pump = has_pump | |
def initialize(self, **kwargs): | |
# Get Firmware | |
fw = self.transfer(_CMD_GET_FIRMWARE, (), ()) | |
v1 = int(fw[4]) | |
v2 = int(fw[5]) | |
v3 = int.from_bytes(fw[6:8], byteorder='little', signed=False) | |
status = [ | |
('Firmware version', '{}.{}.{}'.format(v1, v2, v3), '') | |
] | |
# Activate software mode | |
# This will cause lights to go off, since a device expects command to set initial lights | |
_LOGGER.debug('Switching device to software mode...') | |
self.transfer(_CMD_SOFTWARE_MODE, (), ()) | |
# Get all devices | |
_LOGGER.debug('Initializing devices...') | |
res = self.read(_MODE_GET_DEVICES, _DATA_TYPE_GET_DEVICES) | |
devices = self.init_devices(res) | |
# Print devices | |
for device in devices: | |
val = devices[device] | |
status += [ | |
('Device', '{} - {} RPM - {} C'.format(val.name, val.rpm, val.temperature), '') | |
] | |
# Default color | |
_LOGGER.debug("Setting all devices to white color") | |
white = Color(255, 255, 255, _COLOR_BRIGHTNESS) # Tweak _COLOR_BRIGHTNESS for brightness, from 0.1 to 1 | |
color = modify_brightness(white, white.Brightness) | |
self.set_device_color(devices, color) | |
# Default speeds - percent | |
_LOGGER.debug("Setting all devices to 100% speed") | |
self.set_device_speed(devices, _DEFAULT_FAN_PERCENT, _SPEED_TYPE_PERCENT) | |
# Default speeds - RPM | |
# _LOGGER.debug("Setting all devices to 100% speed") | |
# self.set_device_speed(devices, 2000, _SPEED_TYPE_RPM) | |
return status | |
def write(self, endpoint, data_type, data, endpoint_type): | |
# Buffer | |
buffer = bytearray(len(data_type) + len(data) + _WRITE_HEADER_SIZE) | |
struct.pack_into('<H', buffer, 0, len(data) + 2) | |
buffer[_WRITE_HEADER_SIZE:_WRITE_HEADER_SIZE + len(data_type)] = data_type | |
buffer[_WRITE_HEADER_SIZE + len(data_type):] = data | |
try: | |
self.transfer(_CMD_CLOSE_ENDPOINT, endpoint, ()) | |
except Exception as e: | |
_LOGGER.error(f"Unable to close endpoint: {e}") | |
raise | |
if endpoint_type == _ENDPOINT_TYPE_DEFAULT: # Speed control | |
try: | |
self.transfer(_CMD_OPEN_ENDPOINT, endpoint, ()) | |
except Exception as e: | |
_LOGGER.error(f"Unable to open endpoint: {e}") | |
raise | |
try: | |
self.transfer(_CMD_WRITE, buffer, ()) | |
except Exception as e: | |
_LOGGER.error(f"Unable to write to a device: {e}") | |
raise | |
elif endpoint_type == _ENDPOINT_TYPE_COLOR: # RGB | |
try: | |
self.transfer(_CMD_OPEN_COLOR_ENDPOINT, endpoint, ()) | |
except Exception as e: | |
_LOGGER.error(f"Unable to open endpoint: {e}") | |
raise | |
write_color_ep = _CMD_WRITE_COLOR | |
color_ep = bytearray(write_color_ep) | |
# RGB packets are multi chunked | |
chunks = process_multi_chunk_packet(buffer, _MAXIMUM_BUFFER_PER_REQUEST) | |
for i, chunk in enumerate(chunks): | |
# Once a packet exceeds the maximum defined length, | |
# we need to increment to a next endpoint on a device. | |
# The packet is chunked due to the number of devices it can support, | |
# and each LED requires 3 bytes (R, G, B) | |
# QX Fan has 34 LED channels, multiply with 3 = 102 bytes for 1 device | |
# Initial endpoint is 0x06, next is 0x07, then 0x08... | |
color_ep[0] = color_ep[0] + i | |
try: | |
self.transfer(color_ep, chunk, ()) | |
except Exception as e: | |
_LOGGER.error(f"Unable to write to a device: {e}") | |
raise | |
try: | |
self.transfer(_CMD_CLOSE_ENDPOINT, endpoint, ()) | |
except Exception as e: | |
_LOGGER.error(f"Unable to close endpoint: {e}") | |
raise | |
# Read will read data from a device | |
def read(self, endpoint, data_type): | |
# Close specified endpoint | |
try: | |
self.transfer(_CMD_CLOSE_ENDPOINT, endpoint, ()) | |
except Exception as e: | |
_LOGGER.error(f"Unable to close endpoint: {e}") | |
raise | |
# Open endpoint | |
try: | |
self.transfer(_CMD_OPEN_ENDPOINT, endpoint, ()) | |
except Exception as e: | |
_LOGGER.error(f"Unable to open endpoint: {e}") | |
raise | |
# Read data from endpoint | |
try: | |
buffer = self.transfer(_CMD_READ, endpoint, data_type) | |
except Exception as e: | |
_LOGGER.error(f"Unable to read endpoint: {e}") | |
raise | |
# Close specified endpoint | |
try: | |
self.transfer(_CMD_CLOSE_ENDPOINT, endpoint, ()) | |
except Exception as e: | |
_LOGGER.error(f"Unable to close endpoint: {e}") | |
raise | |
return buffer | |
def transfer(self, endpoint, data, data_type): | |
buffer_w = bytearray(_BUFFER_WRITE_LENGTH) | |
buffer_w[2] = 0x01 | |
endpoint_header_position = slice(_READ_HEADER_SIZE, _READ_HEADER_SIZE + len(endpoint)) | |
buffer_w[endpoint_header_position] = endpoint | |
if len(data) > 0: | |
buffer_w[_READ_HEADER_SIZE + len(endpoint):_READ_HEADER_SIZE + len(endpoint) + len(data)] = data | |
buffer_r = bytearray(_BUFFER_READ_LENGTH) | |
# Send command to a device | |
try: | |
self.device.clear_enqueued_reports() | |
self.device.write(buffer_w) | |
except Exception as e: | |
_LOGGER.debug('Unable to write to a device. Exception: %s', e) | |
# Get data from a device | |
try: | |
buffer_r = self.device.read(_BUFFER_READ_LENGTH) | |
except Exception as e: | |
_LOGGER.error('Unable to read from a device. Exception: %s', e) | |
if len(data_type) == 2: | |
while buffer_r[4] != data_type[0]: | |
try: | |
buffer_r = self.device.read(_BUFFER_READ_LENGTH) | |
except Exception as e: | |
_LOGGER.error('Unable to read from a device. Exception: %s', e) | |
return bytes(buffer_r) | |
def get_device_data(self, mode, data_type): | |
res = self.read(mode, data_type) | |
# Number of devices | |
amount = res[6] | |
# Device data | |
sensor_data = res[7:] | |
# Empty list | |
data_list = {} | |
for i in range(amount): | |
current_sensor = sensor_data[i * 3:(i + 1) * 3] | |
status = current_sensor[0] | |
value = 0.0 | |
if status == 0x00: | |
# Temperature sensor | |
if mode == _MODE_GET_TEMPERATURES: | |
value = struct.unpack('<h', current_sensor[1:3])[0] | |
value = float(value) / 10.0 | |
else: | |
# RPM sensor | |
value = struct.unpack('<h', current_sensor[1:3])[0] | |
device_data = DeviceData( | |
channel_id=i, | |
status=status, | |
value=value | |
) | |
data_list[i] = device_data | |
return data_list | |
def init_devices(self, buffer): | |
channel = buffer[6] | |
index = buffer[7:] | |
position = 0 | |
devices = [] | |
# Loop through a byte array | |
for i in range(1, int(channel) + 1): | |
device_id_len = index[position + 7] | |
if device_id_len == 0: | |
# Unable to get a device id length, skip processing and move to the next | |
position += 8 | |
continue | |
# Get a type, model and device id | |
device_type_model = index[position:position + 8] | |
device_id = index[position + 8:position + 8 + int(device_id_len)] | |
# Build basic device info | |
hub_device = { | |
"ChannelId": i, | |
"DeviceId": device_id.decode(), | |
"DeviceType": device_type_model[2], | |
"DeviceModel": device_type_model[3] | |
} | |
# Append a device to array | |
devices.append(hub_device) | |
# Move to next position for read | |
position += 8 + int(device_id_len) | |
# Make simple key:value pair | |
device_list = {} | |
# Get device temperatures | |
temperatures = self.get_device_data(_MODE_GET_TEMPERATURES, _DATA_TYPE_GET_TEMPERATURES) | |
# Get device speeds | |
speeds = self.get_device_data(_MODE_GET_SPEEDS, _DATA_TYPE_GET_SPEEDS) | |
# Go through all devices | |
for device in devices: | |
# Find device definition | |
match = self.get_device(device['DeviceType'], device['DeviceModel']) | |
if match is None: | |
continue | |
# Build full stack of device info | |
hub_device_info = LinkDevice( | |
channel_id=device['ChannelId'], | |
device_type=device['DeviceType'], | |
device_id=device['DeviceId'], | |
name=match.name, | |
rpm=speeds[device['ChannelId']].value, | |
temperature=temperatures[device['ChannelId']].value, | |
led_channels=match.led_channels, | |
contains_pump=contains_pump(device['DeviceType']) | |
) | |
# All device to array | |
device_list[device['ChannelId']] = hub_device_info | |
# Development purposes | |
_LOGGER.debug('Device initialized. Device: %s', hub_device_info) | |
# Send it | |
return device_list | |
# Development purposes | |
def set_device_color(self, device_list, color): | |
m = 0 | |
buf = {} | |
# Go through all devices | |
for device in device_list: | |
val = device_list[device] | |
led_channels = val.led_channels | |
if led_channels > 0: | |
for i in range(led_channels): | |
# Build color buffer | |
buf[m] = [ | |
int(color.R), | |
int(color.G), | |
int(color.B), | |
] | |
m += 1 | |
# Finalize color buffer | |
buffer = set_color(buf) | |
# Send it | |
self.write( | |
_MODE_SET_COLOR, | |
_DATA_TYPE_SET_COLOR, | |
buffer, | |
_ENDPOINT_TYPE_COLOR | |
) | |
def set_device_speed(self, device_list, speed_value, speed_type): | |
# Basic checks for speed values | |
if speed_type == _SPEED_TYPE_PERCENT: | |
if speed_value > 100: | |
speed_value = 100 | |
if speed_value < 40: | |
speed_value = 40 | |
else: | |
if speed_value > 3000: | |
speed_value = 2400 | |
if speed_value < 400: | |
speed_value = 400 | |
speed = int_to_byte_array(speed_value) | |
for device in device_list: | |
val = device_list[device] | |
if val.contains_pump: | |
# Pump can only be controller via percent mode | |
speed = int_to_byte_array(_DEFAULT_PUMP_PERCENT) | |
channel_speeds = {val.channel_id: speed} | |
speed_type = 0 | |
else: | |
channel_speeds = {val.channel_id: speed} | |
buffer = set_speed(channel_speeds, speed_type) | |
self.write( | |
_MODE_SET_SPEED, | |
_DATA_TYPE_SET_SPEED, | |
buffer, | |
_ENDPOINT_TYPE_DEFAULT | |
) | |
def get_device(self, device_id: int, device_model: int) -> DeviceList | None | Any: | |
for device in self.deviceList: | |
if device.device_id == device_id and device.model == device_model: | |
return device | |
return None |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment