Skip to content

Instantly share code, notes, and snippets.

@jkent
Last active December 14, 2015 17:49
Show Gist options
  • Save jkent/5125274 to your computer and use it in GitHub Desktop.
Save jkent/5125274 to your computer and use it in GitHub Desktop.
wrapper class for libftdi1 and mpsse i2c bus and device classes
# -*- coding: utf-8 -*-
# vim: ts=4 et sw=4 sts=4 ai
import ftdi1
NONE = ftdi1.NONE
ODD = ftdi1.ODD
EVEN = ftdi1.EVEN
MARK = ftdi1.MARK
SPACE = ftdi1.SPACE
STOP_BIT_1 = ftdi1.STOP_BIT_1
STOP_BIT_15 = ftdi1.STOP_BIT_15
STOP_BIT_2 = ftdi1.STOP_BIT_2
BITMODE_RESET = ftdi1.BITMODE_RESET
BITMODE_BITBANG = ftdi1.BITMODE_BITBANG
BITMODE_MPSSE = ftdi1.BITMODE_MPSSE
BITMODE_SYNCBB = ftdi1.BITMODE_SYNCBB
BITMODE_MCU = ftdi1.BITMODE_MCU
BITMODE_OPTO = ftdi1.BITMODE_OPTO
BITMODE_CBUS = ftdi1.BITMODE_CBUS
BITMODE_SYNCFF = ftdi1.BITMODE_SYNCFF
BITMODE_FT1284 = ftdi1.BITMODE_FT1284
INTERFACE_ANY = ftdi1.INTERFACE_ANY
INTERFACE_A = ftdi1.INTERFACE_A
INTERFACE_B = ftdi1.INTERFACE_B
INTERFACE_C = ftdi1.INTERFACE_C
INTERFACE_D = ftdi1.INTERFACE_D
class FtdiError(EnvironmentError):
def __init__(self, context, errno, strerror=None):
if not strerror:
strerror = ftdi1.get_error_string(context)
EnvironmentError.__init__(self, errno, strerror)
class Ftdi:
def __init__(self):
self._ctx = ftdi1.context()
result = ftdi1.init(self._ctx)
if result < 0:
raise FtdiError(self._ctx, result)
def __del__(self):
ftdi1.deinit(self._ctx)
def __getitem__(self, key):
return self._ctx.__getattr__(key)
def __setitem__(self, key, value):
self._ctx.__setattr__(key, value)
def open(self, vid, pid, description=None, serial=None, index=None):
if index == None:
result = ftdi1.usb_open_desc(self._ctx, vid, pid, description, serial)
else:
result = ftdi1.usb_open_desc_index(self._ctx, vid, pid, description, serial, index)
if result < 0:
raise FtdiError(self._ctx, result)
def open_string(self, s):
result = ftdi1.usb_open_string(self._ctx, s)
if result < 0:
raise FtdiError(self._ctx, result)
def reset(self):
result = ftdi1.usb_reset(self._ctx)
if result < 0:
raise FtdiError(self._ctx, result)
def purge_rx_buffer(self):
result = ftdi1.usb_purge_rx_buffer(self._ctx)
if result < 0:
raise FtdiError(self._ctx, result)
def purge_tx_buffer(self):
result = ftdi1.usb_purge_tx_buffer(self._ctx)
if result < 0:
raise FtdiError(self._ctx, result)
def purge_buffers(self):
result = ftdi1.usb_purge_buffers(self._ctx)
if result < 0:
raise FtdiError(self._ctx, result)
def close(self):
result = ftdi1.usb_close(self._ctx)
if result < 0:
raise FtdiError(self._ctx, result)
def set_interface(self, interface):
result = ftdi1.set_interface(self._ctx, interface)
if result < 0:
raise FtdiError(self._ctx, result)
def find_all(self, vid=0, pid=0):
devlist = ftdi1.device_list()
result = ftdi1.usb_find_all(self._ctx, devlist, vid, pid)
if result < 0:
raise FtdiError(self._ctx, result)
def set_baudrate(self, baudrate):
result = ftdi1.set_baudrate(self._ctx, baudrate)
if result < 0:
raise FtdiError(self._ctx, result)
def set_line_property(self, bits=8, stop_bits=STOP_BIT_1, parity=NONE, break_=False):
result = ftdi1.set_line_property2(self._ctx, bits, stop_bits, parity, break_)
if result < 0:
raise FtdiError(self._ctx, result)
def write_data(self, data):
result = ftdi1.write_data(self._ctx, data, len(data))
if result < 0:
raise FtdiError(self._ctx, result)
return result
def write_data_set_chunksize(self, chunksize):
result = ftdi1.write_data_set_chunksize(self._ctx, chunksize)
if result < 0:
raise FtdiError(self._ctx, result)
def write_data_get_chunksize(self):
result, chunksize = ftdi1.write_data_get_chunksize(self._ctx)
if result < 0:
raise FtdiError(self._ctx, result)
return chunksize
def read_data(self, length):
result, data = ftdi1.read_data(self._ctx, length)
if result < 0:
raise FtdiError(self._ctx, result)
return data[:result]
def read_data_set_chunksize(self, chunksize):
result = ftdi1.read_data_set_chunksize(self._ctx, chunksize)
if result < 0:
raise FtdiError(self._ctx, result)
def read_data_get_chunksize(self):
result, chunksize = ftdi1.read_data_get_chunksize(self._ctx)
if result < 0:
raise FtdiError(self._ctx, result)
return chunksize
def set_bitmode(self, mode, bitmask=0x00):
result = ftdi1.set_bitmode(self._ctx, bitmask, mode)
if result < 0:
raise FtdiError(self._ctx, result)
def disable_bitbang(self):
result = ftdi1.disable_bitbang(self._ctx)
if result < 0:
raise FtdiError(self._ctx, result)
def read_pins(self):
result, pins = ftdi1.read_pins(self._ctx)
if result < 0:
raise FtdiError(self._ctx, result)
return pins
def set_latency_timer(self, latency):
result = ftdi1.set_latency_timer(self._ctx, latency)
if result < 0:
raise FtdiError(self._ctx, result)
def get_latency_timer(self):
result, latency = ftdi1.get_latency_timer(self._ctx)
if result < 0:
raise FtdiError(self._ctx, result)
return ord(latency)
def poll_modem_status(self):
result, status = ftdi1.poll_modem_status(self._ctx)
if result < 0:
raise FtdiError(self._ctx, result)
return status
def set_flow_control(self, flowctrl):
result = ftdi1.setflowctrl(self._ctx, flowctrl)
if result < 0:
raise FtdiError(self._ctx, result)
def set_dtr(self, state):
result = ftdi1.setdtr(self._ctx, state)
if result < 0:
raise FtdiError(self._ctx, result)
def set_rts(self, state):
result = ftdi1.setrts(self._ctx, state)
if result < 0:
raise FtdiError(self._ctx, result)
def set_dtr_rts(self, dtr, rts):
result = ftdi1.setdtr_rts(self._ctx, dtr, rts)
if result < 0:
raise FtdiError(self._ctx, result)
def set_event_char(self, char=None):
enable = True
if char == None:
enable = False
char = 0
result = ftdi1.set_event_char(self._ctx, char, enable)
if result < 0:
raise FtdiError(self._ctx, result)
def set_error_char(self, char=None):
enable = True
if char == None:
enable = False
char = 0
result = ftdi1.set_error_char(self._ctx, char, enable)
if result < 0:
raise FtdiError(self._ctx, result)
def read_chipid(self):
result, chipid = ftdi1.read_chipid(self._ctx)
if result < 0:
raise FtdiError(self._ctx, result)
return chipid
@staticmethod
def get_library_version():
return ftdi1.get_library_version()
# -*- coding: utf-8 -*-
# vim: ts=4 et sw=4 sts=4 ai
import struct
import ftdi
class I2CError(EnvironmentError):
pass
class I2CBus:
def __init__(self, dev):
self.dev = dev
self.reset()
def reset(self):
self.out = ''
self.dev.reset()
self.dev.purge_rx_buffer()
self.dev.set_event_char()
self.dev.set_error_char()
self.dev.set_latency_timer(1)
self.dev.set_bitmode(ftdi.BITMODE_RESET)
self.dev.set_bitmode(ftdi.BITMODE_MPSSE)
# synchronize with MPSSE
self.out += '\xAA\xAB\x87'
self.flush()
data = ''
while not data.endswith('\xFA\xAA\xFA\xAB'):
read = self.dev.read_data(1024)
if not read:
raise I2CError('unable to synchronize MPSSE with 0xAA 0xAB')
data += read
self.out += '\x8A' # disable clock divide by 5
self.out += '\x97' # disable adaptive clocking
self.out += '\x8D' # enable 3 phase data clock
self.out += '\x80\x03\x03' # SDA:1 SCL:1
# SK frequency = 60MHz / ((1 + div) * 2)
self.out += '\x86' + struct.pack('<H', 299) # set TCK divisor for 100 kHz
self.out += '\x85' # disable TDI/TDO loopback
self.flush()
def start(self):
self.out += '\x80\x03\x03' # SDA:1 SCL:1
self.out += '\x80\x01\x03' * 20 # SDA:0 SCL:1
self.out += '\x80\x00\x03' # SDA:0 SCL:0
def stop(self):
self.out += '\x80\x01\x03' * 20 # SDA:0 SCL:1
self.out += '\x80\x03\x03' # SDA:1 SCL:1
self.out += '\x80\x03\x00' # SDA:Z SCL:Z
self.flush()
def write(self, byte):
if type(byte) == int:
byte = chr(byte)
self.out += '\x11\x00\x00' # clock one byte out, MSB first, on falling edge
self.out += byte
self.out += '\x80\x02\x01' # SDA:Z SCL:0
self.out += '\x22\x00' # clock one bit in on rising edge
self.out += '\x87' # send immediate
self.flush()
in_ = ''
while True:
in_ += self.dev.read_data(1)
if in_:
break
self.out +='\x80\x02\x03' # SDA:1 SCL:0
if ord(in_) & 0x01:
return False
return True
def read(self):
self.out += '\x80\x02\x01' # SDA:Z SCL:0
self.out += '\x24\x00\x00' # clock one byte in, MSB first, on falling edge
self.out += '\x87' # send immediate
self.flush()
in_ = ''
while True:
in_ += self.dev.read_data(1)
if in_:
break
return in_
def ack(self, ack=True):
self.out += '\x80\x03\x03' # SDA:1 SCL:1
if ack:
self.out += '\x12\x00\x00'
else:
self.out += '\x12\x00\x80'
def flush(self):
self.dev.write_data(self.out)
self.out = ''
class I2CDevice:
def __init__(self, bus, addr):
self.bus = bus
self.addr = addr
def _set_register(self, reg_addr):
self.bus.start()
if not self.bus.write(self.addr << 1):
raise I2CError('ack not received')
if not self.bus.write(reg_addr):
raise I2CError('ack not received')
def reg_read(self, reg_addr, length=1):
data = ''
self._set_register(reg_addr)
self.bus.start()
if not self.bus.write((self.addr << 1) + 1):
raise I2CError('ack not received')
for i in xrange(1, length+1):
data += self.bus.read()
self.bus.ack(i != length)
self.bus.stop()
return data
def reg_write(self, reg_addr, data):
self._set_register(reg_addr)
for byte in data:
if not self.bus.write(byte):
raise I2CError('ack not received')
self.bus.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment