Skip to content

Instantly share code, notes, and snippets.

@Clemapfel
Last active April 1, 2024 22:36
Show Gist options
  • Save Clemapfel/657c02770d5cd77e87a8a25f18a7669e to your computer and use it in GitHub Desktop.
Save Clemapfel/657c02770d5cd77e87a8a25f18a7669e to your computer and use it in GitHub Desktop.
ThreadPool in love2D
-- licensed MIT, created by https://github.com/clemapfel
if rt == nil then rt = {} end
--- @class rt.ThreadPool
rt.ThreadPool = {}
--- @brief constructor
--- @param n_threads Number number of threads, default: 1
--- @return rt.ThreadPool
function rt.ThreadPool:new(n_threads)
if n_threads == nil then n_threads = 1 end
local out = {
_threads = {},
_futures = {},
_n_threads = math.abs(n_threads),
_main_to_worker = love.thread.newChannel(),
_worker_to_main = love.thread.newChannel()
}
setmetatable(out, {
__index = rt.ThreadPool
})
return out
end
setmetatable(rt.ThreadPool, {
__call = rt.ThreadPool.new
})
--- @class rt.MessageType
rt.MessageType = {
KILL = "KILL",
LOAD_AUDIO = "LOAD_AUDIO",
AUDIO_DONE = "AUDIO_DONE",
PRINT = "PRINT",
PRINT_DONE = "PRINT_DONE"
}
--- @class rt.ThreadPool.Message
--- @brief message to be passed between main and worker threads, these instruct the threads what to do
function rt.ThreadPool.Message(type, id, data)
return {
type = type,
id = id,
data = data
}
end
--- @class rt.ThreadPool.Future
--- @brief when querying the threadpool, it will return a future object. Once the promised value is available, it will be automatically send to the future
function rt.ThreadPool.Future(id)
return {
id = id,
result = nil,
is_delivered = false,
is_ready = function(self) return self.is_delivered end,
get_result = function(self) return self.result end
}
end
--- running internal IDs of futures, private
rt.ThreadPool.FUTURE_ID = 0
--- worker source code
rt.ThreadPool._thread_source = love.filesystem.read("common/thread_pool_worker.lua")
if rt.ThreadPool._thread_source == nil then
error("In thread_pool: path to thread pool worker code is invalid, change the file location in thread_pool.lua, line 64")
end
--- @brief allocate all threads and start them, they will run continuously from this point on
function rt.ThreadPool:startup()
for i = 1, self._n_threads do
local to_push = {
thread = love.thread.newThread(
rt.ThreadPool._thread_source
)
}
self._threads[i] = to_push
to_push.thread:start(
self._main_to_worker, -- main_to_worker
self._worker_to_main, -- worker_to_main
rt.MessageType, -- rt.MessageType
i -- THREAD_ID
)
end
end
--- @brief [internal] distribute a message of given type to all threads
function rt.ThreadPool:_send_message(type, data)
rt.ThreadPool.FUTURE_ID = rt.ThreadPool.FUTURE_ID + 1
local id = rt.ThreadPool.FUTURE_ID
local message = rt.ThreadPool.Message(type, id, data)
local future = rt.ThreadPool.Future(id)
self._main_to_worker:push(message)
self._futures[id] = future
return future
end
--- @brief ask the thread pool to print a message thread-side
--- @return rt.ThreadPool.Future<String>
function rt.ThreadPool:request_debug_print(message)
return self:_send_message(rt.MessageType.PRINT, message)
end
--- @brief ask the thread pool to load an audio file
--- @return rt.ThreadPool.Future<love.SoundData>
function rt.ThreadPool:request_load_sound_data(path_to_audio)
return self:_send_message(rt.MessageType.LOAD_AUDIO, path_to_audio)
end
--- @brief flush queue and distribute results among futures
function rt.ThreadPool:update(delta)
local futures = {}
while self._worker_to_main:getCount() > 0 do
local message = self._worker_to_main:pop()
local future = self._futures[message.id]
future.result = message.data
future.is_delivered = true
table.insert(futures, future)
self._futures[message.id] = nil
end
return futures
end
--- @brief safely shutdown threadpool, waits for all tasks to finish
function rt.ThreadPool:shutdown()
for i = 1, #self._threads do
self._main_to_worker:push({
type = rt.MessageType.KILL,
data = nil,
id = -1
})
end
for _, t in pairs(self._threads) do
t.thread:wait()
end
end
--- @brief immediately shutdown threadpool, no matter what
function rt.ThreadPool:force_shutdown()
for _, t in pairs(self._threads) do
t.thread:kill()
t.thread:wait()
end
end
require "love.image"
require "love.audio"
require "love.sound"
require "love.timer"
-- globals handed from main during :start()
local args = {...}
main_to_worker = args[1]
worker_to_main = args[2]
rt = {}; rt.MessageType = args[3]
THREAD_ID = args[4]
-- main loop
while true do
-- retrieve message and handle it
local message = main_to_worker:demand()
if message.type == rt.MessageType.KILL then
-- Message Type #1: KILL This will safely shutdown the thread
break
elseif message.type == rt.MessageType.PRINT then
-- Message Type #2: PRINT this will print to the console from the thread, useful for debugging
print("Thread #" .. tostring(THREAD_ID) .. " prints: " .. tostring(message.data))
worker_to_main:push({
id = message.id,
type = rt.MessageType.PRINT_DONE,
data = message.data
})
elseif message.type == rt.MessageType.LOAD_AUDIO then
-- Message Type #3: LOAD_AUDIO receives a path as message.data, and loads it into memory, then sends the memory back to the worker
local res = love.sound.newSoundData(message.data)
worker_to_main:push({
id = message.id,
type = rt.MessageType.AUDIO_DONE,
data = res
})
elseif message.type == "TODO" then
-- MESSAGE Type #4: TODO add your own message type and thread-side behavior here
-- todo
else
error("In rt.ThreadPool.ThreadWorker: unhandled message type `" .. message.type .. "`")
end
end
require "thread_pool"
love.load = function()
love.window.setMode(1920 / 2, 1080 / 2, {
vsync = 1,
msaa = 8,
stencil = true,
resizable = true
})
love.window.setTitle("ThreadPool Test")
-- tasks to run multi-threaded: print 4 message
test_messages = {
"First Message",
"Second Message",
"Third Message",
"Fourth Message",
}
-- and load one music file
test_audio = {
"assets/music/test_music_03.mp3"
}
-- this is where the results will be stored
futures = {}
-- create the thread pool
thread_pool = rt.ThreadPool(8)
thread_pool:startup()
-- queue tasks for the thread pool
for _, message in pairs(test_messages) do
table.insert(futures, thread_pool:request_debug_print(message))
end
for _, path in pairs(test_audio) do
table.insert(futures, thread_pool:request_load_sound_data(path))
end
end
love.update = function()
local delta = love.timer.getDelta()
-- one or more times per frame, the threadpool needs to be updated
-- this will set the values of our `futures`, if their value becomes available
local ready_futures = thread_pool:update(delta)
-- we can use the futures stored in `future`, but `update` also returns all newly updated futures that turn
for _, future in pairs(ready_futures) do
assert(future:is_ready())
print("Main Received Future #" .. tostring(future.id) .. ":", future:get_result(), "\n")
end
-- if all futures are done, safely shutdown thread pool, then love
local is_done = true
for _, future in pairs(futures) do
if not future:is_ready() then
is_done = false
break
end
end
if is_done then
println("succesfully received all futures")
love.event.quit()
end
end
@Clemapfel
Copy link
Author

This thread pool uses the "future" pattern. When you request the thread pool to do work, for example like in main.lua line 38:

table.insert(futures, thread_pool:request_load_sound_data(path))

The function will return an empty Future object. Each frame, after ThreadPool:update was called, the future may be updated with the value returned by the thread pool. To check if it is done yet, use Future:is_ready(), after which Future:get_result() will return the result.

@Clemapfel
Copy link
Author

Clemapfel commented Apr 1, 2024

As an example of how to extend the functionality, we will be adding a function that lets the thread read a file into a string thread side.

We first add a new message type (thread_pool.lua)

--- @class rt.MessageType
rt.MessageType = {
    KILL = "KILL",
    LOAD_AUDIO = "LOAD_AUDIO",
    AUDIO_DONE = "AUDIO_DONE",
    PRINT = "PRINT",
    PRINT_DONE = "PRINT_DONE"
    READ_FILE = "READ_FILE", -- new
    READ_FILE_DONE = "READ_FILE_DONE" -- new
}

We then add a function to ThreadPool to request this behavior easily

function rt.ThreadPool:request_read_file(path)
    return self:_send_message(rt.MessageType.READ_FILE, path)
end

Then, in thread_pool_worker.lua, we add a new branch to the message handling that handles our new READ_FILE request message

elseif message.type == rt.MessageType.READ_FILE then
   local res = love.filesystem.read(message.data) -- path was forwarded as data
   -- send the finished tasked to main
   worker_to_main:push({
      id = message.id,
      type = rt.MessageType.READ_FILE_DONE, 
      data = res
   })

That's it, now calling thread_pool:request_read_file("path_to_file.txt") will read the file into a string thread-side, then return a future whose get_result will return the read string when it is ready.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment