Skip to content

Instantly share code, notes, and snippets.

@sourceperl
Last active December 23, 2022 16:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sourceperl/756fbbfb67df23e62dee5f263f0b48f4 to your computer and use it in GitHub Desktop.
Save sourceperl/756fbbfb67df23e62dee5f263f0b48f4 to your computer and use it in GitHub Desktop.
A micropython example of HTTP (json) data retrieval with handling of wifi connection errors on the Pico W board.
import uasyncio as aio
from machine import Pin
import network
import rp2
import errno
import urequests
import gc
from private_data import WIFI_SSID, WIFI_KEY
# some const
MODE_DEBUG = False
# log levels
LVL_ERROR = 40
LVL_WARNING = 30
LVL_INFO = 20
LVL_DEBUG = 10
LVL_NOTSET = 0
# some class
class BasicLog:
def __init__(self, lvl=LVL_DEBUG):
self.lvl = lvl
def log(self, msg, lvl=LVL_NOTSET):
if lvl >= self.lvl:
print(msg)
def debug(self, msg):
self.log(f'DEBUG:{msg}', lvl=LVL_DEBUG)
def info(self, msg):
self.log(f'INFO:{msg}', lvl=LVL_INFO)
def warning(self, msg):
self.log(f'WARN:{msg}', lvl=LVL_WARNING)
def error(self, msg):
self.log(f'ERROR:{msg}', lvl=LVL_ERROR)
# asyncio tasks
async def radio_task():
while True:
# wlan connect (ensure wlan is disconnect before)
log.info('wlan connect')
wlan.disconnect()
wlan.connect(WIFI_SSID, WIFI_KEY)
# wait for connection
log.info('waiting for connection')
connect_try = 0
while connect_try < 10:
log.debug(f'{connect_try=}')
connect_try += 1
if wlan.status() == network.STAT_GOT_IP:
log.info(f'wlan connected, @IP={wlan.ifconfig()[0]}')
break
await aio.sleep_ms(1000)
# periodic connection check
while wlan.status() == network.STAT_GOT_IP:
await aio.sleep_ms(1000)
log.warning('wlan seem disconnected, restart connect loop')
async def get_task():
# send LED command, escape this loop for wifi reconnect on nedd
while True:
try:
if wlan.isconnected():
log.info('get ISS data')
# urequests is not write for asyncio, so we can block here (max time = timeout delay)
r = urequests.get('http://api.open-notify.org/iss-now.json', timeout=4.0)
log.info(f'ISS data: {r.json()}')
gc.collect()
except OSError as e:
log.warning(f'error occur: {e}')
if e.errno == errno.EINPROGRESS:
wlan.disconnect()
await aio.sleep_ms(5000)
if __name__ == '__main__':
# init I/O
led = Pin('LED', Pin.OUT)
# init logging
log = BasicLog(lvl=LVL_DEBUG if MODE_DEBUG else LVL_INFO)
# wlan setup
rp2.country('FR')
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
# disable powersaving mode
wlan.config(pm=0xa11140)
# init asyncio tasks and run
loop = aio.get_event_loop()
loop.create_task(radio_task())
loop.create_task(get_task())
loop.run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment