Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Platform-agnostic thread pool for Haxe / OpenFL
package com.blazingmammothgames.util;
#if neko
import neko.vm.Thread;
import neko.vm.Mutex;
#elseif cpp
import cpp.vm.Thread;
import cpp.vm.Mutex;
#end
#if (neko || cpp)
private class PoolThread
{
private var thread:Thread;
private var task:Dynamic->Dynamic;
private var mutex:Mutex;
public var started:Bool;
private var _done:Bool;
public var done(get, never):Bool;
private function get_done():Bool
{
mutex.acquire();
var d:Bool = _done;
mutex.release();
return d;
}
private var _result:Dynamic;
public var result(get, never):Dynamic;
private function get_result():Dynamic
{
mutex.acquire();
var r:Dynamic = _result;
mutex.release();
return r;
}
public function new()
{
mutex = new Mutex();
}
public function start(task:Dynamic->Dynamic, arg:Dynamic):Void
{
this.task = task;
started = true;
_done = false;
thread = Thread.create(doWork);
thread.sendMessage(arg);
}
private function doWork():Void
{
var arg:Dynamic = Thread.readMessage(true);
var ret:Dynamic = task(arg);
mutex.acquire();
_result = ret;
_done = true;
mutex.release();
}
}
#end
typedef Task =
{
var id:Int;
var task:Dynamic->Dynamic;
var done:Bool;
var arg:Dynamic;
#if (neko || cpp)
var thread:PoolThread;
#end
var onFinish:Dynamic->Void;
}
/**
* ...
* @author Kenton Hamaluik
*/
class ThreadPool
{
#if (neko || cpp)
private var numThreads:Int = 1;
private var threads:Array<PoolThread>;
#end
private var tasks:Array<Task>;
private var nextID:Int = 0;
public function new(numThreads:Int)
{
tasks = new Array <Task> ();
#if (neko || cpp)
this.numThreads = numThreads;
threads = new Array<PoolThread>();
for (i in 0...this.numThreads)
{
threads.push(new PoolThread());
}
#end
}
public function addTask(task:Dynamic->Dynamic, arg:Dynamic, onFinish:Dynamic->Void):Void
{
tasks.push( { id: nextID, task: task, done: false, arg: arg, #if (neko || cpp) thread: null, #end onFinish: onFinish } );
nextID++;
}
#if (neko || cpp)
private function allTasksAreDone():Bool
{
for (task in tasks)
if (!task.done)
return false;
return true;
}
private function getNextFreeThread():PoolThread
{
for (thread in threads)
if (!thread.started)
return thread;
return null;
}
#end
public function blockRunAllTasks():Void
{
#if (neko || cpp)
while (!allTasksAreDone())
{
// get a free thread
var thread:PoolThread = getNextFreeThread();
// but if it doesn't exist, try again
if (thread == null)
continue;
for (task in tasks)
{
// skip any tasks that are done
if (task.done)
continue;
// if this task is currently being run, see if it's done yet
if (task.thread != null && task.thread.started)
{
if (task.thread.done)
{
// yay, it finished!
task.done = true;
// reset the thread
task.thread.started = false;
// call the on finish function
if (task.onFinish != null)
task.onFinish(task.thread.result);
}
continue;
}
// ok, we have a task that needs running
// and a thread to run it
// combine forces!
task.thread = thread;
thread.start(task.task, task.arg);
// break to try to assign the next thread
break;
}
}
#else
for (task in tasks)
{
if (task.onFinish != null)
task.onFinish(task.task(task.arg));
}
#end
// clear the old tasks
tasks = new Array<Task>();
}
}
package ;
// ...
import com.blazingmammothgames.util.ThreadPool;
// ...
// this will create a thread pool with 8 threads on neko and cpp platforms
// on all other platforms, no threads will be created
// and the pool will use the main thread
var threadPool:ThreadPool = new ThreadPool(8);
// add a task that will take a while to complete
threadPool.addTask(function(x:Dynamic):Int {
var li:Int = 0;
for (i in 0...10)
{
li += i;
for(n in 0...10000) {}
}
return li;
}, null, onFinish);
// add a task that returns right away
threadPool.addTask(function(x:Dynamic):String {
return "herp derp";
}, null, onFinish);
// this is a blocking call that will run all the tasks
// across the pool's threads
// or just in the main thread if not on neko or cpp
threadPool.blockRunAllTasks();
// ...
// report the results of the above tasks
private function onFinish(x:Dynamic):Void
{
trace(x);
}
// on neko or cpp, this will output:
// herp derp
// 45
// because the "herp derp" task will finish much sooner
// than the fibonnaci task when they're running in parallel
// on all other platforms, this will output:
// 45
// herp derp
// because the tasks will be executed in the order they're added
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.