Skip to content

Instantly share code, notes, and snippets.

@daed
Last active November 6, 2023 01:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save daed/daa06401f54d1eed004e0946696d20fe to your computer and use it in GitHub Desktop.
Save daed/daa06401f54d1eed004e0946696d20fe to your computer and use it in GitHub Desktop.
micropython uasyncio and concurrency

A study of concurrency

Brad Arnett

Assumptions

The following are technical assumptions that are made for the context of this document. The information in this document might not be valid outside of the below:

  • The target microcontroller is the Raspberry Pi Pico W
  • You're using the Micropython version 1.20 or later

Preface

Concurrency is hard because computers like doing just one thing at a time. This is an inconvenience to humans, who want them to do everything all at once, but struggle to put that in terms that are straightforward for a machine to understand. This is especially bad in circumstances like this project, where the correct answer is "do everything you can all at once, but especially this one part over here because it needs to be realtime. Also do it on a limited instruction set." :(

Rationale

Stated Purpose

For the purposes of my LED project (or any networked IoT type device) there are inevitably going to be many concurrent operations that must be handled at once. This must be carefully managed when using low-cost SoCs that do not include full OSs with process scheduling support baked in. I believe this is a problem that may be able to be solved for general purposes with a single pattern.

Hardware capabilities

The Raspberry Pi Pico W is a wifi-enabled version of the Pi Pico. The Pico is powered by a dual-core Arm Cortex M0+ processor up to 133 MHz.

Putting the 'W' at the end of the name will add an on-board single-band 2.4GHz wireless interface (802.11n) using the Infineon CYW43439.

The CLK is shared with VSYS monitor, so only when there isn’t an SPI transaction in progress can VSYS be read via the ADC. The Infineon CYW43439 DIN/DOUT and IRQ all share one pin on the RP2040. Only when an SPI transaction isn’t in progress is it suitable to check for IRQs. The interface typically runs at 33MHz. [2]

asyncio on rp2

The below is from [1] and has been edited for conciseness. This section talks about the async module for micropython before it was integrated into the core feature set. As far as I can tell, it remains relevant.

There is no common GIL. This means that under some conditions Python built in objects can be corrupted.

In the code sample there is a risk of the uasyncio task reading the dict at the same moment as it is being written. Updating a dictionary data entry is atomic: there is no risk of corrupt data being read. In the code sample a lock is only required if mutual consistency of the three values is essential.

In the absence of a GIL some operations on built-in objects are not thread safe. For example adding or deleting items in a dict. This extends to global variables because these are implemented as a dict. See Globals.

The observations in 1.3 re user defined data structures and uasyncio interfacing apply:

This protection does not extend to user defined data structures. The fact that a dictionary won't be corrupted by concurrent access does not imply that its contents will be mutually consistent. In the code sample in section 1, if the application needs mutual consistency between the dictionary values, a lock is needed to ensure that a read cannot be scheduled while an update is in progress.

The above means that, for example, calling uasyncio.create_task from a thread is unsafe as it can destroy the mutual consistency of uasyncio data structures.

Code running on a thread other than that running uasyncio may block for as long as necessary (an application of threading is to handle blocking calls in a way that allows uasyncio to continue running).

Code running on a core other than that running uasyncio may block for as long as necessary.

A tale of two loops

In the LED project (and of broader use-cases for the platform itself), there are two main operational loops that need to happen simulaneously. The first is a general operation loop that will be run through (u)asyncio. This handles most all of the systems level processes including the following:

  • Network interrupts
  • Button handling
  • DNS queries / mDNS advertising
  • Configuration management
  • HTTP server command handling
    • and supporting tasks thereof
  • Appplication initalization and lifecycle management
  • Any other trivial detail

Meanwhile the second loop must be a near-realtime loop dedicated to the application logic itself. This loop is said to require being 'near-realtime' because, at least in the case of driving LED chains, fancy effects will appear stuttery or delayed if they have to time-slice with all of the 'platform' features listed above. The goal here is said to be 'near-realtime' because this loop will time-slice for concurrency as well, but only among application-specific operations.

I guess in a particular way you could maybe call that a "priority thread"?

Gotchas

The following error is thrown when trying to start more than one _thread at a time on an Pico. This illustrates the essential need to be able to schedule and run many complex tasks across both cores.

Traceback (most recent call last):
  File "main.py", line 98, in <module>
  File "main.py", line 95, in main
  File "asyncio/core.py", line 1, in run
  File "asyncio/core.py", line 1, in run_until_complete
  File "asyncio/core.py", line 1, in run_until_complete
  File "main.py", line 63, in amain
OSError: core1 in use

Implementation

Code

import _thread
import time
import asyncio
import gc

class AppLoopManager:
    """
    Manages an event loop on a separate thread, allowing functions to be added to the loop for
    regular execution. Uses a 'thread 1' to run the loop functions independently of the main thread.
    """
    def __init__(self):
        self.workers = []
        self.gens = []
        self.started = False

    def create_worker(self, worker_cls):
        try:
            self.gens.append(worker_cls.loop())
            self.workers.append(worker_cls)
        except Exception as e:
            print(f"{e} thrown during create_worker()")
        
    def start_thread(self):
        """Starts the event loop in a new thread."""
        print("Starting event loop thread")
        self.started = True
        _thread.start_new_thread(self._loop, ())

    def stop_thread(self):
        """Stops the event loop thread."""
        print("Stopping event loop thread")
        self.started = False
        _thread.exit()

    def _loop(self):
        """The internal loop that runs all functions added to the event loop."""
        print("Event loop thread running")
        while self.started:
            for gen in self.gens:
                try:
                    next(gen)
                    time.sleep(0.001)  # Small sleep to prevent CPU hogging
                except StopIteration:
                    print("DEBUG: StopIteration thrown")
                except Exception as e:
                    print(f"Exception in event loop function: {e}")
            gc.collect()

class AppWorker:
    """
    Represents a worker that executes an external function within its own internal loop.
    Designed to perform work as a part of the EventLoopManager's loop.
    """
    def __init__(self):
        self.external_func = None
        self.function_args = None

    def set_external_func(self, external_func, function_args=None):
        """Sets the external function to be called within the internal loop."""
        self.function_args = function_args
        self.external_func = external_func

    def loop(self):
        """The generator function that calls the external function in a loop."""
        while True:
            if self.external_func:
                self.external_func(self, self.function_args)
            yield  # Yield control to allow other functions in the event loop to run


class AppTask:
    """
    A class representing a task to be run within the Worker's internal loop.
    Contains an example function that simulates CPU work and keeps count of its invocations.
    """
    counter = 0

    def run(self, caller, args):
        """Simulates task execution."""
        time.sleep(1)  # Simulates work being done
        self.counter += 1
        print(f"Worker {args[0]}, {caller}: Run count {self.counter}")


async def say_after(delay, what):
    """An asynchronous function that prints a message after a delay."""
    await asyncio.sleep(delay)
    print(what)


async def main_async_loop():
    """Runs the main asynchronous loop with asyncio tasks."""
    print("Starting asyncio main loop")
    while True:
        tasks = await asyncio.gather(
            say_after(5, "Main loop work: 5 seconds"),
            say_after(2, "Main loop work: 2 seconds")
        )


def main():
    """
    Sets up and starts the entire application, including the event loop manager
    and the asynchronous loop.
    """

    # Initialize the event loop manager
    manager = AppLoopManager()
    manager.start_thread()

    # Create task runners
    task_runner_1 = AppTask()
    task_runner_2 = AppTask()

    manager.create_worker(AppWorker())
    manager.create_worker(AppWorker())

    manager.workers[0].set_external_func(task_runner_1.run, ("Worker 0",))
    manager.workers[1].set_external_func(task_runner_2.run, ("Worker 1",))

    # Start the asyncio loop on the main thread
    asyncio.run(main_async_loop())

if __name__ == "__main__":
    main()

Example output

MPY: soft reboot
Starting event loop thread
Event loop thread running
Starting asyncio main loop
Worker Worker 1, <AppWorker object at 20016630>: Run count 1
Main loop work: 2 seconds
Worker Worker 0, <AppWorker object at 20018bb0>: Run count 1
Worker Worker 1, <AppWorker object at 20016630>: Run count 2
Worker Worker 0, <AppWorker object at 20018bb0>: Run count 2
Main loop work: 5 seconds
Worker Worker 1, <AppWorker object at 20016630>: Run count 3
Worker Worker 0, <AppWorker object at 20018bb0>: Run count 3
Main loop work: 2 seconds
Worker Worker 1, <AppWorker object at 20016630>: Run count 4
Worker Worker 0, <AppWorker object at 20018bb0>: Run count 4
Worker Worker 1, <AppWorker object at 20016630>: Run count 5
Main loop work: 5 seconds
Worker Worker 0, <AppWorker object at 20018bb0>: Run count 5
Worker Worker 1, <AppWorker object at 20016630>: Run count 6
Main loop work: 2 seconds
Worker Worker 0, <AppWorker object at 20018bb0>: Run count 6
Worker Worker 1, <AppWorker object at 20016630>: Run count 7
Worker Worker 0, <AppWorker object at 20018bb0>: Run count 7
Main loop work: 5 seconds
Worker Worker 1, <AppWorker object at 20016630>: Run count 8
Worker Worker 0, <AppWorker object at 20018bb0>: Run count 8
Main loop work: 2 seconds

Reference

[1] https://github.com/peterhinch/micropython-async/blob/master/v3/docs/THREADING.md

[2] https://www.raspberrypi.com/documentation/microcontrollers/raspberry-pi-pico.html

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment