Skip to content

Instantly share code, notes, and snippets.

@jasper-lyons
Created March 5, 2020 12:23
Show Gist options
  • Save jasper-lyons/7b9ee6a682f3efb8460e0ddc760e6f3e to your computer and use it in GitHub Desktop.
Save jasper-lyons/7b9ee6a682f3efb8460e0ddc760e6f3e to your computer and use it in GitHub Desktop.
Light weight Channel, Fibers and Task scheduler in Lua
-- This is just me working through this blog post:
-- https://www.wingolog.org/archives/2018/05/16/lightweight-concurrency-in-lua
--
-- My goal was just to explore how one can make concurrency in lua more expressive without
-- compiling your own c extensions. Eventually I want to explore using such a system to build
-- a concurrent socket server but I suspect that will need c. Can't escape the blocking.
local Tasks = {
queue = {}
}
function Tasks:schedule(task)
table.insert(self.queue, task)
end
function Tasks:run()
local queue = self.queue
self.queue = {}
for _, task in ipairs(queue) do
task()
end
end
Tasks:schedule(function ()
print('hello')
end)
Tasks:schedule(function ()
print('task')
end)
Tasks:schedule(function ()
print('world')
end)
Tasks:run()
local Fibers = {
current = nil
}
function Fibers:spawn(func)
local fiber = coroutine.create(func)
Tasks:schedule(function ()
Fibers:resume(fiber)
end)
end
function Fibers:resume(fiber, ...)
Fibers.current = fiber
local ok, err = coroutine.resume(fiber, ...)
Fibers.current = nil
if not ok then
print('Error running fiber: ' .. tostring(err))
end
end
function Fibers:suspend(reschedule)
reschedule(Fibers.current)
return coroutine.yield()
end
function Fibers:yield()
return Fibers:suspend(function (fiber)
Tasks:schedule(function ()
Fibers:resume(fiber)
end)
end)
end
Fibers:spawn(function ()
Fibers:yield()
print('hello')
end)
Fibers:spawn(function ()
print('fiber')
end)
Fibers:spawn(function ()
print('world')
end)
Tasks:run()
Tasks:run()
local Suspension = {}
function Suspension:new(fiber)
local base = { fiber=fiber, waiting=true }
setmetatable(base, self)
self.__index = self
return base
end
function Suspension:isWaiting()
return self.waiting
end
function Suspension:complete(wrap, value)
assert(self.waiting)
self.waiting = false
Tasks:schedule(function ()
Fibers:resume(self.fiber, wrap, value)
end)
end
local Implementation = {}
function Implementation:new(try, block, wrap)
local base = { try=try, block=block, wrap=wrap }
setmetatable(base, self)
self.__index = self
return base
end
local Operation = {}
function Operation:new(implementations)
local base = { implementations=implementations }
setmetatable(base, self)
self.__index = self
return base
end
function Operation:perform()
for _, implementation in ipairs(self.implementations) do
local success, value = implementation.try()
if success then
return implementation.wrap(value)
end
end
local wrap, value = Fibers:suspend(function (fiber)
local suspension = Suspension:new(fiber)
for _, implementation in ipairs(self.implementations) do
implementation.block(suspension, implementation.wrap)
end
end)
return wrap(value)
end
function Operation:wrap(wrapper)
local wrapped = {}
for _, implementation in ipairs(self.implementations) do
table.insert(wrapped, Implementation:new(
implementation.try,
implementation.block,
function (value)
return wrapper(implementation.wrap(value))
end
))
end
return Operation:new(wrapped)
end
local Queue = {}
function Queue:new()
local base = {}
setmetatable(base, self)
self.__index = self
return base
end
function Queue:isEmpty()
return #self == 0
end
function Queue:peek()
return self[1]
end
function Queue:pop()
return table.remove(self, 1)
end
function Queue:push(value)
self[#self + 1] = value
end
function Queue:removeStale(predicate)
local index = 1
while index <= #self do
if predicate(self[index]) then
table.remove(self, index)
else
index = index + 1
end
end
end
local Channel = {}
function Channel:new()
local base = { getQueue = Queue:new(), putQueue = Queue:new() }
setmetatable(base, self)
self.__index = self
return base
end
function Channel:putFactory(value)
return Operation:new({
Implementation:new(
function ()
self.getQueue:removeStale(function (entry)
return not entry.suspension:isWaiting()
end)
if self.getQueue:isEmpty() then
return false, nil
else
local remote = self.getQueue:pop()
remote.suspension:complete(remote.wrap, value)
return true, nil
end
end,
function (suspension, wrap)
self.putQueue:removeStale(function (entry)
return not entry.suspension:waiting()
end)
self.putQueue:push({
suspension = suspension,
wrap = wrap,
value = value
})
end,
function (value)
return value
end
)
})
end
function Channel:getFactory()
return Operation:new({
Implementation:new(
function ()
self.putQueue:removeStale(function (entry)
return not entry.suspension:isWaiting()
end)
if self.putQueue:isEmpty() then
return false, nil
else
local remote = self.putQueue:pop()
remote.suspension:complete(remote.wrap)
return true, remote.value
end
end,
function (suspension, wrap)
self.getQueue:removeStale(function (entry)
return not entry.suspension:waiting()
end)
self.getQueue:push({
suspension = suspension,
wrap = wrap
})
end,
function (value)
return value
end
)
})
end
function Channel:put(value)
return self:putFactory(value):perform()
end
function Channel:get()
return self:getFactory():perform()
end
function numbers(from)
local sink = Channel:new()
Fibers:spawn(function ()
while true do
sink:put(from)
from = from + 1
end
end)
return sink
end
local done = false
Fibers:spawn(function ()
local source = numbers(0)
for i=0,10 do print(source:get()) end
done = true
end)
while not done do Tasks:run() end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment