-
-
Save anecdata/456524f8e38c207931afd0dedf6bec89 to your computer and use it in GitHub Desktop.
# SPDX-FileCopyrightText: 2024 anecdata | |
# SPDX-License-Identifier: MIT | |
import time | |
import os | |
import traceback | |
import board | |
import busio | |
import digitalio | |
import wifi | |
import ssl | |
import adafruit_connection_manager | |
import adafruit_requests | |
from adafruit_wiznet5k.adafruit_wiznet5k import WIZNET5K | |
from adafruit_esp32spi.adafruit_esp32spi import ESP_SPIcontrol | |
URLS = ( | |
"http://wifitest.adafruit.com/testwifi/index.html", | |
"https://httpbin.org/get", | |
) | |
esp_status = [ | |
"WL_IDLE_STATUS", | |
"WL_NO_SSID_AVAIL", | |
"WL_SCAN_COMPLETED", | |
"WL_CONNECTED", | |
"WL_CONNECT_FAILED", | |
"WL_CONNECTION_LOST", | |
"WL_DISCONNECTED", | |
"WL_AP_LISTENING", | |
"WL_AP_CONNECTED", | |
"WL_AP_FAILED", | |
] | |
# wifi (native) | |
def connect_native(radio): | |
while not radio.connected: | |
try: | |
radio.connect(os.getenv("WIFI_SSID"), os.getenv("WIFI_PASSWORD")) | |
except Exception as ex: | |
traceback.print_exception(ex, ex, ex.__traceback__) | |
return radio.ipv4_address | |
# WIZnet | |
wiz_pins = ( | |
( | |
digitalio.DigitalInOut(board.D10), | |
digitalio.DigitalInOut(board.D5), | |
), | |
( | |
digitalio.DigitalInOut(board.D9), | |
digitalio.DigitalInOut(board.TX), | |
), | |
( | |
digitalio.DigitalInOut(board.D6), | |
digitalio.DigitalInOut(board.RX), | |
), | |
) | |
def connect_eth(radio): | |
return radio.pretty_ip(radio.ip_address) | |
# ESP32SPI | |
esp_pins = ( | |
( | |
digitalio.DigitalInOut(board.A0), | |
digitalio.DigitalInOut(board.A1), | |
digitalio.DigitalInOut(board.A2), | |
), | |
( | |
digitalio.DigitalInOut(board.A3), | |
digitalio.DigitalInOut(board.A4), | |
digitalio.DigitalInOut(board.A5), | |
), | |
( | |
digitalio.DigitalInOut(board.D13), | |
digitalio.DigitalInOut(board.D12), | |
digitalio.DigitalInOut(board.D11), | |
), | |
) | |
def connect_esp(radio): | |
ipv4_str = "" | |
while not radio.is_connected: | |
try: | |
radio.connect_AP(os.getenv("WIFI_SSID"), os.getenv("WIFI_PASSWORD")) | |
except Exception as ex: | |
traceback.print_exception(ex, ex, ex.__traceback__) | |
return radio.pretty_ip(radio.ip_address) | |
# General | |
def connect(radio): | |
if radio.__class__.__name__ == "Radio": | |
ipv4_str = connect_native(radio) | |
elif radio.__class__.__name__ == "WIZNET5K": | |
ipv4_str = connect_eth(radio) | |
elif radio.__class__.__name__ == "ESP_SPIcontrol": | |
ipv4_str = connect_esp(radio) | |
return ipv4_str | |
time.sleep(3) # wait for serial | |
radios = [] | |
pools = [] | |
ssl_contexts = [] | |
requestss = [] | |
connection_managers = [] | |
spi = board.SPI() | |
# wifi (native) | |
print(f'Initializing native wifi...') | |
radios.append(wifi.radio) | |
# WIZnet | |
print(f'Initializing ethernets...') | |
for eth in range(0, len(wiz_cs_pins)): | |
print(f'#{eth}') | |
cs = wiz_pins[eth][0] | |
rst = wiz_pins[eth][1] | |
mac = f'de:ad:be:ef:fe:{(251 + eth):02x}' # avoid DHCP collisions | |
while True: | |
try: | |
radio = WIZNET5K(spi, cs, rst, mac=mac, debug=False) | |
print(f'{mac} {radio.pretty_ip(radio.ip_address)}') | |
radios.append(radio) | |
break | |
except Exception as ex: | |
traceback.print_exception(ex, ex, ex.__traceback__) | |
# ESP32SPI | |
print(f'Initializing wifi co-processors...') | |
for esp in range(0, len(esp_pins)): | |
print(f'#{esp}') | |
esp32_cs = esp_pins[esp][0] | |
esp32_reset = esp_pins[esp][1] | |
esp32_ready = esp_pins[esp][2] | |
while True: | |
try: | |
radio = ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset, debug=False) | |
mac = ":".join("%02X" % byte for byte in radio.MAC_address) | |
print(f'{mac} {radio.firmware_version.decode("utf-8")} {esp_status[radio.status]}') | |
radios.append(radio) | |
break | |
except Exception as ex: | |
traceback.print_exception(ex, ex, ex.__traceback__) | |
radio.reset() | |
time.sleep(1) | |
# set up the socketpools and requests | |
print(f'Set up socketpools and requests...') | |
for radio_num in range(0, len(radios)): | |
pools.append(adafruit_connection_manager.get_radio_socketpool(radios[radio_num])) | |
ssl_contexts.append(adafruit_connection_manager.get_radio_ssl_context(radios[radio_num])) | |
requestss.append(adafruit_requests.Session(pools[radio_num], ssl_contexts[radio_num])) | |
connection_managers.append(adafruit_connection_manager.get_connection_manager(pools[radio_num])) | |
# requests on all the radios | |
while True: | |
print(f'{"="*25}') | |
for radio_num in range(0, len(radios)): | |
print(f'{"-"*25}') | |
for url in URLS: | |
print(f"Fetching from {url} via {radios[radio_num].__class__.__name__} #{radio_num} {pools[radio_num]} {connection_managers[radio_num]}") | |
print(f"{connect(radios[radio_num])}") | |
try: | |
with requestss[radio_num].get(url) as resp: | |
print(f'{resp.status_code} {resp.reason.decode()}') | |
except Exception as ex: | |
traceback.print_exception(ex, ex, ex.__traceback__) | |
time.sleep(5) |
Thanks to adafruit/circuitpython#8954
& adafruit/Adafruit_CircuitPython_Wiznet5k#157, Ethernet now supports TLS on boards that have native ssl
.
The key to using more than one Airlift, or more then one WIZnet, are these PRs:
adafruit/Adafruit_CircuitPython_ESP32SPI#198
adafruit/Adafruit_CircuitPython_Wiznet5k#159
adafruit/Adafruit_CircuitPython_ConnectionManager#11
The first two convert the ESP_SPIcontrol and WIZNET objects so they can have multiple instances. Then each device can have its own socket pool rather than one socket pool per device type. The third PR integrates those into ConnectionManager, which gives a simple API to manage multiple sources and sinks of sockets.
The original motivation, and primary use case, for centralizing management of multiple sinks of sockets is code that uses more than one socket-using library. For example, Requests and MQTT previously would have each managed their own sockets, and circumstances could arise where the device prematurely ran out of sockets.
Obviously an extreme example (and finicky due to wiring, power, and density of radios):
"I don't know who would do this in the real world"
But having multiple network links on a microcontroller does have various use cases. Some examples:
wifi
Monitor mode running continuously (one channel or channel-hopping), but also have an AP or a seperate network socket client or server of some kind on another channel or on ethernet