Last active
June 2, 2024 21:27
-
-
Save anecdata/456524f8e38c207931afd0dedf6bec89 to your computer and use it in GitHub Desktop.
CircuitPython with 7 "radios" using Connection Manager
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# SPDX-FileCopyrightText: 2024 anecdata | |
# SPDX-License-Identifier: MIT | |
import time | |
import os | |
import traceback | |
import board | |
import busio | |
import digitalio | |
import wifi | |
import ssl | |
import adafruit_connection_manager | |
import adafruit_requests | |
from adafruit_wiznet5k.adafruit_wiznet5k import WIZNET5K | |
from adafruit_esp32spi.adafruit_esp32spi import ESP_SPIcontrol | |
URLS = ( | |
"http://wifitest.adafruit.com/testwifi/index.html", | |
"https://httpbin.org/get", | |
) | |
esp_status = [ | |
"WL_IDLE_STATUS", | |
"WL_NO_SSID_AVAIL", | |
"WL_SCAN_COMPLETED", | |
"WL_CONNECTED", | |
"WL_CONNECT_FAILED", | |
"WL_CONNECTION_LOST", | |
"WL_DISCONNECTED", | |
"WL_AP_LISTENING", | |
"WL_AP_CONNECTED", | |
"WL_AP_FAILED", | |
] | |
# wifi (native) | |
def connect_native(radio): | |
while not radio.connected: | |
try: | |
radio.connect(os.getenv("WIFI_SSID"), os.getenv("WIFI_PASSWORD")) | |
except Exception as ex: | |
traceback.print_exception(ex, ex, ex.__traceback__) | |
return radio.ipv4_address | |
# WIZnet | |
wiz_pins = ( | |
( | |
digitalio.DigitalInOut(board.D10), | |
digitalio.DigitalInOut(board.D5), | |
), | |
( | |
digitalio.DigitalInOut(board.D9), | |
digitalio.DigitalInOut(board.TX), | |
), | |
( | |
digitalio.DigitalInOut(board.D6), | |
digitalio.DigitalInOut(board.RX), | |
), | |
) | |
def connect_eth(radio): | |
return radio.pretty_ip(radio.ip_address) | |
# ESP32SPI | |
esp_pins = ( | |
( | |
digitalio.DigitalInOut(board.A0), | |
digitalio.DigitalInOut(board.A1), | |
digitalio.DigitalInOut(board.A2), | |
), | |
( | |
digitalio.DigitalInOut(board.A3), | |
digitalio.DigitalInOut(board.A4), | |
digitalio.DigitalInOut(board.A5), | |
), | |
( | |
digitalio.DigitalInOut(board.D13), | |
digitalio.DigitalInOut(board.D12), | |
digitalio.DigitalInOut(board.D11), | |
), | |
) | |
def connect_esp(radio): | |
ipv4_str = "" | |
while not radio.is_connected: | |
try: | |
radio.connect_AP(os.getenv("WIFI_SSID"), os.getenv("WIFI_PASSWORD")) | |
except Exception as ex: | |
traceback.print_exception(ex, ex, ex.__traceback__) | |
return radio.pretty_ip(radio.ip_address) | |
# General | |
def connect(radio): | |
if radio.__class__.__name__ == "Radio": | |
ipv4_str = connect_native(radio) | |
elif radio.__class__.__name__ == "WIZNET5K": | |
ipv4_str = connect_eth(radio) | |
elif radio.__class__.__name__ == "ESP_SPIcontrol": | |
ipv4_str = connect_esp(radio) | |
return ipv4_str | |
time.sleep(3) # wait for serial | |
radios = [] | |
pools = [] | |
ssl_contexts = [] | |
requestss = [] | |
connection_managers = [] | |
spi = board.SPI() | |
# wifi (native) | |
print(f'Initializing native wifi...') | |
radios.append(wifi.radio) | |
# WIZnet | |
print(f'Initializing ethernets...') | |
for eth in range(0, len(wiz_cs_pins)): | |
print(f'#{eth}') | |
cs = wiz_pins[eth][0] | |
rst = wiz_pins[eth][1] | |
mac = f'de:ad:be:ef:fe:{(251 + eth):02x}' # avoid DHCP collisions | |
while True: | |
try: | |
radio = WIZNET5K(spi, cs, rst, mac=mac, debug=False) | |
print(f'{mac} {radio.pretty_ip(radio.ip_address)}') | |
radios.append(radio) | |
break | |
except Exception as ex: | |
traceback.print_exception(ex, ex, ex.__traceback__) | |
# ESP32SPI | |
print(f'Initializing wifi co-processors...') | |
for esp in range(0, len(esp_pins)): | |
print(f'#{esp}') | |
esp32_cs = esp_pins[esp][0] | |
esp32_reset = esp_pins[esp][1] | |
esp32_ready = esp_pins[esp][2] | |
while True: | |
try: | |
radio = ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset, debug=0) | |
mac = ":".join("%02X" % byte for byte in radio.MAC_address) | |
print(f'{mac} {radio.firmware_version.decode("utf-8")} {esp_status[radio.status]}') | |
radios.append(radio) | |
break | |
except Exception as ex: | |
traceback.print_exception(ex, ex, ex.__traceback__) | |
radio.reset() | |
time.sleep(1) | |
# set up the socketpools and requests | |
print(f'Set up socketpools and requests...') | |
for radio_num in range(0, len(radios)): | |
pools.append(adafruit_connection_manager.get_radio_socketpool(radios[radio_num])) | |
ssl_contexts.append(adafruit_connection_manager.get_radio_ssl_context(radios[radio_num])) | |
requestss.append(adafruit_requests.Session(pools[radio_num], ssl_contexts[radio_num])) | |
connection_managers.append(adafruit_connection_manager.get_connection_manager(pools[radio_num])) | |
# requests on all the radios | |
while True: | |
print(f'{"="*25}') | |
for radio_num in range(0, len(radios)): | |
print(f'{"-"*25}') | |
for url in URLS: | |
print(f"Fetching from {url} via {radios[radio_num].__class__.__name__} #{radio_num} {pools[radio_num]} {connection_managers[radio_num]}") | |
print(f"{connect(radios[radio_num])}") | |
try: | |
with requestss[radio_num].get(url) as resp: | |
print(f'{resp.status_code} {resp.reason.decode()}') | |
except Exception as ex: | |
traceback.print_exception(ex, ex, ex.__traceback__) | |
time.sleep(5) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The key to using more than one Airlift, or more then one WIZnet, are these PRs:
adafruit/Adafruit_CircuitPython_ESP32SPI#198
adafruit/Adafruit_CircuitPython_Wiznet5k#159
adafruit/Adafruit_CircuitPython_ConnectionManager#11
The first two convert the ESP_SPIcontrol and WIZNET objects so they can have multiple instances. Then each device can have its own socket pool rather than one socket pool per device type. The third PR integrates those into ConnectionManager, which gives a simple API to manage multiple sources and sinks of sockets.
The original motivation, and primary use case, for centralizing management of multiple sinks of sockets is code that uses more than one socket-using library. For example, Requests and MQTT previously would have each managed their own sockets, and circumstances could arise where the device prematurely ran out of sockets.