Skip to content

Instantly share code, notes, and snippets.

@tannewt
Forked from jepler/_canio.pyi
Last active August 19, 2020 19:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tannewt/89ee082de9f5a98675df901cdcfbe951 to your computer and use it in GitHub Desktop.
Save tannewt/89ee082de9f5a98675df901cdcfbe951 to your computer and use it in GitHub Desktop.
class Filter:
def __init__(self, address: int, *, extended: bool = False, mask: Optional[int] = None):
"""Construct a CanFilter with the given properties.
If mask is not None, then the filter is for any sender which matches all
the nonzero bits in mask. Otherwise, it matches exactly the given address.
If extended is true then only extended addresses are matched, otherwise
only standard addresses are matched.
"""
# XXX the rtr flag is omitted from the initial implementation. Micropython does this:
# If rtr is false (the default), the filter matches normal frames. Otherwise, it
# matches only frames with the rtr bit set. rtr stands for "Remote Transmission
# Request", and solicits the receiving device to send requested data.
#
# Instead, we will receive all address-matching rtr messages all the time
# and add a way to change this later.
#
# XXX
# STM32F405 supports two kinds of filters:
# - lists of addresses
# - bitmask matching of addresses
# The RTR filter is part of the individual filters. It would take two
# rules to accept RTR and non-RTR frames.
#
# SAM E54 supports three kinds of filters:
# - lists of addresses
# - bitmask matching of addresses
# - ranges of addresses
# Additionally, each rule can state whether to accept/reject
# and a final "accept non-matching frames" rule is applied
# The RTR filter is an overall filter, and cannot be used to reject non-RTR
# frames.
#
# MCP2515 / MCP2518 filters are different yet again, but support masks.
# Interestingly they also support filtering on the first 16 bits of
# messages with standard identifiers, calling it "DeviceNet"
#
# This format of CanFilter is more geared to the STM32F405 since I wrote it
# from the Micropython documentation and does not align with the SAM E54
# especially due to how RTR reception is handled.
address: int
mask: Optional[int]
extended: bool
#rtr: bool
class Mode(Enum):
NORMAL = auto()
LOOPBACK = auto()
SILENT = auto()
LOOPBACK_SILENT = auto()
# XXX I don't fully understand what these states indicate but they are standard
# terms in CANbus tech, not just something micropython made up (except
# ERROR_WARNING, which they did make up as far as I can tell) reference:
# http://port.de/cgi-bin/CAN/CanFaqErrors
class State(Enum):
STOPPED = auto()
"""The controller is completely off and reset"""
ERROR_ACTIVE = auto()
"""The controller is on and the TEC and REC are both less than 128. In this state, the device takes part fully in bus communication and signals an error by transmission of an active error frame."""
# MicroPython has ERROR_ACTIVE for TEC/REC both less than 96, and then when the bus is "close to" erroring, it has
# ERROR_WARNING = auto()
# """The controller is on and the TEC and REC are both less than 128. In this state, the device takes part fully in bus communication and signals an error by transmission of an active error frame. Functionally, ERROR_ACTIVE and ERROR_WARNING correspond to the same bus behavior. However, software may wish to change its behavior when the bus state is 'close to' entering the ERROR_PASSIVE state"""
ERROR_PASSIVE = auto()
"""The controller is on but one of the TEC or REC is greater than 127. In this state, the device sends a passive error frame when it detects an error."""
BUS_OFF = auto()
"""The Transmit Error Count has exceeded 255 and the device has disconnected from the bus."""
class Message:
# Does a message know where it came from?
id_ : int
rtr : bool
data : bytearray
class Queue:
def in_waiting() -> Bool:
"""Returns True if a packet has been received on fifo, or False otherwise
If the fifo number is out of range, a RangeError is raised."""
...
def read() -> Optional[Message]:
"""Read a message.
If no message was waiting after the timeout passed, None is returned.
"""
...
def readinto(message: Message) -> bool:
"""Read a message from the FIFO into the preallocated Message. Its contents will be
overwritten.
"""
...
def deinit():
...
def __enter__():
# return self
...
def __exit__():
# to automatically call deinit
...
def __iter__():
# Usually returns self
...
def __next__():
# returns the next message or None if timed out
...
# Food for thought: https://docs.python.org/3/library/queue.html
# https://docs.python.org/3/library/functions.html#iter
# https://github.com/adafruit/circuitpython/blob/main/shared-bindings/_bleio/ScanResults.c#L49
class Controller:
def __init__(self,
TX: microcontroller.Pin,
RX: microcontroller.Pin,
*,
baudrate: int = 1000000,
extframe: bool = False, # True to use 29 bit identifiers, False to use 11-bit identifiers
sample_point: float = .875, # When to sample within bit time (0.0-1.0)
auto_restart: bool = False, # Whether to restart communications after entering bus-off state
mode = CanMode):
"""Construct a CAN object"""
...
state: State
"""The status of the hardware (read-only)"""
transmit_error_count: int
"""The number of transmit errors (read-only). Increased for a detected transmission error, decreased for successful transmission. Limited to the range from 0 to 255 inclusive. Also called TEC."""
receive_error_count: int
"""The number of receive errors (read-only). Increased for a detected reception error, decreased for successful reception. Limited to the range from 0 to 255 inclusive. Also called REC."""
error_warning_state_count: int
"""The number of times the controller enterted the Error Warning state.
This number wraps around to 0 after an implementation-defined number of errors."""
error_passive_state_count: int
"""The number of times the controller enterted the Error Passive state.
This number wraps around to 0 after an implementation-defined number of errors."""
bus_off_state_count: int
"""The number of times the controller enterted the Bus Off state.
This number wraps around to 0 after an implementation-defined number of errors."""
# pending_tx_count: int
# """The number of messages waiting to be transmitted."""
# timeout: float
# """The receive timeout in seconds"""
# # Is this a global setting for the controller or local to _canio? If local, we can put it on read() explicitly.
def receive(filters: Sequence[Filter], *, timeout=10) -> Queue:
"""Start receiving messages that match any one of the filters.
If the hardware cannot support all the requested filters, a ValueError is raised. Note that generally there are some number of hardware filters shared among all fifos.
An empty filter list causes all messages to be discarded.
Timeout dictates how long readinto, read and next() will block. The last two will return None
on timeout and readinto() will return False.
"""
...
# I'd like to be able to do:
# for message in bus.receive():
# ...
# But theres no way for the Queue to always know when the loop is done and
# it could get leaked.
#
# Could use a context:
# with bus.receive() as queue:
# for message in queue:
# ...
def send(message: Message) -> None:
"""Send a message on the bus with the given data and id.
If the message could not be sent due to a full fifo or a bus error condition, RuntimeError is raised.
"""
...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment