Skip to content

Instantly share code, notes, and snippets.

@anecdata
Last active May 3, 2024 14:54
Show Gist options
  • Save anecdata/456524f8e38c207931afd0dedf6bec89 to your computer and use it in GitHub Desktop.
Save anecdata/456524f8e38c207931afd0dedf6bec89 to your computer and use it in GitHub Desktop.
CircuitPython ConnectionManager with 7 "radios"
# SPDX-FileCopyrightText: 2024 anecdata
# SPDX-License-Identifier: MIT
import time
import os
import traceback
import board
import busio
import digitalio
import wifi
import ssl
import adafruit_connection_manager
import adafruit_requests
from adafruit_wiznet5k.adafruit_wiznet5k import WIZNET5K
from adafruit_esp32spi.adafruit_esp32spi import ESP_SPIcontrol
URLS = (
"http://wifitest.adafruit.com/testwifi/index.html",
"https://httpbin.org/get",
)
esp_status = [
"WL_IDLE_STATUS",
"WL_NO_SSID_AVAIL",
"WL_SCAN_COMPLETED",
"WL_CONNECTED",
"WL_CONNECT_FAILED",
"WL_CONNECTION_LOST",
"WL_DISCONNECTED",
"WL_AP_LISTENING",
"WL_AP_CONNECTED",
"WL_AP_FAILED",
]
# wifi (native)
def connect_native(radio):
while not radio.connected:
try:
radio.connect(os.getenv("WIFI_SSID"), os.getenv("WIFI_PASSWORD"))
except Exception as ex:
traceback.print_exception(ex, ex, ex.__traceback__)
return radio.ipv4_address
# WIZnet
wiz_pins = (
(
digitalio.DigitalInOut(board.D10),
digitalio.DigitalInOut(board.D5),
),
(
digitalio.DigitalInOut(board.D9),
digitalio.DigitalInOut(board.TX),
),
(
digitalio.DigitalInOut(board.D6),
digitalio.DigitalInOut(board.RX),
),
)
def connect_eth(radio):
return radio.pretty_ip(radio.ip_address)
# ESP32SPI
esp_pins = (
(
digitalio.DigitalInOut(board.A0),
digitalio.DigitalInOut(board.A1),
digitalio.DigitalInOut(board.A2),
),
(
digitalio.DigitalInOut(board.A3),
digitalio.DigitalInOut(board.A4),
digitalio.DigitalInOut(board.A5),
),
(
digitalio.DigitalInOut(board.D13),
digitalio.DigitalInOut(board.D12),
digitalio.DigitalInOut(board.D11),
),
)
def connect_esp(radio):
ipv4_str = ""
while not radio.is_connected:
try:
radio.connect_AP(os.getenv("WIFI_SSID"), os.getenv("WIFI_PASSWORD"))
except Exception as ex:
traceback.print_exception(ex, ex, ex.__traceback__)
return radio.pretty_ip(radio.ip_address)
# General
def connect(radio):
if radio.__class__.__name__ == "Radio":
ipv4_str = connect_native(radio)
elif radio.__class__.__name__ == "WIZNET5K":
ipv4_str = connect_eth(radio)
elif radio.__class__.__name__ == "ESP_SPIcontrol":
ipv4_str = connect_esp(radio)
return ipv4_str
time.sleep(3) # wait for serial
radios = []
pools = []
ssl_contexts = []
requestss = []
connection_managers = []
spi = board.SPI()
# wifi (native)
print(f'Initializing native wifi...')
radios.append(wifi.radio)
# WIZnet
print(f'Initializing ethernets...')
for eth in range(0, len(wiz_cs_pins)):
print(f'#{eth}')
cs = wiz_pins[eth][0]
rst = wiz_pins[eth][1]
mac = f'de:ad:be:ef:fe:{(251 + eth):02x}' # avoid DHCP collisions
while True:
try:
radio = WIZNET5K(spi, cs, rst, mac=mac, debug=False)
print(f'{mac} {radio.pretty_ip(radio.ip_address)}')
radios.append(radio)
break
except Exception as ex:
traceback.print_exception(ex, ex, ex.__traceback__)
# ESP32SPI
print(f'Initializing wifi co-processors...')
for esp in range(0, len(esp_pins)):
print(f'#{esp}')
esp32_cs = esp_pins[esp][0]
esp32_reset = esp_pins[esp][1]
esp32_ready = esp_pins[esp][2]
while True:
try:
radio = ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset, debug=False)
mac = ":".join("%02X" % byte for byte in radio.MAC_address)
print(f'{mac} {radio.firmware_version.decode("utf-8")} {esp_status[radio.status]}')
radios.append(radio)
break
except Exception as ex:
traceback.print_exception(ex, ex, ex.__traceback__)
radio.reset()
time.sleep(1)
# set up the socketpools and requests
print(f'Set up socketpools and requests...')
for radio_num in range(0, len(radios)):
pools.append(adafruit_connection_manager.get_radio_socketpool(radios[radio_num]))
ssl_contexts.append(adafruit_connection_manager.get_radio_ssl_context(radios[radio_num]))
requestss.append(adafruit_requests.Session(pools[radio_num], ssl_contexts[radio_num]))
connection_managers.append(adafruit_connection_manager.get_connection_manager(pools[radio_num]))
# requests on all the radios
while True:
print(f'{"="*25}')
for radio_num in range(0, len(radios)):
print(f'{"-"*25}')
for url in URLS:
print(f"Fetching from {url} via {radios[radio_num].__class__.__name__} #{radio_num} {pools[radio_num]} {connection_managers[radio_num]}")
print(f"{connect(radios[radio_num])}")
try:
with requestss[radio_num].get(url) as resp:
print(f'{resp.status_code} {resp.reason.decode()}')
except Exception as ex:
traceback.print_exception(ex, ex, ex.__traceback__)
time.sleep(5)
@anecdata
Copy link
Author

anecdata commented Apr 29, 2024

Obviously an extreme example (and finicky due to wiring, power, and density of radios):
"I don't know who would do this in the real world"

But having multiple network links on a microcontroller does have various use cases. Some examples:

  • need more sockets than the hardware-constrained socket count of a typical MCU
  • ESP-NOW running continuously on a channel, but also need an Access point (AP) or a separate network socket client or server of some kind on another channel or on ethernet
  • wifi Monitor mode running continuously (one channel or channel-hopping), but also have an AP or a seperate network socket client or server of some kind on another channel or on ethernet
  • a wifi AP running continuously on one channel, but also need an ethernet to connect to the local network, or a wifi Station that can adapt to the channel of the local network
  • dual-homing or multihoming, for reliability or for access to different networks' services

7_radios

@anecdata
Copy link
Author

Thanks to adafruit/circuitpython#8954
& adafruit/Adafruit_CircuitPython_Wiznet5k#157, Ethernet now supports TLS on boards that have native ssl.

@anecdata
Copy link
Author

anecdata commented Apr 30, 2024

The key to using more than one Airlift, or more then one WIZnet, are these PRs:
adafruit/Adafruit_CircuitPython_ESP32SPI#198
adafruit/Adafruit_CircuitPython_Wiznet5k#159
adafruit/Adafruit_CircuitPython_ConnectionManager#11
The first two convert the ESP_SPIcontrol and WIZNET objects so they can have multiple instances. Then each device can have its own socket pool rather than one socket pool per device type. The third PR integrates those into ConnectionManager, which gives a simple API to manage multiple sources and sinks of sockets.

The original motivation, and primary use case, for centralizing management of multiple sinks of sockets is code that uses more than one socket-using library. For example, Requests and MQTT previously would have each managed their own sockets, and circumstances could arise where the device prematurely ran out of sockets.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment