Skip to content

Instantly share code, notes, and snippets.

@WillemJan
Created February 9, 2025 10:42
Show Gist options
  • Save WillemJan/2a036b751f5f24ba86e8772bc57f5ca7 to your computer and use it in GitHub Desktop.
Save WillemJan/2a036b751f5f24ba86e8772bc57f5ca7 to your computer and use it in GitHub Desktop.
import asyncio
import os
import sys
import machine
import time
import hashlib
import importlib
try:
import urequests as requests # MicroPython
except ImportError:
import requests # CPython (for testing)
# Configuration
OTA_SERVER = "http://your-ota-server/firmware.py"
FIRMWARE_FILE = "firmware.py"
CHUNK_SIZE = 1024
UPDATE_CHECK_INTERVAL = 3600
class OTAUpdater:
"""Handles Over-The-Air firmware updates."""
def __init__(self, ota_server, firmware_file, chunk_size, update_interval):
self.ota_server = ota_server
self.firmware_file = firmware_file
self.chunk_size = chunk_size
self.update_interval = update_interval
self._new_firmware_file = "firmware_new.py"
self._running = True
self._last_update_check = 0
async def _download_firmware(self, url, filename, expected_hash=None):
"""Downloads firmware and verifies checksum."""
try:
print(f"Downloading firmware from {url}...")
response = await asyncio.to_thread(requests.get, url)
if response.status_code != 200:
print(f"HTTP Error: {response.status_code}")
return False
with open(filename, "wb") as f:
f.write(response.content)
print("Firmware downloaded.")
if expected_hash:
with open(filename, "rb") as f:
downloaded_hash = hashlib.sha256(f.read()).hexdigest()
if downloaded_hash != expected_hash:
print("Checksum mismatch! Corrupted download.")
os.remove(filename) # Cleanup
return False
return True
except requests.exceptions.RequestException as e:
print(f"Network error: {e}")
return False
except Exception as e:
print(f"Download error: {e}")
return False
async def _install_firmware(self, filename):
"""Installs the downloaded firmware."""
try:
print("Installing firmware...")
try:
os.rename(self.firmware_file, f"{self.firmware_file}.old")
except OSError:
pass # No previous firmware
os.rename(filename, self.firmware_file)
print("Firmware installed successfully.")
return True
except OSError as e:
print(f"Installation error: {e}")
return False
async def _ota_update(self):
"""Checks for updates and applies them if needed."""
print("Checking for updates...")
try:
# Simulated version & hash check (replace with actual server request)
server_version = "1.0.2"
server_hash = "your_server_hash"
try:
current_version = importlib.import_module(
self.firmware_file.replace(".py", "")
).__version__
if server_version <= current_version:
print("Firmware is up-to-date.")
return False
except (AttributeError, ImportError):
print("Could not determine current firmware version.")
if await self._download_firmware(self.ota_server, self._new_firmware_file, server_hash):
if await self._install_firmware(self._new_firmware_file):
print("Update successful! Rebooting...")
machine.reset()
return True
else:
os.remove(self._new_firmware_file) # Cleanup
print("Firmware installation failed.")
return False
return False
except Exception as e:
print(f"Update error: {e}")
return False
async def _run_firmware(self):
"""Main firmware loop with periodic OTA updates."""
while self._running:
print("Firmware running...")
await asyncio.sleep(1) # Main firmware logic
now = time.time()
if now - self._last_update_check >= self.update_interval:
print("Performing periodic update check...")
if await self._ota_update():
print("Rebooting after update...")
machine.reset()
return
else:
print("No updates or update failed.")
self._last_update_check = now
print("Firmware stopped gracefully.")
async def run(self):
"""Entry point: Checks for firmware and runs main loop."""
try:
try:
os.stat(self.firmware_file) # MicroPython file check
print(f"Firmware '{self.firmware_file}' found. Starting...")
await self._run_firmware()
except OSError: # No firmware found
print("No firmware found. Checking for updates...")
if await self._ota_update():
print("Rebooting after initial update.")
machine.reset()
else:
print("No firmware available and update failed.")
return # Or provide fallback
except Exception as e:
print(f"Critical error in main loop: {e}")
await self._run_firmware() # Run fallback
if __name__ == "__main__":
updater = OTAUpdater(OTA_SERVER, FIRMWARE_FILE, CHUNK_SIZE, UPDATE_CHECK_INTERVAL)
try:
asyncio.run(updater.run())
except Exception as e:
print(f"Error in main loop: {e}")
@WillemJan
Copy link
Author

import uasyncio as asyncio
import os
import sys
import machine
import time
import hashlib
import importlib
from espnow_comms import ESPNowComms

class OTAUpdater:
"""
OTA updater class with integrated ESP‑NOW communication.
Non‑leader nodes periodically broadcast chirps to help determine the swarm boundaries.
"""

def __init__(self, ota_server, firmware_file, chunk_size=1024, update_interval=3600):
    self.ota_server = ota_server
    self.firmware_file = firmware_file
    self.chunk_size = chunk_size
    self.update_interval = update_interval
    self._new_firmware_file = "firmware_new.py"
    self._running = True
    self._last_update_check = 0

    # Initialize ESP‑NOW communication
    self.espnow = ESPNowComms()
    self.device_id = machine.unique_id()  # Unique device ID (bytes)
    self.is_leader = False  # For now, every node starts as non‑leader

async def broadcast_chirp(self):
    """
    Broadcast a "chirp" message that announces the node's presence.
    Non‑leader nodes use chirps to let the swarm know they are alive.
    """
    chirp_message = {
        "type": "chirp",
        "device_id": self.device_id.hex() if hasattr(self.device_id, "hex") else str(self.device_id),
        "timestamp": time.time()
    }
    self.espnow.broadcast(chirp_message)
    print("Broadcasted chirp.")

async def broadcast_firmware_info(self):
    """
    Broadcast current firmware info.
    Other nodes can use this to check if they are up‑to‑date.
    """
    firmware_version = "1.0.2"  # Replace with dynamic version if available.
    firmware_info = {
        "type": "firmware_info",
        "version": firmware_version,
        "device_id": self.device_id.hex() if hasattr(self.device_id, "hex") else str(self.device_id)
    }
    self.espnow.broadcast(firmware_info)
    print("Broadcasted firmware info.")

async def _download_firmware(self, url, filename, expected_hash=None):
    """
    Downloads firmware from the OTA server.
    Uses urequests via asyncio.to_thread to avoid blocking.
    """
    try:
        import urequests as requests  # MicroPython's requests
        print(f"Downloading firmware from {url}...")
        response = await asyncio.to_thread(requests.get, url)
        if response.status_code != 200:
            print(f"HTTP Error: {response.status_code}")
            return False

        with open(filename, "wb") as f:
            f.write(response.content)
        print("Firmware downloaded.")

        if expected_hash:
            with open(filename, "rb") as f:
                downloaded_hash = hashlib.sha256(f.read()).hexdigest()
            if downloaded_hash != expected_hash:
                print("Checksum mismatch! Firmware download is corrupted.")
                os.remove(filename)
                return False
        return True

    except Exception as e:
        print(f"Download error: {e}")
        return False

async def _install_firmware(self, filename):
    """
    Installs the downloaded firmware.
    Backs up the existing firmware before replacing it.
    """
    try:
        print("Installing firmware...")
        try:
            os.rename(self.firmware_file, f"{self.firmware_file}.old")
        except OSError:
            pass  # No previous firmware to back up
        os.rename(filename, self.firmware_file)
        print("Firmware installed successfully.")
        return True

    except OSError as e:
        print(f"Installation error: {e}")
        return False

async def _ota_update(self):
    """
    Checks for OTA updates by comparing the server's firmware version and hash.
    Downloads and installs new firmware if necessary.
    """
    print("Checking for OTA update...")
    try:
        # Simulated server version and hash; replace with real server logic.
        server_version = "1.0.2"
        server_hash = "your_server_hash"

        # Try to obtain current firmware version.
        try:
            module_name = self.firmware_file.replace(".py", "")
            current_module = importlib.import_module(module_name)
            current_version = getattr(current_module, "__version__", "0.0.0")
            if server_version <= current_version:
                print("Firmware is up-to-date.")
                return False
        except Exception:
            print("Could not determine current firmware version.")

        # Download new firmware and install it.
        if await self._download_firmware(self.ota_server, self._new_firmware_file, server_hash):
            if await self._install_firmware(self._new_firmware_file):
                print("OTA update successful! Rebooting...")
                machine.reset()
                return True
            else:
                os.remove(self._new_firmware_file)
                print("Firmware installation failed.")
                return False
        return False

    except Exception as e:
        print(f"OTA update error: {e}")
        return False

async def handle_incoming_messages(self):
    """
    Processes incoming ESP‑NOW messages.
    Handles various types: firmware requests, firmware info, chirps, etc.
    """
    async for mac, msg in self.espnow.receive():
        msg_type = msg.get("type")
        if msg_type == "firmware_request":
            print(f"Received firmware request from {mac}.")
            await self.send_firmware_to_peer(mac)
        elif msg_type == "firmware_info":
            print(f"Received firmware info from {mac}: {msg}")
        elif msg_type == "chirp":
            print(f"Received chirp from {mac}: {msg}")
        # Extend here with additional message types (e.g., leader election)
        # For now, simply print the received messages.

async def send_firmware_to_peer(self, peer_mac):
    """
    Sends firmware to a peer in small chunks via ESP‑NOW.
    Here, binary data is converted using 'latin1' to preserve bytes
    (a production system might use Base64 encoding).
    """
    try:
        with open(self.firmware_file, "rb") as f:
            while True:
                chunk = f.read(250)  # Chunk size for ESP‑NOW packets
                if not chunk:
                    break
                data = {
                    "type": "firmware_chunk",
                    "data": chunk.decode("latin1")
                }
                self.espnow.send(peer_mac, data)
                await asyncio.sleep(0.1)  # Throttle sending to avoid congestion
        print(f"Firmware sent to peer {peer_mac}.")
    except Exception as e:
        print(f"Error sending firmware to peer: {e}")

async def _run_firmware(self):
    """
    Main firmware loop:
      - Runs the core application logic.
      - Periodically checks for OTA updates.
      - Broadcasts chirps (if not leader) and firmware info.
    """
    while self._running:
        print("Firmware running...")
        await asyncio.sleep(1)  # Simulate main firmware logic

        # Periodic OTA update check.
        now = time.time()
        if now - self._last_update_check >= self.update_interval:
            print("Performing OTA update check...")
            if await self._ota_update():
                machine.reset()
                return
            self._last_update_check = now

        # Non‑leader nodes broadcast chirps to help bound the swarm.
        if not self.is_leader:
            await self.broadcast_chirp()

        # Broadcast firmware info to help peers compare versions.
        await self.broadcast_firmware_info()

async def run(self):
    """
    Main entry point:
      - Starts a background task for incoming ESP‑NOW messages.
      - Checks for the firmware file, then runs the firmware loop.
    """
    # Start handling incoming messages in the background.
    asyncio.create_task(self.handle_incoming_messages())

    # Check if firmware file exists (MicroPython-friendly check).
    try:
        os.stat(self.firmware_file)
        print(f"Firmware '{self.firmware_file}' found. Starting firmware loop.")
        await self._run_firmware()
    except OSError:
        print("No firmware found. Attempting OTA update...")
        if await self._ota_update():
            machine.reset()
        else:
            print("OTA update failed. Exiting.")

if name == "main":
updater = OTAUpdater("http://your-ota-server.com/firmware.py", "firmware.py")
try:
asyncio.run(updater.run())
except Exception as e:
print(f"Error in main loop: {e}")

@WillemJan
Copy link
Author

ota_updater.py

import uasyncio as asyncio
import os
import sys
import machine
import time
import hashlib
import importlib
import network
from espnow_comms import ESPNowComms

OTA configuration.

OTA_SERVER = "http://your-ota-server.com/firmware.py"
FIRMWARE_FILE = "firmware.py"
CHUNK_SIZE = 1024
UPDATE_CHECK_INTERVAL = 3600

Wi‑Fi AP configuration for signal strength measurement.

CENTRAL_AP_SSID = "CentralAP" # Set to the SSID of your central Wi‑Fi access point.

class OTAUpdater:
"""
OTA updater with integrated ESP‑NOW swarm management.

Features include:
  - OTA firmware update via HTTP.
  - Asynchronous Wi‑Fi scanning for signal strength (for triangulation).
  - Gossip messages for firmware info, node join/leave, and periodic chirps.
  - A simple raft/gossip leader election based on relative signal strength.
  
This design is inspired by recent MicroPython swarm research for robustness.
"""
def __init__(self, ota_server, firmware_file, chunk_size=1024, update_interval=3600):
    self.ota_server = ota_server
    self.firmware_file = firmware_file
    self.chunk_size = chunk_size
    self.update_interval = update_interval
    self._new_firmware_file = "firmware_new.py"
    self._running = True
    self._last_update_check = 0

    # Initialize ESP‑NOW communications.
    self.espnow = ESPNowComms()
    self.device_id = machine.unique_id()  # Unique device ID (bytes).
    self.device_id_str = self.device_id.hex() if hasattr(self.device_id, "hex") else str(self.device_id)
    self.is_leader = False  # All nodes start as non‑leader.

    # Swarm status: {device_id: {firmware_hash, firmware_version, signal_strength, last_seen, etc.}}
    self.swarm_status = {}

    # Current measured signal strength to the central AP (in dBm).
    self.current_signal_strength = -100

    # Record when this node joined.
    self.join_time = time.time()

def compute_firmware_hash(self):
    """
    Computes the SHA‑256 hash of the current firmware file.
    """
    try:
        with open(self.firmware_file, "rb") as f:
            return hashlib.sha256(f.read()).hexdigest()
    except Exception as e:
        print(f"Error computing firmware hash: {e}")
        return "none"

def estimate_distance(self, rssi, tx_power=-40, path_loss_exponent=2.0):
    """
    Estimates distance (in meters) using the log‑distance path loss model.
    
    Formula: distance = 10 ^ ((tx_power - rssi) / (10 * n))
    
    Parameters:
      - rssi: Received Signal Strength Indicator (dBm).
      - tx_power: Transmit power (dBm). Default is -40 dBm.
      - path_loss_exponent: Environmental factor (typically 2 for free space, higher indoors).
    """
    try:
        distance = 10 ** ((tx_power - rssi) / (10 * path_loss_exponent))
        return distance
    except Exception as e:
        print(f"Error estimating distance: {e}")
        return None

async def measure_signal_strength(self):
    """
    Periodically scans for the central Wi‑Fi AP and updates the node's signal strength.
    """
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    while True:
        try:
            scan_results = wlan.scan()  # Returns tuples: (ssid, bssid, channel, RSSI, security, hidden)
            found = None
            for net in scan_results:
                ssid, bssid, channel, rssi, security, hidden = net
                ssid_str = ssid.decode() if isinstance(ssid, bytes) else ssid
                if ssid_str == CENTRAL_AP_SSID:
                    found = rssi
                    break
            if found is not None:
                self.current_signal_strength = found
            else:
                self.current_signal_strength = -100
        except Exception as e:
            print(f"Signal strength measurement error: {e}")
        await asyncio.sleep(10)

async def broadcast_join(self):
    """
    Broadcast a "node_join" message when a node joins the swarm.
    """
    join_message = {
        "type": "node_join",
        "device_id": self.device_id_str,
        "timestamp": time.time(),
        "firmware_hash": self.compute_firmware_hash(),
        "firmware_version": "1.0.2",  # Replace with dynamic version if available.
        "signal_strength": self.current_signal_strength,
        "uptime": time.ticks_ms()
    }
    self.espnow.broadcast(join_message)
    print("Broadcasted node_join:", join_message)

async def broadcast_leave(self):
    """
    Broadcast a "node_leave" message when a node is leaving the swarm.
    """
    leave_message = {
        "type": "node_leave",
        "device_id": self.device_id_str,
        "timestamp": time.time(),
        "uptime": time.ticks_ms()
    }
    self.espnow.broadcast(leave_message)
    print("Broadcasted node_leave:", leave_message)

async def broadcast_chirp(self):
    """
    Periodically broadcast a chirp with current node status.
    Includes firmware info, measured RSSI, estimated distance, and uptime.
    """
    chirp_message = {
        "type": "chirp",
        "device_id": self.device_id_str,
        "timestamp": time.time(),
        "firmware_hash": self.compute_firmware_hash(),
        "firmware_version": "1.0.2",
        "signal_strength": self.current_signal_strength,
        "distance_estimate": self.estimate_distance(self.current_signal_strength),
        "uptime": time.ticks_ms()
    }
    self.espnow.broadcast(chirp_message)
    print("Broadcasted chirp:", chirp_message)

async def broadcast_firmware_info(self):
    """
    Broadcast firmware info (hash and version) so peers can compare firmware.
    """
    firmware_info = {
        "type": "firmware_info",
        "device_id": self.device_id_str,
        "firmware_hash": self.compute_firmware_hash(),
        "firmware_version": "1.0.2"
    }
    self.espnow.broadcast(firmware_info)
    print("Broadcasted firmware_info:", firmware_info)

async def _download_firmware(self, url, filename, expected_hash=None):
    """
    Downloads firmware from the OTA server asynchronously.
    """
    try:
        import urequests as requests  # Use MicroPython's urequests.
        print(f"Downloading firmware from {url} ...")
        response = await asyncio.to_thread(requests.get, url)
        if response.status_code != 200:
            print(f"HTTP error: {response.status_code}")
            return False

        with open(filename, "wb") as f:
            f.write(response.content)
        print("Firmware downloaded.")

        if expected_hash:
            with open(filename, "rb") as f:
                downloaded_hash = hashlib.sha256(f.read()).hexdigest()
            if downloaded_hash != expected_hash:
                print("Checksum mismatch! Firmware download is corrupted.")
                os.remove(filename)
                return False
        return True

    except Exception as e:
        print(f"Download error: {e}")
        return False

async def _install_firmware(self, filename):
    """
    Installs the downloaded firmware.
    Backs up the existing firmware before replacing it.
    """
    try:
        print("Installing firmware...")
        try:
            os.rename(self.firmware_file, f"{self.firmware_file}.old")
        except OSError:
            pass
        os.rename(filename, self.firmware_file)
        print("Firmware installed successfully.")
        return True

    except OSError as e:
        print(f"Installation error: {e}")
        return False

async def _ota_update(self):
    """
    Checks for OTA updates by comparing firmware versions and hashes.
    Downloads and installs new firmware if necessary.
    """
    print("Checking for OTA update...")
    try:
        # Simulated server version and hash (replace with your server logic).
        server_version = "1.0.2"
        server_hash = "your_server_hash"

        try:
            module_name = self.firmware_file.replace(".py", "")
            current_module = importlib.import_module(module_name)
            current_version = getattr(current_module, "__version__", "0.0.0")
            if server_version <= current_version:
                print("Firmware is up-to-date.")
                return False
        except Exception:
            print("Could not determine current firmware version.")

        if await self._download_firmware(self.ota_server, self._new_firmware_file, server_hash):
            if await self._install_firmware(self._new_firmware_file):
                print("OTA update successful! Rebooting...")
                machine.reset()
                return True
            else:
                os.remove(self._new_firmware_file)
                print("Firmware installation failed.")
                return False
        return False

    except Exception as e:
        print(f"OTA update error: {e}")
        return False

async def handle_incoming_messages(self):
    """
    Processes incoming ESP‑NOW messages.
    Updates local swarm status with node join/leave, chirp, and firmware info messages.
    Also responds to firmware requests.
    """
    async for mac, msg in self.espnow.receive():
        msg_type = msg.get("type")
        device_id = msg.get("device_id")
        if msg_type == "node_join":
            print(f"Node join received from {device_id}: {msg}")
            self.swarm_status[device_id] = msg
        elif msg_type == "node_leave":
            print(f"Node leave received from {device_id}: {msg}")
            if device_id in self.swarm_status:
                del self.swarm_status[device_id]
        elif msg_type == "chirp":
            print(f"Chirp received from {device_id}: {msg}")
            self.swarm_status[device_id] = msg
        elif msg_type == "firmware_info":
            print(f"Firmware info received from {device_id}: {msg}")
            self.swarm_status[device_id] = msg
        elif msg_type == "firmware_request":
            print(f"Firmware request received from {device_id}")
            await self.send_firmware_to_peer(mac)
        # Extend with additional message types as needed.

async def send_firmware_to_peer(self, peer_mac):
    """
    Sends firmware to a peer in small chunks via ESP‑NOW.
    (Here binary data is encoded with 'latin1' for JSON transport.)
    """
    try:
        with open(self.firmware_file, "rb") as f:
            while True:
                chunk = f.read(250)
                if not chunk:
                    break
                data = {
                    "type": "firmware_chunk",
                    "data": chunk.decode("latin1")
                }
                self.espnow.send(peer_mac, data)
                await asyncio.sleep(0.1)
        print(f"Firmware sent to peer {peer_mac}.")
    except Exception as e:
        print(f"Error sending firmware to peer: {e}")

async def elect_leader(self):
    """
    Periodically examines swarm status and elects a leader.
    The candidate with the highest signal strength (i.e. closest to the central AP) is elected.
    Also gossips join/leave events so every node is aware of the current swarm reach.
    """
    while True:
        self_id = self.device_id_str
        self_info = {
            "device_id": self_id,
            "firmware_hash": self.compute_firmware_hash(),
            "firmware_version": "1.0.2",
            "signal_strength": self.current_signal_strength,
            "last_seen": time.time(),
            "uptime": time.ticks_ms()
        }
        candidates = {self_id: self_info}
        # Include info from active peers.
        for device, info in self.swarm_status.items():
            # Assume a valid timestamp is in the "timestamp" or "last_seen" field.
            last = info.get("timestamp", info.get("last_seen", time.time()))
            if time.time() - last < 30:
                candidates[device] = info

        leader_id = None
        best_strength = -999
        for device, info in candidates.items():
            strength = info.get("signal_strength", -999)
            if strength is None:
                strength = -999
            if strength > best_strength:
                best_strength = strength
                leader_id = device

        if leader_id == self_id:
            if not self.is_leader:
                print("I have been elected as leader!")
            self.is_leader = True
        else:
            if self.is_leader:
                print("I am no longer the leader.")
            self.is_leader = False

        print("Current leader:", leader_id, "with signal strength:", best_strength)
        await asyncio.sleep(30)

async def _run_firmware(self):
    """
    Main firmware loop:
      - Simulates primary firmware functionality.
      - Periodically checks for OTA updates.
      - Broadcasts chirps and firmware info.
    """
    while self._running:
        print("Firmware running...")
        await asyncio.sleep(1)
        now = time.time()
        if now - self._last_update_check >= self.update_interval:
            print("Performing OTA update check...")
            if await self._ota_update():
                machine.reset()
                return
            self._last_update_check = now

        if not self.is_leader:
            await self.broadcast_chirp()
        await self.broadcast_firmware_info()

async def run(self):
    """
    Main entry point:
      - Launches background tasks for incoming message handling,
        signal strength measurement, leader election, and join announcement.
      - Checks for firmware file and enters the main firmware loop.
    """
    # Start background tasks.
    asyncio.create_task(self.handle_incoming_messages())
    asyncio.create_task(self.measure_signal_strength())
    asyncio.create_task(self.elect_leader())

    # Broadcast node join on startup.
    await self.broadcast_join()

    try:
        os.stat(self.firmware_file)
        print(f"Firmware '{self.firmware_file}' found. Starting firmware loop.")
        await self._run_firmware()
    except OSError:
        print("No firmware found. Attempting OTA update...")
        if await self._ota_update():
            machine.reset()
        else:
            print("OTA update failed. Exiting.")

    # Optionally, on shutdown broadcast node leave.
    # await self.broadcast_leave()

if name == "main":
updater = OTAUpdater(OTA_SERVER, FIRMWARE_FILE, CHUNK_SIZE, UPDATE_CHECK_INTERVAL)
try:
asyncio.run(updater.run())
except Exception as e:
print(f"Error in main loop: {e}")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment