Skip to content

Instantly share code, notes, and snippets.

@Neradoc
Last active April 22, 2023 13:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Neradoc/df66764bd235e84d1676b9dfa4cc5e59 to your computer and use it in GitHub Desktop.
Save Neradoc/df66764bd235e84d1676b9dfa4cc5e59 to your computer and use it in GitHub Desktop.
Circuitpython-PMW3901 driver, port from Pimoroni https://github.com/pimoroni/pmw3901-python
import time
import struct
from adafruit_bus_device.spi_device import SPIDevice
__version__ = '0.1.0'
WAIT = -1
BG_CS_FRONT_BCM = 7
BG_CS_BACK_BCM = 8
REG_ID = 0x00
REG_DATA_READY = 0x02
REG_MOTION_BURST = 0x16
REG_POWER_UP_RESET = 0x3a
REG_ORIENTATION = 0x5b
REG_RESOLUTION = 0x4e # PAA5100 only
REG_RAWDATA_GRAB = 0x58
REG_RAWDATA_GRAB_STATUS = 0x59
class PMW3901():
def __init__(self, spi, cs_pin):
self.device = SPIDevice(spi, cs_pin, baudrate=40_000)
time.sleep(0.05)
self._write(REG_POWER_UP_RESET, 0x5a)
time.sleep(0.02)
for offset in range(5):
self._read(REG_DATA_READY + offset)
self._secret_sauce()
product_id, revision = self.get_id()
if product_id != 0x49 or revision != 0x00:
raise RuntimeError("Invalid Product ID or Revision for PMW3901: 0x{:02x}/0x{:02x}".format(product_id, revision))
# print("Product ID: {}".format(ID.get_product_id()))
# print("Revision: {}".format(ID.get_revision_id()))
def get_id(self):
"""Get chip ID and revision from PMW3901."""
return self._read(REG_ID, 2)
def set_rotation(self, degrees=0):
"""Set orientation of PMW3901 in increments of 90 degrees.
:param degrees: rotation in multiple of 90 degrees
"""
if degrees == 0:
self.set_orientation(invert_x=True, invert_y=True, swap_xy=True)
elif degrees == 90:
self.set_orientation(invert_x=False, invert_y=True, swap_xy=False)
elif degrees == 180:
self.set_orientation(invert_x=False, invert_y=False, swap_xy=True)
elif degrees == 270:
self.set_orientation(invert_x=True, invert_y=False, swap_xy=False)
else:
raise TypeError("Degrees must be one of 0, 90, 180 or 270")
def set_orientation(self, invert_x=True, invert_y=True, swap_xy=True):
"""Set orientation of PMW3901 manually.
Swapping is performed before flipping.
:param invert_x: invert the X axis
:param invert_y: invert the Y axis
:param swap_xy: swap the X/Y axes
"""
value = 0
if swap_xy:
value |= 0b10000000
if invert_y:
value |= 0b01000000
if invert_x:
value |= 0b00100000
self._write(REG_ORIENTATION, value)
def get_motion(self, timeout=5):
"""Get motion data from PMW3901 using burst read.
Reads 12 bytes sequentially from the PMW3901 and validates
motion data against the SQUAL and Shutter_Upper values.
Returns Delta X and Delta Y indicating 2d flow direction
and magnitude.
:param timeout: Timeout in seconds
"""
in_buffer = bytearray(13)
out_buffer = bytearray(13)
out_buffer[0] = REG_MOTION_BURST
t_start = time.time()
while time.time() - t_start < timeout:
with self.device as bus:
bus.write_readinto(out_buffer, in_buffer)
(_, dr, obs,
x, y, quality,
raw_sum, raw_max, raw_min,
shutter_upper,
shutter_lower) = struct.unpack("<BBBhhBBBBBB", in_buffer)
if dr & 0b10000000 and not (quality < 0x19 and shutter_upper == 0x1f):
return x, y
time.sleep(0.01)
raise RuntimeError("Timed out waiting for motion data.")
def get_motion_slow(self, timeout=5):
"""Get motion data from PMW3901.
Returns Delta X and Delta Y indicating 2d flow direction
and magnitude.
:param timeout: Timeout in seconds
"""
t_start = time.time()
while time.time() - t_start < timeout:
data = self._read(REG_DATA_READY, 5)
dr, x, y = struct.unpack("<Bhh", bytearray(data))
if dr & 0b10000000:
return x, y
time.sleep(0.001)
raise RuntimeError("Timed out waiting for motion data.")
def _write(self, register, value):
with self.device as bus:
bus.write(bytes([register | 0x80, value]))
# self.spi_dev.xfer2([register | 0x80, value])
def _read(self, register, length=1):
result = []
buffer = bytearray(2)
for x in range(length):
with self.device as bus:
bus.write_readinto(bytes([register + x, 0]), buffer)
result.append(buffer[1])
if length == 1:
return result[0]
else:
return result
def _bulk_write(self, data):
for x in range(0, len(data), 2):
register, value = data[x:x + 2]
if register == WAIT:
# print("Sleeping for: {:02d}ms".format(value))
time.sleep(value / 1000)
else:
# print("Writing: {:02x} to {:02x}".format(register, value))
self._write(register, value)
def _secret_sauce(self):
"""Write the secret sauce registers.
Don't ask what these do, the datasheet refuses to explain.
They are some proprietary calibration magic.
"""
self._bulk_write([
0x7f, 0x00,
0x55, 0x01,
0x50, 0x07,
0x7f, 0x0e,
0x43, 0x10
])
if self._read(0x67) & 0b10000000:
self._write(0x48, 0x04)
else:
self._write(0x48, 0x02)
self._bulk_write([
0x7f, 0x00,
0x51, 0x7b,
0x50, 0x00,
0x55, 0x00,
0x7f, 0x0E
])
if self._read(0x73) == 0x00:
c1 = self._read(0x70)
c2 = self._read(0x71)
if c1 <= 28:
c1 += 14
if c1 > 28:
c1 += 11
c1 = max(0, min(0x3F, c1))
c2 = (c2 * 45) // 100
self._bulk_write([
0x7f, 0x00,
0x61, 0xad,
0x51, 0x70,
0x7f, 0x0e
])
self._write(0x70, c1)
self._write(0x71, c2)
self._bulk_write([
0x7f, 0x00,
0x61, 0xad,
0x7f, 0x03,
0x40, 0x00,
0x7f, 0x05,
0x41, 0xb3,
0x43, 0xf1,
0x45, 0x14,
0x5b, 0x32,
0x5f, 0x34,
0x7b, 0x08,
0x7f, 0x06,
0x44, 0x1b,
0x40, 0xbf,
0x4e, 0x3f,
0x7f, 0x08,
0x65, 0x20,
0x6a, 0x18,
0x7f, 0x09,
0x4f, 0xaf,
0x5f, 0x40,
0x48, 0x80,
0x49, 0x80,
0x57, 0x77,
0x60, 0x78,
0x61, 0x78,
0x62, 0x08,
0x63, 0x50,
0x7f, 0x0a,
0x45, 0x60,
0x7f, 0x00,
0x4d, 0x11,
0x55, 0x80,
0x74, 0x21,
0x75, 0x1f,
0x4a, 0x78,
0x4b, 0x78,
0x44, 0x08,
0x45, 0x50,
0x64, 0xff,
0x65, 0x1f,
0x7f, 0x14,
0x65, 0x67,
0x66, 0x08,
0x63, 0x70,
0x7f, 0x15,
0x48, 0x48,
0x7f, 0x07,
0x41, 0x0d,
0x43, 0x14,
0x4b, 0x0e,
0x45, 0x0f,
0x44, 0x42,
0x4c, 0x80,
0x7f, 0x10,
0x5b, 0x02,
0x7f, 0x07,
0x40, 0x41,
0x70, 0x00,
WAIT, 0x0A, # Sleep for 10ms
0x32, 0x44,
0x7f, 0x07,
0x40, 0x40,
0x7f, 0x06,
0x62, 0xf0,
0x63, 0x00,
0x7f, 0x0d,
0x48, 0xc0,
0x6f, 0xd5,
0x7f, 0x00,
0x5b, 0xa0,
0x4e, 0xa8,
0x5a, 0x50,
0x40, 0x80,
WAIT, 0xF0,
0x7f, 0x14, # Enable LED_N pulsing
0x6f, 0x1c,
0x7f, 0x00
])
def frame_capture(self, timeout=10.0):
"""Capture a raw data frame.
Warning: This is *very* slow and of limited usefulness.
"""
self._bulk_write([
0x7f, 0x07,
0x4c, 0x00,
0x7f, 0x08,
0x6a, 0x38,
0x7f, 0x00,
0x55, 0x04,
0x40, 0x80,
0x4d, 0x11,
WAIT, 0x0a,
0x7f, 0x00,
0x58, 0xff
])
t_start = time.time()
while True:
status = self._read(REG_RAWDATA_GRAB_STATUS)
if status & 0b11000000:
break
if time.time() - t_start > timeout:
raise RuntimeError("Frame capture init timed out")
self._write(REG_RAWDATA_GRAB, 0x00)
RAW_DATA_LEN = 1225
t_start = time.time()
raw_data = [0 for _ in range(RAW_DATA_LEN)]
x = 0
while True:
data = self._read(REG_RAWDATA_GRAB)
if data & 0b11000000 == 0b01000000: # Upper 6-bits
raw_data[x] &= ~0b11111100
raw_data[x] |= (data & 0b00111111) << 2 # Held in 5:0
if data & 0b11000000 == 0b10000000: # Lower 2-bits
raw_data[x] &= ~0b00000011
raw_data[x] |= (data & 0b00001100) >> 2 # Held in 3:2
x += 1
if x == RAW_DATA_LEN:
return raw_data
if time.time() - t_start > timeout:
raise RuntimeError("Raw data capture timeout, got {} values".format(x))
return None
class PAA5100(PMW3901):
def _secret_sauce(self):
"""Write the secret sauce registers for the PAA5100.
Don't ask what these do, we'd have to make you walk the plank.
These are some proprietary calibration magic.
I hate this as much as you do, dear reader.
"""
self._bulk_write([
0x7f, 0x00,
0x55, 0x01,
0x50, 0x07,
0x7f, 0x0e,
0x43, 0x10
])
if self._read(0x67) & 0b10000000:
self._write(0x48, 0x04)
else:
self._write(0x48, 0x02)
self._bulk_write([
0x7f, 0x00,
0x51, 0x7b,
0x50, 0x00,
0x55, 0x00,
0x7f, 0x0e
])
if self._read(0x73) == 0x00:
c1 = self._read(0x70)
c2 = self._read(0x71)
if c1 <= 28:
c1 += 14
if c1 > 28:
c1 += 11
c1 = max(0, min(0x3F, c1))
c2 = (c2 * 45) // 100
self._bulk_write([
0x7f, 0x00,
0x61, 0xad,
0x51, 0x70,
0x7f, 0x0e
])
self._write(0x70, c1)
self._write(0x71, c2)
self._bulk_write([
0x7f, 0x00,
0x61, 0xad,
0x7f, 0x03,
0x40, 0x00,
0x7f, 0x05,
0x41, 0xb3,
0x43, 0xf1,
0x45, 0x14,
0x5f, 0x34,
0x7b, 0x08,
0x5e, 0x34,
0x5b, 0x11,
0x6d, 0x11,
0x45, 0x17,
0x70, 0xe5,
0x71, 0xe5,
0x7f, 0x06,
0x44, 0x1b,
0x40, 0xbf,
0x4e, 0x3f,
0x7f, 0x08,
0x66, 0x44,
0x65, 0x20,
0x6a, 0x3a,
0x61, 0x05,
0x62, 0x05,
0x7f, 0x09,
0x4f, 0xaf,
0x5f, 0x40,
0x48, 0x80,
0x49, 0x80,
0x57, 0x77,
0x60, 0x78,
0x61, 0x78,
0x62, 0x08,
0x63, 0x50,
0x7f, 0x0a,
0x45, 0x60,
0x7f, 0x00,
0x4d, 0x11,
0x55, 0x80,
0x74, 0x21,
0x75, 0x1f,
0x4a, 0x78,
0x4b, 0x78,
0x44, 0x08,
0x45, 0x50,
0x64, 0xff,
0x65, 0x1f,
0x7f, 0x14,
0x65, 0x67,
0x66, 0x08,
0x63, 0x70,
0x6f, 0x1c,
0x7f, 0x15,
0x48, 0x48,
0x7f, 0x07,
0x41, 0x0d,
0x43, 0x14,
0x4b, 0x0e,
0x45, 0x0f,
0x44, 0x42,
0x4c, 0x80,
0x7f, 0x10,
0x5b, 0x02,
0x7f, 0x07,
0x40, 0x41,
WAIT, 0x0a, # Wait 10ms
0x7f, 0x00,
0x32, 0x00,
0x7f, 0x07,
0x40, 0x40,
0x7f, 0x06,
0x68, 0xf0,
0x69, 0x00,
0x7f, 0x0d,
0x48, 0xc0,
0x6f, 0xd5,
0x7f, 0x00,
0x5b, 0xa0,
0x4e, 0xa8,
0x5a, 0x90,
0x40, 0x80,
0x73, 0x1f,
WAIT, 0x0a, # Wait 10ms
0x73, 0x00
])
if __name__ == "__main__":
rotation = 0
import board
import busio
from digitalio import DigitalInOut
spi = board.SPI()
# spi = busio.SPI(board.GP2, board.GP3, board.GP4)
cs = DigitalInOut(board.A0)
flo = PMW3901(spi, cs)
flo.set_rotation(rotation)
tx = 0
ty = 0
while True:
try:
x, y = flo.get_motion()
except RuntimeError:
continue
tx += x
ty += y
print("Motion: {:03d} {:03d} x: {:03d} y {:03d}".format(x, y, tx, ty))
time.sleep(0.01)
@Neradoc
Copy link
Author

Neradoc commented Apr 11, 2023

you have to change the spi definition to match your setup

    # spi = board.SPI()
    spi = busio.SPI(board.GP2, board.GP3, board.GP4)
    cs = DigitalInOut(board.GP5)

@Neradoc
Copy link
Author

Neradoc commented Apr 11, 2023

the big question is how to change:

    def _read(self, register, length=1):
        result = []
        for x in range(length):
            GPIO.output(self.spi_cs_gpio, 0)
            value = self.spi_dev.xfer2([register + x, 0])
            GPIO.output(self.spi_cs_gpio, 1)
            result.append(value[1])

into:

    def _read(self, register, length=1):
        result = []
        buffer = bytearray(2) # ?????
        for x in range(length):
            with self.device as bus:
                bus.write_readinto( ????? )
            result.append(buffer[1])

if I understand correctly, this xfer2 call sends one byte as the read command, and then one empty byte, during which the device sends the response byte, which is why it reads value[1], due to how SPI full duplex works.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment