Skip to content

Instantly share code, notes, and snippets.

Last active April 22, 2023 13:18
Show Gist options
  • Save Neradoc/df66764bd235e84d1676b9dfa4cc5e59 to your computer and use it in GitHub Desktop.
Save Neradoc/df66764bd235e84d1676b9dfa4cc5e59 to your computer and use it in GitHub Desktop.
Circuitpython-PMW3901 driver, port from Pimoroni
import time
import struct
from adafruit_bus_device.spi_device import SPIDevice
__version__ = '0.1.0'
WAIT = -1
REG_ID = 0x00
REG_RESOLUTION = 0x4e # PAA5100 only
class PMW3901():
def __init__(self, spi, cs_pin):
self.device = SPIDevice(spi, cs_pin, baudrate=40_000)
self._write(REG_POWER_UP_RESET, 0x5a)
for offset in range(5):
self._read(REG_DATA_READY + offset)
product_id, revision = self.get_id()
if product_id != 0x49 or revision != 0x00:
raise RuntimeError("Invalid Product ID or Revision for PMW3901: 0x{:02x}/0x{:02x}".format(product_id, revision))
# print("Product ID: {}".format(ID.get_product_id()))
# print("Revision: {}".format(ID.get_revision_id()))
def get_id(self):
"""Get chip ID and revision from PMW3901."""
return self._read(REG_ID, 2)
def set_rotation(self, degrees=0):
"""Set orientation of PMW3901 in increments of 90 degrees.
:param degrees: rotation in multiple of 90 degrees
if degrees == 0:
self.set_orientation(invert_x=True, invert_y=True, swap_xy=True)
elif degrees == 90:
self.set_orientation(invert_x=False, invert_y=True, swap_xy=False)
elif degrees == 180:
self.set_orientation(invert_x=False, invert_y=False, swap_xy=True)
elif degrees == 270:
self.set_orientation(invert_x=True, invert_y=False, swap_xy=False)
raise TypeError("Degrees must be one of 0, 90, 180 or 270")
def set_orientation(self, invert_x=True, invert_y=True, swap_xy=True):
"""Set orientation of PMW3901 manually.
Swapping is performed before flipping.
:param invert_x: invert the X axis
:param invert_y: invert the Y axis
:param swap_xy: swap the X/Y axes
value = 0
if swap_xy:
value |= 0b10000000
if invert_y:
value |= 0b01000000
if invert_x:
value |= 0b00100000
self._write(REG_ORIENTATION, value)
def get_motion(self, timeout=5):
"""Get motion data from PMW3901 using burst read.
Reads 12 bytes sequentially from the PMW3901 and validates
motion data against the SQUAL and Shutter_Upper values.
Returns Delta X and Delta Y indicating 2d flow direction
and magnitude.
:param timeout: Timeout in seconds
in_buffer = bytearray(13)
out_buffer = bytearray(13)
out_buffer[0] = REG_MOTION_BURST
t_start = time.time()
while time.time() - t_start < timeout:
with self.device as bus:
bus.write_readinto(out_buffer, in_buffer)
(_, dr, obs,
x, y, quality,
raw_sum, raw_max, raw_min,
shutter_lower) = struct.unpack("<BBBhhBBBBBB", in_buffer)
if dr & 0b10000000 and not (quality < 0x19 and shutter_upper == 0x1f):
return x, y
raise RuntimeError("Timed out waiting for motion data.")
def get_motion_slow(self, timeout=5):
"""Get motion data from PMW3901.
Returns Delta X and Delta Y indicating 2d flow direction
and magnitude.
:param timeout: Timeout in seconds
t_start = time.time()
while time.time() - t_start < timeout:
data = self._read(REG_DATA_READY, 5)
dr, x, y = struct.unpack("<Bhh", bytearray(data))
if dr & 0b10000000:
return x, y
raise RuntimeError("Timed out waiting for motion data.")
def _write(self, register, value):
with self.device as bus:
bus.write(bytes([register | 0x80, value]))
# self.spi_dev.xfer2([register | 0x80, value])
def _read(self, register, length=1):
result = []
buffer = bytearray(2)
for x in range(length):
with self.device as bus:
bus.write_readinto(bytes([register + x, 0]), buffer)
if length == 1:
return result[0]
return result
def _bulk_write(self, data):
for x in range(0, len(data), 2):
register, value = data[x:x + 2]
if register == WAIT:
# print("Sleeping for: {:02d}ms".format(value))
time.sleep(value / 1000)
# print("Writing: {:02x} to {:02x}".format(register, value))
self._write(register, value)
def _secret_sauce(self):
"""Write the secret sauce registers.
Don't ask what these do, the datasheet refuses to explain.
They are some proprietary calibration magic.
0x7f, 0x00,
0x55, 0x01,
0x50, 0x07,
0x7f, 0x0e,
0x43, 0x10
if self._read(0x67) & 0b10000000:
self._write(0x48, 0x04)
self._write(0x48, 0x02)
0x7f, 0x00,
0x51, 0x7b,
0x50, 0x00,
0x55, 0x00,
0x7f, 0x0E
if self._read(0x73) == 0x00:
c1 = self._read(0x70)
c2 = self._read(0x71)
if c1 <= 28:
c1 += 14
if c1 > 28:
c1 += 11
c1 = max(0, min(0x3F, c1))
c2 = (c2 * 45) // 100
0x7f, 0x00,
0x61, 0xad,
0x51, 0x70,
0x7f, 0x0e
self._write(0x70, c1)
self._write(0x71, c2)
0x7f, 0x00,
0x61, 0xad,
0x7f, 0x03,
0x40, 0x00,
0x7f, 0x05,
0x41, 0xb3,
0x43, 0xf1,
0x45, 0x14,
0x5b, 0x32,
0x5f, 0x34,
0x7b, 0x08,
0x7f, 0x06,
0x44, 0x1b,
0x40, 0xbf,
0x4e, 0x3f,
0x7f, 0x08,
0x65, 0x20,
0x6a, 0x18,
0x7f, 0x09,
0x4f, 0xaf,
0x5f, 0x40,
0x48, 0x80,
0x49, 0x80,
0x57, 0x77,
0x60, 0x78,
0x61, 0x78,
0x62, 0x08,
0x63, 0x50,
0x7f, 0x0a,
0x45, 0x60,
0x7f, 0x00,
0x4d, 0x11,
0x55, 0x80,
0x74, 0x21,
0x75, 0x1f,
0x4a, 0x78,
0x4b, 0x78,
0x44, 0x08,
0x45, 0x50,
0x64, 0xff,
0x65, 0x1f,
0x7f, 0x14,
0x65, 0x67,
0x66, 0x08,
0x63, 0x70,
0x7f, 0x15,
0x48, 0x48,
0x7f, 0x07,
0x41, 0x0d,
0x43, 0x14,
0x4b, 0x0e,
0x45, 0x0f,
0x44, 0x42,
0x4c, 0x80,
0x7f, 0x10,
0x5b, 0x02,
0x7f, 0x07,
0x40, 0x41,
0x70, 0x00,
WAIT, 0x0A, # Sleep for 10ms
0x32, 0x44,
0x7f, 0x07,
0x40, 0x40,
0x7f, 0x06,
0x62, 0xf0,
0x63, 0x00,
0x7f, 0x0d,
0x48, 0xc0,
0x6f, 0xd5,
0x7f, 0x00,
0x5b, 0xa0,
0x4e, 0xa8,
0x5a, 0x50,
0x40, 0x80,
WAIT, 0xF0,
0x7f, 0x14, # Enable LED_N pulsing
0x6f, 0x1c,
0x7f, 0x00
def frame_capture(self, timeout=10.0):
"""Capture a raw data frame.
Warning: This is *very* slow and of limited usefulness.
0x7f, 0x07,
0x4c, 0x00,
0x7f, 0x08,
0x6a, 0x38,
0x7f, 0x00,
0x55, 0x04,
0x40, 0x80,
0x4d, 0x11,
WAIT, 0x0a,
0x7f, 0x00,
0x58, 0xff
t_start = time.time()
while True:
status = self._read(REG_RAWDATA_GRAB_STATUS)
if status & 0b11000000:
if time.time() - t_start > timeout:
raise RuntimeError("Frame capture init timed out")
self._write(REG_RAWDATA_GRAB, 0x00)
t_start = time.time()
raw_data = [0 for _ in range(RAW_DATA_LEN)]
x = 0
while True:
data = self._read(REG_RAWDATA_GRAB)
if data & 0b11000000 == 0b01000000: # Upper 6-bits
raw_data[x] &= ~0b11111100
raw_data[x] |= (data & 0b00111111) << 2 # Held in 5:0
if data & 0b11000000 == 0b10000000: # Lower 2-bits
raw_data[x] &= ~0b00000011
raw_data[x] |= (data & 0b00001100) >> 2 # Held in 3:2
x += 1
if x == RAW_DATA_LEN:
return raw_data
if time.time() - t_start > timeout:
raise RuntimeError("Raw data capture timeout, got {} values".format(x))
return None
class PAA5100(PMW3901):
def _secret_sauce(self):
"""Write the secret sauce registers for the PAA5100.
Don't ask what these do, we'd have to make you walk the plank.
These are some proprietary calibration magic.
I hate this as much as you do, dear reader.
0x7f, 0x00,
0x55, 0x01,
0x50, 0x07,
0x7f, 0x0e,
0x43, 0x10
if self._read(0x67) & 0b10000000:
self._write(0x48, 0x04)
self._write(0x48, 0x02)
0x7f, 0x00,
0x51, 0x7b,
0x50, 0x00,
0x55, 0x00,
0x7f, 0x0e
if self._read(0x73) == 0x00:
c1 = self._read(0x70)
c2 = self._read(0x71)
if c1 <= 28:
c1 += 14
if c1 > 28:
c1 += 11
c1 = max(0, min(0x3F, c1))
c2 = (c2 * 45) // 100
0x7f, 0x00,
0x61, 0xad,
0x51, 0x70,
0x7f, 0x0e
self._write(0x70, c1)
self._write(0x71, c2)
0x7f, 0x00,
0x61, 0xad,
0x7f, 0x03,
0x40, 0x00,
0x7f, 0x05,
0x41, 0xb3,
0x43, 0xf1,
0x45, 0x14,
0x5f, 0x34,
0x7b, 0x08,
0x5e, 0x34,
0x5b, 0x11,
0x6d, 0x11,
0x45, 0x17,
0x70, 0xe5,
0x71, 0xe5,
0x7f, 0x06,
0x44, 0x1b,
0x40, 0xbf,
0x4e, 0x3f,
0x7f, 0x08,
0x66, 0x44,
0x65, 0x20,
0x6a, 0x3a,
0x61, 0x05,
0x62, 0x05,
0x7f, 0x09,
0x4f, 0xaf,
0x5f, 0x40,
0x48, 0x80,
0x49, 0x80,
0x57, 0x77,
0x60, 0x78,
0x61, 0x78,
0x62, 0x08,
0x63, 0x50,
0x7f, 0x0a,
0x45, 0x60,
0x7f, 0x00,
0x4d, 0x11,
0x55, 0x80,
0x74, 0x21,
0x75, 0x1f,
0x4a, 0x78,
0x4b, 0x78,
0x44, 0x08,
0x45, 0x50,
0x64, 0xff,
0x65, 0x1f,
0x7f, 0x14,
0x65, 0x67,
0x66, 0x08,
0x63, 0x70,
0x6f, 0x1c,
0x7f, 0x15,
0x48, 0x48,
0x7f, 0x07,
0x41, 0x0d,
0x43, 0x14,
0x4b, 0x0e,
0x45, 0x0f,
0x44, 0x42,
0x4c, 0x80,
0x7f, 0x10,
0x5b, 0x02,
0x7f, 0x07,
0x40, 0x41,
WAIT, 0x0a, # Wait 10ms
0x7f, 0x00,
0x32, 0x00,
0x7f, 0x07,
0x40, 0x40,
0x7f, 0x06,
0x68, 0xf0,
0x69, 0x00,
0x7f, 0x0d,
0x48, 0xc0,
0x6f, 0xd5,
0x7f, 0x00,
0x5b, 0xa0,
0x4e, 0xa8,
0x5a, 0x90,
0x40, 0x80,
0x73, 0x1f,
WAIT, 0x0a, # Wait 10ms
0x73, 0x00
if __name__ == "__main__":
rotation = 0
import board
import busio
from digitalio import DigitalInOut
spi = board.SPI()
# spi = busio.SPI(board.GP2, board.GP3, board.GP4)
cs = DigitalInOut(board.A0)
flo = PMW3901(spi, cs)
tx = 0
ty = 0
while True:
x, y = flo.get_motion()
except RuntimeError:
tx += x
ty += y
print("Motion: {:03d} {:03d} x: {:03d} y {:03d}".format(x, y, tx, ty))
Copy link

Neradoc commented Apr 11, 2023

you have to change the spi definition to match your setup

    # spi = board.SPI()
    spi = busio.SPI(board.GP2, board.GP3, board.GP4)
    cs = DigitalInOut(board.GP5)

Copy link

Neradoc commented Apr 11, 2023

the big question is how to change:

    def _read(self, register, length=1):
        result = []
        for x in range(length):
            GPIO.output(self.spi_cs_gpio, 0)
            value = self.spi_dev.xfer2([register + x, 0])
            GPIO.output(self.spi_cs_gpio, 1)


    def _read(self, register, length=1):
        result = []
        buffer = bytearray(2) # ?????
        for x in range(length):
            with self.device as bus:
                bus.write_readinto( ????? )

if I understand correctly, this xfer2 call sends one byte as the read command, and then one empty byte, during which the device sends the response byte, which is why it reads value[1], due to how SPI full duplex works.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment