Created
February 9, 2025 10:42
-
-
Save WillemJan/2a036b751f5f24ba86e8772bc57f5ca7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import os | |
import sys | |
import machine | |
import time | |
import hashlib | |
import importlib | |
try: | |
import urequests as requests # MicroPython | |
except ImportError: | |
import requests # CPython (for testing) | |
# Configuration | |
OTA_SERVER = "http://your-ota-server/firmware.py" | |
FIRMWARE_FILE = "firmware.py" | |
CHUNK_SIZE = 1024 | |
UPDATE_CHECK_INTERVAL = 3600 | |
class OTAUpdater: | |
"""Handles Over-The-Air firmware updates.""" | |
def __init__(self, ota_server, firmware_file, chunk_size, update_interval): | |
self.ota_server = ota_server | |
self.firmware_file = firmware_file | |
self.chunk_size = chunk_size | |
self.update_interval = update_interval | |
self._new_firmware_file = "firmware_new.py" | |
self._running = True | |
self._last_update_check = 0 | |
async def _download_firmware(self, url, filename, expected_hash=None): | |
"""Downloads firmware and verifies checksum.""" | |
try: | |
print(f"Downloading firmware from {url}...") | |
response = await asyncio.to_thread(requests.get, url) | |
if response.status_code != 200: | |
print(f"HTTP Error: {response.status_code}") | |
return False | |
with open(filename, "wb") as f: | |
f.write(response.content) | |
print("Firmware downloaded.") | |
if expected_hash: | |
with open(filename, "rb") as f: | |
downloaded_hash = hashlib.sha256(f.read()).hexdigest() | |
if downloaded_hash != expected_hash: | |
print("Checksum mismatch! Corrupted download.") | |
os.remove(filename) # Cleanup | |
return False | |
return True | |
except requests.exceptions.RequestException as e: | |
print(f"Network error: {e}") | |
return False | |
except Exception as e: | |
print(f"Download error: {e}") | |
return False | |
async def _install_firmware(self, filename): | |
"""Installs the downloaded firmware.""" | |
try: | |
print("Installing firmware...") | |
try: | |
os.rename(self.firmware_file, f"{self.firmware_file}.old") | |
except OSError: | |
pass # No previous firmware | |
os.rename(filename, self.firmware_file) | |
print("Firmware installed successfully.") | |
return True | |
except OSError as e: | |
print(f"Installation error: {e}") | |
return False | |
async def _ota_update(self): | |
"""Checks for updates and applies them if needed.""" | |
print("Checking for updates...") | |
try: | |
# Simulated version & hash check (replace with actual server request) | |
server_version = "1.0.2" | |
server_hash = "your_server_hash" | |
try: | |
current_version = importlib.import_module( | |
self.firmware_file.replace(".py", "") | |
).__version__ | |
if server_version <= current_version: | |
print("Firmware is up-to-date.") | |
return False | |
except (AttributeError, ImportError): | |
print("Could not determine current firmware version.") | |
if await self._download_firmware(self.ota_server, self._new_firmware_file, server_hash): | |
if await self._install_firmware(self._new_firmware_file): | |
print("Update successful! Rebooting...") | |
machine.reset() | |
return True | |
else: | |
os.remove(self._new_firmware_file) # Cleanup | |
print("Firmware installation failed.") | |
return False | |
return False | |
except Exception as e: | |
print(f"Update error: {e}") | |
return False | |
async def _run_firmware(self): | |
"""Main firmware loop with periodic OTA updates.""" | |
while self._running: | |
print("Firmware running...") | |
await asyncio.sleep(1) # Main firmware logic | |
now = time.time() | |
if now - self._last_update_check >= self.update_interval: | |
print("Performing periodic update check...") | |
if await self._ota_update(): | |
print("Rebooting after update...") | |
machine.reset() | |
return | |
else: | |
print("No updates or update failed.") | |
self._last_update_check = now | |
print("Firmware stopped gracefully.") | |
async def run(self): | |
"""Entry point: Checks for firmware and runs main loop.""" | |
try: | |
try: | |
os.stat(self.firmware_file) # MicroPython file check | |
print(f"Firmware '{self.firmware_file}' found. Starting...") | |
await self._run_firmware() | |
except OSError: # No firmware found | |
print("No firmware found. Checking for updates...") | |
if await self._ota_update(): | |
print("Rebooting after initial update.") | |
machine.reset() | |
else: | |
print("No firmware available and update failed.") | |
return # Or provide fallback | |
except Exception as e: | |
print(f"Critical error in main loop: {e}") | |
await self._run_firmware() # Run fallback | |
if __name__ == "__main__": | |
updater = OTAUpdater(OTA_SERVER, FIRMWARE_FILE, CHUNK_SIZE, UPDATE_CHECK_INTERVAL) | |
try: | |
asyncio.run(updater.run()) | |
except Exception as e: | |
print(f"Error in main loop: {e}") |
ota_updater.py
import uasyncio as asyncio
import os
import sys
import machine
import time
import hashlib
import importlib
import network
from espnow_comms import ESPNowComms
OTA configuration.
OTA_SERVER = "http://your-ota-server.com/firmware.py"
FIRMWARE_FILE = "firmware.py"
CHUNK_SIZE = 1024
UPDATE_CHECK_INTERVAL = 3600
Wi‑Fi AP configuration for signal strength measurement.
CENTRAL_AP_SSID = "CentralAP" # Set to the SSID of your central Wi‑Fi access point.
class OTAUpdater:
"""
OTA updater with integrated ESP‑NOW swarm management.
Features include:
- OTA firmware update via HTTP.
- Asynchronous Wi‑Fi scanning for signal strength (for triangulation).
- Gossip messages for firmware info, node join/leave, and periodic chirps.
- A simple raft/gossip leader election based on relative signal strength.
This design is inspired by recent MicroPython swarm research for robustness.
"""
def __init__(self, ota_server, firmware_file, chunk_size=1024, update_interval=3600):
self.ota_server = ota_server
self.firmware_file = firmware_file
self.chunk_size = chunk_size
self.update_interval = update_interval
self._new_firmware_file = "firmware_new.py"
self._running = True
self._last_update_check = 0
# Initialize ESP‑NOW communications.
self.espnow = ESPNowComms()
self.device_id = machine.unique_id() # Unique device ID (bytes).
self.device_id_str = self.device_id.hex() if hasattr(self.device_id, "hex") else str(self.device_id)
self.is_leader = False # All nodes start as non‑leader.
# Swarm status: {device_id: {firmware_hash, firmware_version, signal_strength, last_seen, etc.}}
self.swarm_status = {}
# Current measured signal strength to the central AP (in dBm).
self.current_signal_strength = -100
# Record when this node joined.
self.join_time = time.time()
def compute_firmware_hash(self):
"""
Computes the SHA‑256 hash of the current firmware file.
"""
try:
with open(self.firmware_file, "rb") as f:
return hashlib.sha256(f.read()).hexdigest()
except Exception as e:
print(f"Error computing firmware hash: {e}")
return "none"
def estimate_distance(self, rssi, tx_power=-40, path_loss_exponent=2.0):
"""
Estimates distance (in meters) using the log‑distance path loss model.
Formula: distance = 10 ^ ((tx_power - rssi) / (10 * n))
Parameters:
- rssi: Received Signal Strength Indicator (dBm).
- tx_power: Transmit power (dBm). Default is -40 dBm.
- path_loss_exponent: Environmental factor (typically 2 for free space, higher indoors).
"""
try:
distance = 10 ** ((tx_power - rssi) / (10 * path_loss_exponent))
return distance
except Exception as e:
print(f"Error estimating distance: {e}")
return None
async def measure_signal_strength(self):
"""
Periodically scans for the central Wi‑Fi AP and updates the node's signal strength.
"""
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
while True:
try:
scan_results = wlan.scan() # Returns tuples: (ssid, bssid, channel, RSSI, security, hidden)
found = None
for net in scan_results:
ssid, bssid, channel, rssi, security, hidden = net
ssid_str = ssid.decode() if isinstance(ssid, bytes) else ssid
if ssid_str == CENTRAL_AP_SSID:
found = rssi
break
if found is not None:
self.current_signal_strength = found
else:
self.current_signal_strength = -100
except Exception as e:
print(f"Signal strength measurement error: {e}")
await asyncio.sleep(10)
async def broadcast_join(self):
"""
Broadcast a "node_join" message when a node joins the swarm.
"""
join_message = {
"type": "node_join",
"device_id": self.device_id_str,
"timestamp": time.time(),
"firmware_hash": self.compute_firmware_hash(),
"firmware_version": "1.0.2", # Replace with dynamic version if available.
"signal_strength": self.current_signal_strength,
"uptime": time.ticks_ms()
}
self.espnow.broadcast(join_message)
print("Broadcasted node_join:", join_message)
async def broadcast_leave(self):
"""
Broadcast a "node_leave" message when a node is leaving the swarm.
"""
leave_message = {
"type": "node_leave",
"device_id": self.device_id_str,
"timestamp": time.time(),
"uptime": time.ticks_ms()
}
self.espnow.broadcast(leave_message)
print("Broadcasted node_leave:", leave_message)
async def broadcast_chirp(self):
"""
Periodically broadcast a chirp with current node status.
Includes firmware info, measured RSSI, estimated distance, and uptime.
"""
chirp_message = {
"type": "chirp",
"device_id": self.device_id_str,
"timestamp": time.time(),
"firmware_hash": self.compute_firmware_hash(),
"firmware_version": "1.0.2",
"signal_strength": self.current_signal_strength,
"distance_estimate": self.estimate_distance(self.current_signal_strength),
"uptime": time.ticks_ms()
}
self.espnow.broadcast(chirp_message)
print("Broadcasted chirp:", chirp_message)
async def broadcast_firmware_info(self):
"""
Broadcast firmware info (hash and version) so peers can compare firmware.
"""
firmware_info = {
"type": "firmware_info",
"device_id": self.device_id_str,
"firmware_hash": self.compute_firmware_hash(),
"firmware_version": "1.0.2"
}
self.espnow.broadcast(firmware_info)
print("Broadcasted firmware_info:", firmware_info)
async def _download_firmware(self, url, filename, expected_hash=None):
"""
Downloads firmware from the OTA server asynchronously.
"""
try:
import urequests as requests # Use MicroPython's urequests.
print(f"Downloading firmware from {url} ...")
response = await asyncio.to_thread(requests.get, url)
if response.status_code != 200:
print(f"HTTP error: {response.status_code}")
return False
with open(filename, "wb") as f:
f.write(response.content)
print("Firmware downloaded.")
if expected_hash:
with open(filename, "rb") as f:
downloaded_hash = hashlib.sha256(f.read()).hexdigest()
if downloaded_hash != expected_hash:
print("Checksum mismatch! Firmware download is corrupted.")
os.remove(filename)
return False
return True
except Exception as e:
print(f"Download error: {e}")
return False
async def _install_firmware(self, filename):
"""
Installs the downloaded firmware.
Backs up the existing firmware before replacing it.
"""
try:
print("Installing firmware...")
try:
os.rename(self.firmware_file, f"{self.firmware_file}.old")
except OSError:
pass
os.rename(filename, self.firmware_file)
print("Firmware installed successfully.")
return True
except OSError as e:
print(f"Installation error: {e}")
return False
async def _ota_update(self):
"""
Checks for OTA updates by comparing firmware versions and hashes.
Downloads and installs new firmware if necessary.
"""
print("Checking for OTA update...")
try:
# Simulated server version and hash (replace with your server logic).
server_version = "1.0.2"
server_hash = "your_server_hash"
try:
module_name = self.firmware_file.replace(".py", "")
current_module = importlib.import_module(module_name)
current_version = getattr(current_module, "__version__", "0.0.0")
if server_version <= current_version:
print("Firmware is up-to-date.")
return False
except Exception:
print("Could not determine current firmware version.")
if await self._download_firmware(self.ota_server, self._new_firmware_file, server_hash):
if await self._install_firmware(self._new_firmware_file):
print("OTA update successful! Rebooting...")
machine.reset()
return True
else:
os.remove(self._new_firmware_file)
print("Firmware installation failed.")
return False
return False
except Exception as e:
print(f"OTA update error: {e}")
return False
async def handle_incoming_messages(self):
"""
Processes incoming ESP‑NOW messages.
Updates local swarm status with node join/leave, chirp, and firmware info messages.
Also responds to firmware requests.
"""
async for mac, msg in self.espnow.receive():
msg_type = msg.get("type")
device_id = msg.get("device_id")
if msg_type == "node_join":
print(f"Node join received from {device_id}: {msg}")
self.swarm_status[device_id] = msg
elif msg_type == "node_leave":
print(f"Node leave received from {device_id}: {msg}")
if device_id in self.swarm_status:
del self.swarm_status[device_id]
elif msg_type == "chirp":
print(f"Chirp received from {device_id}: {msg}")
self.swarm_status[device_id] = msg
elif msg_type == "firmware_info":
print(f"Firmware info received from {device_id}: {msg}")
self.swarm_status[device_id] = msg
elif msg_type == "firmware_request":
print(f"Firmware request received from {device_id}")
await self.send_firmware_to_peer(mac)
# Extend with additional message types as needed.
async def send_firmware_to_peer(self, peer_mac):
"""
Sends firmware to a peer in small chunks via ESP‑NOW.
(Here binary data is encoded with 'latin1' for JSON transport.)
"""
try:
with open(self.firmware_file, "rb") as f:
while True:
chunk = f.read(250)
if not chunk:
break
data = {
"type": "firmware_chunk",
"data": chunk.decode("latin1")
}
self.espnow.send(peer_mac, data)
await asyncio.sleep(0.1)
print(f"Firmware sent to peer {peer_mac}.")
except Exception as e:
print(f"Error sending firmware to peer: {e}")
async def elect_leader(self):
"""
Periodically examines swarm status and elects a leader.
The candidate with the highest signal strength (i.e. closest to the central AP) is elected.
Also gossips join/leave events so every node is aware of the current swarm reach.
"""
while True:
self_id = self.device_id_str
self_info = {
"device_id": self_id,
"firmware_hash": self.compute_firmware_hash(),
"firmware_version": "1.0.2",
"signal_strength": self.current_signal_strength,
"last_seen": time.time(),
"uptime": time.ticks_ms()
}
candidates = {self_id: self_info}
# Include info from active peers.
for device, info in self.swarm_status.items():
# Assume a valid timestamp is in the "timestamp" or "last_seen" field.
last = info.get("timestamp", info.get("last_seen", time.time()))
if time.time() - last < 30:
candidates[device] = info
leader_id = None
best_strength = -999
for device, info in candidates.items():
strength = info.get("signal_strength", -999)
if strength is None:
strength = -999
if strength > best_strength:
best_strength = strength
leader_id = device
if leader_id == self_id:
if not self.is_leader:
print("I have been elected as leader!")
self.is_leader = True
else:
if self.is_leader:
print("I am no longer the leader.")
self.is_leader = False
print("Current leader:", leader_id, "with signal strength:", best_strength)
await asyncio.sleep(30)
async def _run_firmware(self):
"""
Main firmware loop:
- Simulates primary firmware functionality.
- Periodically checks for OTA updates.
- Broadcasts chirps and firmware info.
"""
while self._running:
print("Firmware running...")
await asyncio.sleep(1)
now = time.time()
if now - self._last_update_check >= self.update_interval:
print("Performing OTA update check...")
if await self._ota_update():
machine.reset()
return
self._last_update_check = now
if not self.is_leader:
await self.broadcast_chirp()
await self.broadcast_firmware_info()
async def run(self):
"""
Main entry point:
- Launches background tasks for incoming message handling,
signal strength measurement, leader election, and join announcement.
- Checks for firmware file and enters the main firmware loop.
"""
# Start background tasks.
asyncio.create_task(self.handle_incoming_messages())
asyncio.create_task(self.measure_signal_strength())
asyncio.create_task(self.elect_leader())
# Broadcast node join on startup.
await self.broadcast_join()
try:
os.stat(self.firmware_file)
print(f"Firmware '{self.firmware_file}' found. Starting firmware loop.")
await self._run_firmware()
except OSError:
print("No firmware found. Attempting OTA update...")
if await self._ota_update():
machine.reset()
else:
print("OTA update failed. Exiting.")
# Optionally, on shutdown broadcast node leave.
# await self.broadcast_leave()
if name == "main":
updater = OTAUpdater(OTA_SERVER, FIRMWARE_FILE, CHUNK_SIZE, UPDATE_CHECK_INTERVAL)
try:
asyncio.run(updater.run())
except Exception as e:
print(f"Error in main loop: {e}")
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
import uasyncio as asyncio
import os
import sys
import machine
import time
import hashlib
import importlib
from espnow_comms import ESPNowComms
class OTAUpdater:
"""
OTA updater class with integrated ESP‑NOW communication.
Non‑leader nodes periodically broadcast chirps to help determine the swarm boundaries.
"""
if name == "main":
updater = OTAUpdater("http://your-ota-server.com/firmware.py", "firmware.py")
try:
asyncio.run(updater.run())
except Exception as e:
print(f"Error in main loop: {e}")