Created
January 8, 2019 05:38
-
-
Save wgwoods/35edf4c87d94d5f18a4bf087a19088cc to your computer and use it in GitHub Desktop.
Some cruddy python code using pyusb to talk to a GK64 keyboard's control interface
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# gk64.py - exploration stuff using pyusb to talk to a GK6x keyboard | |
# | |
# Copyright (c) 2019 Will Woods <w@wizard.zone> | |
# | |
# You shouldn't be using this, because it's horrible, but if you are, | |
# consider it licensed as GPLv2+. Also, I'm sorry. | |
# | |
# If you just wanna send random commands and see what happens, try: | |
# sudo python3 gk64.py [cmd] [subcmd] | |
# For interactive "exploration" I recommend: | |
# sudo ipython3 -i gk64.py | |
import time | |
import struct | |
import usb.core | |
import usb.util | |
from usb.core import USBError | |
from collections import namedtuple | |
# unoptimized, translated from http://mdfs.net/Info/Comp/Comms/CRC16.htm | |
def crc16(data, poly=0x1021, iv=0x0000, xorf=0x0000): | |
crc = int(iv) | |
for b in bytearray(data): | |
crc ^= (b << 8) | |
for _ in range(0,8): | |
crc <<= 1 | |
if crc & 0x10000: | |
crc = (crc ^ poly) & 0xffff # xor with poly and trunc to 16bit | |
return (crc & 0xffff) ^ xorf | |
def crc16_usb(data, iv=0xffff): | |
return crc16(data, poly=0x8005, iv=0xffff, xorf=0xffff) | |
def mycrc16(data, iv=0xffff): | |
return crc16(data, poly=0x1021, iv=0xffff, xorf=0x0000) | |
def hexdump_line(data): | |
linedata = data[:16] | |
hexbytes = ["%02x" % b for b in linedata] + ([" "] * (16-len(linedata))) | |
printable = ''.join(chr(b) if b >= 0x20 and b < 0x7f else '.' for b in linedata) | |
return '{} {} {} {}'.format(' '.join(hexbytes[:8]), | |
' '.join(hexbytes[8:]), | |
printable[:8], | |
printable[8:]) | |
# USB Packet Structure: | |
# | |
# Data is usually sent to endpoint 4, and the device answers on endpoint 3. | |
# In firmware update mode (see below), send to endpoint 2 and get answers on 1. | |
# | |
# Outgoing and incoming packets are always 0x64 bytes long, and have roughly | |
# the same structure. Example outgoing packet data: | |
# | |
# 01 01 00 00 00 00 74 1b 00 00 00 00 00 00 00 00 ......t. ........ | |
# 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ........ ........ | |
# 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ........ ........ | |
# 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ........ ........ | |
# | |
# And the reply: | |
# | |
# 01 01 01 00 00 00 35 25 01 39 10 02 09 01 00 00 ......5% .9...... | |
# 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ........ ........ | |
# 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ........ ........ | |
# 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ........ ........ | |
# | |
# The structure is as follows: | |
# * 8 byte header, then up to 56 (0x38) bytes of data (padded with zeros) | |
# * Command header: 01 01 00 00 00 00 74 1b | |
# * Byte 0: Command | |
# * Byte 1: Subcommand | |
# * Byte 2-3: Offset (used for uploading firmware in chunks) | |
# * Byte 4: padding? (always 00..) | |
# * Byte 5: Size of payload (max 0x38) | |
# * Byte 6-7: checksum | |
# * CRC16/CCITT-FALSE: little-endian, polynomial 0x1021, IV 0xFFFF | |
# * Calculated over the whole 64-byte packet, with checksum = 00 00 | |
# * Byte 8-63: Payload data, padded with 00s to 64 bytes total | |
# * Reply header: 01 01 01 00 00 00 35 25 | |
# * Byte 0: Command | |
# * Byte 1: Subcommand | |
# * Byte 2: Result - 01 for success, 00 otherwise | |
# * Byte 3-5: unused/padding (always 00..) | |
# * Byte 6-7: checksum, as above | |
# * Byte 8-63: payload (padded to 64 bytes long with 0x00's) | |
# | |
# You'll note that the Reply doesn't seem to tell you how much data it's | |
# sending you, which makes interpreting the reply a little trickier.. | |
PacketStruct = struct.Struct("<BBHBBH56s") | |
class PacketMixin(object): | |
@classmethod | |
def _unpack(cls, buf): | |
return cls(*PacketStruct.unpack(buf)) | |
def _pack(self): | |
return PacketStruct.pack(*self) | |
def _calculate_checksum(self): | |
return mycrc16(self._replace(checksum=0x0000)._pack()) | |
def _replace_checksum(self): | |
return self._replace(checksum=self._calculate_checksum()) | |
def _checksum_ok(self): | |
return self.checksum == self._calculate_checksum() | |
def _hexdump(self): | |
data = self._pack() | |
return '\n'.join(hexdump_line(data[s:s+0x10]) for s in range(0,0x40,0x10)) | |
CommandPacketTuple = namedtuple("CommandPacketTuple", "cmd subcmd offset pad1 length checksum data") | |
class CommandPacket(CommandPacketTuple, PacketMixin): | |
pass | |
ReplyPacketTuple = namedtuple("ReplyPacketTuple", "cmd subcmd result pad1 pad2 checksum data") | |
class ReplyPacket(ReplyPacketTuple, PacketMixin): | |
pass | |
class GK64(object): | |
idVendor = 0x1ea7 | |
GK64Product = 0x0907 | |
CDBootProduct = 0x0905 | |
WeltrendVendor = 0x040b | |
def __init__(self, bus=None, address=None): | |
self.dev = None | |
self.cmd_in = None | |
self.cmd_out = None | |
self.fwid = None | |
if bus is None or address is None: | |
self.find_dev() | |
def find_dev(self): | |
self.dev = usb.core.find(idVendor=self.idVendor) or usb.core.find(idVendor=self.WeltrendVendor) | |
if self.dev is None: | |
return False | |
if self.dev.idProduct == self.GK64Product: | |
iface = 1 | |
elif self.dev.idProduct == self.CDBootProduct: | |
iface = 0 | |
else: # TODO: look for the first interface that has an IN endpoint | |
iface = 0 | |
if self.dev.is_kernel_driver_active(iface): | |
self.dev.detach_kernel_driver(iface) | |
self.cmd_in, self.cmd_out = self.dev[0][iface,0].endpoints() | |
return True | |
def sendcmd(self, cmd, subcmd, offset=0, length=0, data=None): | |
if not data: | |
data = bytearray(0x38) | |
pkt = CommandPacket(cmd, subcmd, offset, 0, length, 0, data)._replace_checksum() | |
self.dev.write(self.cmd_out, pkt._pack()) | |
return ReplyPacket._unpack(self.dev.read(self.cmd_in, 0x40)) | |
def get_fwid(self): | |
r = self.send_cmd(1,1) | |
if r.result == 1: | |
self.fwid = "{r[3]:02x}-{r[2]:02x}{r[1]:02x}-{r[0]:02x}-V{r[5]:d}.{r[4]:d}".format(r=r.data) | |
return self.fwid | |
def enter_cdboot_mode(self): | |
r = self.send_cmd(3,2) | |
time.sleep(0.5) | |
self.find_dev() | |
def exit_cdboot_mode(self): | |
self.send_cmd(3,1) | |
time.sleep(0.5) | |
self.find_dev() | |
def wait_for_dev(): | |
print("Looking for device...", flush=True, end='') | |
while True: | |
try: | |
kbd = GK64() | |
if kbd.dev is not None: | |
kbd.sendcmd(1,2) | |
break | |
except USBError: | |
pass | |
time.sleep(0.2) | |
print(".", flush=True, end='') | |
print(" found {dev.idVendor:04x}:{dev.idProduct:04x} at {dev.bus}.{dev.address}".format(dev=kbd.dev)) | |
return kbd | |
def probe_loop(): | |
''' | |
Yeah, this is gross, but this is just for my personal experimentation.. | |
''' | |
kbd = wait_for_dev() | |
results = dict() | |
skip_a = [2,3,5,6,7] | |
# NOTE: 3:1, 3:2, and 3:3 all seem to reset the system.. but 4 doesn't? | |
timeoutcount = 0 | |
for a in range(1,256): | |
for b in range(1,255): | |
if a in skip_a: continue | |
print("Trying command {:02x}:{:02x}: ".format(a,b), end='', flush=True) | |
reply = None | |
result = None | |
# Send command and save reply (or exception) | |
try: | |
reply = kbd.sendcmd(a,b) | |
result = reply | |
timeoutcount = 0 | |
if any(reply.data): | |
print("reply OK, data:", reply._hexdump()) | |
elif reply.result == 1: | |
print("reply OK") | |
elif reply.result == 0: | |
print("reply NAK") | |
else: | |
print("unhandled reply:", reply._hexdump()) | |
except USBError as err: | |
print(err) | |
result = err | |
if err.errno == 110: | |
timeoutcount += 1 | |
results[a,b] = result | |
if timeoutcount == 3: | |
timeoutcount = 0 | |
skip_a.append(a) | |
kbd = wait_for_dev() | |
# Check if the device is still available.. | |
try: | |
kbd.sendcmd(1,2) | |
except USBError as err: | |
# Record that this command lead to an error | |
result = [result, err] | |
# Wait a moment to recover | |
# And reconnect if needed | |
if err.errno == 19: # No such device | |
time.sleep(0.2) | |
kbd.find_dev() | |
# If we got a good result, show the user | |
if reply and (reply.result or any(reply.data)): | |
print(reply._hexdump()) | |
print(results) | |
if __name__ == '__main__': | |
import sys | |
if len(sys.argv) > 2: | |
try: | |
cmd = int(sys.argv[1]) | |
sub = int(sys.argv[2]) | |
except ValueError: | |
print("error: invalid integer in args {}".format(sys.argv[1:3])) | |
raise SystemExit(2) | |
try: | |
kbd = GK64() | |
print(kbd.sendcmd(cmd,sub)._hexdump()) | |
except KeyboardInterrupt: | |
raise SystemExit(1) | |
except USBError as e: | |
print(e) | |
raise SystemExit(e.errno) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Do you have a write up on what you've found?