Created
May 21, 2023 18:15
-
-
Save browserdotsys/ef1b22c60c31d9c61e18cca30b3ce903 to your computer and use it in GitHub Desktop.
Small util to display JPEGs on a Corsair iCUE H150i ELITE LCD display
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import pyshark | |
from dataclasses import dataclass | |
import argparse | |
import hid | |
import time | |
VID = 0x1b1c # Corsair | |
PID = 0x0c39 # Corsair LCD Cap for Elite Capellix coolers | |
# Sample header: | |
# 02 05 40 00 01 00 f8 03" | |
@dataclass | |
class CorsairCommand: | |
"""Corsair Command""" | |
opcode: int # 0x02 | |
unknown1: int # 0x05 | |
unknown2: int # 0x40 | |
is_end: bool # 0x00 or 0x01 | |
part_num: int # 0x0000 - 0xffff, little endian | |
datalen: int # 0x0000 - 0xffff, little endian | |
data: bytes # datalen bytes + padding up to packet size | |
HEADER_SIZE = 8 | |
# constructor from bytes | |
@classmethod | |
def from_bytes(cls, data): | |
opcode = data[0] | |
unknown1 = data[1] | |
unknown2 = data[2] | |
is_end = data[3] == 0x01 | |
part_num = int.from_bytes(data[4:6], byteorder='little') | |
datalen = int.from_bytes(data[6:8], byteorder='little') | |
payload = data[cls.HEADER_SIZE:cls.HEADER_SIZE+datalen] | |
return cls( | |
opcode=opcode, | |
unknown1=unknown1, | |
unknown2=unknown2, | |
is_end=is_end, | |
part_num=part_num, | |
datalen=datalen, | |
data=payload | |
) | |
def to_bytes(self): | |
return bytes([ | |
self.opcode, | |
self.unknown1, | |
self.unknown2, | |
0x01 if self.is_end else 0x00, | |
]) + \ | |
self.part_num.to_bytes(2, byteorder='little') + \ | |
self.datalen.to_bytes(2, byteorder='little') + \ | |
self.data | |
@property | |
def is_start(self): | |
return self.part_num == 0 | |
@property | |
def header_size(self): | |
return self.HEADER_SIZE | |
@property | |
def size(self): | |
return self.header_size + self.datalen | |
def make_commands(data, opcode=0x02, max_len=1024): | |
"""Splits data into Corsair commands of max_len bytes""" | |
real_max_len = max_len - CorsairCommand.HEADER_SIZE | |
part_num = 0 | |
while data: | |
# packets must be padded if shorter than max_len | |
if len(data) < real_max_len: | |
padded_data = data + b'\x00' * (real_max_len - len(data)) | |
else: | |
padded_data = data[:real_max_len] | |
datalen = min(real_max_len, len(data)) | |
data = data[real_max_len:] | |
yield CorsairCommand( | |
opcode=opcode, | |
unknown1=0x05, | |
unknown2=0x40, | |
is_end=not bool(data), | |
part_num=part_num, | |
datalen=datalen, | |
data=padded_data, | |
) | |
part_num += 1 | |
def extract_jpegs(pcap_file, output_prefix='out_'): | |
"""Extracts JPEGs from a Corsair USB capture""" | |
filter = 'usbhid.data[0,1] == 02:05' | |
cap = pyshark.FileCapture(pcap_file, display_filter=filter) | |
current_jpeg = b'' | |
i = 0 | |
for p in cap: | |
cmd = CorsairCommand.from_bytes(p['DATA'].usbhid_data.binary_value) | |
assert cmd.opcode == 0x02 | |
if cmd.part_num == 0: | |
if current_jpeg: | |
print(f"Warning: JPEG {i} was not complete when JPEG {i+1} started") | |
current_jpeg = cmd.data | |
else: | |
current_jpeg += cmd.data | |
if cmd.is_end: | |
with open(f"{output_prefix}{i:08}.jpg", 'wb') as f: | |
f.write(current_jpeg) | |
print(f"Saved JPEG {i} to {output_prefix}{i:08}.jpg ({len(current_jpeg)} bytes)") | |
current_jpeg = b'' | |
i += 1 | |
if current_jpeg: | |
print(f"Warning: JPEG {i} was not complete when capture ended") | |
def animate_jpegs(jpeg_files, max_len=1024, delay=100, loop=False): | |
done = False | |
dev = hid.device() | |
dev.open(vendor_id=VID,product_id=PID) | |
while not done: | |
for jpeg_file in jpeg_files: | |
frame_start = time.time() | |
with open(jpeg_file, 'rb') as f: | |
data = f.read() | |
for cmd in make_commands(data, max_len=max_len): | |
dev.write(cmd.to_bytes()) | |
frame_end = time.time() | |
frame_time = frame_end - frame_start | |
if frame_time < delay / 1000: | |
time.sleep((delay / 1000) - frame_time) | |
if not loop: | |
done = True | |
dev.close() | |
def main(): | |
parser = argparse.ArgumentParser(description='Corsair LCD JPEG utility') | |
# subcommands | |
subparsers = parser.add_subparsers(dest='command') | |
# extract | |
extract_parser = subparsers.add_parser('extract', help='Extract JPEGs from a Corsair USB capture') | |
extract_parser.add_argument('pcap_file', help='Path to the pcap file') | |
extract_parser.add_argument('--output-prefix', default='out_', help='Prefix for output files') | |
# prepare | |
prepare_parser = subparsers.add_parser('prepare', help='Prepare a JPEG for upload to the LCD') | |
prepare_parser.add_argument('jpeg_file', help='Path to the JPEG file') | |
prepare_parser.add_argument('--max-len', type=int, default=1024, help='Maximum length of each command') | |
# animate | |
animate_parser = subparsers.add_parser('animate', help='Animate a series of JPEGs on the LCD') | |
animate_parser.add_argument('jpeg_files', nargs='+', help='Path to the JPEG files') | |
animate_parser.add_argument('--max-len', type=int, default=1024, help='Maximum length of each command') | |
animate_parser.add_argument('--delay', type=int, default=100, help='Delay between frames in milliseconds') | |
animate_parser.add_argument('--loop', action='store_true', help='Loop the animation') | |
args = parser.parse_args() | |
if args.command == 'extract': | |
extract_jpegs(args.pcap_file, args.output_prefix) | |
elif args.command == 'prepare': | |
with open(args.jpeg_file, 'rb') as f: | |
data = f.read() | |
for i,cmd in enumerate(make_commands(data, max_len=args.max_len)): | |
print(f'{i:04}: {cmd.to_bytes().hex()}') | |
elif args.command == 'animate': | |
animate_jpegs(args.jpeg_files, max_len=args.max_len, delay=args.delay, loop=args.loop) | |
else: | |
parser.print_help() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment