Skip to content

Instantly share code, notes, and snippets.

@jepler
Created October 8, 2020 15:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jepler/5a1cdaac7dd8709e54861802dce53d89 to your computer and use it in GitHub Desktop.
Save jepler/5a1cdaac7dd8709e54861802dce53d89 to your computer and use it in GitHub Desktop.
import ulab
import sys
import struct
import time
import neopixel_write
import board
import digitalio
neopixel = digitalio.DigitalInOut(board.NEOPIXEL)
neopixel.direction = digitalio.Direction.OUTPUT
class Deadline:
def __init__(self, relative=None, relative_ns=None):
self.deadline = time.monotonic_ns()
self.reset(relative, relative_ns)
def reset(self, relative=None, relative_ns=None):
if relative_ns is None:
relative_ns = round(relative * 1e9)
self.deadline += relative_ns
@property
def expired(self):
return time.monotonic_ns() > self.deadline
def print_msg(m):
if m is None:
print(m)
elif isinstance(m, Message):
print ("MSG", hex(m.id), m.extended, ":".join("%02x" % d for d in m.data))
else:
print ("RTR", hex(m.id), m.extended, m.length)
baudrate = 1_000_000
if hasattr(board, 'CAN_TX'):
from canio import CAN, Message, BusState
if hasattr(board, 'CAN_STANDBY'):
standby = digitalio.DigitalInOut(board.CAN_STANDBY)
standby.switch_to_output(False)
c = CAN(rx=board.CAN_RX, tx=board.CAN_TX, baudrate=baudrate)
else:
from adafruit_mcp2515.canio import Message, BusState
from adafruit_mcp2515 import MCP2515 as CAN
cs = digitalio.DigitalInOut(board.D5)
cs.switch_to_output()
c = CAN(board.SPI(), cs, baudrate=2*baudrate) ## my board has an 8MHz xtal
pixeldata = bytearray(6)
neopixel_write.neopixel_write(neopixel, pixeldata)
all_r0c1 = []
all_r6c0 = []
counts = ulab.zeros(64, dtype=ulab.int16)
l = c.listen(timeout=1)
m = Message(id=0x408, data=b'', extended=True)
d = Deadline(relative=4)
while True:
if d.expired:
mcount = max(counts)
counts = counts / mcount
for i in range(0, 64, 8):
print(" ".join("% .3f" % ci for ci in counts[i:i+8]))
all_r0c1.append(counts[1])
all_r6c0.append(counts[48])
print()
#print(f"r0c1 avg={ulab.numerical.sum(all_r0c1)} std dev={ulab.numerical.std(all_r0c1})")
print(f"r0c1 std={ulab.numerical.std(all_r0c1)}", end=' ')
print(f"avg={ulab.numerical.sum(all_r0c1) / (sum(all_r0c1) or 1)}")
print(f"r6c0 std={ulab.numerical.std(all_r6c0)}", end=' ')
print(f"avg={ulab.numerical.sum(all_r6c0) / (sum(all_r6c0) or 1)}")
#print(f"r6c0 avg={ulab.numerical.sum(all_r6c0)} std dev={ulab.numerical.std(all_r6c0})")
counts = ulab.zeros(64, dtype=ulab.int16)
d.reset(5)
m = l.receive()
if not m:
continue
data = m.data
for i in range(len(data) * 8):
if data[i // 8] & (1<< (i%8)):
counts[i] += 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment