Skip to content

Instantly share code, notes, and snippets.

@kovrov
Created July 26, 2010 13:59
Show Gist options
  • Save kovrov/490580 to your computer and use it in GitHub Desktop.
Save kovrov/490580 to your computer and use it in GitHub Desktop.
priority_queue.d
import core.thread;
import core.sync.condition;
import core.sync.mutex;
import std.date;
const sleep_ticks = 10_000_000/ticksPerSecond;
class TaskQueue(T) : Thread
{
this()
{
super(&run);
this.cond_mutex = new Mutex;
this.cond = new Condition(this.cond_mutex);
}
void enqueue(T x)
{
synchronized (this.cond_mutex)
{
this.insert(x);
this.cond.notify();
}
}
void stop()
{
synchronized (this.cond_mutex)
{
this.done = true;
this.cond.notify();
}
}
private:
void run()
{
synchronized (this.cond_mutex)
while (!this.done)
{
if (this.heap.length < 2) // no tasks
{
cond.wait(); // unlocks cond_mutex
debug writefln("+++ woke up to do something?");
continue;
}
auto now = getUTCtime();
auto task = this.heap[1];
if (now < task.time)
{
cond.wait((task.time - now) * sleep_ticks); // unlocks cond_mutex
debug
{ now = getUTCtime();
if (now < task.time)
debug writefln("### woke up too soon! (%s.%s sec earlier)",
(task.time - now) / ticksPerSecond % 60,
(task.time - now) % ticksPerSecond);
}
continue;
}
task = this.pop();
auto timeout = task.update(now);
if (timeout != d_time_nan)
{
task.time = now + timeout;
this.insert(task);
}
}
}
/* binary heap stuff */
void insert(T x)
{
this.heap.length = this.heap.length + 1;
int index = this.heap.length - 1;
while (index > 1 && x < this.heap[index / 2])
{
this.heap[index] = this.heap[index / 2];
index /= 2;
}
this.heap[index] = x;
}
T pop()
{
assert (this.heap.length - 1 > 0);
T ret = this.heap[1];
T last = this.heap[1] = this.heap[$ - 1];
this.heap.length = this.heap.length - 1;
if (this.heap.length < 2) // no elements
return ret;
int node_index = 1;
while (node_index * 2 < this.heap.length) // until there is atleast one child
{
int child_index = node_index * 2;
if (child_index + 1 < this.heap.length && this.heap[child_index + 1] < this.heap[child_index])
child_index++;
if (this.heap[child_index] >= last)
break;
this.heap[node_index] = this.heap[child_index];
node_index = child_index;
}
this.heap[node_index] = last;
return ret;
}
bool done;
Condition cond;
Mutex cond_mutex;
T[] heap = [T.init,];
//invariant() { assert (this.heap.length > 0); }
}
alias d_time delegate(const ref d_time) Tasklet;
struct Task
{
d_time time; Tasklet update;
int opCmp(ref Task other) { return this.time > other.time ? 1 : this.time < other.time ? -1 : 0; }
}
struct Time
{
int hour, min, sec, ms;
this(ref d_time time)
{
ms = cast(int)(time%ticksPerSecond);
sec = cast(int)(time/ticksPerSecond%60);
min = cast(int)(time/ticksPerSecond/60%60);
hour = cast(int)(time/ticksPerSecond/60/60%24);
}
}
import std.conv;
import std.stdio;
void main()
{
auto task_mgr = new TaskQueue!(Task);
//task_mgr.priority = Thread.PRIORITY_MAX; // test
task_mgr.start();
scope (exit) task_mgr.stop();
int fps = 5;
d_time begin_time = getUTCtime();
d_time next_time = begin_time + ticksPerSecond/fps;
int rate = 0;
d_time task(const ref d_time current_time)
{
if (next_time - current_time < 0)
rate++;
else
rate = 0;
if (rate > 3)
{
task_mgr.stop();
writefln("failed on %s fps", fps);
}
if (begin_time + ticksPerSecond*5 < current_time)
{
writefln("going to the next level: %s fps", ++fps);
begin_time = current_time;
rate = 0;
}
next_time = current_time + ticksPerSecond/fps;
return ticksPerSecond/fps;
}
foreach (i; 0..100)
{
task_mgr.enqueue(Task(i+1, &task));
}
task_mgr.join();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment