Skip to content

Instantly share code, notes, and snippets.

@binji
Last active April 25, 2016 04:13
Show Gist options
  • Save binji/16b9bc4a1e875b6a64663ff797456a5b to your computer and use it in GitHub Desktop.
Save binji/16b9bc4a1e875b6a64663ff797456a5b to your computer and use it in GitHub Desktop.
simple thread pool w/ bump allocator
doing stuff...
doing stuff...
chunk 0: used: 50000 size: 65536
chunk 1: used: 37020 size: 65536
chunk 2: used: 51000 size: 65536
chunk 3: used: 44000 size: 65536
chunk 4: used: 58000 size: 65536
chunk 5: used: 63000 size: 65536
chunk 6: used: 41000 size: 65536
chunk 7: used: 56000 size: 65536
TOTAL: used: 400020 size: 524288
#include <assert.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>
#define LOG 0
#if LOG
#define LOGF(...) fprintf(stderr, __VA_ARGS__)
#else
#define LOGF(...) (void)0
#endif
struct Chunk {
explicit Chunk(size_t size) {
data = new uint8_t[size];
current = data;
end = data + size;
next = nullptr;
}
~Chunk() { delete[] data; }
void* Alloc(size_t size) {
if (end - current < size)
return nullptr;
void* result = current;
current += size;
return result;
}
uint8_t* data;
uint8_t* current;
uint8_t* end;
Chunk* next;
};
const size_t kDefaultChunkSize = 64 * 1024;
struct Allocator {
Allocator() : first(nullptr), last(nullptr), chunk_count(0) {}
~Allocator() {
while (first) {
Chunk* next = first->next;
delete first;
first = next;
}
}
void* Alloc(size_t size) {
void* result = nullptr;
if (first) {
result = first->Alloc(size);
}
if (!result) {
Chunk* chunk = new Chunk(std::max(size, kDefaultChunkSize));
chunk_count++;
if (first) {
chunk->next = first;
} else {
last = chunk;
}
first = chunk;
result = chunk->Alloc(size);
assert(result);
}
return result;
}
void StealFirstAndPrepend(Allocator* their) {
Chunk* stolen = their->first;
if (!stolen) {
return;
}
their->first = stolen->next;
if (!their->first) {
their->last = nullptr;
}
stolen->next = first;
if (!first) {
last = stolen;
}
first = stolen;
chunk_count++;
their->chunk_count--;
}
void StealAllAndAppend(Allocator* their) {
if (!their->first) {
// nothing to steal
return;
}
if (last) {
last->next = their->first;
} else {
first = their->first;
}
last = their->last;
their->first = their->last = nullptr;
chunk_count += their->chunk_count;
their->chunk_count = 0;
}
void Dump() {
size_t total_used = 0;
size_t total_size = 0;
int i = 0;
for (Chunk* chunk = first; chunk; chunk = chunk->next, ++i) {
size_t chunk_used = chunk->current - chunk->data;
size_t chunk_size = chunk->end - chunk->data;
fprintf(stderr, "chunk %d: used: %zd size: %zd\n", i, chunk_used,
chunk_size);
total_used += chunk_used;
total_size += chunk_size;
}
fprintf(stderr, "TOTAL: used: %zd size: %zd\n", total_used, total_size);
}
Chunk* first;
Chunk* last;
int chunk_count;
};
struct Task {
virtual ~Task() {}
virtual void Before() {}
virtual void Run() = 0;
virtual void After() {}
};
using TaskPtr = std::unique_ptr<Task>;
struct ThreadPool;
struct Thread {
explicit Thread(ThreadPool* pool) : thread(&Thread::Run, this, pool) {}
~Thread() { thread.join(); }
void Run(ThreadPool* pool);
std::thread thread;
};
using ThreadPtr = std::unique_ptr<Thread>;
struct TaskQueue {
TaskQueue() : exiting(false) {}
void Enqueue(TaskPtr task) {
std::lock_guard<std::mutex> lock(mut);
tasks.push(std::move(task));
has_task.notify_one();
}
TaskPtr Dequeue() {
std::unique_lock<std::mutex> lock(mut);
has_task.wait(lock, [this] { return exiting || !tasks.empty(); });
if (exiting)
return nullptr;
TaskPtr task = std::move(tasks.front());
tasks.pop();
return task;
}
void Exit() {
std::lock_guard<std::mutex> lock(mut);
exiting = true;
has_task.notify_all();
}
private:
std::mutex mut;
std::condition_variable has_task;
std::queue<TaskPtr> tasks;
bool exiting;
};
struct ThreadPool {
explicit ThreadPool(int count) {
for (int i = 0; i < count; ++i) {
threads.emplace_back(new Thread(this));
}
}
~ThreadPool() {
tasks_to_run.Exit();
}
template <typename Container>
void Run(Container& tasks) {
const size_t kMaxRunning = threads.size() * 2;
size_t total = tasks.size();
size_t started = 0;
size_t finished = 0;
while (started < kMaxRunning) {
TaskPtr task = std::move(tasks[started++]);
task->Before();
tasks_to_run.Enqueue(std::move(task));
}
while (finished < total) {
TaskPtr task = done_tasks.Dequeue();
task->After();
finished++;
if (started < total) {
TaskPtr task = std::move(tasks[started++]);
task->Before();
tasks_to_run.Enqueue(std::move(task));
}
}
tasks.clear();
}
private:
TaskQueue tasks_to_run;
TaskQueue done_tasks;
std::vector<ThreadPtr> threads;
friend struct Thread;
};
void Thread::Run(ThreadPool* pool) {
while (true) {
TaskPtr task = pool->tasks_to_run.Dequeue();
if (!task)
break;
task->Run();
pool->done_tasks.Enqueue(std::move(task));
}
}
struct Function {
explicit Function(int dummy) : dummy(dummy) {}
int dummy;
};
struct Module {
Allocator allocator;
std::vector<Function> functions;
};
struct ModuleTask : Task {
explicit ModuleTask(Module* module) : module(module) {}
virtual void Before() { allocator.StealFirstAndPrepend(&module->allocator); }
virtual void After() {
module->allocator.StealFirstAndPrepend(&allocator);
module->allocator.StealAllAndAppend(&allocator);
}
Module* module;
Allocator allocator;
};
int fib(int n) {
if (n < 2) return 1;
return fib(n - 1) + fib(n - 2);
}
struct MyTask : ModuleTask {
MyTask(Module* module, Function* f) : ModuleTask(module), f(f) {
id = s_counter++;
}
virtual void Run() {
LOGF("%d: started\n", id);
// silly allocations
LOGF("%d: allocating 1000 bytes\n", id);
allocator.Alloc(1000);
// busy work
f->dummy = fib(id % 40);
LOGF("%d: finished\n", id);
}
static int s_counter;
int id;
Function* f;
};
int MyTask::s_counter = 0;
void DoSomeStuff(ThreadPool* pool, Module* module) {
fprintf(stderr, "doing stuff...\n");
std::vector<TaskPtr> tasks;
for (Function& f : module->functions) {
tasks.emplace_back(new MyTask(module, &f));
}
pool->Run(tasks);
}
int main() {
const int kNumFunctions = 200;
ThreadPool pool(4);
Module module;
module.allocator.Alloc(20);
for (int i = 0; i < kNumFunctions; ++i) {
module.functions.emplace_back(i);
}
DoSomeStuff(&pool, &module);
DoSomeStuff(&pool, &module);
module.allocator.Dump();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment