Skip to content

Instantly share code, notes, and snippets.

@rvantonder
Last active December 25, 2015 16:19
Show Gist options
  • Save rvantonder/7005376 to your computer and use it in GitHub Desktop.
Save rvantonder/7005376 to your computer and use it in GitHub Desktop.
Files:
GoodFET.py in verbose mode, and prints out the data that gets sent over serial
GoodFETMAXUSB.py with type-through
MAXUSBApp.py with extra method send_on_endpoint_key which calls write_bytes with a length of 1
USBKeyboard.py which calls send_on_endpoint_key in MAXUSBApp.py
Notes.txt - Description of the problem
Old.txt - How the old code sent key presses through
The problem is that when the new refactored code (USBKeyboard, MAXUSBApp.py, Facedancer.py) is used to provide type-through functionality for the HID keyboard, keys do not get typed out properly at the other end. With the current code, sending a key will transmit the following to Facedancer, but 'a' will not be typed out at the other end reliably - it simply won't show up, and "gets lost".
Does NOT always work:
>>a
Facedancer Tx: 40 00 04 00 1b 00 00 04 # send 'a'
Facedancer Tx: 40 00 02 00 43 03 # write to byte counter register (3 bytes)
Facedancer Tx: 40 00 04 00 1b 00 00 00 # send key up (0)
Facedancer Tx: 40 00 02 00 43 03 # write to byte counter register (3 bytes)
Description: 40: App name
00: verb
04 00: length (little endian)
00 1b: register
04 00 00: Data, with 04 corresponding to 'a'
But, the following ALWAYS works (which is the way the old code (GoodFETMAXUSB, GoodFET.py) sent keys in type-through (see old.txt)):
>>a
Facedancer Tx: 40 00 02 00 1b 00 # write / send each byte separately
Facedancer Tx: 40 00 02 00 1b 00
Facedancer Tx: 40 00 02 00 1b 04 # 'a'
Facedancer Tx: 40 00 02 00 43 03 # write to byte counter register (3 bytes)
Facedancer Tx: 40 00 02 00 1b 00 # again write each byte separately
Facedancer Tx: 40 00 02 00 1b 00
Facedancer Tx: 40 00 02 00 1b 00 # key up (0)
Facedancer Tx: 40 00 02 00 43 03
Last note:
In the old code, the bytes sent over actually use register 1a (not 1b as above), and do not set the ACK flag for the byte counter register (43 above versus 42 in the original code). To make sure that this difference does not impact the results I'm seeing, I changed the above to use register 1a, and turned off acking, and found that changing these settings do not impact the results. That is, whether we use register 1a or 1b doesn't really matter, and whether we ack or not also does not matter.
This seems to only come up with 'manual' type through, where there is a delay in sending the keys. When there is no delay, and we have a hardcoded list of keys that need to be sent, it appears to work fine. My guess is it's something to do with timing.
Typing 'a' in GoodFETMAXUSB.py
>>a
Tx: ( 0x40, 0x00, 0x0002 )
Data: ['0x1a', '0x0'] # register 1a
Rx: ( 0x40, 0x00, 0x0002 )
Tx: ( 0x40, 0x00, 0x0002 )
Data: ['0x1a', '0x0']
Rx: ( 0x40, 0x00, 0x0002 )
Tx: ( 0x40, 0x00, 0x0002 )
Data: ['0x1a', '0x4'] # 'a'
Rx: ( 0x40, 0x00, 0x0002 )
Tx: ( 0x40, 0x00, 0x0002 )
Data: ['0x42', '0x3']
Rx: ( 0x40, 0x00, 0x0002 )
Tx: ( 0x40, 0x00, 0x0002 )
Data: ['0x1a', '0x0']
Rx: ( 0x40, 0x00, 0x0002 )
Tx: ( 0x40, 0x00, 0x0002 )
Data: ['0x1a', '0x0']
Rx: ( 0x40, 0x00, 0x0002 )
Tx: ( 0x40, 0x00, 0x0002 )
Data: ['0x1a', '0x0']
Rx: ( 0x40, 0x00, 0x0002 )
Tx: ( 0x40, 0x00, 0x0002 )
Data: ['0x42', '0x3']
Rx: ( 0x40, 0x00, 0x0002 )
#!/usr/bin/env python
# GoodFET Client Library
#
# (C) 2009 Travis Goodspeed <travis at radiantmachines.com>
#
# This code is being rewritten and refactored. You've been warned!
import sys, time, string, cStringIO, struct, glob, os;
import sqlite3;
fmt = ("B", "<H", None, "<L")
def getClient(name="GoodFET"):
import GoodFET, GoodFETCC, GoodFETAVR, GoodFETSPI, GoodFETMSP430, GoodFETNRF, GoodFETCCSPI;
if(name=="GoodFET" or name=="monitor"): return GoodFET.GoodFET();
elif name=="cc" or name=="cc51": return GoodFETCC.GoodFETCC();
elif name=="cc2420" or name=="ccspi": return GoodFETCCSPI.GoodFETCCSPI();
elif name=="avr": return GoodFETAVR.GoodFETAVR();
elif name=="spi": return GoodFETSPI.GoodFETSPI();
elif name=="msp430": return GoodFETMSP430.GoodFETMSP430();
elif name=="nrf": return GoodFETNRF.GoodFETNRF();
print "Unsupported target: %s" % name;
sys.exit(0);
class SymbolTable:
"""GoodFET Symbol Table"""
db=sqlite3.connect(":memory:");
def __init__(self, *args, **kargs):
self.db.execute("create table if not exists symbols(adr,name,memory,size,comment);");
def get(self,name):
self.db.commit();
c=self.db.cursor();
try:
c.execute("select adr,memory from symbols where name=?",(name,));
for row in c:
#print "Found it.";
sys.stdout.flush();
return row[0];
#print "No dice.";
except:# sqlite3.OperationalError:
#print "SQL error.";
return eval(name);
return eval(name);
def define(self,adr,name,comment="",memory="vn",size=16):
self.db.execute("insert into symbols(adr,name,memory,size,comment)"
"values(?,?,?,?,?);", (
adr,name,memory,size,comment));
#print "Set %s=%s." % (name,adr);
class GoodFETbtser:
"""py-bluez class for emulating py-serial."""
def __init__(self,btaddr):
import bluetooth;
if btaddr==None or btaddr=="none" or btaddr=="bluetooth":
print "performing inquiry..."
nearby_devices = bluetooth.discover_devices(lookup_names = True)
print "found %d devices" % len(nearby_devices)
for addr, name in nearby_devices:
print " %s - '%s'" % (addr, name)
#TODO switch to wildcards.
if name=='FireFly-A6BD':
btaddr=addr;
if name=='RN42-A94A':
btaddr=addr;
print "Please set $GOODFET to the address of your device.";
sys.exit();
print "Identified GoodFET at %s" % btaddr;
# Manually use the portnumber.
port=1;
print "Connecting to %s on port %i." % (btaddr, port);
sock=bluetooth.BluetoothSocket(bluetooth.RFCOMM);
self.sock=sock;
sock.connect((btaddr,port));
sock.settimeout(10); #IMPORTANT Must be patient.
##This is what we'd do for a normal reset.
#str="";
#while not str.endswith("goodfet.sf.net/"):
# str=self.read(64);
# print str;
# Instead, just return and hope for the best.
return;
def write(self,msg):
"""Send traffic."""
import time;
self.sock.send(msg);
#time.sleep(0.1);
return;
def read(self,length):
"""Read traffic."""
data="";
while len(data)<length:
data=data+self.sock.recv(length-len(data));
return data;
class GoodFET:
"""GoodFET Client Library"""
besilent=0;
app=0;
verb=0;
count=0;
data="";
verbose=True
GLITCHAPP=0x71;
MONITORAPP=0x00;
symbols=SymbolTable();
def __init__(self, *args, **kargs):
self.data=[0];
def getConsole(self):
from GoodFETConsole import GoodFETConsole;
return GoodFETConsole(self);
def name2adr(self,name):
return self.symbols.get(name);
def timeout(self):
print "timeout\n";
def serInit(self, port=None, timeout=2, attemptlimit=None):
"""Open a serial port of some kind."""
import re;
if port==None:
port=os.environ.get("GOODFET");
if port=="bluetooth" or (port is not None and re.match("..:..:..:..:..:..",port)):
self.btInit(port,2,attemptlimit);
else:
self.pyserInit(port,timeout,attemptlimit);
def btInit(self, port, timeout, attemptlimit):
"""Open a bluetooth port.""";
#self.verbose=True; #For debugging BT.
self.serialport=GoodFETbtser(port);
def pyserInit(self, port, timeout, attemptlimit):
"""Open the serial port"""
# Make timeout None to wait forever, 0 for non-blocking mode.
import serial;
fixserial=False;
if os.name=='nt' and sys.version.find('64 bit')!=-1:
print "WARNING: PySerial requires a 32-bit Python build in Windows.";
if port is None and os.environ.get("GOODFET")!=None:
glob_list = glob.glob(os.environ.get("GOODFET"));
if len(glob_list) > 0:
port = glob_list[0];
else:
port = os.environ.get("GOODFET");
if port is None:
glob_list = glob.glob("/dev/tty.usbserial*");
if len(glob_list) > 0:
port = glob_list[0];
if port is None:
glob_list = glob.glob("/dev/ttyUSB*");
if len(glob_list) > 0:
port = glob_list[0];
if port is None:
glob_list = glob.glob("/dev/ttyU0");
if len(glob_list) > 0:
port = glob_list[0];
if port is None and os.name=='nt':
from scanwin32 import winScan;
scan=winScan();
for order,comport,desc,hwid in sorted(scan.comports()):
try:
if hwid.index('FTDI')==0:
port=comport;
#print "Using FTDI port %s" % port
except:
#Do nothing.
a=1;
baud=115200;
if(os.environ.get("platform")=='arduino' or os.environ.get("board")=='arduino'):
baud=19200; #Slower, for now.
self.serialport = serial.Serial(
port,
#9600,
baud,
parity = serial.PARITY_NONE,
timeout=timeout
)
self.verb=0;
attempts=0;
connected=0;
while connected==0:
while self.verb!=0x7F or self.data!="http://goodfet.sf.net/":
#while self.data!="http://goodfet.sf.net/":
#print "'%s'!=\n'%s'" % (self.data,"http://goodfet.sf.net/");
if attemptlimit is not None and attempts >= attemptlimit:
return
elif attempts==2 and os.environ.get("board")!='telosb':
print "See the GoodFET FAQ about missing info flash.";
self.serialport.setTimeout(0.2);
elif attempts == 100:
print "Tried 100 times to connect and failed."
sys.stdout.write("Continuing to try forever.") # No newline
sys.stdout.flush()
self.verbose=True # Something isn't going right, give the user more info
elif attempts > 100 and attempts % 10 == 0:
sys.stdout.write('.')
sys.stdout.flush()
#self.serialport.flushInput()
#self.serialport.flushOutput()
#TelosB reset, prefer software to I2C SPST Switch.
if (os.environ.get("board")=='telosb'):
#print "TelosB Reset";
self.telosBReset();
elif (os.environ.get("board")=='z1'):
self.bslResetZ1(invokeBSL=0);
elif (os.environ.get("board")=='apimote1') or (os.environ.get("board")=='apimote'):
#Explicitly set RTS and DTR to halt board.
self.serialport.setRTS(1);
self.serialport.setDTR(1);
#RTS pin, not DTR is used for reset.
self.serialport.setRTS(0);
#print "Resetting Apimote not yet tested.";
else:
#Explicitly set RTS and DTR to halt board.
self.serialport.setRTS(1);
self.serialport.setDTR(1);
#Drop DTR, which is !RST, low to begin the app.
self.serialport.setDTR(0);
#self.serialport.write(chr(0x80));
#self.serialport.write(chr(0x80));
#self.serialport.write(chr(0x80));
#self.serialport.write(chr(0x80));
#self.serialport.flushInput()
#self.serialport.flushOutput()
#time.sleep(60);
attempts=attempts+1;
self.readcmd(); #Read the first command.
#print "Got %02x,%02x:'%s'" % (self.app,self.verb,self.data);
if self.verb!=0x7f:
#Retry again. This usually times out, but helps connect.
self.readcmd();
#print "Retry got %02x,%02x:'%s'" % (self.app,self.verb,self.data);
#Here we have a connection, but maybe not a good one.
#print "We have a connection."
connected=1;
if attempts >= 100:
print "" # Add a newline
olds=self.infostring();
clocking=self.monitorclocking();
for foo in range(1,30):
if not self.monitorecho():
if self.verbose:
print "Comm error on %i try, resyncing out of %s." % (foo,
clocking);
connected=0;
break;
if self.verbose: print "Connected after %02i attempts." % attempts;
self.mon_connected();
self.serialport.setTimeout(12);
def serClose(self):
self.serialport.close();
def telosSetSCL(self, level):
self.serialport.setRTS(not level)
def telosSetSDA(self, level):
self.serialport.setDTR(not level)
def telosI2CStart(self):
self.telosSetSDA(1)
self.telosSetSCL(1)
self.telosSetSDA(0)
def telosI2CStop(self):
self.telosSetSDA(0)
self.telosSetSCL(1)
self.telosSetSDA(1)
def telosI2CWriteBit(self, bit):
self.telosSetSCL(0)
self.telosSetSDA(bit)
time.sleep(2e-6)
self.telosSetSCL(1)
time.sleep(1e-6)
self.telosSetSCL(0)
def telosI2CWriteByte(self, byte):
self.telosI2CWriteBit( byte & 0x80 );
self.telosI2CWriteBit( byte & 0x40 );
self.telosI2CWriteBit( byte & 0x20 );
self.telosI2CWriteBit( byte & 0x10 );
self.telosI2CWriteBit( byte & 0x08 );
self.telosI2CWriteBit( byte & 0x04 );
self.telosI2CWriteBit( byte & 0x02 );
self.telosI2CWriteBit( byte & 0x01 );
self.telosI2CWriteBit( 0 ); # "acknowledge"
def telosI2CWriteCmd(self, addr, cmdbyte):
self.telosI2CStart()
self.telosI2CWriteByte( 0x90 | (addr << 1) )
self.telosI2CWriteByte( cmdbyte )
self.telosI2CStop()
def bslResetZ1(self, invokeBSL=0):
'''
Applies BSL entry sequence on RST/NMI and TEST/VPP pins
Parameters:
invokeBSL = 1: complete sequence
invokeBSL = 0: only RST/NMI pin accessed
By now only BSL mode is accessed
'''
#if DEBUG > 1: sys.stderr.write("* bslReset(invokeBSL=%s)\n" % invokeBSL)
if invokeBSL:
#sys.stderr.write("in Z1 bsl reset...\n")
time.sleep(0.1)
self.writepicROM(0xFF, 0xFF)
time.sleep(0.1)
#sys.stderr.write("z1 bsl reset done...\n")
else:
#sys.stderr.write("in Z1 reset...\n")
time.sleep(0.1)
self.writepicROM(0xFF, 0xFE)
time.sleep(0.1)
#sys.stderr.write("z1 reset done...\n")
def writepicROM(self, address, data):
''' Writes data to @address'''
for i in range(7,-1,-1):
self.picROMclock((address >> i) & 0x01)
self.picROMclock(0)
recbuf = 0
for i in range(7,-1,-1):
s = ((data >> i) & 0x01)
#print s
if i < 1:
r = not self.picROMclock(s, True)
else:
r = not self.picROMclock(s)
recbuf = (recbuf << 1) + r
self.picROMclock(0, True)
#k = 1
#while not self.serial.getCTS():
# pass
#time.sleep(0.1)
return recbuf
def readpicROM(self, address):
''' reads a byte from @address'''
for i in range(7,-1,-1):
self.picROMclock((address >> i) & 0x01)
self.picROMclock(1)
recbuf = 0
r = 0
for i in range(7,-1,-1):
r = self.picROMclock(0)
recbuf = (recbuf << 1) + r
self.picROMclock(r)
#time.sleep(0.1)
return recbuf
#This seems more reliable when slowed.
def picROMclock(self, masterout, slow = True):
#print "setting masterout to "+str(masterout)
self.serialport.setRTS(masterout)
self.serialport.setDTR(1)
#time.sleep(0.02)
self.serialport.setDTR(0)
if slow:
time.sleep(0.02)
return self.serialport.getCTS()
def picROMfastclock(self, masterout):
#print "setting masterout to "+str(masterout)
self.serialport.setRTS(masterout)
self.serialport.setDTR(1)
self.serialport.setDTR(0)
time.sleep(0.02)
return self.serialport.getCTS()
def telosBReset(self,invokeBSL=0):
# "BSL entry sequence at dedicated JTAG pins"
# rst !s0: 0 0 0 0 1 1
# tck !s1: 1 0 1 0 0 1
# s0|s1: 1 3 1 3 2 0
# "BSL entry sequence at shared JTAG pins"
# rst !s0: 0 0 0 0 1 1
# tck !s1: 0 1 0 1 1 0
# s0|s1: 3 1 3 1 0 2
if invokeBSL:
self.telosI2CWriteCmd(0,1)
self.telosI2CWriteCmd(0,3)
self.telosI2CWriteCmd(0,1)
self.telosI2CWriteCmd(0,3)
self.telosI2CWriteCmd(0,2)
self.telosI2CWriteCmd(0,0)
else:
self.telosI2CWriteCmd(0,3)
self.telosI2CWriteCmd(0,2)
# This line was not defined inside the else: block, not sure where it
# should be however
self.telosI2CWriteCmd(0,0)
time.sleep(0.250) #give MSP430's oscillator time to stabilize
self.serialport.flushInput() #clear buffers
def getbuffer(self,size=0x1c00):
writecmd(0,0xC2,[size&0xFF,(size>>16)&0xFF]);
print "Got %02x%02x buffer size." % (self.data[1],self.data[0]);
def writecmd(self, app, verb, count=0, data=[]):
"""Write a command and some data to the GoodFET."""
self.serialport.write(chr(app));
self.serialport.write(chr(verb));
#if data!=None:
# count=len(data); #Initial count ignored.
#print "TX %02x %02x %04x" % (app,verb,count);
#little endian 16-bit length
self.serialport.write(chr(count&0xFF));
self.serialport.write(chr(count>>8));
if self.verbose:
print "Tx: ( 0x%02x, 0x%02x, 0x%04x )" % ( app, verb, count )
try:
print "Data:",map(hex, data)
except:
pass
#print "count=%02x, len(data)=%04x" % (count,len(data));
if count!=0:
if(isinstance(data,list)):
for i in range(0,count):
#print "Converting %02x at %i" % (data[i],i)
data[i]=chr(data[i]);
#print type(data);
outstr=''.join(data);
self.serialport.write(outstr);
if not self.besilent:
return self.readcmd()
else:
return []
def readcmd(self):
"""Read a reply from the GoodFET."""
while 1:#self.serialport.inWaiting(): # Loop while input data is available
try:
#print "Reading...";
self.app=ord(self.serialport.read(1));
#print "APP=%02x" % self.app;
self.verb=ord(self.serialport.read(1));
#Fixes an obscure bug in the TelosB.
if self.app==0x00:
while self.verb==0x00:
self.verb=ord(self.serialport.read(1));
#print "VERB=%02x" % self.verb;
self.count=(
ord(self.serialport.read(1))
+(ord(self.serialport.read(1))<<8)
);
if self.verbose:
print "Rx: ( 0x%02x, 0x%02x, 0x%04x )" % ( self.app, self.verb, self.count )
#Debugging string; print, but wait.
if self.app==0xFF:
if self.verb==0xFF:
print "# DEBUG %s" % self.serialport.read(self.count)
elif self.verb==0xFE:
print "# DEBUG 0x%x" % struct.unpack(fmt[self.count-1], self.serialport.read(self.count))[0]
elif self.verb==0xFD:
#Do nothing, just wait so there's no timeout.
print "# NOP.";
sys.stdout.flush();
else:
self.data=self.serialport.read(self.count);
return self.data;
except TypeError:
if self.connected:
print "Warning: waiting for serial read timed out (most likely).";
#print "This shouldn't happen after syncing. Exiting for safety.";
#sys.exit(-1)
return self.data;
#Glitching stuff.
def glitchApp(self,app):
"""Glitch into a device by its application."""
self.data=[app&0xff];
self.writecmd(self.GLITCHAPP,0x80,1,self.data);
#return ord(self.data[0]);
def glitchVerb(self,app,verb,data):
"""Glitch during a transaction."""
if data==None: data=[];
self.data=[app&0xff, verb&0xFF]+data;
self.writecmd(self.GLITCHAPP,0x81,len(self.data),self.data);
#return ord(self.data[0]);
def glitchstart(self):
"""Glitch into the AVR application."""
self.glitchVerb(self.APP,0x20,None);
def glitchstarttime(self):
"""Measure the timer of the START verb."""
return self.glitchTime(self.APP,0x20,None);
def glitchTime(self,app,verb,data):
"""Time the execution of a verb."""
if data==None: data=[];
self.data=[app&0xff, verb&0xFF]+data;
print "Timing app %02x verb %02x." % (app,verb);
self.writecmd(self.GLITCHAPP,0x82,len(self.data),self.data);
time=ord(self.data[0])+(ord(self.data[1])<<8);
print "Timed to be %i." % time;
return time;
def glitchVoltages(self,low=0x0880, high=0x0fff):
"""Set glitching voltages. (0x0fff is max.)"""
self.data=[low&0xff, (low>>8)&0xff,
high&0xff, (high>>8)&0xff];
self.writecmd(self.GLITCHAPP,0x90,4,self.data);
#return ord(self.data[0]);
def glitchRate(self,count=0x0800):
"""Set glitching count period."""
self.data=[count&0xff, (count>>8)&0xff];
self.writecmd(self.GLITCHAPP,0x91,2,
self.data);
#return ord(self.data[0]);
#Monitor stuff
def silent(self,s=0):
"""Transmissions halted when 1."""
self.besilent=s;
print "besilent is %i" % self.besilent;
self.writecmd(0,0xB0,1,[s]);
connected=0;
def mon_connected(self):
"""Announce to the monitor that the connection is good."""
self.connected=1;
self.writecmd(0,0xB1,0,[]);
def out(self,byte):
"""Write a byte to P5OUT."""
self.writecmd(0,0xA1,1,[byte]);
def dir(self,byte):
"""Write a byte to P5DIR."""
self.writecmd(0,0xA0,1,[byte]);
def call(self,adr):
"""Call to an address."""
self.writecmd(0,0x30,2,
[adr&0xFF,(adr>>8)&0xFF]);
def execute(self,code):
"""Execute supplied code."""
self.writecmd(0,0x31,2,#len(code),
code);
def MONpeek8(self,address):
"""Read a byte of memory from the monitor."""
self.data=[address&0xff,address>>8];
self.writecmd(0,0x02,2,self.data);
#self.readcmd();
return ord(self.data[0]);
def MONpeek16(self,address):
"""Read a word of memory from the monitor."""
return self.MONpeek8(address)+(self.MONpeek8(address+1)<<8);
def peek(self,address):
"""Read a word of memory from the monitor."""
return self.MONpeek8(address)+(self.MONpeek8(address+1)<<8);
def eeprompeek(self,address):
"""Read a word of memory from the monitor."""
print "EEPROM peeking not supported for the monitor.";
#return self.MONpeek8(address)+(self.MONpeek8(address+1)<<8);
def peekbysym(self,name):
"""Read a value by its symbol name."""
#TODO include memory in symbol.
reg=self.symbols.get(name);
return self.peek8(reg,"data");
def pokebysym(self,name,val):
"""Write a value by its symbol name."""
#TODO include memory in symbol.
reg=self.symbols.get(name);
return self.pokebyte(reg,val);
def pokebyte(self,address,value,memory="vn"):
"""Set a byte of memory by the monitor."""
self.data=[address&0xff,address>>8,value];
self.writecmd(0,0x03,3,self.data);
return ord(self.data[0]);
def poke16(self,address,value):
"""Set a word of memory by the monitor."""
self.MONpoke16(address,value);
def MONpoke16(self,address,value):
"""Set a word of memory by the monitor."""
self.pokebyte(address,value&0xFF);
self.pokebyte(address,(value>>8)&0xFF);
return value;
def setsecret(self,value):
"""Set a secret word for later retreival. Used by glitcher."""
#self.eeprompoke(0,value);
#self.eeprompoke(1,value);
print "Secret setting is not yet suppored for this target.";
print "Aborting.";
def getsecret(self):
"""Get a secret word. Used by glitcher."""
#self.eeprompeek(0);
print "Secret getting is not yet suppored for this target.";
print "Aborting.";
sys.exit();
def dumpmem(self,begin,end):
i=begin;
while i<end:
print "%04x %04x" % (i, self.MONpeek16(i));
i+=2;
def monitor_ram_pattern(self):
"""Overwrite all of RAM with 0xBEEF."""
self.writecmd(0,0x90,0,self.data);
return;
def monitor_ram_depth(self):
"""Determine how many bytes of RAM are unused by looking for 0xBEEF.."""
self.writecmd(0,0x91,0,self.data);
return ord(self.data[0])+(ord(self.data[1])<<8);
#Baud rates.
baudrates=[115200,
9600,
19200,
38400,
57600,
115200];
def setBaud(self,baud):
"""Change the baud rate. TODO fix this."""
rates=self.baudrates;
self.data=[baud];
print "Changing FET baud."
self.serialport.write(chr(0x00));
self.serialport.write(chr(0x80));
self.serialport.write(chr(1));
self.serialport.write(chr(baud));
print "Changed host baud."
self.serialport.setBaudrate(rates[baud]);
time.sleep(1);
self.serialport.flushInput()
self.serialport.flushOutput()
print "Baud is now %i." % rates[baud];
return;
def readbyte(self):
return ord(self.serialport.read(1));
def findbaud(self):
for r in self.baudrates:
print "\nTrying %i" % r;
self.serialport.setBaudrate(r);
#time.sleep(1);
self.serialport.flushInput()
self.serialport.flushOutput()
for i in range(1,10):
self.readbyte();
print "Read %02x %02x %02x %02x" % (
self.readbyte(),self.readbyte(),self.readbyte(),self.readbyte());
def monitortest(self):
"""Self-test several functions through the monitor."""
print "Performing monitor self-test.";
self.monitorclocking();
for f in range(0,3000):
a=self.MONpeek16(0x0c00);
b=self.MONpeek16(0x0c02);
if a!=0x0c04 and a!=0x0c06:
print "ERROR Fetched %04x, %04x" % (a,b);
self.pokebyte(0x0021,0); #Drop LED
if self.MONpeek8(0x0021)!=0:
print "ERROR, P1OUT not cleared.";
self.pokebyte(0x0021,1); #Light LED
if not self.monitorecho():
print "Echo test failed.";
print "Self-test complete.";
self.monitorclocking();
def monitorecho(self):
data="The quick brown fox jumped over the lazy dog.";
self.writecmd(self.MONITORAPP,0x81,len(data),data);
if self.data!=data:
if self.verbose:
print "Comm error recognized by monitorecho(), got:\n%s" % self.data;
return 0;
return 1;
def monitor_info(self):
print "GoodFET with %s MCU" % self.infostring();
print "Clocked at %s" % self.monitorclocking();
return 1;
def testleds(self):
print "Flashing LEDs"
self.writecmd(self.MONITORAPP,0xD0,0,"")
try:
print "Flashed %d LED." % ord(self.data)
except:
print "Unable to process response:", self.data
return 1
def monitor_list_apps(self, full=False):
self.monitor_info()
old_value = self.besilent
self.besilent = True # turn off automatic call to readcmd
self.writecmd(self.MONITORAPP, 0x82, 1, [int(full)]);
self.besilent = old_value
# read the build date string
self.readcmd()
print "Build Date: %s" % self.data
print "Firmware apps:"
while True:
self.readcmd()
if self.count == 0:
break
print self.data
return 1;
def monitorclocking(self):
"""Return the 16-bit clocking value."""
return "0x%04x" % self.monitorgetclock();
def monitorsetclock(self,clock):
"""Set the clocking value."""
self.MONpoke16(0x56, clock);
def monitorgetclock(self):
"""Get the clocking value."""
if(os.environ.get("platform")=='arduino' or os.environ.get("board")=='arduino'):
return 0xDEAD;
#Check for MSP430 before peeking this.
return self.MONpeek16(0x56);
# The following functions ought to be implemented in
# every client.
def infostring(self):
if(os.environ.get("platform")=='arduino' or os.environ.get("board")=='arduino'):
return "Arduino";
else:
a=self.MONpeek8(0xff0);
b=self.MONpeek8(0xff1);
return "%02x%02x" % (a,b);
def lock(self):
print "Locking Unsupported.";
def erase(self):
print "Erasure Unsupported.";
def setup(self):
return;
def start(self):
return;
def test(self):
print "Unimplemented.";
return;
def status(self):
print "Unimplemented.";
return;
def halt(self):
print "Unimplemented.";
return;
def resume(self):
print "Unimplemented.";
return;
def getpc(self):
print "Unimplemented.";
return 0xdead;
def flash(self,file):
"""Flash an intel hex file to code memory."""
print "Flash not implemented.";
def dump(self,file,start=0,stop=0xffff):
"""Dump an intel hex file from code memory."""
print "Dump not implemented.";
def peek32(self,address, memory="vn"):
"""Peek 32 bits."""
return (self.peek16(address,memory)+
(self.peek16(address+2,memory)<<16));
def peek16(self,address, memory="vn"):
"""Peek 16 bits of memory."""
return (self.peek8(address,memory)+
(self.peek8(address+1,memory)<<8));
def peek8(self,address, memory="vn"):
"""Peek a byte of memory."""
return self.MONpeek8(address); #monitor
def peekblock(self,address,length,memory="vn"):
"""Return a block of data."""
data=range(0,length);
for foo in range(0,length):
data[foo]=self.peek8(address+foo,memory);
return data;
def pokeblock(self,address,bytes,memory="vn"):
"""Poke a block of a data into memory at an address."""
for foo in bytes:
self.pokebyte(address,foo,memory);
address=address+1;
return;
def loadsymbols(self):
"""Load symbols from a file."""
return;
#!/usr/bin/env python
# GoodFET Client Library for Maxim USB Chips.
#
# (C) 2012 Travis Goodspeed <travis at radiantmachines.com>
#
# This code is being rewritten and refactored. You've been warned!
import sys, time, string, cStringIO, struct, glob, os;
import warnings
from GoodFET import GoodFET;
warnings.warn(
"""This library will soon be deprecated in favor of the USB*.py libraries."""
)
#Handy registers.
rEP0FIFO=0
rEP1OUTFIFO=1
rEP2INFIFO=2
rEP3INFIFO=3
rSUDFIFO=4
rEP0BC=5
rEP1OUTBC=6
rEP2INBC=7
rEP3INBC=8
rEPSTALLS=9
rCLRTOGS=10
rEPIRQ=11
rEPIEN=12
rUSBIRQ=13
rUSBIEN=14
rUSBCTL=15
rCPUCTL=16
rPINCTL=17
rREVISION=18
rFNADDR=19
rIOPINS=20
rIOPINS1=20 #Same as rIOPINS
rIOPINS2=21
rHIRQ=25
rHIEN=26
rMODE=27
rPERADDR=28
rHCTL=29
rHXFR=30
rHRSL=31
#Host mode registers.
rRCVFIFO =1
rSNDFIFO =2
rRCVBC =6
rSNDBC =7
rHIRQ =25
# R11 EPIRQ register bits
bmSUDAVIRQ =0x20
bmIN3BAVIRQ =0x10
bmIN2BAVIRQ =0x08
bmOUT1DAVIRQ= 0x04
bmOUT0DAVIRQ= 0x02
bmIN0BAVIRQ =0x01
# R12 EPIEN register bits
bmSUDAVIE =0x20
bmIN3BAVIE =0x10
bmIN2BAVIE =0x08
bmOUT1DAVIE =0x04
bmOUT0DAVIE =0x02
bmIN0BAVIE =0x01
# ************************
# Standard USB Requests
SR_GET_STATUS =0x00 # Get Status
SR_CLEAR_FEATURE =0x01 # Clear Feature
SR_RESERVED =0x02 # Reserved
SR_SET_FEATURE =0x03 # Set Feature
SR_SET_ADDRESS =0x05 # Set Address
SR_GET_DESCRIPTOR =0x06 # Get Descriptor
SR_SET_DESCRIPTOR =0x07 # Set Descriptor
SR_GET_CONFIGURATION =0x08 # Get Configuration
SR_SET_CONFIGURATION =0x09 # Set Configuration
SR_GET_INTERFACE =0x0a # Get Interface
SR_SET_INTERFACE =0x0b # Set Interface
# Get Descriptor codes
GD_DEVICE =0x01 # Get device descriptor: Device
GD_CONFIGURATION =0x02 # Get device descriptor: Configuration
GD_STRING =0x03 # Get device descriptor: String
GD_HID =0x21 # Get descriptor: HID
GD_REPORT =0x22 # Get descriptor: Report
# SETUP packet header offsets
bmRequestType =0
bRequest =1
wValueL =2
wValueH =3
wIndexL =4
wIndexH =5
wLengthL =6
wLengthH =7
# HID bRequest values
GET_REPORT =1
GET_IDLE =2
GET_PROTOCOL =3
SET_REPORT =9
SET_IDLE =0x0A
SET_PROTOCOL =0x0B
INPUT_REPORT =1
# PINCTL bits
bmEP3INAK =0x80
bmEP2INAK =0x40
bmEP1INAK =0x20
bmFDUPSPI =0x10
bmINTLEVEL =0x08
bmPOSINT =0x04
bmGPXB =0x02
bmGPXA =0x01
# rUSBCTL bits
bmHOSCSTEN =0x80
bmVBGATE =0x40
bmCHIPRES =0x20
bmPWRDOWN =0x10
bmCONNECT =0x08
bmSIGRWU =0x04
# USBIRQ bits
bmURESDNIRQ =0x80
bmVBUSIRQ =0x40
bmNOVBUSIRQ =0x20
bmSUSPIRQ =0x10
bmURESIRQ =0x08
bmBUSACTIRQ =0x04
bmRWUDNIRQ =0x02
bmOSCOKIRQ =0x01
# MODE bits
bmHOST =0x01
bmLOWSPEED =0x02
bmHUBPRE =0x04
bmSOFKAENAB =0x08
bmSEPIRQ =0x10
bmDELAYISO =0x20
bmDMPULLDN =0x40
bmDPPULLDN =0x80
# PERADDR/HCTL bits
bmBUSRST =0x01
bmFRMRST =0x02
bmSAMPLEBUS =0x04
bmSIGRSM =0x08
bmRCVTOG0 =0x10
bmRCVTOG1 =0x20
bmSNDTOG0 =0x40
bmSNDTOG1 =0x80
# rHXFR bits
# Host XFR token values for writing the HXFR register (R30).
# OR this bit field with the endpoint number in bits 3:0
tokSETUP =0x10 # HS=0, ISO=0, OUTNIN=0, SETUP=1
tokIN =0x00 # HS=0, ISO=0, OUTNIN=0, SETUP=0
tokOUT =0x20 # HS=0, ISO=0, OUTNIN=1, SETUP=0
tokINHS =0x80 # HS=1, ISO=0, OUTNIN=0, SETUP=0
tokOUTHS =0xA0 # HS=1, ISO=0, OUTNIN=1, SETUP=0
tokISOIN =0x40 # HS=0, ISO=1, OUTNIN=0, SETUP=0
tokISOOUT =0x60 # HS=0, ISO=1, OUTNIN=1, SETUP=0
# rRSL bits
bmRCVTOGRD =0x10
bmSNDTOGRD =0x20
bmKSTATUS =0x40
bmJSTATUS =0x80
# Host error result codes, the 4 LSB's in the HRSL register.
hrSUCCESS =0x00
hrBUSY =0x01
hrBADREQ =0x02
hrUNDEF =0x03
hrNAK =0x04
hrSTALL =0x05
hrTOGERR =0x06
hrWRONGPID =0x07
hrBADBC =0x08
hrPIDERR =0x09
hrPKTERR =0x0A
hrCRCERR =0x0B
hrKERR =0x0C
hrJERR =0x0D
hrTIMEOUT =0x0E
hrBABBLE =0x0F
# HIRQ bits
bmBUSEVENTIRQ =0x01 # indicates BUS Reset Done or BUS Resume
bmRWUIRQ =0x02
bmRCVDAVIRQ =0x04
bmSNDBAVIRQ =0x08
bmSUSDNIRQ =0x10
bmCONDETIRQ =0x20
bmFRAMEIRQ =0x40
bmHXFRDNIRQ =0x80
class GoodFETMAXUSB(GoodFET):
MAXUSBAPP=0x40;
usbverbose=False;
def service_irqs(self):
"""Handle USB interrupt events."""
epirq=self.rreg(rEPIRQ);
usbirq=self.rreg(rUSBIRQ);
#Are we being asked for setup data?
if(epirq&bmSUDAVIRQ): #Setup Data Requested
self.wreg(rEPIRQ,bmSUDAVIRQ); #Clear the bit
self.do_SETUP();
if(epirq&bmOUT1DAVIRQ): #OUT1-OUT packet
self.do_OUT1();
self.wreg(rEPIRQ,bmOUT1DAVIRQ); #Clear the bit *AFTER* servicing.
if(epirq&bmIN3BAVIRQ): #IN3-IN packet
self.do_IN3();
#self.wreg(rEPIRQ,bmIN3BAVIRQ); #Clear the bit
if(epirq&bmIN2BAVIRQ): #IN2 packet
self.do_IN2();
#self.wreg(rEPIRQ,bmIN2BAVIRQ); #Clear the bit
#else:
# print "No idea how to service this IRQ: %02x" % epirq;
def do_IN2(self):
"""Overload this."""
def do_IN3(self):
"""Overload this."""
def do_OUT1(self):
"""Overload this."""
if self.usbverbose: print "Ignoring an OUT1 interrupt.";
def setup2str(self,SUD):
"""Converts the header of a setup packet to a string."""
return "bmRequestType=0x%02x, bRequest=0x%02x, wValue=0x%04x, wIndex=0x%04x, wLength=0x%04x" % (
ord(SUD[0]), ord(SUD[1]),
ord(SUD[2])+(ord(SUD[3])<<8),
ord(SUD[4])+(ord(SUD[5])<<8),
ord(SUD[6])+(ord(SUD[7])<<8)
);
def MAXUSBsetup(self):
"""Move the FET into the MAXUSB application."""
self.writecmd(self.MAXUSBAPP,0x10,0,self.data); #MAXUSB/SETUP
self.writecmd(self.MAXUSBAPP,0x10,0,self.data); #MAXUSB/SETUP
self.writecmd(self.MAXUSBAPP,0x10,0,self.data); #MAXUSB/SETUP
print "Connected to MAX342x Rev. %x" % (self.rreg(rREVISION));
self.wreg(rPINCTL,0x18); #Set duplex and negative INT level.
def MAXUSBtrans8(self,byte):
"""Read and write 8 bits by MAXUSB."""
data=self.MAXUSBtrans([byte]);
return ord(data[0]);
def MAXUSBtrans(self,data):
"""Exchange data by MAXUSB."""
self.data=data;
self.writecmd(self.MAXUSBAPP,0x00,len(data),data);
return self.data;
def rreg(self,reg):
"""Peek 8 bits from a register."""
data=[reg<<3,0];
self.writecmd(self.MAXUSBAPP,0x00,len(data),data);
return ord(self.data[1]);
def rregAS(self,reg):
"""Peek 8 bits from a register, setting AS."""
data=[(reg<<3)|1,0];
self.writecmd(self.MAXUSBAPP,0x00,len(data),data);
return ord(self.data[1]);
def wreg(self,reg,value):
"""Poke 8 bits into a register."""
data=[(reg<<3)|2,value];
self.writecmd(self.MAXUSBAPP,0x00,len(data),data);
return value;
def wregAS(self,reg,value):
"""Poke 8 bits into a register, setting AS."""
data=[(reg<<3)|3,value];
self.writecmd(self.MAXUSBAPP,0x00,len(data),data);
return value;
def readbytes(self,reg,length):
"""Peek some bytes from a register."""
data=[(reg<<3)]+range(0,length);
self.writecmd(self.MAXUSBAPP,0x00,len(data),data);
toret=self.data[1:len(self.data)];
ashex="";
for foo in toret:
ashex=ashex+(" %02x"%ord(foo));
if self.usbverbose: print "GET %02x==%s" % (reg,ashex);
return toret;
def readbytesAS(self,reg,length):
"""Peek some bytes from a register, acking prior transfer."""
data=[(reg<<3)|1]+range(0,length);
self.writecmd(self.MAXUSBAPP,0x00,len(data),data);
toret=self.data[1:len(self.data)];
ashex="";
for foo in toret:
ashex=ashex+(" %02x"%ord(foo));
if self.usbverbose: print "GETAS %02x==%s" % (reg,ashex);
return toret;
def fifo_ep3in_tx(self,data):
"""Sends the data out of EP3 in 64-byte chunks."""
#Wait for the buffer to be free before starting.
while not(self.rreg(rEPIRQ)&bmIN3BAVIRQ): pass;
count=len(data);
pos=0;
while count>0:
#Send 64-byte chunks or the remainder.
c=min(count,64);
self.writebytes(rEP3INFIFO,
data[pos:pos+c]);
self.wregAS(rEP3INBC,c);
count=count-c;
pos=pos+c;
#Wait for the buffer to be free before continuing.
while not(self.rreg(rEPIRQ)&bmIN3BAVIRQ): pass;
return;
def ctl_write_nd(self,request):
"""Control Write with no data stage. Assumes PERADDR is set
and the SUDFIFO contains the 8 setup bytes. Returns with
result code = HRSLT[3:0] (HRSL register). If there is an
error, the 4MSBits of the returned value indicate the stage 1
or 2."""
# 1. Send the SETUP token and 8 setup bytes.
# Should ACK immediately.
self.writebytes(rSUDFIFO,request);
resultcode=self.send_packet(tokSETUP,0); #SETUP packet to EP0.
if resultcode: return resultcode;
# 2. No data stage, so the last operation is to send an IN
# token to the peripheral as the STATUS (handhsake) stage of
# this control transfer. We should get NAK or the DATA1 PID.
# When we get back to the DATA1 PID the 3421 automatically
# sends the closing NAK.
resultcode=self.send_packet(tokINHS,0); #Function takes care of retries.
if resultcode: return resultcode;
return 0;
def ctl_read(self,request):
"""Control read transfer, used in Host mode."""
resultcode=0;
bytes_to_read=request[6]+256*request[7];
##SETUP packet
self.writebytes(rSUDFIFO,request); #Load the FIFO
resultcode=self.send_packet(tokSETUP,0); #SETUP packet to EP0
if resultcode:
print "Failed to get ACK on SETUP request in ctl_read()."
return resultcode;
self.wreg(rHCTL,bmRCVTOG1); #FIRST data packet in CTL transfer uses DATA1 toggle.
resultcode=self.IN_Transfer(0,bytes_to_read);
if resultcode:
print "Failed on IN Transfer in ctl_read()";
return resultcode;
self.IN_nak_count=self.nak_count;
#The OUT status stage.
resultcode=self.send_packet(tokOUTHS,0);
if resultcode:
print "Failed on OUT Status stage in ctl_read()";
return resultcode;
return 0; #Success
xfrdata=[]; #Ugly variable used only by a few functions. FIXME
def IN_Transfer(self,endpoint,INbytes):
"""Does an IN transfer to an endpoint, used for Host mode."""
xfrsize=INbytes;
xfrlen=0;
self.xfrdata=[];
while 1:
resultcode=self.send_packet(tokIN,endpoint); #IN packet to EP. NAKS taken care of.
if resultcode: return resultcode;
pktsize=self.rreg(rRCVBC); #Numer of RXed bytes.
#Very innefficient, move this to C if performance is needed.
for j in range(0,pktsize):
self.xfrdata=self.xfrdata+[self.rreg(rRCVFIFO)];
xfrsize=self.xfrdata[0];
self.wreg(rHIRQ,bmRCVDAVIRQ); #Clear IRQ
xfrlen=xfrlen+pktsize; #Add byte count to total transfer length.
#print "%i / %i" % (xfrlen,xfrsize)
#Packet is complete if:
# 1. The device sent a short packet, <maxPacketSize
# 2. INbytes have been transfered.
if (pktsize<self.maxPacketSize) or (xfrlen>=xfrsize):
self.last_transfer_size=xfrlen;
ashex="";
for foo in self.xfrdata:
ashex=ashex+(" %02x"%foo);
#print "INPACKET EP%i==%s (0x%02x bytes remain)" % (endpoint,ashex,xfrsize);
return resultcode;
RETRY_LIMIT=3;
NAK_LIMIT=300;
def send_packet(self,token,endpoint):
"""Send a packet to an endpoint as the Host, taking care of NAKs.
Don't use this for device code."""
self.retry_count=0;
self.nak_count=0;
#Repeat until NAK_LIMIT or RETRY_LIMIT is reached.
while self.nak_count<self.NAK_LIMIT and self.retry_count<self.RETRY_LIMIT:
self.wreg(rHXFR,(token|endpoint)); #launch the transfer
while not (self.rreg(rHIRQ) & bmHXFRDNIRQ):
# wait for the completion IRQ
pass;
self.wreg(rHIRQ,bmHXFRDNIRQ); #Clear IRQ
resultcode = (self.rreg(rHRSL) & 0x0F); # get the result
if (resultcode==hrNAK):
self.nak_count=self.nak_count+1;
elif (resultcode==hrTIMEOUT):
self.retry_count=self.retry_count+1;
else:
#Success!
return resultcode;
return resultcode;
def writebytes(self,reg,tosend):
"""Poke some bytes into a register."""
data="";
if type(tosend)==str:
data=chr((reg<<3)|3)+tosend;
if self.usbverbose: print "PUT %02x:=%s (0x%02x bytes)" % (reg,tosend,len(data))
else:
data=[(reg<<3)|3]+tosend;
ashex="";
for foo in tosend:
ashex=ashex+(" %02x"%foo);
if self.usbverbose: print "PUT %02x:=%s (0x%02x bytes)" % (reg,ashex,len(data))
self.writecmd(self.MAXUSBAPP,0x00,len(data),data);
def usb_connect(self):
"""Connect the USB port."""
#disconnect D+ pullup if host turns off VBUS
self.wreg(rUSBCTL,bmVBGATE|bmCONNECT);
def usb_disconnect(self):
"""Disconnect the USB port."""
self.wreg(rUSBCTL,bmVBGATE);
def STALL_EP0(self,SUD=None):
"""Stall for an unknown SETUP event."""
if SUD==None:
print "Stalling EP0.";
else:
print "Stalling EPO for %s" % self.setup2str(SUD);
self.wreg(rEPSTALLS,0x23); #All three stall bits.
def SETBIT(self,reg,val):
"""Set a bit in a register."""
self.wreg(reg,self.rreg(reg)|val);
def vbus_on(self):
"""Turn on the target device."""
self.wreg(rIOPINS2,(self.rreg(rIOPINS2)|0x08));
def vbus_off(self):
"""Turn off the target device's power."""
self.wreg(rIOPINS2,0x00);
def reset_host(self):
"""Resets the chip into host mode."""
self.wreg(rUSBCTL,bmCHIPRES); #Stop the oscillator.
self.wreg(rUSBCTL,0x00); #restart it.
#FIXME: Why does the OSC line never settle?
#Code works without it.
#print "Waiting for PLL to sabilize.";
#while self.rreg(rUSBIRQ)&bmOSCOKIRQ:
# #Hang until the PLL stabilizes.
# pass;
#print "Stable.";
class GoodFETMAXUSBHost(GoodFETMAXUSB):
"""This is a class for implemented a minimal USB host.
It's intended for fuzzing, rather than for daily use."""
def hostinit(self):
"""Initialize the MAX3421 as a USB Host."""
self.usb_connect();
print "Enabling host mode.";
self.wreg(rPINCTL,(bmFDUPSPI|bmPOSINT));
print "Resetting host.";
self.reset_host();
self.vbus_off();
time.sleep(0.2);
print "Powering host.";
self.vbus_on();
#self.hostrun();
def hostrun(self):
"""Run as a minimal host and dump the config tables."""
while 1:
self.detect_device();
time.sleep(0.2);
self.enumerate_device();
self.wait_for_disconnect();
def detect_device(self):
"""Waits for a device to be inserted and then returns."""
busstate=0;
#Activate host mode and turn on 15K pulldown resistors on D+ and D-.
self.wreg(rMODE,(bmDPPULLDN|bmDMPULLDN|bmHOST));
#Clear connection detect IRQ.
self.wreg(rHIRQ,bmCONDETIRQ);
print "Waiting for a device connection.";
while busstate==0:
self.wreg(rHCTL,bmSAMPLEBUS); #Update JSTATUS and KSTATUS bits.
busstate=self.rreg(rHRSL) & (bmJSTATUS|bmKSTATUS);
if busstate==bmJSTATUS:
print "Detected Full-Speed Device.";
self.wreg(rMODE,(bmDPPULLDN|bmDMPULLDN|bmHOST|bmSOFKAENAB));
elif busstate==bmKSTATUS:
print "Detected Low-Speed Device.";
self.wreg(rMODE,(bmDPPULLDN|bmDMPULLDN|bmHOST|bmLOWSPEED|bmSOFKAENAB));
else:
print "Not sure whether this is Full-Speed or Low-Speed. Please investigate.";
def wait_for_disconnect(self):
"""Wait for a device to be disconnected."""
print "Waiting for a device disconnect.";
self.wreg(rHIRQ,bmCONDETIRQ); #Clear disconnect IRQ
while not (self.rreg(rHIRQ) & bmCONDETIRQ):
#Wait for IRQ to change.
pass;
#Turn off markers.
self.wreg(rMODE,bmDPPULLDN|bmDMPULLDN|bmHOST);
print "Device disconnected.";
self.wreg(rIOPINS2,(self.rreg(rIOPINS2) & ~0x04)); #HL1_OFF
self.wreg(rIOPINS1,(self.rreg(rIOPINS1) & ~0x02)); #HL4_OFF
def enumerate_device(self):
"""Enumerates a device on the present port."""
Set_Address_to_7 = [0x00,0x05,0x07,0x00,0x00,0x00,0x00,0x00];
Get_Descriptor_Device = [0x80,0x06,0x00,0x01,0x00,0x00,0x00,0x00]; #len filled in
Get_Descriptor_Config = [0x80,0x06,0x00,0x02,0x00,0x00,0x00,0x00];
print "Issuing USB bus reset.";
self.wreg(rHCTL,bmBUSRST);
while self.rreg(rHCTL) & bmBUSRST:
#Wait for reset to complete.
pass;
time.sleep(0.2);
#Get the device descriptor.
self.wreg(rPERADDR,0); #First request to address 0.
self.maxPacketSize=8; #Only safe value for first check.
Get_Descriptor_Device[6]=8; # wLengthL
Get_Descriptor_Device[7]=0; # wLengthH
print "Fetching 8 bytes of Device Descriptor.";
self.ctl_read(Get_Descriptor_Device); # Get device descriptor into self.xfrdata;
self.maxPacketSize=self.xfrdata[7];
print "EP0 maxPacketSize is %02i bytes." % self.maxPacketSize;
# Issue another USB bus reset
print "Resetting the bus again."
self.wreg(rHCTL,bmBUSRST);
while self.rreg(rHCTL) & bmBUSRST:
#Wait for reset to complete.
pass;
time.sleep(0.2);
# Set_Address to 7 (Note: this request goes to address 0, already set in PERADDR register).
print "Setting address to 0x07";
HR = self.ctl_write_nd(Set_Address_to_7); # CTL-Write, no data stage
#if(print_error(HR)) return;
time.sleep(0.002); # Device gets 2 msec recovery time
self.wreg(rPERADDR,7); # now all transfers go to addr 7
#Get the device descriptor at the assigned address.
Get_Descriptor_Device[6]=0x12; #Fill in real descriptor length.
print "Fetching Device Descriptor."
self.ctl_read(Get_Descriptor_Device); #Result in self.xfrdata;
self.descriptor=self.xfrdata;
self.VID = self.xfrdata[8] + 256*self.xfrdata[9];
self.PID = self.xfrdata[10]+ 256*self.xfrdata[11];
iMFG = self.xfrdata[14];
iPROD = self.xfrdata[15];
iSERIAL = self.xfrdata[16];
self.manufacturer=self.getDescriptorString(iMFG);
self.product=self.getDescriptorString(iPROD);
self.serial=self.getDescriptorString(iSERIAL);
self.printstrings();
def printstrings(self):
print "Vendor ID is %04x." % self.VID;
print "Product ID is %04x." % self.PID;
print "Manufacturer: %s" % self.manufacturer;
print "Product: %s" % self.product;
print "Serial: %s" % self.serial;
def getDescriptorString(self, index):
"""Grabs a string from the descriptor string table."""
# Get_Descriptor-String template. Code fills in idx at str[2].
Get_Descriptor_String = [0x80,0x06,index,0x03,0x00,0x00,0x40,0x00];
if index==0: return "MISSING STRING";
status=self.ctl_read(Get_Descriptor_String);
if status: return None;
#Since we've got a string
toret="";
for c in self.xfrdata[2:len(self.xfrdata)]:
if c>0: toret=toret+chr(c);
return toret;
class GoodFETMAXUSBDevice(GoodFETMAXUSB):
def send_descriptor(self,SUD):
"""Send the USB descriptors based upon the setup data."""
desclen=0;
reqlen=ord(SUD[wLengthL])+256*ord(SUD[wLengthH]); #16-bit length
desctype=ord(SUD[wValueH]);
if desctype==GD_DEVICE:
desclen=self.DD[0];
ddata=self.DD;
elif desctype==GD_CONFIGURATION:
desclen=self.CD[2];
ddata=self.CD;
elif desctype==GD_STRING:
desclen=ord(self.strDesc[ord(SUD[wValueL])][0]);
ddata=self.strDesc[ord(SUD[wValueL])];
elif desctype==GD_HID:
#Don't know how to do this yet.
pass;
elif desctype==GD_REPORT:
desclen=self.CD[25];
ddata=self.RepD;
#TODO Configuration, String, Hid, and Report
if desclen>0:
#Reduce desclen if asked for fewer bytes.
desclen=min(reqlen,desclen);
#Send those bytes.
self.writebytes(rEP0FIFO,ddata[0:desclen]);
self.wregAS(rEP0BC,desclen);
else:
print "Stalling in send_descriptor() for lack of handler for %02x." % desctype;
self.STALL_EP0(SUD);
def set_configuration(self,SUD):
"""Set the configuration."""
bmSUSPIE=0x10;
configval=ord(SUD[wValueL]);
if(configval>0):
self.SETBIT(rUSBIEN,bmSUSPIE);
self.rregAS(rFNADDR);
class GoodFETMAXUSBHID(GoodFETMAXUSBDevice):
"""This is an example HID keyboard driver, loosely based on the
MAX3420 examples."""
def hidinit(self):
"""Initialize a USB HID device."""
self.usb_disconnect();
self.usb_connect();
self.hidrun();
def hidrun(self):
"""Main loop of the USB HID emulator."""
print "Starting a HID device. This won't return.";
while 1:
self.service_irqs();
def do_SETUP(self):
"""Handle USB Enumeration"""
#Grab the SETUP packet from the buffer.
SUD=self.readbytes(rSUDFIFO,8);
#Parse the SETUP packet
print "Handling a setup packet of %s" % self.setup2str(SUD);
self.OsLastConfigType=ord(SUD[bmRequestType]);
self.typepos=0;
setuptype=(ord(SUD[bmRequestType])&0x60);
if setuptype==0x00:
self.std_request(SUD);
elif setuptype==0x20:
self.class_request(SUD);
elif setuptype==0x40:
self.vendor_request(SUD);
else:
print "Unknown request type 0x%02x." % ord(SUD[bmRequestType])
self.STALL_EP0(SUD);
def class_request(self,SUD):
"""Handle a class request."""
print "Stalling a class request.";
self.STALL_EP0(SUD);
def vendor_request(self,SUD):
print "Stalling a vendor request.";
self.STALL_EP0(SUD);
def std_request(self,SUD):
"""Handles a standard setup request."""
setuptype=ord(SUD[bRequest]);
if setuptype==SR_GET_DESCRIPTOR: self.send_descriptor(SUD);
#elif setuptype==SR_SET_FEATURE:
# self.rregAS(rFNADDR);
# # self.feature(1);
elif setuptype==SR_SET_CONFIGURATION: self.set_configuration(SUD);
elif setuptype==SR_GET_STATUS: self.get_status(SUD);
elif setuptype==SR_SET_ADDRESS: self.rregAS(rFNADDR);
elif setuptype==SR_GET_INTERFACE: self.get_interface(SUD);
else:
print "Stalling Unknown standard setup request type %02x" % setuptype;
self.STALL_EP0(SUD);
def get_interface(self,SUD):
"""Handles a setup request for SR_GET_INTERFACE."""
if ord(SUD[wIndexL]==0):
self.wreg(rEP0FIFO,0);
self.wregAS(rEP0BC,1);
else:
self.STALL_EP0(SUD);
OsLastConfigType=-1;
#Device Descriptor
DD=[0x12, # bLength = 18d
0x01, # bDescriptorType = Device (1)
0x00,0x01, # bcdUSB(L/H) USB spec rev (BCD)
0x00,0x00,0x00, # bDeviceClass, bDeviceSubClass, bDeviceProtocol
0x40, # bMaxPacketSize0 EP0 is 64 bytes
0x6A,0x0B, # idVendor(L/H)--Maxim is 0B6A
0x46,0x53, # idProduct(L/H)--5346
0x34,0x12, # bcdDevice--1234
1,2,3, # iManufacturer, iProduct, iSerialNumber
1];
#Configuration Descriptor
CD=[0x09, # bLength
0x02, # bDescriptorType = Config
0x22,0x00, # wTotalLength(L/H) = 34 bytes
0x01, # bNumInterfaces
0x01, # bConfigValue
0x00, # iConfiguration
0xE0, # bmAttributes. b7=1 b6=self-powered b5=RWU supported
0x01, # MaxPower is 2 ma
# INTERFACE Descriptor
0x09, # length = 9
0x04, # type = IF
0x00, # IF #0
0x00, # bAlternate Setting
0x01, # bNum Endpoints
0x03, # bInterfaceClass = HID
0x00,0x00, # bInterfaceSubClass, bInterfaceProtocol
0x00, # iInterface
# HID Descriptor--It's at CD[18]
0x09, # bLength
0x21, # bDescriptorType = HID
0x10,0x01, # bcdHID(L/H) Rev 1.1
0x00, # bCountryCode (none)
0x01, # bNumDescriptors (one report descriptor)
0x22, # bDescriptorType (report)
43,0, # CD[25]: wDescriptorLength(L/H) (report descriptor size is 43 bytes)
# Endpoint Descriptor
0x07, # bLength
0x05, # bDescriptorType (Endpoint)
0x83, # bEndpointAddress (EP3-IN)
0x03, # bmAttributes (interrupt)
64,0, # wMaxPacketSize (64)
10];
strDesc=[
# STRING descriptor 0--Language string
"\x04\x03\x09\x04",
# [
# 0x04, # bLength
# 0x03, # bDescriptorType = string
# 0x09,0x04 # wLANGID(L/H) = English-United Sates
# ],
# STRING descriptor 1--Manufacturer ID
"\x0c\x03M\x00a\x00x\x00i\x00m\x00",
# [
# 12, # bLength
# 0x03, # bDescriptorType = string
# 'M',0,'a',0,'x',0,'i',0,'m',0 # text in Unicode
# ],
# STRING descriptor 2 - Product ID
"\x18\x03M\x00A\x00X\x003\x004\x002\x000\x00E\x00 \x00E\x00n\x00u\x00m\x00 \x00C\x00o\x00d\x00e\x00",
# [ 24, # bLength
# 0x03, # bDescriptorType = string
# 'M',0,'A',0,'X',0,'3',0,'4',0,'2',0,'0',0,'E',0,' ',0,
# 'E',0,'n',0,'u',0,'m',0,' ',0,'C',0,'o',0,'d',0,'e',0
# ],
# STRING descriptor 3 - Serial Number ID
"\x14\x03S\x00/\x00N\x00 \x003\x004\x002\x000\x00E\x00"
# [ 20, # bLength
# 0x03, # bDescriptorType = string
# 'S',0,
# '/',0,
# 'N',0,
# ' ',0,
# '3',0,
# '4',0,
# '2',0,
# '0',0,
# 'E',0,
# ]
];
RepD=[
0x05,0x01, # Usage Page (generic desktop)
0x09,0x06, # Usage (keyboard)
0xA1,0x01, # Collection
0x05,0x07, # Usage Page 7 (keyboard/keypad)
0x19,0xE0, # Usage Minimum = 224
0x29,0xE7, # Usage Maximum = 231
0x15,0x00, # Logical Minimum = 0
0x25,0x01, # Logical Maximum = 1
0x75,0x01, # Report Size = 1
0x95,0x08, # Report Count = 8
0x81,0x02, # Input(Data,Variable,Absolute)
0x95,0x01, # Report Count = 1
0x75,0x08, # Report Size = 8
0x81,0x01, # Input(Constant)
0x19,0x00, # Usage Minimum = 0
0x29,0x65, # Usage Maximum = 101
0x15,0x00, # Logical Minimum = 0,
0x25,0x65, # Logical Maximum = 101
0x75,0x08, # Report Size = 8
0x95,0x01, # Report Count = 1
0x81,0x00, # Input(Data,Variable,Array)
0xC0]
def get_status(self,SUD):
"""Get the USB Setup Status."""
testbyte=ord(SUD[bmRequestType])
#Toward Device
if testbyte==0x80:
self.wreg(rEP0FIFO,0x03); #Enable RWU and self-powered
self.wreg(rEP0FIFO,0x00); #Second byte is always zero.
self.wregAS(rEP0BC,2); #Load byte count, arm transfer, and ack CTL.
#Toward Interface
elif testbyte==0x81:
self.wreg(rEP0FIFO,0x00);
self.wreg(rEP0FIFO,0x00); #Second byte is always zero.
self.wregAS(rEP0BC,2);
#Toward Endpoint
elif testbyte==0x82:
if(ord(SUD[wIndexL])==0x83):
self.wreg(rEP0FIFO,0x01); #Stall EP3
self.wreg(rEP0FIFO,0x00); #Second byte is always zero.
self.wregAS(rEP0BC,2);
else:
self.STALL_EP0(SUD);
else:
self.STALL_EP0(SUD);
typepos=0;
typestrings={
-1 : "Python does USB HID!\n", # Unidentified OS. This is the default typestring.
0x00 : "OSX Hosts don't recognize Maxim keyboards.\n", # We have to identify as an Apple keyboard to get arround the unknown keyboard error.
0xA1 : "Python does USB HID on Linux!\n",
0x81 : " Python does USB HID on Windows!\n", # Windows requires a bit of a delay. Maybe we can watch for a keyboard reset command?
}
def typestring(self):
if self.typestrings.has_key(self.OsLastConfigType):
return self.typestrings[self.OsLastConfigType];
else:
return self.typestrings[-1];
# http://www.win.tue.nl/~aeb/linux/kbd/scancodes-14.html
# Escape=0x29 Backsp=0x2A Space=0x2C CapsLock=0x39 Menu=0x65
keymaps={
'en_US' :[ ' abcdefghijklmnopqrstuvwxyz1234567890\n\t -=[]\\\\;\'`,./',
''' 
  ''', # LeftCtrl
' ABCDEFGHIJKLMNOPQRSTUVWXYZ!@#$%^&*() {}?+||:"~<>?', # LeftShift
'', # LeftCtrl & LeftShift
' abc'], # LeftAlt
'Dvorak' :[ ' axje.uidchtnmbrl\'poygk,qf;1234567890\n\t []/=\\\\s-`wvz',
'''    
   ''', # LeftCtrl
' AXJE UIDCHTNMBRL"POYGK<QF:!@#$%^&*() {}?+||S_~WVZ', # LeftShift
'', # LeftCtrl & LeftShift
' axj'], # LeftAlt
}
layout='en_US';
def keymap(self):
return self.keymaps[self.layout];
modifiers={
'None': 0b00000000,
'LeftCtrl': 0b00000001,
'LeftShift': 0b00000010,
'LeftAlt': 0b00000100,
'LeftGUI': 0b00001000,
'RightCtrl': 0b00010000,
'RightShift': 0b00100000,
'RightAlt': 0b01000000,
'RightGUI': 0b10000000
}
def asc2hid(self,ascii):
"""Translate ASCII to an USB keycode."""
if type(ascii)!=str:
return (0,0); # Send NoEvent if not passed a character
if ascii==' ':
return (0,0x2C); # space
for modset in self.keymap():
keycode=modset.find(ascii);
if keycode != -1:
modifier = self.keymap().index(modset)
return (modifier, keycode);
return (0,0);
def type_IN3(self):
"""Type next letter in buffer."""
string=self.typestring();
if self.typepos>=len(string):
self.typeletter(0) #key up
while 1:
mydata = raw_input('>>')
if mydata == '0':
break
else:
for i in mydata:
self.typeletter(i)
self.typeletter(0)
#self.typeletter(0); # Send NoEvent to indicate key-up
exit(0);
#self.typepos=0; # Repeat typestring forever!
# This would be a great place to enable a typethrough mode so the host operator can control the target
else:
if self.usbverbose:
sys.stdout.write(string[self.typepos]);
sys.stdout.flush();
self.typeletter(string[self.typepos]);
self.typepos+=1;
return;
def typeletter(self,key):
"""Type a letter on IN3. Zero for keyup."""
mod=0;
if type(key)==str:
(mod, key) = self.asc2hid(key);
self.wreg(rEP3INFIFO,mod);
self.wreg(rEP3INFIFO,0);
self.wreg(rEP3INFIFO,key);
self.wreg(rEP3INBC,3);
def do_IN3(self):
"""Handle IN3 event."""
#Don't bother clearing interrupt flag, that's done by sending the reply.
if self.OsLastConfigType != -1: # Wait for some configuration before stuffing keycodes down the pipe
self.type_IN3();
# MAXUSBApp.py
#
# Contains class definition for MAXUSBApp.
import time
from util import *
from Facedancer import *
from USB import *
from USBDevice import USBDeviceRequest
class MAXUSBApp(FacedancerApp):
app_name = "MAXUSB"
app_num = 0x40
reg_ep0_fifo = 0x00
reg_ep1_out_fifo = 0x01
reg_ep2_in_fifo = 0x02
reg_ep3_in_fifo = 0x03
reg_setup_data_fifo = 0x04
reg_ep0_byte_count = 0x05
reg_ep1_out_byte_count = 0x06
reg_ep2_in_byte_count = 0x07
reg_ep3_in_byte_count = 0x08
reg_ep_stalls = 0x09
reg_clr_togs = 0x0a
reg_endpoint_irq = 0x0b
reg_endpoint_interrupt_enable = 0x0c
reg_usb_irq = 0x0d
reg_usb_interrupt_enable = 0x0e
reg_usb_control = 0x0f
reg_cpu_control = 0x10
reg_pin_control = 0x11
reg_revision = 0x12
reg_function_address = 0x13
reg_io_pins = 0x14
# bitmask values for reg_endpoint_irq = 0x0b
is_setup_data_avail = 0x20 # SUDAVIRQ
is_in3_buffer_avail = 0x10 # IN3BAVIRQ
is_in2_buffer_avail = 0x08 # IN2BAVIRQ
is_out1_data_avail = 0x04 # OUT1DAVIRQ
is_out0_data_avail = 0x02 # OUT0DAVIRQ
is_in0_buffer_avail = 0x01 # IN0BAVIRQ
# bitmask values for reg_usb_control = 0x0f
usb_control_vbgate = 0x40
usb_control_connect = 0x08
# bitmask values for reg_pin_control = 0x11
interrupt_level = 0x08
full_duplex = 0x10
def __init__(self, device, verbose=0):
FacedancerApp.__init__(self, device, verbose)
self.connected_device = None
self.enable()
if verbose > 0:
rev = self.read_register(self.reg_revision)
print(self.app_name, "revision", rev)
# set duplex and negative INT level (from GoodFEDMAXUSB.py)
self.write_register(self.reg_pin_control,
self.full_duplex | self.interrupt_level)
def init_commands(self):
self.read_register_cmd = FacedancerCommand(self.app_num, 0x00, b'')
self.write_register_cmd = FacedancerCommand(self.app_num, 0x00, b'')
self.enable_app_cmd = FacedancerCommand(self.app_num, 0x10, b'')
self.ack_cmd = FacedancerCommand(self.app_num, 0x00, b'\x01')
def read_register(self, reg_num, ack=False):
if self.verbose > 1:
print(self.app_name, "reading register 0x%02x" % reg_num)
self.read_register_cmd.data = bytearray([ reg_num << 3, 0 ])
if ack:
self.read_register_cmd.data[0] |= 1
self.device.writecmd(self.read_register_cmd)
resp = self.device.readcmd()
if self.verbose > 2:
print(self.app_name, "read register 0x%02x has value 0x%02x" %
(reg_num, resp.data[1]))
return resp.data[1]
def write_register(self, reg_num, value, ack=False):
if self.verbose > 2:
print(self.app_name, "writing register 0x%02x with value 0x%02x" %
(reg_num, value))
self.write_register_cmd.data = bytearray([ (reg_num << 3) | 2, value ])
if ack:
self.write_register_cmd.data[0] |= 1
self.device.writecmd(self.write_register_cmd)
self.device.readcmd()
def get_version(self):
return self.read_register(self.reg_revision)
def ack_status_stage(self):
if self.verbose > 5:
print(self.app_name, "sending ack!")
self.device.writecmd(self.ack_cmd)
self.device.readcmd()
def connect(self, usb_device):
if self.read_register(self.reg_usb_control) & self.usb_control_connect:
self.write_register(self.reg_usb_control, self.usb_control_vbgate)
time.sleep(.1)
self.write_register(self.reg_usb_control, self.usb_control_vbgate |
self.usb_control_connect)
self.connected_device = usb_device
if self.verbose > 0:
print(self.app_name, "connected device", self.connected_device.name)
def disconnect(self):
self.write_register(self.reg_usb_control, self.usb_control_vbgate)
if self.verbose > 0:
print(self.app_name, "disconnected device", self.connected_device.name)
self.connected_device = None
def clear_irq_bit(self, reg, bit):
self.write_register(reg, bit)
def read_bytes(self, reg, n):
if self.verbose > 2:
print(self.app_name, "reading", n, "bytes from register", reg)
data = bytes([ (reg << 3) ] + ([0] * n))
cmd = FacedancerCommand(self.app_num, 0x00, data)
self.device.writecmd(cmd)
resp = self.device.readcmd()
if self.verbose > 3:
print(self.app_name, "read", len(resp.data) - 1, "bytes from register", reg)
return resp.data[1:]
def write_bytes(self, reg, data):
data = bytes([ (reg << 3) | 3 ]) + data # this changes the register from 1b to 1a
cmd = FacedancerCommand(self.app_num, 0x00, data)
self.device.writecmd(cmd)
self.device.readcmd() # null response
if self.verbose > 3:
print(self.app_name, "wrote", len(data) - 1, "bytes to register", reg)
# HACK: but given the limitations of the MAX chips, it seems necessary
def send_on_endpoint(self, ep_num, data):
if ep_num == 0:
fifo_reg = self.reg_ep0_fifo
bc_reg = self.reg_ep0_byte_count
elif ep_num == 2:
fifo_reg = self.reg_ep2_in_fifo
bc_reg = self.reg_ep2_in_byte_count
elif ep_num == 3:
fifo_reg = self.reg_ep3_in_fifo
bc_reg = self.reg_ep3_in_byte_count
else:
raise ValueError('endpoint ' + str(ep_num) + ' not supported')
# FIFO buffer is only 64 bytes, must loop
while len(data) > 64:
self.write_bytes(fifo_reg, data[:64])
self.write_register(bc_reg, 64, ack=True)
data = data[64:]
self.write_bytes(fifo_reg, data)
self.write_register(bc_reg, len(data), ack=True)
if self.verbose > 1:
print(self.app_name, "wrote", bytes_as_hex(data), "to endpoint",
ep_num)
def send_on_endpoint_key(self, ep_num, data):
fifo_reg = self.reg_ep3_in_fifo
bc_reg = self.reg_ep3_in_byte_count
self.write_bytes(fifo_reg, b'\x00')
self.write_bytes(fifo_reg, b'\x00')
self.write_bytes(fifo_reg, data)
self.write_register(bc_reg, 3, ack=True)
# HACK: but given the limitations of the MAX chips, it seems necessary
def read_from_endpoint(self, ep_num):
if ep_num != 1:
return b''
byte_count = self.read_register(self.reg_ep1_out_byte_count)
if byte_count == 0:
return b''
data = self.read_bytes(self.reg_ep1_out_fifo, byte_count)
if self.verbose > 1:
print(self.app_name, "read", bytes_as_hex(data), "from endpoint",
ep_num)
return data
def stall_ep0(self):
if self.verbose > 0:
print(self.app_name, "stalling endpoint 0")
self.write_register(self.reg_ep_stalls, 0x23)
def service_irqs(self):
while True:
irq = self.read_register(self.reg_endpoint_irq)
if self.verbose > 3:
print(self.app_name, "read endpoint irq: 0x%02x" % irq)
if self.verbose > 2:
if irq & ~ (self.is_in0_buffer_avail \
| self.is_in2_buffer_avail | self.is_in3_buffer_avail):
print(self.app_name, "notable irq: 0x%02x" % irq)
if irq & self.is_setup_data_avail:
self.clear_irq_bit(self.reg_endpoint_irq, self.is_setup_data_avail)
b = self.read_bytes(self.reg_setup_data_fifo, 8)
req = USBDeviceRequest(b)
self.connected_device.handle_request(req)
if irq & self.is_out1_data_avail:
data = self.read_from_endpoint(1)
if data:
self.connected_device.handle_data_available(1, data)
self.clear_irq_bit(self.reg_endpoint_irq, self.is_out1_data_avail)
if irq & self.is_in2_buffer_avail:
self.connected_device.handle_buffer_available(2)
if irq & self.is_in3_buffer_avail:
self.connected_device.handle_buffer_available(3)
# USBKeyboard.py
#
# Contains class definitions to implement a USB keyboard.
from USB import *
from USBDevice import *
from USBConfiguration import *
from USBInterface import *
from USBEndpoint import *
class USBKeyboardInterface(USBInterface):
name = "USB keyboard interface"
hid_descriptor = b'\x09\x21\x10\x01\x00\x01\x22\x2b\x00'
report_descriptor = b'\x05\x01\x09\x06\xA1\x01\x05\x07\x19\xE0\x29\xE7\x15\x00\x25\x01\x75\x01\x95\x08\x81\x02\x95\x01\x75\x08\x81\x01\x19\x00\x29\x65\x15\x00\x25\x65\x75\x08\x95\x01\x81\x00\xC0'
def __init__(self, verbose=0):
descriptors = {
USB.desc_type_hid : self.hid_descriptor,
USB.desc_type_report : self.report_descriptor
}
endpoint = USBEndpoint(
3, # endpoint number
USBEndpoint.direction_in,
USBEndpoint.transfer_type_interrupt,
USBEndpoint.sync_type_none,
USBEndpoint.usage_type_data,
16384, # max packet size
10, # polling interval, see USB 2.0 spec Table 9-13
self.handle_buffer_available # handler function
)
# TODO: un-hardcode string index (last arg before "verbose")
USBInterface.__init__(
self,
0, # interface number
0, # alternate setting
3, # interface class
0, # subclass
0, # protocol
0, # string index
verbose,
[ endpoint ],
descriptors
)
# "l<KEY UP>s<KEY UP><ENTER><KEY UP>"
empty_preamble = [ 0x00 ] * 10
text = [ 0x0f, 0x00, 0x16, 0x00, 0x28, 0x00 ]
self.keys = [ chr(x) for x in empty_preamble + text ]
self.key_mapping = ' abcdefghijklmnopqrstuvwxyz1234567890' # and so on
self.counter = 0
def handle_buffer_available(self):
self.counter += 1 # doing the same as the empty_preamble, waiting for setup
if self.counter > 10: # once configuration is set, make type-through
while 1:
data = input('>>')
if len(data) > 0:
for i in data:
self.type_letter(chr(self.key_mapping.find(i)))
self.type_letter(chr(0))
# if not self.keys:
# return
# letter = self.keys.pop(0)
# self.type_letter(letter)
def type_letter(self, letter, modifiers=0):
data = bytes([ 0, 0, ord(letter) ])
if self.verbose > 2:
print(self.name, "sending keypress 0x%02x" % ord(letter))
# THIS METHOD SENDS ONE BYTE and works as intended
self.configuration.device.maxusb_app.send_on_endpoint_key(3, bytes([ord(letter)]))
# THIS METHOD SENDS 3 BYTES and does not work as intended (data is bytes([0,0,ord(letter)])
# self.configuration.device.maxusb_app.send_on_endpoint(3, data)
class USBKeyboardDevice(USBDevice):
name = "USB keyboard device"
def __init__(self, maxusb_app, verbose=0):
config = USBConfiguration(
1, # index
"Emulated Keyboard", # string desc
[ USBKeyboardInterface() ] # interfaces
)
USBDevice.__init__(
self,
maxusb_app,
0, # device class
0, # device subclass
0, # protocol release number
64, # max packet size for endpoint 0
0x610b, # vendor id
0x4653, # product id
0x3412, # device revision
"Maxim", # manufacturer string
"MAX3420E Enum Code", # product string
"S/N3420E", # serial number string
[ config ],
verbose=verbose
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment