Last active
July 2, 2021 06:11
-
-
Save spacemanspiff2007/33ba627a3c800ce6b999d6d9f1c2db18 to your computer and use it in GitHub Desktop.
micropython interface
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from uasyncio import create_task, sleep_ms | |
class InterfaceBase: | |
def __init__(self, interface, auto_reconnect: bool = False): | |
self._if = interface | |
self.first_connect = True | |
self.is_connected = False | |
self.is_connecting = False | |
self.has_connected = False | |
self.auto_reconnect = auto_reconnect # still wip | |
self._task = None | |
self._last_objs = None | |
self._cb_con = [] | |
self._cb_discon = [] | |
# Use this to register a callback that gets called when the inferface changes to connected | |
def on_connect(self, coro): | |
if coro in self._cb_con: | |
raise ValueError('Coro already registered!') | |
self._cb_con.append(coro) | |
# Use this to register a callback that gets called when the inferface changes to disconnected | |
def on_disconnect(self, coro): | |
if coro in self._cb_discon: | |
raise ValueError('Coro already registered!') | |
self._cb_discon.append(coro) | |
# override this for interface logic | |
async def do_connect(self): | |
pass | |
# override this for interface logic | |
async def do_disconnect(self): | |
pass | |
# possiblity for custom error handler | |
def on_error(self, err: Exception): | |
print('Error: {} {}'.format(err, type(err))) | |
async def _run_cbs(self, objs: list): | |
for coro in objs: | |
try: | |
await coro() | |
except Exception as e: | |
self.on_error(e) | |
self._task = None | |
def _create_task(self, objs: list): | |
if self._last_objs is objs: | |
return None | |
self._last_objs = objs | |
if self._task is not None: | |
self._task.cancel() | |
self._task = create_task(self._run_cbs(objs)) | |
async def disconnect(self): | |
self.is_connected = False | |
try: | |
await self.do_disconnect() | |
except Exception as e: | |
self.on_error(e) | |
if not self._if.isconnected(): | |
self._create_task(self._cb_discon) | |
async def connect(self): | |
if self.is_connecting: | |
return None | |
self.is_connecting = True | |
err = None | |
try: | |
await self.do_connect() | |
except Exception as e: | |
err = e | |
self.first_connect = False | |
self.is_connecting = False | |
if err is not None: | |
self.on_error(err) | |
if self._if.isconnected(): | |
self.is_connected = True | |
self.has_connected = True | |
self._create_task(self._cb_con) | |
def check_connected(self) -> bool: | |
if self.is_connecting: | |
return False | |
was = self.is_connected | |
now = self._if.isconnected() | |
if not now: | |
self.is_connected = False | |
if was and not now: | |
self._create_task(self._cb_discon) | |
# option to automatically reconnect | |
if self.auto_reconnect: | |
create_task(self.connect()) | |
elif now and not was: | |
self._create_task(self._cb_con) | |
return self.is_connected | |
async def check_stable(self, secs: int) -> bool: | |
for _ in range(secs * 2): | |
was = self.is_connected | |
self.check_connected() | |
now = self.is_connected | |
if was != now: | |
return False | |
await sleep_ms(500) | |
return True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import machine | |
import network | |
from uasyncio import create_task, sleep, sleep_ms | |
class IfWifi(InterfaceBase): | |
def __init__(self): | |
super().__init__(network.WLAN(network.STA_IF), auto_reconnect=True) | |
self.on_disconnect(self._on_disconnect) | |
async def _on_disconnect(self): | |
create_task(self.connect()) # This will automatically try to reconnect | |
async def do_connect(self): | |
_ssid = CONFIG_WIFI['ssid'] | |
_pass = CONFIG_WIFI['pass'] | |
if self._if.isconnected(): | |
return True | |
started = time() | |
if not self._if.active(): | |
self._if.active(True) | |
if self.first_connect: | |
print('Scanning for', '"' + _ssid + '":') | |
for wifi in self._if.scan(): | |
wifi_name = wifi[0].decode() | |
print(' - {} ({}dBm){}'.format(wifi_name, wifi[3], " <--" if wifi_name == _ssid else "")) | |
self._if.connect(_ssid, _pass) | |
while self._if.status() == network.STAT_CONNECTING and time() - started < _CON_TIME_LAN: | |
await sleep_ms(200) | |
if self._if.isconnected(): | |
_show_con('WLAN', *self._if.ifconfig()) | |
return True | |
print('Wifi failed!') | |
self._if.active(False) | |
return False | |
async def do_disconnect(self): | |
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from interface_wifi import IfWifi | |
WIFI = IfWifi() | |
async def disconnected(): | |
print('disconnected) | |
async def connected(): | |
print('connected) | |
# subscribe callbacks through base class | |
WIFI.on_connect(connected) | |
WIFI.on_disconnect(disconnected) | |
# interface that can be used by the code | |
await WIFI.connect() | |
await WIFI.disconnect() | |
await WIFI.check_connected() | |
# vars that make additional logic easier | |
WIFI.is_connected | |
WIFI.has_connected | |
WIFI.first_connect | |
WIFI.is_connecting |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment