-
-
Save jurkovic-nikola/45790cfc2f615d32670bc95b029bdd8e to your computer and use it in GitHub Desktop.
Link System Hub device info, speed, temps and firmware
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""liquidctl drivers for Corsair Link System Hub. | |
Supported devices: | |
- Corsair Link System Hub | |
Copyright Nikola Jurkovic | |
SPDX-License-Identifier: GPL-3.0-or-later | |
""" | |
import dataclasses | |
import logging | |
import struct | |
from typing import Any | |
from liquidctl.driver.usb import UsbHidDriver | |
_LOGGER = logging.getLogger(__name__) | |
# Lengths | |
_BUFFER_WRITE_LENGTH = 513 | |
_BUFFER_READ_LENGTH = 512 | |
_READ_HEADER_SIZE = 3 | |
_WRITE_HEADER_SIZE = 4 | |
# Interface | |
_INTERFACE_NUMBER = 0 | |
# Commands | |
_CMD_OPEN_ENDPOINT = (0x0d, 0x01) | |
_CMD_OPEN_COLOR_ENDPOINT = (0x0d, 0x00) | |
_CMD_CLOSE_ENDPOINT = (0x05, 0x01, 0x01) | |
_CMD_GET_FIRMWARE = (0x02, 0x13) | |
_CMD_SOFTWARE_MODE = (0x01, 0x03, 0x00, 0x02) | |
_CMD_HARDWARE_MODE = (0x01, 0x03, 0x00, 0x01) | |
_CMD_WRITE = (0x06, 0x01) | |
_CMD_WRITE_COLOR = (0x06, 0x00) | |
_CMD_READ = (0x08, 0x01) | |
_CMD_GET_DEVICE_MODE = (0x01, 0x08, 0x01) | |
# Command modes | |
_MODE_GET_DEVICES = (0x36,) | |
_MODE_GET_TEMPERATURES = (0x21,) | |
_MODE_GET_SPEEDS = (0x17,) | |
_MODE_SET_SPEED = (0x18,) | |
_MODE_SET_COLOR = (0x22,) | |
# Command data types | |
_DATA_TYPE_GET_DEVICES = (0x21, 0x00) | |
_DATA_TYPE_GET_TEMPERATURES = (0x10, 0x00) | |
_DATA_TYPE_GET_SPEEDS = (0x25, 0x00) | |
_DATA_TYPE_SET_SPEED = (0x07, 0x00) | |
_DATA_TYPE_SET_COLOR = (0x12, 0x00) | |
@dataclasses.dataclass | |
class LinkDevice: | |
channel_id: int | |
device_type: int | |
device_id: str | |
name: str | |
rpm: float | |
temperature: float | |
led_channels: int | |
contains_pump: bool | |
@dataclasses.dataclass | |
class DeviceData: | |
channel_id: int | |
status: int | |
value: float | |
class DeviceList: | |
def __init__(self, device_id: int, model: int, name: str, led_channels: int): | |
self.device_id = device_id | |
self.model = model | |
self.name = name | |
self.led_channels = led_channels | |
def contains_pump(t: int) -> bool: | |
return t == 0x07 or t == 0x0c or t == 0x0e | |
class LinkSystemHub(UsbHidDriver): | |
# Corsair iCUE LINK System Hub | |
# https://www3.corsair.com/software/CUE_V5/public/modules/windows/packages/cuepkg-metadata.json | |
# Download cuepkg and extract Meta folder for device Ids | |
deviceList = [ | |
DeviceList(0x01, 0x00, "QX Fan", 34), # Fan | |
DeviceList(0x13, 0x00, "RX Fan", 0), # Fan No LEDs | |
DeviceList(0x0f, 0x00, "RX RGB Fan", 8), # Fan | |
DeviceList(0x07, 0x02, "H150i", 20), # AIO Black | |
DeviceList(0x07, 0x05, "H150i", 20), # AIO White | |
DeviceList(0x07, 0x01, "H115i", 20), # AIO | |
DeviceList(0x07, 0x03, "H170i", 20), # AIO | |
DeviceList(0x07, 0x00, "H100i", 20), # AIO Black | |
DeviceList(0x07, 0x04, "H100i", 20), # AIO White | |
DeviceList(0x09, 0x00, "XC7 Elite", 24), # CPU Block Stealth Gray | |
DeviceList(0x09, 0x01, "XC7 Elite", 24), # CPU Block White | |
DeviceList(0x0d, 0x00, "XG7", 16), # GPU Block | |
DeviceList(0x0c, 0x00, "XD5 Elite", 22), # Pump reservoir Stealth Gray | |
DeviceList(0x0c, 0x01, "XD5 Elite", 22), # Pump reservoir White (?) | |
DeviceList(0x0e, 0x00, "XD5 Elite LCD", 22), # Pump reservoir Stealth Gray | |
DeviceList(0x0e, 0x01, "XD5 Elite LCD", 22) # Pump reservoir White (?) | |
] | |
_MATCHES = [ | |
# No clue what has_pump means at all, but meh... | |
(0x1b1c, 0x0c3f, 'Corsair iCUE LINK System Hub', {"has_pump": True}), | |
] | |
@classmethod | |
def probe(cls, handle, **kwargs): | |
if handle.hidinfo['interface_number'] != _INTERFACE_NUMBER: | |
return | |
yield from super().probe(handle, **kwargs) | |
def __init__(self, device, description, has_pump, **kwargs): | |
super().__init__(device, description, **kwargs) | |
self._has_pump = has_pump | |
def initialize(self, **kwargs): | |
# Get Firmware | |
fw = self.transfer(_CMD_GET_FIRMWARE, (), ()) | |
v1 = int(fw[4]) | |
v2 = int(fw[5]) | |
v3 = int.from_bytes(fw[6:8], byteorder='little', signed=False) | |
status = [ | |
('Firmware version', '{}.{}.{}'.format(v1, v2, v3), '') | |
] | |
# Activate software mode | |
# This will cause lights to go off, since a device expects command to set initial lights | |
_LOGGER.debug('Switching device to software mode...') | |
self.transfer(_CMD_SOFTWARE_MODE, (), ()) | |
# Get all devices | |
_LOGGER.debug('Initializing devices...') | |
res = self.read(_MODE_GET_DEVICES, _DATA_TYPE_GET_DEVICES) | |
devices = self.init_devices(res) | |
# Print devices | |
for device in devices: | |
val = devices[device] | |
status += [ | |
('Device', '{} - {} RPM - {} C'.format(val.name, val.rpm, val.temperature), '') | |
] | |
return status | |
# Read will read data from a device | |
def read(self, endpoint, data_type): | |
# Close specified endpoint | |
try: | |
self.transfer(_CMD_CLOSE_ENDPOINT, endpoint, ()) | |
except Exception as e: | |
_LOGGER.error(f"Unable to close endpoint: {e}") | |
raise | |
# Open endpoint | |
try: | |
self.transfer(_CMD_OPEN_ENDPOINT, endpoint, ()) | |
except Exception as e: | |
_LOGGER.error(f"Unable to open endpoint: {e}") | |
raise | |
# Read data from endpoint | |
try: | |
buffer = self.transfer(_CMD_READ, endpoint, data_type) | |
except Exception as e: | |
_LOGGER.error(f"Unable to read endpoint: {e}") | |
raise | |
# Close specified endpoint | |
try: | |
self.transfer(_CMD_CLOSE_ENDPOINT, endpoint, ()) | |
except Exception as e: | |
_LOGGER.error(f"Unable to close endpoint: {e}") | |
raise | |
return buffer | |
def transfer(self, endpoint, data, data_type): | |
buffer_w = bytearray(_BUFFER_WRITE_LENGTH) | |
buffer_w[2] = 0x01 | |
endpoint_header_position = slice(_READ_HEADER_SIZE, _READ_HEADER_SIZE + len(endpoint)) | |
buffer_w[endpoint_header_position] = endpoint | |
if len(data) > 0: | |
buffer_w[_READ_HEADER_SIZE + len(endpoint):_READ_HEADER_SIZE + len(endpoint) + len(data)] = data | |
buffer_r = bytearray(_BUFFER_READ_LENGTH) | |
# Send command to a device | |
try: | |
self.device.clear_enqueued_reports() | |
self.device.write(buffer_w) | |
except Exception as e: | |
_LOGGER.debug('Unable to write to a device. Exception: %s', e) | |
# Get data from a device | |
try: | |
buffer_r = self.device.read(_BUFFER_READ_LENGTH) | |
except Exception as e: | |
_LOGGER.error('Unable to read from a device. Exception: %s', e) | |
if len(data_type) == 2: | |
while buffer_r[4] != data_type[0]: | |
try: | |
buffer_r = self.device.read(_BUFFER_READ_LENGTH) | |
except Exception as e: | |
_LOGGER.error('Unable to read from a device. Exception: %s', e) | |
return bytes(buffer_r) | |
def get_device_data(self, mode, data_type): | |
res = self.read(mode, data_type) | |
# Number of devices | |
amount = res[6] | |
# Device data | |
sensor_data = res[7:] | |
# Empty list | |
data_list = {} | |
for i in range(amount): | |
current_sensor = sensor_data[i * 3:(i + 1) * 3] | |
status = current_sensor[0] | |
value = 0.0 | |
if status == 0x00: | |
# Temperature sensor | |
if mode == _MODE_GET_TEMPERATURES: | |
value = struct.unpack('<h', current_sensor[1:3])[0] | |
value = float(value) / 10.0 | |
else: | |
# RPM sensor | |
value = struct.unpack('<h', current_sensor[1:3])[0] | |
device_data = DeviceData( | |
channel_id=i, | |
status=status, | |
value=value | |
) | |
data_list[i] = device_data | |
return data_list | |
def init_devices(self, buffer): | |
channel = buffer[6] | |
index = buffer[7:] | |
position = 0 | |
devices = [] | |
# Loop through a byte array | |
for i in range(1, int(channel) + 1): | |
device_id_len = index[position + 7] | |
if device_id_len == 0: | |
# Unable to get a device id length, skip processing and move to the next | |
position += 8 | |
continue | |
# Get a type, model and device id | |
device_type_model = index[position:position + 8] | |
device_id = index[position + 8:position + 8 + int(device_id_len)] | |
# Build basic device info | |
hub_device = { | |
"ChannelId": i, | |
"DeviceId": device_id.decode(), | |
"DeviceType": device_type_model[2], | |
"DeviceModel": device_type_model[3] | |
} | |
# Append a device to array | |
devices.append(hub_device) | |
# Move to next position for read | |
position += 8 + int(device_id_len) | |
# Make simple key:value pair | |
device_list = {} | |
# Get device temperatures | |
temperatures = self.get_device_data(_MODE_GET_TEMPERATURES, _DATA_TYPE_GET_TEMPERATURES) | |
# Get device speeds | |
speeds = self.get_device_data(_MODE_GET_SPEEDS, _DATA_TYPE_GET_SPEEDS) | |
# Go through all devices | |
for device in devices: | |
# Find device definition | |
match = self.get_device(device['DeviceType'], device['DeviceModel']) | |
if match is None: | |
continue | |
# Build full stack of device info | |
hub_device_info = LinkDevice( | |
channel_id=device['ChannelId'], | |
device_type=device['DeviceType'], | |
device_id=device['DeviceId'], | |
name=match.name, | |
rpm=speeds[device['ChannelId']].value, | |
temperature=temperatures[device['ChannelId']].value, | |
led_channels=match.led_channels, | |
contains_pump=contains_pump(device['DeviceType']) | |
) | |
# All device to array | |
device_list[device['ChannelId']] = hub_device_info | |
# Development purposes | |
_LOGGER.debug('Device initialized. Device: %s', hub_device_info) | |
# Send it | |
return device_list | |
def get_device(self, device_id: int, device_model: int) -> DeviceList | None | Any: | |
for device in self.deviceList: | |
if device.device_id == device_id and device.model == device_model: | |
return device | |
return None |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment