Skip to content

Instantly share code, notes, and snippets.

@rdlaner
Last active January 28, 2024 21:19
Show Gist options
  • Save rdlaner/ea6f84b36b5ce40274edcbf1984e1b59 to your computer and use it in GitHub Desktop.
Save rdlaner/ea6f84b36b5ce40274edcbf1984e1b59 to your computer and use it in GitHub Desktop.
wifi_and_ping
# Borrowed ping function from µPing:
#
# µPing (MicroPing) for MicroPython
# copyright (c) 2018 Shawwwn <shawwwn1@gmail.com>
# License: MIT
# Internet Checksum Algorithm
# Author: Olav Morken
# https://github.com/olavmrk/python-ping/blob/master/ping.py
# @data: bytes
import gc
import network
import time
ssid = <ENTER_SSID_HERE>
password = <ENTER_PASSWORD_HERE>
sta = network.WLAN(network.STA_IF)
sta.active(True)
def checksum(data):
if len(data) & 0x1: # Odd number of bytes
data += b'\0'
cs = 0
for pos in range(0, len(data), 2):
b1 = data[pos]
b2 = data[pos + 1]
cs += (b1 << 8) + b2
while cs >= 0x10000:
cs = (cs & 0xffff) + (cs >> 16)
cs = ~cs & 0xffff
return cs
def ping(host, count=4, timeout=5000, interval=10, quiet=False, size=64):
import utime
import uselect
import uctypes
import usocket
import ustruct
import urandom
# prepare packet
assert size >= 16, "pkt size too small"
pkt = b'Q'*size
pkt_desc = {
"type": uctypes.UINT8 | 0,
"code": uctypes.UINT8 | 1,
"checksum": uctypes.UINT16 | 2,
"id": uctypes.UINT16 | 4,
"seq": uctypes.INT16 | 6,
"timestamp": uctypes.UINT64 | 8,
} # packet header descriptor
h = uctypes.struct(uctypes.addressof(pkt), pkt_desc, uctypes.BIG_ENDIAN)
h.type = 8 # ICMP_ECHO_REQUEST
h.code = 0
h.checksum = 0
h.id = urandom.getrandbits(16)
h.seq = 1
# init socket
sock = usocket.socket(usocket.AF_INET, usocket.SOCK_RAW, 1)
sock.setblocking(0)
sock.settimeout(timeout/1000)
addr = usocket.getaddrinfo(host, 1)[0][-1][0] # ip address
sock.connect((addr, 1))
not quiet and print("PING %s (%s): %u data bytes" % (host, addr, len(pkt)))
seqs = list(range(1, count+1)) # [1,2,...,count]
c = 1
t = 0
n_trans = 0
n_recv = 0
finish = False
while t < timeout:
if t==interval and c<=count:
# send packet
h.checksum = 0
h.seq = c
h.timestamp = utime.ticks_us()
h.checksum = checksum(pkt)
if sock.send(pkt) == size:
n_trans += 1
t = 0 # reset timeout
else:
print(f"sock sent failed: {size}")
seqs.remove(c)
c += 1
# recv packet
while 1:
socks, _, _ = uselect.select([sock], [], [], 0)
if socks:
resp = socks[0].recv(4096)
resp_mv = memoryview(resp)
h2 = uctypes.struct(uctypes.addressof(resp_mv[20:]), pkt_desc, uctypes.BIG_ENDIAN)
# TODO: validate checksum (optional)
seq = h2.seq
if h2.type==0 and h2.id==h.id and (seq in seqs): # 0: ICMP_ECHO_REPLY
t_elasped = (utime.ticks_us()-h2.timestamp) / 1000
ttl = ustruct.unpack('!B', resp_mv[8:9])[0] # time-to-live
n_recv += 1
not quiet and print("%u bytes from %s: icmp_seq=%u, ttl=%u, time=%f ms" % (len(resp), addr, seq, ttl, t_elasped))
seqs.remove(seq)
if len(seqs) == 0:
finish = True
break
else:
break
if finish:
break
utime.sleep_ms(1)
t += 1
# close
sock.close()
ret = (n_trans, n_recv)
not quiet and print("%u packets transmitted, %u packets received" % (n_trans, n_recv))
return (n_trans, n_recv)
def wait_for(func, timeout_secs=20):
timeout_time = time.time() + timeout_secs
while not func():
if time.time() >= timeout_time:
raise RuntimeError("noop")
time.sleep_ms(10)
def disconnect():
print("Disconnecting...")
sta.disconnect()
try:
wait_for(lambda: not sta.isconnected())
except RuntimeError as exc:
raise RuntimeError("Timeout trying to disconnect Wifi") from exc
def connect():
max_connect_attempts = 3
success = True
if not sta.isconnected():
attempt = 1
while not sta.isconnected():
print(f"Connecting to AP: {ssid}")
networks = sta.scan()
# Each network is a tuple with the following data:
# (ssid, bssid, channel, RSSI, security, hidden)
networks = sorted(
networks, key=lambda net: net[3], reverse=True)
for net in networks:
print(f"ssid: {net[0]}, rssi: {net[3]}")
try:
sta.connect(ssid, password)
except (RuntimeError, OSError) as exc:
print(f"Could not connect to wifi AP: {ssid}. exc: {exc}")
try:
wait_for(sta.isconnected)
except RuntimeError as exc:
print(f"Timed out connecting to wifi AP: {ssid}. exc: {exc}")
if not sta.isconnected():
if attempt >= max_connect_attempts:
break
print("Retrying in 3 seconds...")
attempt += 1
time.sleep(3)
gc.collect()
if not sta.isconnected():
print(f"Failed to connect to Wifi after {max_connect_attempts} attempts!")
success = False
else:
print(f"Wifi is connected: {sta.ifconfig()}")
else:
print("Wifi is already connected")
disconnect()
connect()
ping('8.8.8.8', count=10)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment