Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Structured concurrency and Lua (part 1)

Structured concurrency and Lua (part 1)

John Belmonte, 2022-Sep

I've started writing a toy structured concurrency implementation for the Lua programming language. Some motivations:

  • use it as a simple introduction to structured concurrency from the perspective of Lua (this article)
  • learn the fundamental properties of structured concurrency and how to implement them
  • share code that could become the starting point for a real Lua library and framework

So what is structured concurrency? For now, I'll just say that it's a programming paradigm that makes managing concurrency (arguably the hardest problem of computer science) an order of magnitude easier in many contexts. It achieves this in ways that seem subtle to us-- clearly so, since its utility didn't reach critical mass until around 20181 (just as control structures like functions, if, and while weren't introduced to languages until long after the first computer program). Rather than start with a long explanation, however, I'll jump into a very simple example of structured concurrency in Lua. From there, we'll build up the rationale, examples, and implementation in stages-- hopefully as a series of articles.

a first look

trio = require('trio')

function await_main()
    -- two parallel tasks that sleep for 2 and 3 seconds, respectively
    do
        local nursery <close> = trio.open_nursery()
        nursery.start_soon(function()
            print('child 1 start')
            trio.await_sleep(2)
            print('child 1 end')
        end)
        nursery.start_soon(function()
            print('child 2 start')
            trio.await_sleep(3)
            print('child 2 end')
        end)
        print('waiting for child tasks')
    end
    print('done')
end

trio.run(await_main)  -- launch the async event loop

A "task" is an independent thread of execution, and this example runs three of them concurrently: one main and two spawned tasks. For Lua, tasks are implemented with cooperative multitasking via coroutines. However, rather than use the coroutine API directly, we're using a higher level API that applies a hierarchy to the set of tasks. The lifetime of any spawned task is constrained by the containing code block, called a nursery. The concluding "done" won't be printed until the nursery block, and hence both of its child tasks, have completed successfully. Here is the program output:

$ time lua example_1.lua
waiting for child tasks
child 2 start
child 1 start
child 1 end
child 2 end
done

real    0m3.126s
user    0m0.043s
sys     0m0.079s

Since the child tasks run in parallel, and we're waiting for the slowest one to complete, the expected wall time is about 3 seconds, as confirmed here. "User" time is nearly zero, since the CPU doesn't do any actual work (sleep is implemented by the appropriate OS call rather than a busy-wait). Also interesting is that the "waiting for child tasks" message is printed before the children begin. This reflects the semantics of the nursery's start_soon() method which, as its name suggests, does not immediately execute the given function. Rather, the task is queued-- to be picked up once the parent task yields execution.

It may appear, at first, that structured concurrency is merely fork/join, which has existed for a long time. However, structured concurrency is ensuring that 1) all tasks have well-defined parents, siblings, and children, and 2) the task hierarchy mirrors the structure of the code. This has profound implications for managing concurrency. For example, when a task is cancelled or has an error, the hierarchy suggests a natural way to clean up (cancel all descendants). Moreover, since the task and code structure mirror each other, the code that spawned a task will never be out of scope when the child exits. Exceptions and return values can thus be propagated to the caller, lending to natural use of the host language's built-in control structures (especially exception handling). In the future, we'll delve into these and more aspects of structured concurrency.

At the end of the example code is an administrative detail: the run() function, which is handed another function to execute as the root task. This is the bridge into the structured concurrency world, launching an event loop.

A last thing to mention for this example is the use of Lua's relatively new feature of "to-be-closed" variables. They allow the scope of each nursery to be carefully delineated-- important since that, in turn, defines our task hierarchy. Additionally, the structured concurrency implementation will rely on these timely finalizers to achieve expected behavior regarding exceptions, cancellations, etc. (Naming convention: any function prefixed by open_ expects its return value to be held in a <close> variable.)

worry-free encapsulation

In a real program, we wouldn't write so much boilerplate to create some background tasks. There might be a utility like await_all(), which is useful when you have a heterogeneous set of tasks to run, just for their side effects. The implementation:

function await_all(...)
    local nursery <close> = trio.open_nursery()
    for _, f in ipairs({...}) do
        nursery.start_soon(f)
    end
end

Then await_main() of the original example reduces to:

function await_main()
    await_all(
        function() trio.await_sleep(2) end,
        function() trio.await_sleep(3) end))
    print('done')
end

Note that the caller of await_all() does not need to deal with nurseries or to-be-closed variables. Moreover, with structured concurrency, he can be confident that tasks (again, Lua coroutines) will never leak out of such a call. The concurrency details are completely encapsulated by the function.

In the future, we'll cover how timeouts and cancellation can easily be applied to such calls (or any block of async code), despite the encapsulation, and without ad-hoc function parameters or forethought from library authors.

context switching is always explicit

Though it's not a strict requirement for structured concurrency, we'd like the points of context switching to always be explicit and readily visible in our programs. This keeps local code easy to reason about, and can reduce the amount of explicit locking needed.

While some languages have function coloring, such as async/await qualifiers, which identifies declarations and calls that can yield execution, Lua does not. Rather, we rely on naming convention: any function prefixed by await_ is awaitable, or expected to yield execution to other tasks. (These are also referred to as asynchronous, or async, functions.)

Putting this together: code between await_ calls can be treated as atomic, relative to other running tasks. For example, let's say there is websocket handler that deals with a discrete set of messages, and we'd like to maintain a count of how many times each message is seen. The information is global state, shared by an arbitrary number of concurrent connections running the same message loop. Yet, by reading the code, it's clear that the bookkeeping is safely atomic:

count_by_message = {}

function await_websocket_echo_handler(websocket)
    while true do
        local msg = websocket.await_receive_message()

        if count_by_message[msg] == nil then
            count_by_message[msg] = 0
        end
        count_by_message[msg] = count_by_message[msg] + 1

        websocket.await_send_message(msg)
    end
end

A corollary: the fewer the number awaitable calls, the easier a program is to reason about. As a consequence, API's try to minimize the number of awaitable functions (relative to synchronous functions), as it helps reduce the cognitive load of using the API. An example is the nursery API itself. Since start_soon() is synchronous, it's possible to orchestrate the launch of a set of interrelated tasks, atomically. Otherwise, there would be a context switch each time a task was queued, and it would be necessary to consider what program state might change before queueing the next task.

When writing your own functions, there are two important rules to honor:

  1. any function that calls an await_ function is itself awaitable, and should be named appropriately
  2. if a function is named as awaitable, it should be yielding execution regardless of the code path taken. For example, if there is a short-circuit case at the start of the function, at least call trio.await_sleep(0) before returning. Otherwise, it's easy for users of the function to mistakenly starve the scheduler with a busy loop.

Besides awaitable calls, remember that having a nursery go out of scope will also yield execution, assuming there are pending child tasks. While this is a more subtle context switch, it's still evident from the code's structure.

can you do more than sleep?

So far we've seen one "blocking" API call, or explicit way for a task to yield execution: await_sleep(). While programs with concurrency often sleep, they also need to do other things asynchronously, such as file or network I/O, as well as calling into any API itself using file or network I/O.

Typically, a structured concurrency library (or any library implementing an asynchronous event loop) will offer basic async file and network I/O that is compatible with the framework. That is the easy part. The daunting challenge becomes (re)implementing popular prototcols and client/server libraries on top of that API.

While async I/O isn't implemented yet, another common awaitable is: events. These are single-use signals with a synchronous "set" method. It's the primary means of communicating between tasks, and a building bock for higher-level synchronization primitives. For (a very contrived) example:

function await_event_example()
    do
        -- two child tasks wait for an event
        local nursery <close> = trio.open_nursery()
        local event = trio.Event()
        nursery.start_soon(function()
            print('child 1 waiting')
            event.await()
            print('child 1 got event')
        end)
        nursery.start_soon(function()
            print('child 2 waiting')
            event.await()
            print('child 2 got event')
        end)
        trio.await_sleep(1)
        event.set()
    end
    print('done')
end
$ time lua example_2.lua
child 2 waiting
child 1 waiting
child 1 got event
child 2 got event
done

real    0m1.118s
user    0m0.042s
sys     0m0.066s

implementation

Along with this article, I'm sharing a toy implementation of structured concurrency on Lua that just barely implements the examples here. Refer to the source code for the significant caveats (in the form of TODO's).

The module name, API, and implementation are inspired by Python Trio, one of the first structured concurrency libraries.

The module requires Lua 5.4.3 or newer, and fractional time() and thread sleep() functions which are not in the standard Lua build. See the source for placeholder Unix implementations using popen(), which may need to be revised for your OS.

As already mentioned, tasks are implemented with Lua coroutines. The event loop uses the return value of coroutine.resume() to control task scheduling. For example, to implement await_sleep(), the task yields the desired duration. To pause until manually rescheduled later (for example, within the implementation of Event), the task yields a sentinel value. As the library is built up, this coroutine protocol will be expanded to support propagating exceptions, cancellation, blocking on I/O events, etc.

next up: exceptions

To demonstrate a limitation of the current implementation, let's try raising an exception from a child task:

trio = require('trio')

function await_error_example()
    -- child task raises an error
    do
        local nursery <close> = trio.open_nursery()
        nursery.start_soon(function()
            trio.await_sleep(1)
            error('oops')
        end)
    end
    print('done')
end

trio.run(await_error_example)
$ lua example_3.lua
lua: ./trio.lua:140: example_3.lua:9: oops
stack traceback:
	[C]: in function 'assert'
	./trio.lua:140: in function 'trio.run'
	example_3.lua:15: in main chunk
	[C]: in ?

This is not yet meeting the potential of structured concurrency, as the exception does not propagate into await_error_example() (which spawned the error-raising task), and rather originates from the implementation's event loop. We'll tackle this in the next article!


continue reading: Structured concurrency and Lua (part 2)


article © 2022 John Belmonte, all rights reserved

Footnotes

  1. While it's argued that the core concept of structured concurrency has existed in some libraries and languages prior to the recent boom, these lacked significant parts of the story for making concurrency manageable. One is powerful, ergonomic control of timeouts and cancellation-- itself dependent on structured concurrency. Another is striving for a program that follows structured concurrency as a whole, including direct and transient dependencies. Nathaniel J. Smith was first to piece this all together, with a set of articles, and corresponding Python Trio concurrency library.

trio = require('trio')
function await_main()
-- two parallel tasks that sleep for 2 and 3 seconds, respectively
do
local nursery <close> = trio.open_nursery()
nursery.start_soon(function()
print('child 1 start')
trio.await_sleep(2)
print('child 1 end')
end)
nursery.start_soon(function()
print('child 2 start')
trio.await_sleep(3)
print('child 2 end')
end)
print('waiting for child tasks')
end
print('done')
end
trio.run(await_main) -- launch the async event loop
trio = require('trio')
function await_event_example()
do
-- two child tasks wait for an event
local nursery <close> = trio.open_nursery()
local event = trio.Event()
nursery.start_soon(function()
print('child 1 waiting')
event.await()
print('child 1 got event')
end)
nursery.start_soon(function()
print('child 2 waiting')
event.await()
print('child 2 got event')
end)
trio.await_sleep(1)
event.set()
end
print('done')
end
trio.run(await_event_example)
trio = require('trio')
function await_error_example()
-- child task raises an error
do
local nursery <close> = trio.open_nursery()
nursery.start_soon(function()
trio.await_sleep(1)
error('oops')
end)
end
print('done')
end
trio.run(await_error_example)
-- trio - toy structured concurrency implementation based on Python Trio
--
-- requirements:
-- * Lua >= 5.4.3 (with "yield from __close" support)
-- * fractional _time() and _sleep() - see below for placeholder Unix
-- implementations using popen().
--
-- API naming conventions:
-- * functions that may yield are prefixed with "await_"
-- * functions that require storing the result in a to-be-closed variable
-- are prefixed with "open_"
--
-- TODO: nested nurseries
-- TODO: error handling
-- TODO: cancel scopes
-- TODO: lua_trio compatible networking, file I/O, etc. (See cosock for ideas
-- on adapting luasocket.)
local function _sleep(seconds)
os.execute('sleep ' .. tonumber(seconds))
end
-- returns fractional time in seconds from an arbitrary reference point
-- (This implementation requires `date` from GNU coreutils.)
local function _time()
local f = assert(io.popen('date +%s%3N', 'r'))
local s = assert(f:read('*a'))
f:close()
return tonumber(s) / 1000
end
local function _debug_print(...)
if false then
print(...)
end
end
local _TASK_PAUSE = -1
local trio = nil
trio = {
-- TODO: encapsulate into EventLoop class created by run()
_tasks = {}, -- list sorted by wait_until, nearest time last
_paused_tasks = {}, -- set
_tasks_by_coro = {}, -- map
-- TODO: optional task name
_new_task = function(f, parent_nursery, wait_until)
local task = {
coro=coroutine.create(f),
parent_nursery=parent_nursery,
wait_until=wait_until,
child_nursuries={},
_done = trio.Event()
}
trio._tasks_by_coro[task.coro] = task
return task
end,
_schedule_task = function(task)
-- TODO: find insertion point by binary search
local tasks = trio._tasks
for i = #tasks, 1, -1 do
if tasks[i].wait_until >= task.wait_until then
table.insert(tasks, i+1, task)
return
end
end
table.insert(tasks, 1, task)
end,
-- TODO: support abort
await_task_rescheduled = function()
coroutine.yield(_TASK_PAUSE)
end,
reschedule = function(task)
assert(trio._paused_tasks[task])
trio._paused_tasks[task] = nil
task.wait_until = 0
trio._schedule_task(task)
end,
Event = function()
local _is_set = false
local _waiting_tasks = {}
return {
set = function()
if not _is_set then
_is_set = true
-- reschedule tasks
for _, task in ipairs(_waiting_tasks) do
trio.reschedule(task)
end
end
end,
is_set = function()
return _is_set
end,
await = function()
if not _is_set then
table.insert(_waiting_tasks, trio.current_task())
trio.await_task_rescheduled()
end
end
}
end,
current_time = _time,
current_task = function()
return trio._tasks_by_coro[coroutine.running()]
end,
await_sleep = function(seconds)
assert(seconds >= 0)
coroutine.yield(seconds)
end,
run = function(f)
do
local main_task_nursery <close> = trio.open_nursery()
trio._schedule_task(trio._new_task(f, main_task_nursery, 0))
local tasks = trio._tasks
while #tasks > 0 do
local wait_delta = tasks[#tasks].wait_until - trio.current_time()
if wait_delta > 0 then
_debug_print('run: sleep until next event', wait_delta)
_sleep(wait_delta)
end
-- collect batch of tasks which are ready to run
local current_time = trio.current_time()
local tasks_to_run = {}
while #tasks > 0 and tasks[#tasks].wait_until - current_time <= 0 do
table.insert(tasks_to_run, table.remove(tasks))
end
-- run the ready tasks
-- TODO: shuffle execution to avoid reliance on implementation details
for _, task in ipairs(tasks_to_run) do
local coro = task.coro
local _, time_to_sleep = assert(coroutine.resume(coro))
if coroutine.status(coro) == 'dead' then
_debug_print('run: retire', coro)
task.parent_nursery.child_tasks[task] = nil
trio._tasks_by_coro[coro] = nil
task._done.set()
elseif time_to_sleep == _TASK_PAUSE then
_debug_print('run: pausing', coro)
trio._paused_tasks[task] = true
else
task.wait_until = trio.current_time() + time_to_sleep
_debug_print('run:', coro, 'scheduled sleep for', time_to_sleep)
trio._schedule_task(task)
end
end
end
end
end,
open_nursery = function()
-- TODO: needs to attach to active event loop somehow. Global map
-- of Lua thread to event loop?
local this
this = setmetatable({
-- TODO: track parent_task
child_tasks = {}, -- set
start_soon = function(f)
local task = trio._new_task(f, this, 0)
trio._schedule_task(task)
this.child_tasks[task] = true
end
}, {
__close = function(a)
_debug_print('__close nursery', a)
-- block until all child tasks done
_debug_print(' running tasks:')
for task, _ in pairs(this.child_tasks) do
_debug_print(' ', task)
task._done.await()
end
end
})
return this
end
}
return trio

article

Copyright 2022 John Belmonte, All rights reserved

code and examples

Copyright 2022 John Belmonte

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

@denisdemaisbr
Copy link

denisdemaisbr commented Sep 9, 2022

congratulations!

@fsantanna
Copy link

fsantanna commented Sep 15, 2022

Very nice, thanks.

I only disagree with this line:
"since it wasn't really discovered and applied until the late 2010's"

Esterel (structured concurrency) and Statecharts (hierarchical state machines) are around since the early 80s, with many industrial uses and derived (mostly academic) languages.

@belm0
Copy link
Author

belm0 commented Sep 15, 2022

I only disagree with this line: "since it wasn't really discovered and applied until the late 2010's"

Esterel (structured concurrency) and Statecharts (hierarchical state machines) are around since the early 80s, with many industrial uses and derived (mostly academic) languages.

@fsantanna Thank you-- I had briefly peeked at the article you posted on the SC forum, but I'll take a closer look.

@warmist
Copy link

warmist commented Sep 20, 2022

If i understood this correctly this needs lua >=5.4.4, not 5.4.3

@belm0
Copy link
Author

belm0 commented Sep 20, 2022

If i understood this correctly this needs lua >=5.4.4, not 5.4.3

@warmist I've just confirmed that 5.4.3 is sufficient, are you seeing a problem?

From the mailing list:

Actually, Lua 5.4.3 already supported "yield from __close".

-- Roberto

@warmist
Copy link

warmist commented Sep 20, 2022

Ah i see now. The lib itself is using 5.4.3 and the examples are using new local thing <close> syntax from 5.4.4

@belm0
Copy link
Author

belm0 commented Sep 21, 2022

Ah i see now. The lib itself is using 5.4.3 and the examples are using new local thing <close> syntax from 5.4.4

@warmist that's not the case-- <close> was introduced in 5.4.0. This code happens to need >= 5.4.3 because it fixed a detail about yielding from finalizers. Perhaps you are confusing 5.4.3 with 5.3.x?

@warmist
Copy link

warmist commented Sep 21, 2022

@belm0 yup you are right. Read the changelog 3 times for lua and still thought it said 5.4.4 when it was 5.4... Sometimes brain just dotn wokr

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment