Skip to content

Instantly share code, notes, and snippets.

@anecdata
Last active April 26, 2024 18:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anecdata/8e8db7d325df23f438e039b0ec8f615b to your computer and use it in GitHub Desktop.
Save anecdata/8e8db7d325df23f438e039b0ec8f615b to your computer and use it in GitHub Desktop.
Connection Manager: multiple devices of the same radio rype (Connection Manager PR#11)
import time
import os
import traceback
import board
import busio
import digitalio
import os
import adafruit_connection_manager
import adafruit_requests
from adafruit_esp32spi.adafruit_esp32spi import ESP_SPIcontrol
import adafruit_esp32spi.adafruit_esp32spi_socketpool as socketpool
URLS = ("http://wifitest.adafruit.com/testwifi/index.html", "https://httpbin.org/get")
# TWO Airlift FeatherWings (not stacked)
pins = [
[
digitalio.DigitalInOut(board.A0),
digitalio.DigitalInOut(board.A1),
digitalio.DigitalInOut(board.A2),
],
[
digitalio.DigitalInOut(board.A3),
digitalio.DigitalInOut(board.A4),
digitalio.DigitalInOut(board.A5),
],
]
def connect_esp(radio):
while not radio.is_connected:
try:
radio.connect_AP(os.getenv("WIFI_SSID"), os.getenv("WIFI_PASSWORD"))
except Exception as ex:
traceback.print_exception(ex, ex, ex.__traceback__)
ipv4_str = radio.pretty_ip(radio.ip_address)
return ipv4_str
spi = board.SPI()
radios = []
pools = []
ssl_contexts = []
requestss = []
connection_managers = []
for airlift in range(0, len(pins)):
esp32_cs = pins[airlift][0]
esp32_reset = pins[airlift][1]
esp32_ready = pins[airlift][2]
radios.append(ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset, debug=False))
pools.append(adafruit_connection_manager.get_radio_socketpool(radios[airlift])) # only the first radio is seen at the router
#pools.append( socketpool.SocketPool(radios[airlift]) ) # both radios are seen at the router
ssl_contexts.append(adafruit_connection_manager.get_radio_ssl_context(radios[airlift]))
requestss.append(adafruit_requests.Session(pools[airlift], ssl_contexts[airlift]))
#
connection_managers.append(adafruit_connection_manager.get_connection_manager(pools[airlift]))
while True:
print(f'{"="*25}')
for airlift in range(0, len(pins)):
for url in URLS:
print(f'{"-"*25}')
print(f"Fetching from {url} via ESP32SPI {radios[airlift]} {connection_managers[airlift]} {connect_esp(radios[airlift])} ", end="")
with requestss[airlift].get(url) as resp:
print(f'{resp.status_code} {resp.reason.decode()}')
time.sleep(5)
@anecdata
Copy link
Author

esp32-s3 + esp32spi*2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment