Last active
December 25, 2015 16:19
-
-
Save rvantonder/7005376 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Files: | |
GoodFET.py in verbose mode, and prints out the data that gets sent over serial | |
GoodFETMAXUSB.py with type-through | |
MAXUSBApp.py with extra method send_on_endpoint_key which calls write_bytes with a length of 1 | |
USBKeyboard.py which calls send_on_endpoint_key in MAXUSBApp.py | |
Notes.txt - Description of the problem | |
Old.txt - How the old code sent key presses through |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
The problem is that when the new refactored code (USBKeyboard, MAXUSBApp.py, Facedancer.py) is used to provide type-through functionality for the HID keyboard, keys do not get typed out properly at the other end. With the current code, sending a key will transmit the following to Facedancer, but 'a' will not be typed out at the other end reliably - it simply won't show up, and "gets lost". | |
Does NOT always work: | |
>>a | |
Facedancer Tx: 40 00 04 00 1b 00 00 04 # send 'a' | |
Facedancer Tx: 40 00 02 00 43 03 # write to byte counter register (3 bytes) | |
Facedancer Tx: 40 00 04 00 1b 00 00 00 # send key up (0) | |
Facedancer Tx: 40 00 02 00 43 03 # write to byte counter register (3 bytes) | |
Description: 40: App name | |
00: verb | |
04 00: length (little endian) | |
00 1b: register | |
04 00 00: Data, with 04 corresponding to 'a' | |
But, the following ALWAYS works (which is the way the old code (GoodFETMAXUSB, GoodFET.py) sent keys in type-through (see old.txt)): | |
>>a | |
Facedancer Tx: 40 00 02 00 1b 00 # write / send each byte separately | |
Facedancer Tx: 40 00 02 00 1b 00 | |
Facedancer Tx: 40 00 02 00 1b 04 # 'a' | |
Facedancer Tx: 40 00 02 00 43 03 # write to byte counter register (3 bytes) | |
Facedancer Tx: 40 00 02 00 1b 00 # again write each byte separately | |
Facedancer Tx: 40 00 02 00 1b 00 | |
Facedancer Tx: 40 00 02 00 1b 00 # key up (0) | |
Facedancer Tx: 40 00 02 00 43 03 | |
Last note: | |
In the old code, the bytes sent over actually use register 1a (not 1b as above), and do not set the ACK flag for the byte counter register (43 above versus 42 in the original code). To make sure that this difference does not impact the results I'm seeing, I changed the above to use register 1a, and turned off acking, and found that changing these settings do not impact the results. That is, whether we use register 1a or 1b doesn't really matter, and whether we ack or not also does not matter. | |
This seems to only come up with 'manual' type through, where there is a delay in sending the keys. When there is no delay, and we have a hardcoded list of keys that need to be sent, it appears to work fine. My guess is it's something to do with timing. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Typing 'a' in GoodFETMAXUSB.py | |
>>a | |
Tx: ( 0x40, 0x00, 0x0002 ) | |
Data: ['0x1a', '0x0'] # register 1a | |
Rx: ( 0x40, 0x00, 0x0002 ) | |
Tx: ( 0x40, 0x00, 0x0002 ) | |
Data: ['0x1a', '0x0'] | |
Rx: ( 0x40, 0x00, 0x0002 ) | |
Tx: ( 0x40, 0x00, 0x0002 ) | |
Data: ['0x1a', '0x4'] # 'a' | |
Rx: ( 0x40, 0x00, 0x0002 ) | |
Tx: ( 0x40, 0x00, 0x0002 ) | |
Data: ['0x42', '0x3'] | |
Rx: ( 0x40, 0x00, 0x0002 ) | |
Tx: ( 0x40, 0x00, 0x0002 ) | |
Data: ['0x1a', '0x0'] | |
Rx: ( 0x40, 0x00, 0x0002 ) | |
Tx: ( 0x40, 0x00, 0x0002 ) | |
Data: ['0x1a', '0x0'] | |
Rx: ( 0x40, 0x00, 0x0002 ) | |
Tx: ( 0x40, 0x00, 0x0002 ) | |
Data: ['0x1a', '0x0'] | |
Rx: ( 0x40, 0x00, 0x0002 ) | |
Tx: ( 0x40, 0x00, 0x0002 ) | |
Data: ['0x42', '0x3'] | |
Rx: ( 0x40, 0x00, 0x0002 ) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# GoodFET Client Library | |
# | |
# (C) 2009 Travis Goodspeed <travis at radiantmachines.com> | |
# | |
# This code is being rewritten and refactored. You've been warned! | |
import sys, time, string, cStringIO, struct, glob, os; | |
import sqlite3; | |
fmt = ("B", "<H", None, "<L") | |
def getClient(name="GoodFET"): | |
import GoodFET, GoodFETCC, GoodFETAVR, GoodFETSPI, GoodFETMSP430, GoodFETNRF, GoodFETCCSPI; | |
if(name=="GoodFET" or name=="monitor"): return GoodFET.GoodFET(); | |
elif name=="cc" or name=="cc51": return GoodFETCC.GoodFETCC(); | |
elif name=="cc2420" or name=="ccspi": return GoodFETCCSPI.GoodFETCCSPI(); | |
elif name=="avr": return GoodFETAVR.GoodFETAVR(); | |
elif name=="spi": return GoodFETSPI.GoodFETSPI(); | |
elif name=="msp430": return GoodFETMSP430.GoodFETMSP430(); | |
elif name=="nrf": return GoodFETNRF.GoodFETNRF(); | |
print "Unsupported target: %s" % name; | |
sys.exit(0); | |
class SymbolTable: | |
"""GoodFET Symbol Table""" | |
db=sqlite3.connect(":memory:"); | |
def __init__(self, *args, **kargs): | |
self.db.execute("create table if not exists symbols(adr,name,memory,size,comment);"); | |
def get(self,name): | |
self.db.commit(); | |
c=self.db.cursor(); | |
try: | |
c.execute("select adr,memory from symbols where name=?",(name,)); | |
for row in c: | |
#print "Found it."; | |
sys.stdout.flush(); | |
return row[0]; | |
#print "No dice."; | |
except:# sqlite3.OperationalError: | |
#print "SQL error."; | |
return eval(name); | |
return eval(name); | |
def define(self,adr,name,comment="",memory="vn",size=16): | |
self.db.execute("insert into symbols(adr,name,memory,size,comment)" | |
"values(?,?,?,?,?);", ( | |
adr,name,memory,size,comment)); | |
#print "Set %s=%s." % (name,adr); | |
class GoodFETbtser: | |
"""py-bluez class for emulating py-serial.""" | |
def __init__(self,btaddr): | |
import bluetooth; | |
if btaddr==None or btaddr=="none" or btaddr=="bluetooth": | |
print "performing inquiry..." | |
nearby_devices = bluetooth.discover_devices(lookup_names = True) | |
print "found %d devices" % len(nearby_devices) | |
for addr, name in nearby_devices: | |
print " %s - '%s'" % (addr, name) | |
#TODO switch to wildcards. | |
if name=='FireFly-A6BD': | |
btaddr=addr; | |
if name=='RN42-A94A': | |
btaddr=addr; | |
print "Please set $GOODFET to the address of your device."; | |
sys.exit(); | |
print "Identified GoodFET at %s" % btaddr; | |
# Manually use the portnumber. | |
port=1; | |
print "Connecting to %s on port %i." % (btaddr, port); | |
sock=bluetooth.BluetoothSocket(bluetooth.RFCOMM); | |
self.sock=sock; | |
sock.connect((btaddr,port)); | |
sock.settimeout(10); #IMPORTANT Must be patient. | |
##This is what we'd do for a normal reset. | |
#str=""; | |
#while not str.endswith("goodfet.sf.net/"): | |
# str=self.read(64); | |
# print str; | |
# Instead, just return and hope for the best. | |
return; | |
def write(self,msg): | |
"""Send traffic.""" | |
import time; | |
self.sock.send(msg); | |
#time.sleep(0.1); | |
return; | |
def read(self,length): | |
"""Read traffic.""" | |
data=""; | |
while len(data)<length: | |
data=data+self.sock.recv(length-len(data)); | |
return data; | |
class GoodFET: | |
"""GoodFET Client Library""" | |
besilent=0; | |
app=0; | |
verb=0; | |
count=0; | |
data=""; | |
verbose=True | |
GLITCHAPP=0x71; | |
MONITORAPP=0x00; | |
symbols=SymbolTable(); | |
def __init__(self, *args, **kargs): | |
self.data=[0]; | |
def getConsole(self): | |
from GoodFETConsole import GoodFETConsole; | |
return GoodFETConsole(self); | |
def name2adr(self,name): | |
return self.symbols.get(name); | |
def timeout(self): | |
print "timeout\n"; | |
def serInit(self, port=None, timeout=2, attemptlimit=None): | |
"""Open a serial port of some kind.""" | |
import re; | |
if port==None: | |
port=os.environ.get("GOODFET"); | |
if port=="bluetooth" or (port is not None and re.match("..:..:..:..:..:..",port)): | |
self.btInit(port,2,attemptlimit); | |
else: | |
self.pyserInit(port,timeout,attemptlimit); | |
def btInit(self, port, timeout, attemptlimit): | |
"""Open a bluetooth port."""; | |
#self.verbose=True; #For debugging BT. | |
self.serialport=GoodFETbtser(port); | |
def pyserInit(self, port, timeout, attemptlimit): | |
"""Open the serial port""" | |
# Make timeout None to wait forever, 0 for non-blocking mode. | |
import serial; | |
fixserial=False; | |
if os.name=='nt' and sys.version.find('64 bit')!=-1: | |
print "WARNING: PySerial requires a 32-bit Python build in Windows."; | |
if port is None and os.environ.get("GOODFET")!=None: | |
glob_list = glob.glob(os.environ.get("GOODFET")); | |
if len(glob_list) > 0: | |
port = glob_list[0]; | |
else: | |
port = os.environ.get("GOODFET"); | |
if port is None: | |
glob_list = glob.glob("/dev/tty.usbserial*"); | |
if len(glob_list) > 0: | |
port = glob_list[0]; | |
if port is None: | |
glob_list = glob.glob("/dev/ttyUSB*"); | |
if len(glob_list) > 0: | |
port = glob_list[0]; | |
if port is None: | |
glob_list = glob.glob("/dev/ttyU0"); | |
if len(glob_list) > 0: | |
port = glob_list[0]; | |
if port is None and os.name=='nt': | |
from scanwin32 import winScan; | |
scan=winScan(); | |
for order,comport,desc,hwid in sorted(scan.comports()): | |
try: | |
if hwid.index('FTDI')==0: | |
port=comport; | |
#print "Using FTDI port %s" % port | |
except: | |
#Do nothing. | |
a=1; | |
baud=115200; | |
if(os.environ.get("platform")=='arduino' or os.environ.get("board")=='arduino'): | |
baud=19200; #Slower, for now. | |
self.serialport = serial.Serial( | |
port, | |
#9600, | |
baud, | |
parity = serial.PARITY_NONE, | |
timeout=timeout | |
) | |
self.verb=0; | |
attempts=0; | |
connected=0; | |
while connected==0: | |
while self.verb!=0x7F or self.data!="http://goodfet.sf.net/": | |
#while self.data!="http://goodfet.sf.net/": | |
#print "'%s'!=\n'%s'" % (self.data,"http://goodfet.sf.net/"); | |
if attemptlimit is not None and attempts >= attemptlimit: | |
return | |
elif attempts==2 and os.environ.get("board")!='telosb': | |
print "See the GoodFET FAQ about missing info flash."; | |
self.serialport.setTimeout(0.2); | |
elif attempts == 100: | |
print "Tried 100 times to connect and failed." | |
sys.stdout.write("Continuing to try forever.") # No newline | |
sys.stdout.flush() | |
self.verbose=True # Something isn't going right, give the user more info | |
elif attempts > 100 and attempts % 10 == 0: | |
sys.stdout.write('.') | |
sys.stdout.flush() | |
#self.serialport.flushInput() | |
#self.serialport.flushOutput() | |
#TelosB reset, prefer software to I2C SPST Switch. | |
if (os.environ.get("board")=='telosb'): | |
#print "TelosB Reset"; | |
self.telosBReset(); | |
elif (os.environ.get("board")=='z1'): | |
self.bslResetZ1(invokeBSL=0); | |
elif (os.environ.get("board")=='apimote1') or (os.environ.get("board")=='apimote'): | |
#Explicitly set RTS and DTR to halt board. | |
self.serialport.setRTS(1); | |
self.serialport.setDTR(1); | |
#RTS pin, not DTR is used for reset. | |
self.serialport.setRTS(0); | |
#print "Resetting Apimote not yet tested."; | |
else: | |
#Explicitly set RTS and DTR to halt board. | |
self.serialport.setRTS(1); | |
self.serialport.setDTR(1); | |
#Drop DTR, which is !RST, low to begin the app. | |
self.serialport.setDTR(0); | |
#self.serialport.write(chr(0x80)); | |
#self.serialport.write(chr(0x80)); | |
#self.serialport.write(chr(0x80)); | |
#self.serialport.write(chr(0x80)); | |
#self.serialport.flushInput() | |
#self.serialport.flushOutput() | |
#time.sleep(60); | |
attempts=attempts+1; | |
self.readcmd(); #Read the first command. | |
#print "Got %02x,%02x:'%s'" % (self.app,self.verb,self.data); | |
if self.verb!=0x7f: | |
#Retry again. This usually times out, but helps connect. | |
self.readcmd(); | |
#print "Retry got %02x,%02x:'%s'" % (self.app,self.verb,self.data); | |
#Here we have a connection, but maybe not a good one. | |
#print "We have a connection." | |
connected=1; | |
if attempts >= 100: | |
print "" # Add a newline | |
olds=self.infostring(); | |
clocking=self.monitorclocking(); | |
for foo in range(1,30): | |
if not self.monitorecho(): | |
if self.verbose: | |
print "Comm error on %i try, resyncing out of %s." % (foo, | |
clocking); | |
connected=0; | |
break; | |
if self.verbose: print "Connected after %02i attempts." % attempts; | |
self.mon_connected(); | |
self.serialport.setTimeout(12); | |
def serClose(self): | |
self.serialport.close(); | |
def telosSetSCL(self, level): | |
self.serialport.setRTS(not level) | |
def telosSetSDA(self, level): | |
self.serialport.setDTR(not level) | |
def telosI2CStart(self): | |
self.telosSetSDA(1) | |
self.telosSetSCL(1) | |
self.telosSetSDA(0) | |
def telosI2CStop(self): | |
self.telosSetSDA(0) | |
self.telosSetSCL(1) | |
self.telosSetSDA(1) | |
def telosI2CWriteBit(self, bit): | |
self.telosSetSCL(0) | |
self.telosSetSDA(bit) | |
time.sleep(2e-6) | |
self.telosSetSCL(1) | |
time.sleep(1e-6) | |
self.telosSetSCL(0) | |
def telosI2CWriteByte(self, byte): | |
self.telosI2CWriteBit( byte & 0x80 ); | |
self.telosI2CWriteBit( byte & 0x40 ); | |
self.telosI2CWriteBit( byte & 0x20 ); | |
self.telosI2CWriteBit( byte & 0x10 ); | |
self.telosI2CWriteBit( byte & 0x08 ); | |
self.telosI2CWriteBit( byte & 0x04 ); | |
self.telosI2CWriteBit( byte & 0x02 ); | |
self.telosI2CWriteBit( byte & 0x01 ); | |
self.telosI2CWriteBit( 0 ); # "acknowledge" | |
def telosI2CWriteCmd(self, addr, cmdbyte): | |
self.telosI2CStart() | |
self.telosI2CWriteByte( 0x90 | (addr << 1) ) | |
self.telosI2CWriteByte( cmdbyte ) | |
self.telosI2CStop() | |
def bslResetZ1(self, invokeBSL=0): | |
''' | |
Applies BSL entry sequence on RST/NMI and TEST/VPP pins | |
Parameters: | |
invokeBSL = 1: complete sequence | |
invokeBSL = 0: only RST/NMI pin accessed | |
By now only BSL mode is accessed | |
''' | |
#if DEBUG > 1: sys.stderr.write("* bslReset(invokeBSL=%s)\n" % invokeBSL) | |
if invokeBSL: | |
#sys.stderr.write("in Z1 bsl reset...\n") | |
time.sleep(0.1) | |
self.writepicROM(0xFF, 0xFF) | |
time.sleep(0.1) | |
#sys.stderr.write("z1 bsl reset done...\n") | |
else: | |
#sys.stderr.write("in Z1 reset...\n") | |
time.sleep(0.1) | |
self.writepicROM(0xFF, 0xFE) | |
time.sleep(0.1) | |
#sys.stderr.write("z1 reset done...\n") | |
def writepicROM(self, address, data): | |
''' Writes data to @address''' | |
for i in range(7,-1,-1): | |
self.picROMclock((address >> i) & 0x01) | |
self.picROMclock(0) | |
recbuf = 0 | |
for i in range(7,-1,-1): | |
s = ((data >> i) & 0x01) | |
#print s | |
if i < 1: | |
r = not self.picROMclock(s, True) | |
else: | |
r = not self.picROMclock(s) | |
recbuf = (recbuf << 1) + r | |
self.picROMclock(0, True) | |
#k = 1 | |
#while not self.serial.getCTS(): | |
# pass | |
#time.sleep(0.1) | |
return recbuf | |
def readpicROM(self, address): | |
''' reads a byte from @address''' | |
for i in range(7,-1,-1): | |
self.picROMclock((address >> i) & 0x01) | |
self.picROMclock(1) | |
recbuf = 0 | |
r = 0 | |
for i in range(7,-1,-1): | |
r = self.picROMclock(0) | |
recbuf = (recbuf << 1) + r | |
self.picROMclock(r) | |
#time.sleep(0.1) | |
return recbuf | |
#This seems more reliable when slowed. | |
def picROMclock(self, masterout, slow = True): | |
#print "setting masterout to "+str(masterout) | |
self.serialport.setRTS(masterout) | |
self.serialport.setDTR(1) | |
#time.sleep(0.02) | |
self.serialport.setDTR(0) | |
if slow: | |
time.sleep(0.02) | |
return self.serialport.getCTS() | |
def picROMfastclock(self, masterout): | |
#print "setting masterout to "+str(masterout) | |
self.serialport.setRTS(masterout) | |
self.serialport.setDTR(1) | |
self.serialport.setDTR(0) | |
time.sleep(0.02) | |
return self.serialport.getCTS() | |
def telosBReset(self,invokeBSL=0): | |
# "BSL entry sequence at dedicated JTAG pins" | |
# rst !s0: 0 0 0 0 1 1 | |
# tck !s1: 1 0 1 0 0 1 | |
# s0|s1: 1 3 1 3 2 0 | |
# "BSL entry sequence at shared JTAG pins" | |
# rst !s0: 0 0 0 0 1 1 | |
# tck !s1: 0 1 0 1 1 0 | |
# s0|s1: 3 1 3 1 0 2 | |
if invokeBSL: | |
self.telosI2CWriteCmd(0,1) | |
self.telosI2CWriteCmd(0,3) | |
self.telosI2CWriteCmd(0,1) | |
self.telosI2CWriteCmd(0,3) | |
self.telosI2CWriteCmd(0,2) | |
self.telosI2CWriteCmd(0,0) | |
else: | |
self.telosI2CWriteCmd(0,3) | |
self.telosI2CWriteCmd(0,2) | |
# This line was not defined inside the else: block, not sure where it | |
# should be however | |
self.telosI2CWriteCmd(0,0) | |
time.sleep(0.250) #give MSP430's oscillator time to stabilize | |
self.serialport.flushInput() #clear buffers | |
def getbuffer(self,size=0x1c00): | |
writecmd(0,0xC2,[size&0xFF,(size>>16)&0xFF]); | |
print "Got %02x%02x buffer size." % (self.data[1],self.data[0]); | |
def writecmd(self, app, verb, count=0, data=[]): | |
"""Write a command and some data to the GoodFET.""" | |
self.serialport.write(chr(app)); | |
self.serialport.write(chr(verb)); | |
#if data!=None: | |
# count=len(data); #Initial count ignored. | |
#print "TX %02x %02x %04x" % (app,verb,count); | |
#little endian 16-bit length | |
self.serialport.write(chr(count&0xFF)); | |
self.serialport.write(chr(count>>8)); | |
if self.verbose: | |
print "Tx: ( 0x%02x, 0x%02x, 0x%04x )" % ( app, verb, count ) | |
try: | |
print "Data:",map(hex, data) | |
except: | |
pass | |
#print "count=%02x, len(data)=%04x" % (count,len(data)); | |
if count!=0: | |
if(isinstance(data,list)): | |
for i in range(0,count): | |
#print "Converting %02x at %i" % (data[i],i) | |
data[i]=chr(data[i]); | |
#print type(data); | |
outstr=''.join(data); | |
self.serialport.write(outstr); | |
if not self.besilent: | |
return self.readcmd() | |
else: | |
return [] | |
def readcmd(self): | |
"""Read a reply from the GoodFET.""" | |
while 1:#self.serialport.inWaiting(): # Loop while input data is available | |
try: | |
#print "Reading..."; | |
self.app=ord(self.serialport.read(1)); | |
#print "APP=%02x" % self.app; | |
self.verb=ord(self.serialport.read(1)); | |
#Fixes an obscure bug in the TelosB. | |
if self.app==0x00: | |
while self.verb==0x00: | |
self.verb=ord(self.serialport.read(1)); | |
#print "VERB=%02x" % self.verb; | |
self.count=( | |
ord(self.serialport.read(1)) | |
+(ord(self.serialport.read(1))<<8) | |
); | |
if self.verbose: | |
print "Rx: ( 0x%02x, 0x%02x, 0x%04x )" % ( self.app, self.verb, self.count ) | |
#Debugging string; print, but wait. | |
if self.app==0xFF: | |
if self.verb==0xFF: | |
print "# DEBUG %s" % self.serialport.read(self.count) | |
elif self.verb==0xFE: | |
print "# DEBUG 0x%x" % struct.unpack(fmt[self.count-1], self.serialport.read(self.count))[0] | |
elif self.verb==0xFD: | |
#Do nothing, just wait so there's no timeout. | |
print "# NOP."; | |
sys.stdout.flush(); | |
else: | |
self.data=self.serialport.read(self.count); | |
return self.data; | |
except TypeError: | |
if self.connected: | |
print "Warning: waiting for serial read timed out (most likely)."; | |
#print "This shouldn't happen after syncing. Exiting for safety."; | |
#sys.exit(-1) | |
return self.data; | |
#Glitching stuff. | |
def glitchApp(self,app): | |
"""Glitch into a device by its application.""" | |
self.data=[app&0xff]; | |
self.writecmd(self.GLITCHAPP,0x80,1,self.data); | |
#return ord(self.data[0]); | |
def glitchVerb(self,app,verb,data): | |
"""Glitch during a transaction.""" | |
if data==None: data=[]; | |
self.data=[app&0xff, verb&0xFF]+data; | |
self.writecmd(self.GLITCHAPP,0x81,len(self.data),self.data); | |
#return ord(self.data[0]); | |
def glitchstart(self): | |
"""Glitch into the AVR application.""" | |
self.glitchVerb(self.APP,0x20,None); | |
def glitchstarttime(self): | |
"""Measure the timer of the START verb.""" | |
return self.glitchTime(self.APP,0x20,None); | |
def glitchTime(self,app,verb,data): | |
"""Time the execution of a verb.""" | |
if data==None: data=[]; | |
self.data=[app&0xff, verb&0xFF]+data; | |
print "Timing app %02x verb %02x." % (app,verb); | |
self.writecmd(self.GLITCHAPP,0x82,len(self.data),self.data); | |
time=ord(self.data[0])+(ord(self.data[1])<<8); | |
print "Timed to be %i." % time; | |
return time; | |
def glitchVoltages(self,low=0x0880, high=0x0fff): | |
"""Set glitching voltages. (0x0fff is max.)""" | |
self.data=[low&0xff, (low>>8)&0xff, | |
high&0xff, (high>>8)&0xff]; | |
self.writecmd(self.GLITCHAPP,0x90,4,self.data); | |
#return ord(self.data[0]); | |
def glitchRate(self,count=0x0800): | |
"""Set glitching count period.""" | |
self.data=[count&0xff, (count>>8)&0xff]; | |
self.writecmd(self.GLITCHAPP,0x91,2, | |
self.data); | |
#return ord(self.data[0]); | |
#Monitor stuff | |
def silent(self,s=0): | |
"""Transmissions halted when 1.""" | |
self.besilent=s; | |
print "besilent is %i" % self.besilent; | |
self.writecmd(0,0xB0,1,[s]); | |
connected=0; | |
def mon_connected(self): | |
"""Announce to the monitor that the connection is good.""" | |
self.connected=1; | |
self.writecmd(0,0xB1,0,[]); | |
def out(self,byte): | |
"""Write a byte to P5OUT.""" | |
self.writecmd(0,0xA1,1,[byte]); | |
def dir(self,byte): | |
"""Write a byte to P5DIR.""" | |
self.writecmd(0,0xA0,1,[byte]); | |
def call(self,adr): | |
"""Call to an address.""" | |
self.writecmd(0,0x30,2, | |
[adr&0xFF,(adr>>8)&0xFF]); | |
def execute(self,code): | |
"""Execute supplied code.""" | |
self.writecmd(0,0x31,2,#len(code), | |
code); | |
def MONpeek8(self,address): | |
"""Read a byte of memory from the monitor.""" | |
self.data=[address&0xff,address>>8]; | |
self.writecmd(0,0x02,2,self.data); | |
#self.readcmd(); | |
return ord(self.data[0]); | |
def MONpeek16(self,address): | |
"""Read a word of memory from the monitor.""" | |
return self.MONpeek8(address)+(self.MONpeek8(address+1)<<8); | |
def peek(self,address): | |
"""Read a word of memory from the monitor.""" | |
return self.MONpeek8(address)+(self.MONpeek8(address+1)<<8); | |
def eeprompeek(self,address): | |
"""Read a word of memory from the monitor.""" | |
print "EEPROM peeking not supported for the monitor."; | |
#return self.MONpeek8(address)+(self.MONpeek8(address+1)<<8); | |
def peekbysym(self,name): | |
"""Read a value by its symbol name.""" | |
#TODO include memory in symbol. | |
reg=self.symbols.get(name); | |
return self.peek8(reg,"data"); | |
def pokebysym(self,name,val): | |
"""Write a value by its symbol name.""" | |
#TODO include memory in symbol. | |
reg=self.symbols.get(name); | |
return self.pokebyte(reg,val); | |
def pokebyte(self,address,value,memory="vn"): | |
"""Set a byte of memory by the monitor.""" | |
self.data=[address&0xff,address>>8,value]; | |
self.writecmd(0,0x03,3,self.data); | |
return ord(self.data[0]); | |
def poke16(self,address,value): | |
"""Set a word of memory by the monitor.""" | |
self.MONpoke16(address,value); | |
def MONpoke16(self,address,value): | |
"""Set a word of memory by the monitor.""" | |
self.pokebyte(address,value&0xFF); | |
self.pokebyte(address,(value>>8)&0xFF); | |
return value; | |
def setsecret(self,value): | |
"""Set a secret word for later retreival. Used by glitcher.""" | |
#self.eeprompoke(0,value); | |
#self.eeprompoke(1,value); | |
print "Secret setting is not yet suppored for this target."; | |
print "Aborting."; | |
def getsecret(self): | |
"""Get a secret word. Used by glitcher.""" | |
#self.eeprompeek(0); | |
print "Secret getting is not yet suppored for this target."; | |
print "Aborting."; | |
sys.exit(); | |
def dumpmem(self,begin,end): | |
i=begin; | |
while i<end: | |
print "%04x %04x" % (i, self.MONpeek16(i)); | |
i+=2; | |
def monitor_ram_pattern(self): | |
"""Overwrite all of RAM with 0xBEEF.""" | |
self.writecmd(0,0x90,0,self.data); | |
return; | |
def monitor_ram_depth(self): | |
"""Determine how many bytes of RAM are unused by looking for 0xBEEF..""" | |
self.writecmd(0,0x91,0,self.data); | |
return ord(self.data[0])+(ord(self.data[1])<<8); | |
#Baud rates. | |
baudrates=[115200, | |
9600, | |
19200, | |
38400, | |
57600, | |
115200]; | |
def setBaud(self,baud): | |
"""Change the baud rate. TODO fix this.""" | |
rates=self.baudrates; | |
self.data=[baud]; | |
print "Changing FET baud." | |
self.serialport.write(chr(0x00)); | |
self.serialport.write(chr(0x80)); | |
self.serialport.write(chr(1)); | |
self.serialport.write(chr(baud)); | |
print "Changed host baud." | |
self.serialport.setBaudrate(rates[baud]); | |
time.sleep(1); | |
self.serialport.flushInput() | |
self.serialport.flushOutput() | |
print "Baud is now %i." % rates[baud]; | |
return; | |
def readbyte(self): | |
return ord(self.serialport.read(1)); | |
def findbaud(self): | |
for r in self.baudrates: | |
print "\nTrying %i" % r; | |
self.serialport.setBaudrate(r); | |
#time.sleep(1); | |
self.serialport.flushInput() | |
self.serialport.flushOutput() | |
for i in range(1,10): | |
self.readbyte(); | |
print "Read %02x %02x %02x %02x" % ( | |
self.readbyte(),self.readbyte(),self.readbyte(),self.readbyte()); | |
def monitortest(self): | |
"""Self-test several functions through the monitor.""" | |
print "Performing monitor self-test."; | |
self.monitorclocking(); | |
for f in range(0,3000): | |
a=self.MONpeek16(0x0c00); | |
b=self.MONpeek16(0x0c02); | |
if a!=0x0c04 and a!=0x0c06: | |
print "ERROR Fetched %04x, %04x" % (a,b); | |
self.pokebyte(0x0021,0); #Drop LED | |
if self.MONpeek8(0x0021)!=0: | |
print "ERROR, P1OUT not cleared."; | |
self.pokebyte(0x0021,1); #Light LED | |
if not self.monitorecho(): | |
print "Echo test failed."; | |
print "Self-test complete."; | |
self.monitorclocking(); | |
def monitorecho(self): | |
data="The quick brown fox jumped over the lazy dog."; | |
self.writecmd(self.MONITORAPP,0x81,len(data),data); | |
if self.data!=data: | |
if self.verbose: | |
print "Comm error recognized by monitorecho(), got:\n%s" % self.data; | |
return 0; | |
return 1; | |
def monitor_info(self): | |
print "GoodFET with %s MCU" % self.infostring(); | |
print "Clocked at %s" % self.monitorclocking(); | |
return 1; | |
def testleds(self): | |
print "Flashing LEDs" | |
self.writecmd(self.MONITORAPP,0xD0,0,"") | |
try: | |
print "Flashed %d LED." % ord(self.data) | |
except: | |
print "Unable to process response:", self.data | |
return 1 | |
def monitor_list_apps(self, full=False): | |
self.monitor_info() | |
old_value = self.besilent | |
self.besilent = True # turn off automatic call to readcmd | |
self.writecmd(self.MONITORAPP, 0x82, 1, [int(full)]); | |
self.besilent = old_value | |
# read the build date string | |
self.readcmd() | |
print "Build Date: %s" % self.data | |
print "Firmware apps:" | |
while True: | |
self.readcmd() | |
if self.count == 0: | |
break | |
print self.data | |
return 1; | |
def monitorclocking(self): | |
"""Return the 16-bit clocking value.""" | |
return "0x%04x" % self.monitorgetclock(); | |
def monitorsetclock(self,clock): | |
"""Set the clocking value.""" | |
self.MONpoke16(0x56, clock); | |
def monitorgetclock(self): | |
"""Get the clocking value.""" | |
if(os.environ.get("platform")=='arduino' or os.environ.get("board")=='arduino'): | |
return 0xDEAD; | |
#Check for MSP430 before peeking this. | |
return self.MONpeek16(0x56); | |
# The following functions ought to be implemented in | |
# every client. | |
def infostring(self): | |
if(os.environ.get("platform")=='arduino' or os.environ.get("board")=='arduino'): | |
return "Arduino"; | |
else: | |
a=self.MONpeek8(0xff0); | |
b=self.MONpeek8(0xff1); | |
return "%02x%02x" % (a,b); | |
def lock(self): | |
print "Locking Unsupported."; | |
def erase(self): | |
print "Erasure Unsupported."; | |
def setup(self): | |
return; | |
def start(self): | |
return; | |
def test(self): | |
print "Unimplemented."; | |
return; | |
def status(self): | |
print "Unimplemented."; | |
return; | |
def halt(self): | |
print "Unimplemented."; | |
return; | |
def resume(self): | |
print "Unimplemented."; | |
return; | |
def getpc(self): | |
print "Unimplemented."; | |
return 0xdead; | |
def flash(self,file): | |
"""Flash an intel hex file to code memory.""" | |
print "Flash not implemented."; | |
def dump(self,file,start=0,stop=0xffff): | |
"""Dump an intel hex file from code memory.""" | |
print "Dump not implemented."; | |
def peek32(self,address, memory="vn"): | |
"""Peek 32 bits.""" | |
return (self.peek16(address,memory)+ | |
(self.peek16(address+2,memory)<<16)); | |
def peek16(self,address, memory="vn"): | |
"""Peek 16 bits of memory.""" | |
return (self.peek8(address,memory)+ | |
(self.peek8(address+1,memory)<<8)); | |
def peek8(self,address, memory="vn"): | |
"""Peek a byte of memory.""" | |
return self.MONpeek8(address); #monitor | |
def peekblock(self,address,length,memory="vn"): | |
"""Return a block of data.""" | |
data=range(0,length); | |
for foo in range(0,length): | |
data[foo]=self.peek8(address+foo,memory); | |
return data; | |
def pokeblock(self,address,bytes,memory="vn"): | |
"""Poke a block of a data into memory at an address.""" | |
for foo in bytes: | |
self.pokebyte(address,foo,memory); | |
address=address+1; | |
return; | |
def loadsymbols(self): | |
"""Load symbols from a file.""" | |
return; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# GoodFET Client Library for Maxim USB Chips. | |
# | |
# (C) 2012 Travis Goodspeed <travis at radiantmachines.com> | |
# | |
# This code is being rewritten and refactored. You've been warned! | |
import sys, time, string, cStringIO, struct, glob, os; | |
import warnings | |
from GoodFET import GoodFET; | |
warnings.warn( | |
"""This library will soon be deprecated in favor of the USB*.py libraries.""" | |
) | |
#Handy registers. | |
rEP0FIFO=0 | |
rEP1OUTFIFO=1 | |
rEP2INFIFO=2 | |
rEP3INFIFO=3 | |
rSUDFIFO=4 | |
rEP0BC=5 | |
rEP1OUTBC=6 | |
rEP2INBC=7 | |
rEP3INBC=8 | |
rEPSTALLS=9 | |
rCLRTOGS=10 | |
rEPIRQ=11 | |
rEPIEN=12 | |
rUSBIRQ=13 | |
rUSBIEN=14 | |
rUSBCTL=15 | |
rCPUCTL=16 | |
rPINCTL=17 | |
rREVISION=18 | |
rFNADDR=19 | |
rIOPINS=20 | |
rIOPINS1=20 #Same as rIOPINS | |
rIOPINS2=21 | |
rHIRQ=25 | |
rHIEN=26 | |
rMODE=27 | |
rPERADDR=28 | |
rHCTL=29 | |
rHXFR=30 | |
rHRSL=31 | |
#Host mode registers. | |
rRCVFIFO =1 | |
rSNDFIFO =2 | |
rRCVBC =6 | |
rSNDBC =7 | |
rHIRQ =25 | |
# R11 EPIRQ register bits | |
bmSUDAVIRQ =0x20 | |
bmIN3BAVIRQ =0x10 | |
bmIN2BAVIRQ =0x08 | |
bmOUT1DAVIRQ= 0x04 | |
bmOUT0DAVIRQ= 0x02 | |
bmIN0BAVIRQ =0x01 | |
# R12 EPIEN register bits | |
bmSUDAVIE =0x20 | |
bmIN3BAVIE =0x10 | |
bmIN2BAVIE =0x08 | |
bmOUT1DAVIE =0x04 | |
bmOUT0DAVIE =0x02 | |
bmIN0BAVIE =0x01 | |
# ************************ | |
# Standard USB Requests | |
SR_GET_STATUS =0x00 # Get Status | |
SR_CLEAR_FEATURE =0x01 # Clear Feature | |
SR_RESERVED =0x02 # Reserved | |
SR_SET_FEATURE =0x03 # Set Feature | |
SR_SET_ADDRESS =0x05 # Set Address | |
SR_GET_DESCRIPTOR =0x06 # Get Descriptor | |
SR_SET_DESCRIPTOR =0x07 # Set Descriptor | |
SR_GET_CONFIGURATION =0x08 # Get Configuration | |
SR_SET_CONFIGURATION =0x09 # Set Configuration | |
SR_GET_INTERFACE =0x0a # Get Interface | |
SR_SET_INTERFACE =0x0b # Set Interface | |
# Get Descriptor codes | |
GD_DEVICE =0x01 # Get device descriptor: Device | |
GD_CONFIGURATION =0x02 # Get device descriptor: Configuration | |
GD_STRING =0x03 # Get device descriptor: String | |
GD_HID =0x21 # Get descriptor: HID | |
GD_REPORT =0x22 # Get descriptor: Report | |
# SETUP packet header offsets | |
bmRequestType =0 | |
bRequest =1 | |
wValueL =2 | |
wValueH =3 | |
wIndexL =4 | |
wIndexH =5 | |
wLengthL =6 | |
wLengthH =7 | |
# HID bRequest values | |
GET_REPORT =1 | |
GET_IDLE =2 | |
GET_PROTOCOL =3 | |
SET_REPORT =9 | |
SET_IDLE =0x0A | |
SET_PROTOCOL =0x0B | |
INPUT_REPORT =1 | |
# PINCTL bits | |
bmEP3INAK =0x80 | |
bmEP2INAK =0x40 | |
bmEP1INAK =0x20 | |
bmFDUPSPI =0x10 | |
bmINTLEVEL =0x08 | |
bmPOSINT =0x04 | |
bmGPXB =0x02 | |
bmGPXA =0x01 | |
# rUSBCTL bits | |
bmHOSCSTEN =0x80 | |
bmVBGATE =0x40 | |
bmCHIPRES =0x20 | |
bmPWRDOWN =0x10 | |
bmCONNECT =0x08 | |
bmSIGRWU =0x04 | |
# USBIRQ bits | |
bmURESDNIRQ =0x80 | |
bmVBUSIRQ =0x40 | |
bmNOVBUSIRQ =0x20 | |
bmSUSPIRQ =0x10 | |
bmURESIRQ =0x08 | |
bmBUSACTIRQ =0x04 | |
bmRWUDNIRQ =0x02 | |
bmOSCOKIRQ =0x01 | |
# MODE bits | |
bmHOST =0x01 | |
bmLOWSPEED =0x02 | |
bmHUBPRE =0x04 | |
bmSOFKAENAB =0x08 | |
bmSEPIRQ =0x10 | |
bmDELAYISO =0x20 | |
bmDMPULLDN =0x40 | |
bmDPPULLDN =0x80 | |
# PERADDR/HCTL bits | |
bmBUSRST =0x01 | |
bmFRMRST =0x02 | |
bmSAMPLEBUS =0x04 | |
bmSIGRSM =0x08 | |
bmRCVTOG0 =0x10 | |
bmRCVTOG1 =0x20 | |
bmSNDTOG0 =0x40 | |
bmSNDTOG1 =0x80 | |
# rHXFR bits | |
# Host XFR token values for writing the HXFR register (R30). | |
# OR this bit field with the endpoint number in bits 3:0 | |
tokSETUP =0x10 # HS=0, ISO=0, OUTNIN=0, SETUP=1 | |
tokIN =0x00 # HS=0, ISO=0, OUTNIN=0, SETUP=0 | |
tokOUT =0x20 # HS=0, ISO=0, OUTNIN=1, SETUP=0 | |
tokINHS =0x80 # HS=1, ISO=0, OUTNIN=0, SETUP=0 | |
tokOUTHS =0xA0 # HS=1, ISO=0, OUTNIN=1, SETUP=0 | |
tokISOIN =0x40 # HS=0, ISO=1, OUTNIN=0, SETUP=0 | |
tokISOOUT =0x60 # HS=0, ISO=1, OUTNIN=1, SETUP=0 | |
# rRSL bits | |
bmRCVTOGRD =0x10 | |
bmSNDTOGRD =0x20 | |
bmKSTATUS =0x40 | |
bmJSTATUS =0x80 | |
# Host error result codes, the 4 LSB's in the HRSL register. | |
hrSUCCESS =0x00 | |
hrBUSY =0x01 | |
hrBADREQ =0x02 | |
hrUNDEF =0x03 | |
hrNAK =0x04 | |
hrSTALL =0x05 | |
hrTOGERR =0x06 | |
hrWRONGPID =0x07 | |
hrBADBC =0x08 | |
hrPIDERR =0x09 | |
hrPKTERR =0x0A | |
hrCRCERR =0x0B | |
hrKERR =0x0C | |
hrJERR =0x0D | |
hrTIMEOUT =0x0E | |
hrBABBLE =0x0F | |
# HIRQ bits | |
bmBUSEVENTIRQ =0x01 # indicates BUS Reset Done or BUS Resume | |
bmRWUIRQ =0x02 | |
bmRCVDAVIRQ =0x04 | |
bmSNDBAVIRQ =0x08 | |
bmSUSDNIRQ =0x10 | |
bmCONDETIRQ =0x20 | |
bmFRAMEIRQ =0x40 | |
bmHXFRDNIRQ =0x80 | |
class GoodFETMAXUSB(GoodFET): | |
MAXUSBAPP=0x40; | |
usbverbose=False; | |
def service_irqs(self): | |
"""Handle USB interrupt events.""" | |
epirq=self.rreg(rEPIRQ); | |
usbirq=self.rreg(rUSBIRQ); | |
#Are we being asked for setup data? | |
if(epirq&bmSUDAVIRQ): #Setup Data Requested | |
self.wreg(rEPIRQ,bmSUDAVIRQ); #Clear the bit | |
self.do_SETUP(); | |
if(epirq&bmOUT1DAVIRQ): #OUT1-OUT packet | |
self.do_OUT1(); | |
self.wreg(rEPIRQ,bmOUT1DAVIRQ); #Clear the bit *AFTER* servicing. | |
if(epirq&bmIN3BAVIRQ): #IN3-IN packet | |
self.do_IN3(); | |
#self.wreg(rEPIRQ,bmIN3BAVIRQ); #Clear the bit | |
if(epirq&bmIN2BAVIRQ): #IN2 packet | |
self.do_IN2(); | |
#self.wreg(rEPIRQ,bmIN2BAVIRQ); #Clear the bit | |
#else: | |
# print "No idea how to service this IRQ: %02x" % epirq; | |
def do_IN2(self): | |
"""Overload this.""" | |
def do_IN3(self): | |
"""Overload this.""" | |
def do_OUT1(self): | |
"""Overload this.""" | |
if self.usbverbose: print "Ignoring an OUT1 interrupt."; | |
def setup2str(self,SUD): | |
"""Converts the header of a setup packet to a string.""" | |
return "bmRequestType=0x%02x, bRequest=0x%02x, wValue=0x%04x, wIndex=0x%04x, wLength=0x%04x" % ( | |
ord(SUD[0]), ord(SUD[1]), | |
ord(SUD[2])+(ord(SUD[3])<<8), | |
ord(SUD[4])+(ord(SUD[5])<<8), | |
ord(SUD[6])+(ord(SUD[7])<<8) | |
); | |
def MAXUSBsetup(self): | |
"""Move the FET into the MAXUSB application.""" | |
self.writecmd(self.MAXUSBAPP,0x10,0,self.data); #MAXUSB/SETUP | |
self.writecmd(self.MAXUSBAPP,0x10,0,self.data); #MAXUSB/SETUP | |
self.writecmd(self.MAXUSBAPP,0x10,0,self.data); #MAXUSB/SETUP | |
print "Connected to MAX342x Rev. %x" % (self.rreg(rREVISION)); | |
self.wreg(rPINCTL,0x18); #Set duplex and negative INT level. | |
def MAXUSBtrans8(self,byte): | |
"""Read and write 8 bits by MAXUSB.""" | |
data=self.MAXUSBtrans([byte]); | |
return ord(data[0]); | |
def MAXUSBtrans(self,data): | |
"""Exchange data by MAXUSB.""" | |
self.data=data; | |
self.writecmd(self.MAXUSBAPP,0x00,len(data),data); | |
return self.data; | |
def rreg(self,reg): | |
"""Peek 8 bits from a register.""" | |
data=[reg<<3,0]; | |
self.writecmd(self.MAXUSBAPP,0x00,len(data),data); | |
return ord(self.data[1]); | |
def rregAS(self,reg): | |
"""Peek 8 bits from a register, setting AS.""" | |
data=[(reg<<3)|1,0]; | |
self.writecmd(self.MAXUSBAPP,0x00,len(data),data); | |
return ord(self.data[1]); | |
def wreg(self,reg,value): | |
"""Poke 8 bits into a register.""" | |
data=[(reg<<3)|2,value]; | |
self.writecmd(self.MAXUSBAPP,0x00,len(data),data); | |
return value; | |
def wregAS(self,reg,value): | |
"""Poke 8 bits into a register, setting AS.""" | |
data=[(reg<<3)|3,value]; | |
self.writecmd(self.MAXUSBAPP,0x00,len(data),data); | |
return value; | |
def readbytes(self,reg,length): | |
"""Peek some bytes from a register.""" | |
data=[(reg<<3)]+range(0,length); | |
self.writecmd(self.MAXUSBAPP,0x00,len(data),data); | |
toret=self.data[1:len(self.data)]; | |
ashex=""; | |
for foo in toret: | |
ashex=ashex+(" %02x"%ord(foo)); | |
if self.usbverbose: print "GET %02x==%s" % (reg,ashex); | |
return toret; | |
def readbytesAS(self,reg,length): | |
"""Peek some bytes from a register, acking prior transfer.""" | |
data=[(reg<<3)|1]+range(0,length); | |
self.writecmd(self.MAXUSBAPP,0x00,len(data),data); | |
toret=self.data[1:len(self.data)]; | |
ashex=""; | |
for foo in toret: | |
ashex=ashex+(" %02x"%ord(foo)); | |
if self.usbverbose: print "GETAS %02x==%s" % (reg,ashex); | |
return toret; | |
def fifo_ep3in_tx(self,data): | |
"""Sends the data out of EP3 in 64-byte chunks.""" | |
#Wait for the buffer to be free before starting. | |
while not(self.rreg(rEPIRQ)&bmIN3BAVIRQ): pass; | |
count=len(data); | |
pos=0; | |
while count>0: | |
#Send 64-byte chunks or the remainder. | |
c=min(count,64); | |
self.writebytes(rEP3INFIFO, | |
data[pos:pos+c]); | |
self.wregAS(rEP3INBC,c); | |
count=count-c; | |
pos=pos+c; | |
#Wait for the buffer to be free before continuing. | |
while not(self.rreg(rEPIRQ)&bmIN3BAVIRQ): pass; | |
return; | |
def ctl_write_nd(self,request): | |
"""Control Write with no data stage. Assumes PERADDR is set | |
and the SUDFIFO contains the 8 setup bytes. Returns with | |
result code = HRSLT[3:0] (HRSL register). If there is an | |
error, the 4MSBits of the returned value indicate the stage 1 | |
or 2.""" | |
# 1. Send the SETUP token and 8 setup bytes. | |
# Should ACK immediately. | |
self.writebytes(rSUDFIFO,request); | |
resultcode=self.send_packet(tokSETUP,0); #SETUP packet to EP0. | |
if resultcode: return resultcode; | |
# 2. No data stage, so the last operation is to send an IN | |
# token to the peripheral as the STATUS (handhsake) stage of | |
# this control transfer. We should get NAK or the DATA1 PID. | |
# When we get back to the DATA1 PID the 3421 automatically | |
# sends the closing NAK. | |
resultcode=self.send_packet(tokINHS,0); #Function takes care of retries. | |
if resultcode: return resultcode; | |
return 0; | |
def ctl_read(self,request): | |
"""Control read transfer, used in Host mode.""" | |
resultcode=0; | |
bytes_to_read=request[6]+256*request[7]; | |
##SETUP packet | |
self.writebytes(rSUDFIFO,request); #Load the FIFO | |
resultcode=self.send_packet(tokSETUP,0); #SETUP packet to EP0 | |
if resultcode: | |
print "Failed to get ACK on SETUP request in ctl_read()." | |
return resultcode; | |
self.wreg(rHCTL,bmRCVTOG1); #FIRST data packet in CTL transfer uses DATA1 toggle. | |
resultcode=self.IN_Transfer(0,bytes_to_read); | |
if resultcode: | |
print "Failed on IN Transfer in ctl_read()"; | |
return resultcode; | |
self.IN_nak_count=self.nak_count; | |
#The OUT status stage. | |
resultcode=self.send_packet(tokOUTHS,0); | |
if resultcode: | |
print "Failed on OUT Status stage in ctl_read()"; | |
return resultcode; | |
return 0; #Success | |
xfrdata=[]; #Ugly variable used only by a few functions. FIXME | |
def IN_Transfer(self,endpoint,INbytes): | |
"""Does an IN transfer to an endpoint, used for Host mode.""" | |
xfrsize=INbytes; | |
xfrlen=0; | |
self.xfrdata=[]; | |
while 1: | |
resultcode=self.send_packet(tokIN,endpoint); #IN packet to EP. NAKS taken care of. | |
if resultcode: return resultcode; | |
pktsize=self.rreg(rRCVBC); #Numer of RXed bytes. | |
#Very innefficient, move this to C if performance is needed. | |
for j in range(0,pktsize): | |
self.xfrdata=self.xfrdata+[self.rreg(rRCVFIFO)]; | |
xfrsize=self.xfrdata[0]; | |
self.wreg(rHIRQ,bmRCVDAVIRQ); #Clear IRQ | |
xfrlen=xfrlen+pktsize; #Add byte count to total transfer length. | |
#print "%i / %i" % (xfrlen,xfrsize) | |
#Packet is complete if: | |
# 1. The device sent a short packet, <maxPacketSize | |
# 2. INbytes have been transfered. | |
if (pktsize<self.maxPacketSize) or (xfrlen>=xfrsize): | |
self.last_transfer_size=xfrlen; | |
ashex=""; | |
for foo in self.xfrdata: | |
ashex=ashex+(" %02x"%foo); | |
#print "INPACKET EP%i==%s (0x%02x bytes remain)" % (endpoint,ashex,xfrsize); | |
return resultcode; | |
RETRY_LIMIT=3; | |
NAK_LIMIT=300; | |
def send_packet(self,token,endpoint): | |
"""Send a packet to an endpoint as the Host, taking care of NAKs. | |
Don't use this for device code.""" | |
self.retry_count=0; | |
self.nak_count=0; | |
#Repeat until NAK_LIMIT or RETRY_LIMIT is reached. | |
while self.nak_count<self.NAK_LIMIT and self.retry_count<self.RETRY_LIMIT: | |
self.wreg(rHXFR,(token|endpoint)); #launch the transfer | |
while not (self.rreg(rHIRQ) & bmHXFRDNIRQ): | |
# wait for the completion IRQ | |
pass; | |
self.wreg(rHIRQ,bmHXFRDNIRQ); #Clear IRQ | |
resultcode = (self.rreg(rHRSL) & 0x0F); # get the result | |
if (resultcode==hrNAK): | |
self.nak_count=self.nak_count+1; | |
elif (resultcode==hrTIMEOUT): | |
self.retry_count=self.retry_count+1; | |
else: | |
#Success! | |
return resultcode; | |
return resultcode; | |
def writebytes(self,reg,tosend): | |
"""Poke some bytes into a register.""" | |
data=""; | |
if type(tosend)==str: | |
data=chr((reg<<3)|3)+tosend; | |
if self.usbverbose: print "PUT %02x:=%s (0x%02x bytes)" % (reg,tosend,len(data)) | |
else: | |
data=[(reg<<3)|3]+tosend; | |
ashex=""; | |
for foo in tosend: | |
ashex=ashex+(" %02x"%foo); | |
if self.usbverbose: print "PUT %02x:=%s (0x%02x bytes)" % (reg,ashex,len(data)) | |
self.writecmd(self.MAXUSBAPP,0x00,len(data),data); | |
def usb_connect(self): | |
"""Connect the USB port.""" | |
#disconnect D+ pullup if host turns off VBUS | |
self.wreg(rUSBCTL,bmVBGATE|bmCONNECT); | |
def usb_disconnect(self): | |
"""Disconnect the USB port.""" | |
self.wreg(rUSBCTL,bmVBGATE); | |
def STALL_EP0(self,SUD=None): | |
"""Stall for an unknown SETUP event.""" | |
if SUD==None: | |
print "Stalling EP0."; | |
else: | |
print "Stalling EPO for %s" % self.setup2str(SUD); | |
self.wreg(rEPSTALLS,0x23); #All three stall bits. | |
def SETBIT(self,reg,val): | |
"""Set a bit in a register.""" | |
self.wreg(reg,self.rreg(reg)|val); | |
def vbus_on(self): | |
"""Turn on the target device.""" | |
self.wreg(rIOPINS2,(self.rreg(rIOPINS2)|0x08)); | |
def vbus_off(self): | |
"""Turn off the target device's power.""" | |
self.wreg(rIOPINS2,0x00); | |
def reset_host(self): | |
"""Resets the chip into host mode.""" | |
self.wreg(rUSBCTL,bmCHIPRES); #Stop the oscillator. | |
self.wreg(rUSBCTL,0x00); #restart it. | |
#FIXME: Why does the OSC line never settle? | |
#Code works without it. | |
#print "Waiting for PLL to sabilize."; | |
#while self.rreg(rUSBIRQ)&bmOSCOKIRQ: | |
# #Hang until the PLL stabilizes. | |
# pass; | |
#print "Stable."; | |
class GoodFETMAXUSBHost(GoodFETMAXUSB): | |
"""This is a class for implemented a minimal USB host. | |
It's intended for fuzzing, rather than for daily use.""" | |
def hostinit(self): | |
"""Initialize the MAX3421 as a USB Host.""" | |
self.usb_connect(); | |
print "Enabling host mode."; | |
self.wreg(rPINCTL,(bmFDUPSPI|bmPOSINT)); | |
print "Resetting host."; | |
self.reset_host(); | |
self.vbus_off(); | |
time.sleep(0.2); | |
print "Powering host."; | |
self.vbus_on(); | |
#self.hostrun(); | |
def hostrun(self): | |
"""Run as a minimal host and dump the config tables.""" | |
while 1: | |
self.detect_device(); | |
time.sleep(0.2); | |
self.enumerate_device(); | |
self.wait_for_disconnect(); | |
def detect_device(self): | |
"""Waits for a device to be inserted and then returns.""" | |
busstate=0; | |
#Activate host mode and turn on 15K pulldown resistors on D+ and D-. | |
self.wreg(rMODE,(bmDPPULLDN|bmDMPULLDN|bmHOST)); | |
#Clear connection detect IRQ. | |
self.wreg(rHIRQ,bmCONDETIRQ); | |
print "Waiting for a device connection."; | |
while busstate==0: | |
self.wreg(rHCTL,bmSAMPLEBUS); #Update JSTATUS and KSTATUS bits. | |
busstate=self.rreg(rHRSL) & (bmJSTATUS|bmKSTATUS); | |
if busstate==bmJSTATUS: | |
print "Detected Full-Speed Device."; | |
self.wreg(rMODE,(bmDPPULLDN|bmDMPULLDN|bmHOST|bmSOFKAENAB)); | |
elif busstate==bmKSTATUS: | |
print "Detected Low-Speed Device."; | |
self.wreg(rMODE,(bmDPPULLDN|bmDMPULLDN|bmHOST|bmLOWSPEED|bmSOFKAENAB)); | |
else: | |
print "Not sure whether this is Full-Speed or Low-Speed. Please investigate."; | |
def wait_for_disconnect(self): | |
"""Wait for a device to be disconnected.""" | |
print "Waiting for a device disconnect."; | |
self.wreg(rHIRQ,bmCONDETIRQ); #Clear disconnect IRQ | |
while not (self.rreg(rHIRQ) & bmCONDETIRQ): | |
#Wait for IRQ to change. | |
pass; | |
#Turn off markers. | |
self.wreg(rMODE,bmDPPULLDN|bmDMPULLDN|bmHOST); | |
print "Device disconnected."; | |
self.wreg(rIOPINS2,(self.rreg(rIOPINS2) & ~0x04)); #HL1_OFF | |
self.wreg(rIOPINS1,(self.rreg(rIOPINS1) & ~0x02)); #HL4_OFF | |
def enumerate_device(self): | |
"""Enumerates a device on the present port.""" | |
Set_Address_to_7 = [0x00,0x05,0x07,0x00,0x00,0x00,0x00,0x00]; | |
Get_Descriptor_Device = [0x80,0x06,0x00,0x01,0x00,0x00,0x00,0x00]; #len filled in | |
Get_Descriptor_Config = [0x80,0x06,0x00,0x02,0x00,0x00,0x00,0x00]; | |
print "Issuing USB bus reset."; | |
self.wreg(rHCTL,bmBUSRST); | |
while self.rreg(rHCTL) & bmBUSRST: | |
#Wait for reset to complete. | |
pass; | |
time.sleep(0.2); | |
#Get the device descriptor. | |
self.wreg(rPERADDR,0); #First request to address 0. | |
self.maxPacketSize=8; #Only safe value for first check. | |
Get_Descriptor_Device[6]=8; # wLengthL | |
Get_Descriptor_Device[7]=0; # wLengthH | |
print "Fetching 8 bytes of Device Descriptor."; | |
self.ctl_read(Get_Descriptor_Device); # Get device descriptor into self.xfrdata; | |
self.maxPacketSize=self.xfrdata[7]; | |
print "EP0 maxPacketSize is %02i bytes." % self.maxPacketSize; | |
# Issue another USB bus reset | |
print "Resetting the bus again." | |
self.wreg(rHCTL,bmBUSRST); | |
while self.rreg(rHCTL) & bmBUSRST: | |
#Wait for reset to complete. | |
pass; | |
time.sleep(0.2); | |
# Set_Address to 7 (Note: this request goes to address 0, already set in PERADDR register). | |
print "Setting address to 0x07"; | |
HR = self.ctl_write_nd(Set_Address_to_7); # CTL-Write, no data stage | |
#if(print_error(HR)) return; | |
time.sleep(0.002); # Device gets 2 msec recovery time | |
self.wreg(rPERADDR,7); # now all transfers go to addr 7 | |
#Get the device descriptor at the assigned address. | |
Get_Descriptor_Device[6]=0x12; #Fill in real descriptor length. | |
print "Fetching Device Descriptor." | |
self.ctl_read(Get_Descriptor_Device); #Result in self.xfrdata; | |
self.descriptor=self.xfrdata; | |
self.VID = self.xfrdata[8] + 256*self.xfrdata[9]; | |
self.PID = self.xfrdata[10]+ 256*self.xfrdata[11]; | |
iMFG = self.xfrdata[14]; | |
iPROD = self.xfrdata[15]; | |
iSERIAL = self.xfrdata[16]; | |
self.manufacturer=self.getDescriptorString(iMFG); | |
self.product=self.getDescriptorString(iPROD); | |
self.serial=self.getDescriptorString(iSERIAL); | |
self.printstrings(); | |
def printstrings(self): | |
print "Vendor ID is %04x." % self.VID; | |
print "Product ID is %04x." % self.PID; | |
print "Manufacturer: %s" % self.manufacturer; | |
print "Product: %s" % self.product; | |
print "Serial: %s" % self.serial; | |
def getDescriptorString(self, index): | |
"""Grabs a string from the descriptor string table.""" | |
# Get_Descriptor-String template. Code fills in idx at str[2]. | |
Get_Descriptor_String = [0x80,0x06,index,0x03,0x00,0x00,0x40,0x00]; | |
if index==0: return "MISSING STRING"; | |
status=self.ctl_read(Get_Descriptor_String); | |
if status: return None; | |
#Since we've got a string | |
toret=""; | |
for c in self.xfrdata[2:len(self.xfrdata)]: | |
if c>0: toret=toret+chr(c); | |
return toret; | |
class GoodFETMAXUSBDevice(GoodFETMAXUSB): | |
def send_descriptor(self,SUD): | |
"""Send the USB descriptors based upon the setup data.""" | |
desclen=0; | |
reqlen=ord(SUD[wLengthL])+256*ord(SUD[wLengthH]); #16-bit length | |
desctype=ord(SUD[wValueH]); | |
if desctype==GD_DEVICE: | |
desclen=self.DD[0]; | |
ddata=self.DD; | |
elif desctype==GD_CONFIGURATION: | |
desclen=self.CD[2]; | |
ddata=self.CD; | |
elif desctype==GD_STRING: | |
desclen=ord(self.strDesc[ord(SUD[wValueL])][0]); | |
ddata=self.strDesc[ord(SUD[wValueL])]; | |
elif desctype==GD_HID: | |
#Don't know how to do this yet. | |
pass; | |
elif desctype==GD_REPORT: | |
desclen=self.CD[25]; | |
ddata=self.RepD; | |
#TODO Configuration, String, Hid, and Report | |
if desclen>0: | |
#Reduce desclen if asked for fewer bytes. | |
desclen=min(reqlen,desclen); | |
#Send those bytes. | |
self.writebytes(rEP0FIFO,ddata[0:desclen]); | |
self.wregAS(rEP0BC,desclen); | |
else: | |
print "Stalling in send_descriptor() for lack of handler for %02x." % desctype; | |
self.STALL_EP0(SUD); | |
def set_configuration(self,SUD): | |
"""Set the configuration.""" | |
bmSUSPIE=0x10; | |
configval=ord(SUD[wValueL]); | |
if(configval>0): | |
self.SETBIT(rUSBIEN,bmSUSPIE); | |
self.rregAS(rFNADDR); | |
class GoodFETMAXUSBHID(GoodFETMAXUSBDevice): | |
"""This is an example HID keyboard driver, loosely based on the | |
MAX3420 examples.""" | |
def hidinit(self): | |
"""Initialize a USB HID device.""" | |
self.usb_disconnect(); | |
self.usb_connect(); | |
self.hidrun(); | |
def hidrun(self): | |
"""Main loop of the USB HID emulator.""" | |
print "Starting a HID device. This won't return."; | |
while 1: | |
self.service_irqs(); | |
def do_SETUP(self): | |
"""Handle USB Enumeration""" | |
#Grab the SETUP packet from the buffer. | |
SUD=self.readbytes(rSUDFIFO,8); | |
#Parse the SETUP packet | |
print "Handling a setup packet of %s" % self.setup2str(SUD); | |
self.OsLastConfigType=ord(SUD[bmRequestType]); | |
self.typepos=0; | |
setuptype=(ord(SUD[bmRequestType])&0x60); | |
if setuptype==0x00: | |
self.std_request(SUD); | |
elif setuptype==0x20: | |
self.class_request(SUD); | |
elif setuptype==0x40: | |
self.vendor_request(SUD); | |
else: | |
print "Unknown request type 0x%02x." % ord(SUD[bmRequestType]) | |
self.STALL_EP0(SUD); | |
def class_request(self,SUD): | |
"""Handle a class request.""" | |
print "Stalling a class request."; | |
self.STALL_EP0(SUD); | |
def vendor_request(self,SUD): | |
print "Stalling a vendor request."; | |
self.STALL_EP0(SUD); | |
def std_request(self,SUD): | |
"""Handles a standard setup request.""" | |
setuptype=ord(SUD[bRequest]); | |
if setuptype==SR_GET_DESCRIPTOR: self.send_descriptor(SUD); | |
#elif setuptype==SR_SET_FEATURE: | |
# self.rregAS(rFNADDR); | |
# # self.feature(1); | |
elif setuptype==SR_SET_CONFIGURATION: self.set_configuration(SUD); | |
elif setuptype==SR_GET_STATUS: self.get_status(SUD); | |
elif setuptype==SR_SET_ADDRESS: self.rregAS(rFNADDR); | |
elif setuptype==SR_GET_INTERFACE: self.get_interface(SUD); | |
else: | |
print "Stalling Unknown standard setup request type %02x" % setuptype; | |
self.STALL_EP0(SUD); | |
def get_interface(self,SUD): | |
"""Handles a setup request for SR_GET_INTERFACE.""" | |
if ord(SUD[wIndexL]==0): | |
self.wreg(rEP0FIFO,0); | |
self.wregAS(rEP0BC,1); | |
else: | |
self.STALL_EP0(SUD); | |
OsLastConfigType=-1; | |
#Device Descriptor | |
DD=[0x12, # bLength = 18d | |
0x01, # bDescriptorType = Device (1) | |
0x00,0x01, # bcdUSB(L/H) USB spec rev (BCD) | |
0x00,0x00,0x00, # bDeviceClass, bDeviceSubClass, bDeviceProtocol | |
0x40, # bMaxPacketSize0 EP0 is 64 bytes | |
0x6A,0x0B, # idVendor(L/H)--Maxim is 0B6A | |
0x46,0x53, # idProduct(L/H)--5346 | |
0x34,0x12, # bcdDevice--1234 | |
1,2,3, # iManufacturer, iProduct, iSerialNumber | |
1]; | |
#Configuration Descriptor | |
CD=[0x09, # bLength | |
0x02, # bDescriptorType = Config | |
0x22,0x00, # wTotalLength(L/H) = 34 bytes | |
0x01, # bNumInterfaces | |
0x01, # bConfigValue | |
0x00, # iConfiguration | |
0xE0, # bmAttributes. b7=1 b6=self-powered b5=RWU supported | |
0x01, # MaxPower is 2 ma | |
# INTERFACE Descriptor | |
0x09, # length = 9 | |
0x04, # type = IF | |
0x00, # IF #0 | |
0x00, # bAlternate Setting | |
0x01, # bNum Endpoints | |
0x03, # bInterfaceClass = HID | |
0x00,0x00, # bInterfaceSubClass, bInterfaceProtocol | |
0x00, # iInterface | |
# HID Descriptor--It's at CD[18] | |
0x09, # bLength | |
0x21, # bDescriptorType = HID | |
0x10,0x01, # bcdHID(L/H) Rev 1.1 | |
0x00, # bCountryCode (none) | |
0x01, # bNumDescriptors (one report descriptor) | |
0x22, # bDescriptorType (report) | |
43,0, # CD[25]: wDescriptorLength(L/H) (report descriptor size is 43 bytes) | |
# Endpoint Descriptor | |
0x07, # bLength | |
0x05, # bDescriptorType (Endpoint) | |
0x83, # bEndpointAddress (EP3-IN) | |
0x03, # bmAttributes (interrupt) | |
64,0, # wMaxPacketSize (64) | |
10]; | |
strDesc=[ | |
# STRING descriptor 0--Language string | |
"\x04\x03\x09\x04", | |
# [ | |
# 0x04, # bLength | |
# 0x03, # bDescriptorType = string | |
# 0x09,0x04 # wLANGID(L/H) = English-United Sates | |
# ], | |
# STRING descriptor 1--Manufacturer ID | |
"\x0c\x03M\x00a\x00x\x00i\x00m\x00", | |
# [ | |
# 12, # bLength | |
# 0x03, # bDescriptorType = string | |
# 'M',0,'a',0,'x',0,'i',0,'m',0 # text in Unicode | |
# ], | |
# STRING descriptor 2 - Product ID | |
"\x18\x03M\x00A\x00X\x003\x004\x002\x000\x00E\x00 \x00E\x00n\x00u\x00m\x00 \x00C\x00o\x00d\x00e\x00", | |
# [ 24, # bLength | |
# 0x03, # bDescriptorType = string | |
# 'M',0,'A',0,'X',0,'3',0,'4',0,'2',0,'0',0,'E',0,' ',0, | |
# 'E',0,'n',0,'u',0,'m',0,' ',0,'C',0,'o',0,'d',0,'e',0 | |
# ], | |
# STRING descriptor 3 - Serial Number ID | |
"\x14\x03S\x00/\x00N\x00 \x003\x004\x002\x000\x00E\x00" | |
# [ 20, # bLength | |
# 0x03, # bDescriptorType = string | |
# 'S',0, | |
# '/',0, | |
# 'N',0, | |
# ' ',0, | |
# '3',0, | |
# '4',0, | |
# '2',0, | |
# '0',0, | |
# 'E',0, | |
# ] | |
]; | |
RepD=[ | |
0x05,0x01, # Usage Page (generic desktop) | |
0x09,0x06, # Usage (keyboard) | |
0xA1,0x01, # Collection | |
0x05,0x07, # Usage Page 7 (keyboard/keypad) | |
0x19,0xE0, # Usage Minimum = 224 | |
0x29,0xE7, # Usage Maximum = 231 | |
0x15,0x00, # Logical Minimum = 0 | |
0x25,0x01, # Logical Maximum = 1 | |
0x75,0x01, # Report Size = 1 | |
0x95,0x08, # Report Count = 8 | |
0x81,0x02, # Input(Data,Variable,Absolute) | |
0x95,0x01, # Report Count = 1 | |
0x75,0x08, # Report Size = 8 | |
0x81,0x01, # Input(Constant) | |
0x19,0x00, # Usage Minimum = 0 | |
0x29,0x65, # Usage Maximum = 101 | |
0x15,0x00, # Logical Minimum = 0, | |
0x25,0x65, # Logical Maximum = 101 | |
0x75,0x08, # Report Size = 8 | |
0x95,0x01, # Report Count = 1 | |
0x81,0x00, # Input(Data,Variable,Array) | |
0xC0] | |
def get_status(self,SUD): | |
"""Get the USB Setup Status.""" | |
testbyte=ord(SUD[bmRequestType]) | |
#Toward Device | |
if testbyte==0x80: | |
self.wreg(rEP0FIFO,0x03); #Enable RWU and self-powered | |
self.wreg(rEP0FIFO,0x00); #Second byte is always zero. | |
self.wregAS(rEP0BC,2); #Load byte count, arm transfer, and ack CTL. | |
#Toward Interface | |
elif testbyte==0x81: | |
self.wreg(rEP0FIFO,0x00); | |
self.wreg(rEP0FIFO,0x00); #Second byte is always zero. | |
self.wregAS(rEP0BC,2); | |
#Toward Endpoint | |
elif testbyte==0x82: | |
if(ord(SUD[wIndexL])==0x83): | |
self.wreg(rEP0FIFO,0x01); #Stall EP3 | |
self.wreg(rEP0FIFO,0x00); #Second byte is always zero. | |
self.wregAS(rEP0BC,2); | |
else: | |
self.STALL_EP0(SUD); | |
else: | |
self.STALL_EP0(SUD); | |
typepos=0; | |
typestrings={ | |
-1 : "Python does USB HID!\n", # Unidentified OS. This is the default typestring. | |
0x00 : "OSX Hosts don't recognize Maxim keyboards.\n", # We have to identify as an Apple keyboard to get arround the unknown keyboard error. | |
0xA1 : "Python does USB HID on Linux!\n", | |
0x81 : " Python does USB HID on Windows!\n", # Windows requires a bit of a delay. Maybe we can watch for a keyboard reset command? | |
} | |
def typestring(self): | |
if self.typestrings.has_key(self.OsLastConfigType): | |
return self.typestrings[self.OsLastConfigType]; | |
else: | |
return self.typestrings[-1]; | |
# http://www.win.tue.nl/~aeb/linux/kbd/scancodes-14.html | |
# Escape=0x29 Backsp=0x2A Space=0x2C CapsLock=0x39 Menu=0x65 | |
keymaps={ | |
'en_US' :[ ' abcdefghijklmnopqrstuvwxyz1234567890\n\t -=[]\\\\;\'`,./', | |
''' | |
''', # LeftCtrl | |
' ABCDEFGHIJKLMNOPQRSTUVWXYZ!@#$%^&*() {}?+||:"~<>?', # LeftShift | |
'', # LeftCtrl & LeftShift | |
' abc'], # LeftAlt | |
'Dvorak' :[ ' axje.uidchtnmbrl\'poygk,qf;1234567890\n\t []/=\\\\s-`wvz', | |
''' | |
''', # LeftCtrl | |
' AXJE UIDCHTNMBRL"POYGK<QF:!@#$%^&*() {}?+||S_~WVZ', # LeftShift | |
'', # LeftCtrl & LeftShift | |
' axj'], # LeftAlt | |
} | |
layout='en_US'; | |
def keymap(self): | |
return self.keymaps[self.layout]; | |
modifiers={ | |
'None': 0b00000000, | |
'LeftCtrl': 0b00000001, | |
'LeftShift': 0b00000010, | |
'LeftAlt': 0b00000100, | |
'LeftGUI': 0b00001000, | |
'RightCtrl': 0b00010000, | |
'RightShift': 0b00100000, | |
'RightAlt': 0b01000000, | |
'RightGUI': 0b10000000 | |
} | |
def asc2hid(self,ascii): | |
"""Translate ASCII to an USB keycode.""" | |
if type(ascii)!=str: | |
return (0,0); # Send NoEvent if not passed a character | |
if ascii==' ': | |
return (0,0x2C); # space | |
for modset in self.keymap(): | |
keycode=modset.find(ascii); | |
if keycode != -1: | |
modifier = self.keymap().index(modset) | |
return (modifier, keycode); | |
return (0,0); | |
def type_IN3(self): | |
"""Type next letter in buffer.""" | |
string=self.typestring(); | |
if self.typepos>=len(string): | |
self.typeletter(0) #key up | |
while 1: | |
mydata = raw_input('>>') | |
if mydata == '0': | |
break | |
else: | |
for i in mydata: | |
self.typeletter(i) | |
self.typeletter(0) | |
#self.typeletter(0); # Send NoEvent to indicate key-up | |
exit(0); | |
#self.typepos=0; # Repeat typestring forever! | |
# This would be a great place to enable a typethrough mode so the host operator can control the target | |
else: | |
if self.usbverbose: | |
sys.stdout.write(string[self.typepos]); | |
sys.stdout.flush(); | |
self.typeletter(string[self.typepos]); | |
self.typepos+=1; | |
return; | |
def typeletter(self,key): | |
"""Type a letter on IN3. Zero for keyup.""" | |
mod=0; | |
if type(key)==str: | |
(mod, key) = self.asc2hid(key); | |
self.wreg(rEP3INFIFO,mod); | |
self.wreg(rEP3INFIFO,0); | |
self.wreg(rEP3INFIFO,key); | |
self.wreg(rEP3INBC,3); | |
def do_IN3(self): | |
"""Handle IN3 event.""" | |
#Don't bother clearing interrupt flag, that's done by sending the reply. | |
if self.OsLastConfigType != -1: # Wait for some configuration before stuffing keycodes down the pipe | |
self.type_IN3(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# MAXUSBApp.py | |
# | |
# Contains class definition for MAXUSBApp. | |
import time | |
from util import * | |
from Facedancer import * | |
from USB import * | |
from USBDevice import USBDeviceRequest | |
class MAXUSBApp(FacedancerApp): | |
app_name = "MAXUSB" | |
app_num = 0x40 | |
reg_ep0_fifo = 0x00 | |
reg_ep1_out_fifo = 0x01 | |
reg_ep2_in_fifo = 0x02 | |
reg_ep3_in_fifo = 0x03 | |
reg_setup_data_fifo = 0x04 | |
reg_ep0_byte_count = 0x05 | |
reg_ep1_out_byte_count = 0x06 | |
reg_ep2_in_byte_count = 0x07 | |
reg_ep3_in_byte_count = 0x08 | |
reg_ep_stalls = 0x09 | |
reg_clr_togs = 0x0a | |
reg_endpoint_irq = 0x0b | |
reg_endpoint_interrupt_enable = 0x0c | |
reg_usb_irq = 0x0d | |
reg_usb_interrupt_enable = 0x0e | |
reg_usb_control = 0x0f | |
reg_cpu_control = 0x10 | |
reg_pin_control = 0x11 | |
reg_revision = 0x12 | |
reg_function_address = 0x13 | |
reg_io_pins = 0x14 | |
# bitmask values for reg_endpoint_irq = 0x0b | |
is_setup_data_avail = 0x20 # SUDAVIRQ | |
is_in3_buffer_avail = 0x10 # IN3BAVIRQ | |
is_in2_buffer_avail = 0x08 # IN2BAVIRQ | |
is_out1_data_avail = 0x04 # OUT1DAVIRQ | |
is_out0_data_avail = 0x02 # OUT0DAVIRQ | |
is_in0_buffer_avail = 0x01 # IN0BAVIRQ | |
# bitmask values for reg_usb_control = 0x0f | |
usb_control_vbgate = 0x40 | |
usb_control_connect = 0x08 | |
# bitmask values for reg_pin_control = 0x11 | |
interrupt_level = 0x08 | |
full_duplex = 0x10 | |
def __init__(self, device, verbose=0): | |
FacedancerApp.__init__(self, device, verbose) | |
self.connected_device = None | |
self.enable() | |
if verbose > 0: | |
rev = self.read_register(self.reg_revision) | |
print(self.app_name, "revision", rev) | |
# set duplex and negative INT level (from GoodFEDMAXUSB.py) | |
self.write_register(self.reg_pin_control, | |
self.full_duplex | self.interrupt_level) | |
def init_commands(self): | |
self.read_register_cmd = FacedancerCommand(self.app_num, 0x00, b'') | |
self.write_register_cmd = FacedancerCommand(self.app_num, 0x00, b'') | |
self.enable_app_cmd = FacedancerCommand(self.app_num, 0x10, b'') | |
self.ack_cmd = FacedancerCommand(self.app_num, 0x00, b'\x01') | |
def read_register(self, reg_num, ack=False): | |
if self.verbose > 1: | |
print(self.app_name, "reading register 0x%02x" % reg_num) | |
self.read_register_cmd.data = bytearray([ reg_num << 3, 0 ]) | |
if ack: | |
self.read_register_cmd.data[0] |= 1 | |
self.device.writecmd(self.read_register_cmd) | |
resp = self.device.readcmd() | |
if self.verbose > 2: | |
print(self.app_name, "read register 0x%02x has value 0x%02x" % | |
(reg_num, resp.data[1])) | |
return resp.data[1] | |
def write_register(self, reg_num, value, ack=False): | |
if self.verbose > 2: | |
print(self.app_name, "writing register 0x%02x with value 0x%02x" % | |
(reg_num, value)) | |
self.write_register_cmd.data = bytearray([ (reg_num << 3) | 2, value ]) | |
if ack: | |
self.write_register_cmd.data[0] |= 1 | |
self.device.writecmd(self.write_register_cmd) | |
self.device.readcmd() | |
def get_version(self): | |
return self.read_register(self.reg_revision) | |
def ack_status_stage(self): | |
if self.verbose > 5: | |
print(self.app_name, "sending ack!") | |
self.device.writecmd(self.ack_cmd) | |
self.device.readcmd() | |
def connect(self, usb_device): | |
if self.read_register(self.reg_usb_control) & self.usb_control_connect: | |
self.write_register(self.reg_usb_control, self.usb_control_vbgate) | |
time.sleep(.1) | |
self.write_register(self.reg_usb_control, self.usb_control_vbgate | | |
self.usb_control_connect) | |
self.connected_device = usb_device | |
if self.verbose > 0: | |
print(self.app_name, "connected device", self.connected_device.name) | |
def disconnect(self): | |
self.write_register(self.reg_usb_control, self.usb_control_vbgate) | |
if self.verbose > 0: | |
print(self.app_name, "disconnected device", self.connected_device.name) | |
self.connected_device = None | |
def clear_irq_bit(self, reg, bit): | |
self.write_register(reg, bit) | |
def read_bytes(self, reg, n): | |
if self.verbose > 2: | |
print(self.app_name, "reading", n, "bytes from register", reg) | |
data = bytes([ (reg << 3) ] + ([0] * n)) | |
cmd = FacedancerCommand(self.app_num, 0x00, data) | |
self.device.writecmd(cmd) | |
resp = self.device.readcmd() | |
if self.verbose > 3: | |
print(self.app_name, "read", len(resp.data) - 1, "bytes from register", reg) | |
return resp.data[1:] | |
def write_bytes(self, reg, data): | |
data = bytes([ (reg << 3) | 3 ]) + data # this changes the register from 1b to 1a | |
cmd = FacedancerCommand(self.app_num, 0x00, data) | |
self.device.writecmd(cmd) | |
self.device.readcmd() # null response | |
if self.verbose > 3: | |
print(self.app_name, "wrote", len(data) - 1, "bytes to register", reg) | |
# HACK: but given the limitations of the MAX chips, it seems necessary | |
def send_on_endpoint(self, ep_num, data): | |
if ep_num == 0: | |
fifo_reg = self.reg_ep0_fifo | |
bc_reg = self.reg_ep0_byte_count | |
elif ep_num == 2: | |
fifo_reg = self.reg_ep2_in_fifo | |
bc_reg = self.reg_ep2_in_byte_count | |
elif ep_num == 3: | |
fifo_reg = self.reg_ep3_in_fifo | |
bc_reg = self.reg_ep3_in_byte_count | |
else: | |
raise ValueError('endpoint ' + str(ep_num) + ' not supported') | |
# FIFO buffer is only 64 bytes, must loop | |
while len(data) > 64: | |
self.write_bytes(fifo_reg, data[:64]) | |
self.write_register(bc_reg, 64, ack=True) | |
data = data[64:] | |
self.write_bytes(fifo_reg, data) | |
self.write_register(bc_reg, len(data), ack=True) | |
if self.verbose > 1: | |
print(self.app_name, "wrote", bytes_as_hex(data), "to endpoint", | |
ep_num) | |
def send_on_endpoint_key(self, ep_num, data): | |
fifo_reg = self.reg_ep3_in_fifo | |
bc_reg = self.reg_ep3_in_byte_count | |
self.write_bytes(fifo_reg, b'\x00') | |
self.write_bytes(fifo_reg, b'\x00') | |
self.write_bytes(fifo_reg, data) | |
self.write_register(bc_reg, 3, ack=True) | |
# HACK: but given the limitations of the MAX chips, it seems necessary | |
def read_from_endpoint(self, ep_num): | |
if ep_num != 1: | |
return b'' | |
byte_count = self.read_register(self.reg_ep1_out_byte_count) | |
if byte_count == 0: | |
return b'' | |
data = self.read_bytes(self.reg_ep1_out_fifo, byte_count) | |
if self.verbose > 1: | |
print(self.app_name, "read", bytes_as_hex(data), "from endpoint", | |
ep_num) | |
return data | |
def stall_ep0(self): | |
if self.verbose > 0: | |
print(self.app_name, "stalling endpoint 0") | |
self.write_register(self.reg_ep_stalls, 0x23) | |
def service_irqs(self): | |
while True: | |
irq = self.read_register(self.reg_endpoint_irq) | |
if self.verbose > 3: | |
print(self.app_name, "read endpoint irq: 0x%02x" % irq) | |
if self.verbose > 2: | |
if irq & ~ (self.is_in0_buffer_avail \ | |
| self.is_in2_buffer_avail | self.is_in3_buffer_avail): | |
print(self.app_name, "notable irq: 0x%02x" % irq) | |
if irq & self.is_setup_data_avail: | |
self.clear_irq_bit(self.reg_endpoint_irq, self.is_setup_data_avail) | |
b = self.read_bytes(self.reg_setup_data_fifo, 8) | |
req = USBDeviceRequest(b) | |
self.connected_device.handle_request(req) | |
if irq & self.is_out1_data_avail: | |
data = self.read_from_endpoint(1) | |
if data: | |
self.connected_device.handle_data_available(1, data) | |
self.clear_irq_bit(self.reg_endpoint_irq, self.is_out1_data_avail) | |
if irq & self.is_in2_buffer_avail: | |
self.connected_device.handle_buffer_available(2) | |
if irq & self.is_in3_buffer_avail: | |
self.connected_device.handle_buffer_available(3) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# USBKeyboard.py | |
# | |
# Contains class definitions to implement a USB keyboard. | |
from USB import * | |
from USBDevice import * | |
from USBConfiguration import * | |
from USBInterface import * | |
from USBEndpoint import * | |
class USBKeyboardInterface(USBInterface): | |
name = "USB keyboard interface" | |
hid_descriptor = b'\x09\x21\x10\x01\x00\x01\x22\x2b\x00' | |
report_descriptor = b'\x05\x01\x09\x06\xA1\x01\x05\x07\x19\xE0\x29\xE7\x15\x00\x25\x01\x75\x01\x95\x08\x81\x02\x95\x01\x75\x08\x81\x01\x19\x00\x29\x65\x15\x00\x25\x65\x75\x08\x95\x01\x81\x00\xC0' | |
def __init__(self, verbose=0): | |
descriptors = { | |
USB.desc_type_hid : self.hid_descriptor, | |
USB.desc_type_report : self.report_descriptor | |
} | |
endpoint = USBEndpoint( | |
3, # endpoint number | |
USBEndpoint.direction_in, | |
USBEndpoint.transfer_type_interrupt, | |
USBEndpoint.sync_type_none, | |
USBEndpoint.usage_type_data, | |
16384, # max packet size | |
10, # polling interval, see USB 2.0 spec Table 9-13 | |
self.handle_buffer_available # handler function | |
) | |
# TODO: un-hardcode string index (last arg before "verbose") | |
USBInterface.__init__( | |
self, | |
0, # interface number | |
0, # alternate setting | |
3, # interface class | |
0, # subclass | |
0, # protocol | |
0, # string index | |
verbose, | |
[ endpoint ], | |
descriptors | |
) | |
# "l<KEY UP>s<KEY UP><ENTER><KEY UP>" | |
empty_preamble = [ 0x00 ] * 10 | |
text = [ 0x0f, 0x00, 0x16, 0x00, 0x28, 0x00 ] | |
self.keys = [ chr(x) for x in empty_preamble + text ] | |
self.key_mapping = ' abcdefghijklmnopqrstuvwxyz1234567890' # and so on | |
self.counter = 0 | |
def handle_buffer_available(self): | |
self.counter += 1 # doing the same as the empty_preamble, waiting for setup | |
if self.counter > 10: # once configuration is set, make type-through | |
while 1: | |
data = input('>>') | |
if len(data) > 0: | |
for i in data: | |
self.type_letter(chr(self.key_mapping.find(i))) | |
self.type_letter(chr(0)) | |
# if not self.keys: | |
# return | |
# letter = self.keys.pop(0) | |
# self.type_letter(letter) | |
def type_letter(self, letter, modifiers=0): | |
data = bytes([ 0, 0, ord(letter) ]) | |
if self.verbose > 2: | |
print(self.name, "sending keypress 0x%02x" % ord(letter)) | |
# THIS METHOD SENDS ONE BYTE and works as intended | |
self.configuration.device.maxusb_app.send_on_endpoint_key(3, bytes([ord(letter)])) | |
# THIS METHOD SENDS 3 BYTES and does not work as intended (data is bytes([0,0,ord(letter)]) | |
# self.configuration.device.maxusb_app.send_on_endpoint(3, data) | |
class USBKeyboardDevice(USBDevice): | |
name = "USB keyboard device" | |
def __init__(self, maxusb_app, verbose=0): | |
config = USBConfiguration( | |
1, # index | |
"Emulated Keyboard", # string desc | |
[ USBKeyboardInterface() ] # interfaces | |
) | |
USBDevice.__init__( | |
self, | |
maxusb_app, | |
0, # device class | |
0, # device subclass | |
0, # protocol release number | |
64, # max packet size for endpoint 0 | |
0x610b, # vendor id | |
0x4653, # product id | |
0x3412, # device revision | |
"Maxim", # manufacturer string | |
"MAX3420E Enum Code", # product string | |
"S/N3420E", # serial number string | |
[ config ], | |
verbose=verbose | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment