Created
October 8, 2020 15:43
-
-
Save jepler/5a1cdaac7dd8709e54861802dce53d89 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ulab | |
import sys | |
import struct | |
import time | |
import neopixel_write | |
import board | |
import digitalio | |
neopixel = digitalio.DigitalInOut(board.NEOPIXEL) | |
neopixel.direction = digitalio.Direction.OUTPUT | |
class Deadline: | |
def __init__(self, relative=None, relative_ns=None): | |
self.deadline = time.monotonic_ns() | |
self.reset(relative, relative_ns) | |
def reset(self, relative=None, relative_ns=None): | |
if relative_ns is None: | |
relative_ns = round(relative * 1e9) | |
self.deadline += relative_ns | |
@property | |
def expired(self): | |
return time.monotonic_ns() > self.deadline | |
def print_msg(m): | |
if m is None: | |
print(m) | |
elif isinstance(m, Message): | |
print ("MSG", hex(m.id), m.extended, ":".join("%02x" % d for d in m.data)) | |
else: | |
print ("RTR", hex(m.id), m.extended, m.length) | |
baudrate = 1_000_000 | |
if hasattr(board, 'CAN_TX'): | |
from canio import CAN, Message, BusState | |
if hasattr(board, 'CAN_STANDBY'): | |
standby = digitalio.DigitalInOut(board.CAN_STANDBY) | |
standby.switch_to_output(False) | |
c = CAN(rx=board.CAN_RX, tx=board.CAN_TX, baudrate=baudrate) | |
else: | |
from adafruit_mcp2515.canio import Message, BusState | |
from adafruit_mcp2515 import MCP2515 as CAN | |
cs = digitalio.DigitalInOut(board.D5) | |
cs.switch_to_output() | |
c = CAN(board.SPI(), cs, baudrate=2*baudrate) ## my board has an 8MHz xtal | |
pixeldata = bytearray(6) | |
neopixel_write.neopixel_write(neopixel, pixeldata) | |
all_r0c1 = [] | |
all_r6c0 = [] | |
counts = ulab.zeros(64, dtype=ulab.int16) | |
l = c.listen(timeout=1) | |
m = Message(id=0x408, data=b'', extended=True) | |
d = Deadline(relative=4) | |
while True: | |
if d.expired: | |
mcount = max(counts) | |
counts = counts / mcount | |
for i in range(0, 64, 8): | |
print(" ".join("% .3f" % ci for ci in counts[i:i+8])) | |
all_r0c1.append(counts[1]) | |
all_r6c0.append(counts[48]) | |
print() | |
#print(f"r0c1 avg={ulab.numerical.sum(all_r0c1)} std dev={ulab.numerical.std(all_r0c1})") | |
print(f"r0c1 std={ulab.numerical.std(all_r0c1)}", end=' ') | |
print(f"avg={ulab.numerical.sum(all_r0c1) / (sum(all_r0c1) or 1)}") | |
print(f"r6c0 std={ulab.numerical.std(all_r6c0)}", end=' ') | |
print(f"avg={ulab.numerical.sum(all_r6c0) / (sum(all_r6c0) or 1)}") | |
#print(f"r6c0 avg={ulab.numerical.sum(all_r6c0)} std dev={ulab.numerical.std(all_r6c0})") | |
counts = ulab.zeros(64, dtype=ulab.int16) | |
d.reset(5) | |
m = l.receive() | |
if not m: | |
continue | |
data = m.data | |
for i in range(len(data) * 8): | |
if data[i // 8] & (1<< (i%8)): | |
counts[i] += 1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment