Skip to content

Instantly share code, notes, and snippets.

@spacemanspiff2007
Last active July 2, 2021 06:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save spacemanspiff2007/33ba627a3c800ce6b999d6d9f1c2db18 to your computer and use it in GitHub Desktop.
Save spacemanspiff2007/33ba627a3c800ce6b999d6d9f1c2db18 to your computer and use it in GitHub Desktop.
micropython interface
from uasyncio import create_task, sleep_ms
class InterfaceBase:
def __init__(self, interface, auto_reconnect: bool = False):
self._if = interface
self.first_connect = True
self.is_connected = False
self.is_connecting = False
self.has_connected = False
self.auto_reconnect = auto_reconnect # still wip
self._task = None
self._last_objs = None
self._cb_con = []
self._cb_discon = []
# Use this to register a callback that gets called when the inferface changes to connected
def on_connect(self, coro):
if coro in self._cb_con:
raise ValueError('Coro already registered!')
self._cb_con.append(coro)
# Use this to register a callback that gets called when the inferface changes to disconnected
def on_disconnect(self, coro):
if coro in self._cb_discon:
raise ValueError('Coro already registered!')
self._cb_discon.append(coro)
# override this for interface logic
async def do_connect(self):
pass
# override this for interface logic
async def do_disconnect(self):
pass
# possiblity for custom error handler
def on_error(self, err: Exception):
print('Error: {} {}'.format(err, type(err)))
async def _run_cbs(self, objs: list):
for coro in objs:
try:
await coro()
except Exception as e:
self.on_error(e)
self._task = None
def _create_task(self, objs: list):
if self._last_objs is objs:
return None
self._last_objs = objs
if self._task is not None:
self._task.cancel()
self._task = create_task(self._run_cbs(objs))
async def disconnect(self):
self.is_connected = False
try:
await self.do_disconnect()
except Exception as e:
self.on_error(e)
if not self._if.isconnected():
self._create_task(self._cb_discon)
async def connect(self):
if self.is_connecting:
return None
self.is_connecting = True
err = None
try:
await self.do_connect()
except Exception as e:
err = e
self.first_connect = False
self.is_connecting = False
if err is not None:
self.on_error(err)
if self._if.isconnected():
self.is_connected = True
self.has_connected = True
self._create_task(self._cb_con)
def check_connected(self) -> bool:
if self.is_connecting:
return False
was = self.is_connected
now = self._if.isconnected()
if not now:
self.is_connected = False
if was and not now:
self._create_task(self._cb_discon)
# option to automatically reconnect
if self.auto_reconnect:
create_task(self.connect())
elif now and not was:
self._create_task(self._cb_con)
return self.is_connected
async def check_stable(self, secs: int) -> bool:
for _ in range(secs * 2):
was = self.is_connected
self.check_connected()
now = self.is_connected
if was != now:
return False
await sleep_ms(500)
return True
import machine
import network
from uasyncio import create_task, sleep, sleep_ms
class IfWifi(InterfaceBase):
def __init__(self):
super().__init__(network.WLAN(network.STA_IF), auto_reconnect=True)
self.on_disconnect(self._on_disconnect)
async def _on_disconnect(self):
create_task(self.connect()) # This will automatically try to reconnect
async def do_connect(self):
_ssid = CONFIG_WIFI['ssid']
_pass = CONFIG_WIFI['pass']
if self._if.isconnected():
return True
started = time()
if not self._if.active():
self._if.active(True)
if self.first_connect:
print('Scanning for', '"' + _ssid + '":')
for wifi in self._if.scan():
wifi_name = wifi[0].decode()
print(' - {} ({}dBm){}'.format(wifi_name, wifi[3], " <--" if wifi_name == _ssid else ""))
self._if.connect(_ssid, _pass)
while self._if.status() == network.STAT_CONNECTING and time() - started < _CON_TIME_LAN:
await sleep_ms(200)
if self._if.isconnected():
_show_con('WLAN', *self._if.ifconfig())
return True
print('Wifi failed!')
self._if.active(False)
return False
async def do_disconnect(self):
pass
from interface_wifi import IfWifi
WIFI = IfWifi()
async def disconnected():
print('disconnected)
async def connected():
print('connected)
# subscribe callbacks through base class
WIFI.on_connect(connected)
WIFI.on_disconnect(disconnected)
# interface that can be used by the code
await WIFI.connect()
await WIFI.disconnect()
await WIFI.check_connected()
# vars that make additional logic easier
WIFI.is_connected
WIFI.has_connected
WIFI.first_connect
WIFI.is_connecting
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment