Skip to content

Instantly share code, notes, and snippets.

@jepler
Created August 19, 2020 17:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save jepler/e1fc3705242794e91d7b7c28c7422844 to your computer and use it in GitHub Desktop.
Save jepler/e1fc3705242794e91d7b7c28c7422844 to your computer and use it in GitHub Desktop.
class CanFilter:
def __init__(self, address: int, *, extended: bool = False, mask: Optional[int] = None):
"""Construct a CanFilter with the given properties.
If mask is not None, then the filter is for any sender which matches all
the nonzero bits in mask. Otherwise, it matches exactly the given address.
If extended is true then only extended addresses are matched, otherwise
only standard addresses are matched.
"""
# XXX the rtr flag is omitted from the initial implementation. Micropython does this:
# If rtr is false (the default), the filter matches normal frames. Otherwise, it
# matches only frames with the rtr bit set. rtr stands for "Remote Transmission
# Request", and solicits the receiving device to send requested data.
#
# Instead, we will receive all address-matching rtr messages all the time
# and add a way to change this later.
#
# XXX
# STM32F405 supports two kinds of filters:
# - lists of addresses
# - bitmask matching of addresses
# The RTR filter is part of the individual filters. It would take two
# rules to accept RTR and non-RTR frames.
#
# SAM E54 supports three kinds of filters:
# - lists of addresses
# - bitmask matching of addresses
# - ranges of addresses
# Additionally, each rule can state whether to accept/reject
# and a final "accept non-matching frames" rule is applied
# The RTR filter is an overall filter, and cannot be used to reject non-RTR
# frames.
#
# MCP2515 / MCP2518 filters are different yet again, but support masks.
# Interestingly they also support filtering on the first 16 bits of
# messages with standard identifiers, calling it "DeviceNet"
#
# This format of CanFilter is more geared to the STM32F405 since I wrote it
# from the Micropython documentation and does not align with the SAM E54
# especially due to how RTR reception is handled.
address: int
mask: Optional[int]
extended: bool
#rtr: bool
class CanMode(Enum):
NORMAL = auto()
LOOPBACK = auto()
SILENT = auto()
LOOPBACK_SILENT = auto()
# XXX I don't fully understand what these states indicate but they are standard
# terms in CANbus tech, not just something micropython made up (except
# ERROR_WARNING, which they did make up as far as I can tell) reference:
# http://port.de/cgi-bin/CAN/CanFaqErrors
class CanState(Enum):
STOPPED = auto()
"""The controller is completely off and reset"""
ERROR_ACTIVE = auto()
"""The controller is on and the TEC and REC are both less than 128. In this state, the device takes part fully in bus communication and signals an error by transmission of an active error frame."""
# MicroPython has ERROR_ACTIVE for TEC/REC both less than 96, and then when the bus is "close to" erroring, it has
# ERROR_WARNING = auto()
# """The controller is on and the TEC and REC are both less than 128. In this state, the device takes part fully in bus communication and signals an error by transmission of an active error frame. Functionally, ERROR_ACTIVE and ERROR_WARNING correspond to the same bus behavior. However, software may wish to change its behavior when the bus state is 'close to' entering the ERROR_PASSIVE state"""
ERROR_PASSIVE = auto()
"""The controller is on but one of the TEC or REC is greater than 127. In this state, the device sends a passive error frame when it detects an error."""
BUS_OFF = auto()
"""The Transmit Error Count has exceeded 255 and the device has disconnected from the bus."""
class CAN:
def __init__(self,
TX: microcontroller.Pin,
RX: microcontroller.Pin,
*,
baudrate: int = 1000000,
extframe: bool = False, # True to use 29 bit identifiers, False to use 11-bit identifiers
sample_point: float = .875, # When to sample within bit time (0.0-1.0)
auto_restart: bool = False, # Whether to restart communications after entering bus-off state
mode = CanMode):
"""Construct a CAN object"""
...
state: CanState
"""The status of the hardware (read-only)"""
fifo_count: int
"""The number of receive fifos supported by the hardware (read-only)
The number of receive fifos depends on the underlying hardware. Both STM32F405 and SAM E54 have 2 RX fifos.
"""
### XXX there is a single transmit fifo on STM32F405 and SAM E54
transmit_error_count: int
"""The number of transmit errors (read-only). Increased for a detected transmission error, decreased for successful transmission. Limited to the range from 0 to 255 inclusive. Also called TEC."""
receive_error_count: int
"""The number of receive errors (read-only). Increased for a detected reception error, decreased for successful reception. Limited to the range from 0 to 255 inclusive. Also called REC."""
error_warning_state_count: int
"""The number of times the controller enterted the Error Warning state.
This number wraps around to 0 after an implementation-defined number of errors."""
error_passive_state_count: int
"""The number of times the controller enterted the Error Passive state.
This number wraps around to 0 after an implementation-defined number of errors."""
bus_off_state_count: int
"""The number of times the controller enterted the Bus Off state.
This number wraps around to 0 after an implementation-defined number of errors."""
pending_tx_count: int
"""The number of messages waiting to be transmitted."""
timeout: float
"""The receive timeout in seconds"""
def setfilters(filters: Sequence[CanFilter], fifo: int = 0) -> None:
"""Sets the filters for the given fifo
If the hardware cannot support all the requested filters, a ValueError is raised. Note that generally there are some number of hardware filters shared among all fifos.
An empty filter list causes all messages to be discarded.
If the fifo number is out of range, a RangeError is raised.
"""
...
def in_waiting(fifo: int = 0) -> Bool:
"""Returns True if a packet has been received on fifo, or False otherwise
If the fifo number is out of range, a RangeError is raised."""
...
def read(*, fifo: int = 0) -> Optional[Sequence[int, bool, bytes]]:
"""Read a message. The elements of the returned list are:
* The id of the message
* A bool that is true if the message is an RTR message
* An array containing the data
If no message was waiting after the timeout passed, None is returned.
If the fifo number is out of range, a RangeError is raised.
"""
...
def readinto(buffer: List[object], *, fifo: int = 0) -> bool:
"""Read a message from the FIFO into the preallocated buffer.
If buf already has 3 elements and the third element is a writable
buffer of an appropriate size, this function works without allocating
memory.
The elements in the buffer are set to:
* The id of the message
* A bool that is true if the message is an RTR message
* An array containing the data
(the same as the values in the sequence returned by CAN.read())
If no message was waiting after the timeout passed, False is returned.
If the bytearray is not large enough to accomodate the received data, raises
IndexError.
If the fifo number is out of range, a RangeError is raised.
"""
...
def write(id: int, data: bytearray, *, rtr: bool = False) -> None:
"""Send a message on the bus with the given data and id.
If the message could not be sent due to a full fifo or a bus error condition, RuntimeError is raised.
"""
...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment