Skip to content

Instantly share code, notes, and snippets.

@mrjohannchang
Created March 3, 2013 14:52
Show Gist options
  • Save mrjohannchang/5076386 to your computer and use it in GitHub Desktop.
Save mrjohannchang/5076386 to your computer and use it in GitHub Desktop.
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <unistd.h>
#include <assert.h>
#include <sys/time.h>
#include <pthread.h>
#include <queue>
#include <vector>
#include <deque>
using namespace std;
enum OutputOpt {
OOPT_NONE,
OOPT_MEMCPY,
OOPT_4BYTES,
OOPT_DUFF,
};
const int N = 30;
const OutputOpt output_opt = OOPT_DUFF;
/* mutable parameters */
bool flag_output_num;
int maxlen = N;
int job_size = 14;
double time_limit = 10.0;
#if 0
unsigned int thread_num = 4;
int memory_size = 1048576 * 100;
#else
// for quanta s2q
unsigned int thread_num = 32;
int memory_size = 1048576 * 800;
#endif
pthread_mutex_t g_lock;
int worker_num;
int job_num;
int out_num;
double now(void)
{
struct timeval tnow;
double tval;
gettimeofday(&tnow,0);
tval=tnow.tv_sec+tnow.tv_usec/1e6;
return tval;
}
//#define TRACE() Tracer tx(__LINE__)
#define TRACE()
#define LINES 500
struct Tracer {
static double time[LINES];
static int count[LINES];
int line;
double t0;
Tracer(int line_) : line(line_) {
t0 = now();
}
~Tracer() {
time[line] += now() - t0;
count[line]++;
}
static void dump() {
for (int i = 0; i < LINES; i++)
if (count[i])
fprintf(stderr, "line %d, count %d, t=%f, avg=%f\n",
i, count[i], time[i], time[i]/count[i]);
}
};
double Tracer::time[LINES];
int Tracer::count[LINES];
struct Out {
/* 5^7 * N(=30) = 2.3M */
/* +1 for \n, +4 for 4 bytes op */
static const unsigned cap = 78125 * (N+1) + 1 + 4;
int id;
unsigned int outlen;
char *outbuf;
int found;
Out() {
outlen = 0;
found = 0;
outbuf = (char*) malloc(cap);
}
};
struct Job {
int id;
int len;
int mid;
int end;
char result[N+1+4];
int a;
Out *out;
};
class JobQueue {
queue<Job*> q;
pthread_mutex_t mutex;
pthread_cond_t cond_w;
pthread_cond_t cond_r;
unsigned int limit;
bool done;
public:
JobQueue(unsigned int limit_)
:q(), limit(limit_) {
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&cond_r, NULL);
pthread_cond_init(&cond_w, NULL);
done = false;
}
Job* get() {
TRACE();
Job *result;
pthread_mutex_lock(&mutex);
while (q.size() == 0) {
if (done) {
pthread_mutex_unlock(&mutex);
return NULL;
}
pthread_cond_wait(&cond_r, &mutex);
}
result = q.front();
q.pop();
pthread_cond_signal(&cond_w);
pthread_mutex_unlock(&mutex);
return result;
}
void no_more_job() {
TRACE();
pthread_mutex_lock(&mutex);
done = true;
pthread_cond_broadcast(&cond_r);
pthread_mutex_unlock(&mutex);
}
void push(Job* job) {
TRACE();
pthread_mutex_lock(&mutex);
while (q.size() == limit) {
pthread_cond_wait(&cond_w, &mutex);
}
q.push(job);
pthread_cond_signal(&cond_r);
pthread_mutex_unlock(&mutex);
}
};
JobQueue *job_queue;
class OutputList {
pthread_mutex_t mutex;
pthread_cond_t cond_r;
pthread_cond_t cond_w;
pthread_cond_t cond_s;
deque<Out*> list;
vector<Out*> free_list;
int next_id;
public:
OutputList(int size) {
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&cond_r, NULL);
pthread_cond_init(&cond_w, NULL);
pthread_cond_init(&cond_s, NULL);
next_id = 0;
for (int i = 0; i < size; i++) {
list.push_back(NULL);
free_list.push_back(new Out);
}
}
Out* get_to_fill() {
TRACE();
pthread_mutex_lock(&mutex);
while (free_list.empty())
pthread_cond_wait(&cond_w, &mutex);
Out *r = free_list.back();
free_list.pop_back();
pthread_mutex_unlock(&mutex);
return r;
}
Out* get_to_output() {
TRACE();
pthread_mutex_lock(&mutex);
while (list.front() == NULL) {
pthread_mutex_lock(&g_lock);
if (worker_num == 0) {
pthread_mutex_unlock(&mutex);
pthread_mutex_unlock(&g_lock);
return NULL;
}
pthread_mutex_unlock(&g_lock);
pthread_cond_wait(&cond_r, &mutex);
}
Out* r = list.front();
list.pop_front();
list.push_back(NULL);
next_id++;
pthread_mutex_unlock(&mutex);
return r;
}
void put(Out *out) {
TRACE();
pthread_mutex_lock(&mutex);
while (out->id - next_id >= (int)list.size()) {
pthread_cond_wait(&cond_s, &mutex);
}
assert(0 <= out->id - next_id);
assert(out->id - next_id < list.size());
list[out->id - next_id] = out;
pthread_cond_signal(&cond_s);
pthread_cond_signal(&cond_r);
pthread_mutex_unlock(&mutex);
}
void free(Out *out) {
TRACE();
pthread_mutex_lock(&mutex);
free_list.push_back(out);
pthread_cond_signal(&cond_w);
pthread_mutex_unlock(&mutex);
}
void notify() {
pthread_cond_signal(&cond_r);
}
};
OutputList *out_list;
const char cand[] = "01689";
const char cand_mid[] = "018";
const char cand2[] = "01986";
void output(struct Out *out, struct Job *job)
{
if (output_opt == OOPT_NONE) {
} else if (output_opt == OOPT_4BYTES) {
uint32_t *p = (uint32_t*)(out->outbuf + out->outlen);
uint32_t *q = (uint32_t*)job->result;
while (*q) {
*p++ = *q++;
}
} else if (output_opt == OOPT_DUFF) {
uint32_t *p = (uint32_t*)(out->outbuf + out->outlen);
uint32_t *q = (uint32_t*)job->result;
switch (job->len >> 2) {
case 7: *p++ = *q++;
case 6: *p++ = *q++;
case 5: *p++ = *q++;
case 4: *p++ = *q++;
case 3: *p++ = *q++;
case 2: *p++ = *q++;
case 1: *p++ = *q++;
case 0: *p++ = *q++;
}
} else if (output_opt == OOPT_MEMCPY) {
memcpy(out->outbuf+out->outlen, job->result, job->len+1);
}
out->outlen += job->len+1;
out->found++;
assert(out->outlen < out->cap);
}
void run(struct Job *job, int a)
{
int i = a == 0 ? 1 : 0;
if (a == job->end) {
output(job->out, job);
} else {
if (a == job->mid) {
for (; cand_mid[i]; i++) {
job->result[a] = cand_mid[i];
run(job, a+1);
}
} else {
for (; cand[i]; i++) {
job->result[a] = cand[i];
job->result[job->len - a - 1] = cand2[i];
run(job, a+1);
}
}
}
}
void gen(struct Job *job, int a)
{
if (a == job->end) {
Job *new_job = new Job();
new_job->id = job_num++;
new_job->len = job->len;
memcpy(new_job->result, job->result, sizeof(job->result));
new_job->end = job->len / 2 + job->len % 2;
new_job->mid = job->len / 2;
new_job->a = a;
job_queue->push(new_job);
} else {
int i = a == 0 ? 1 : 0;
for (; cand[i]; i++) {
job->result[a] = cand[i];
job->result[job->len - a - 1] = cand2[i];
gen(job, a+1);
}
}
}
void generate_job()
{
Job job;
for (int len = 1; len <= maxlen; len++) {
memset(&job, 0, sizeof(job));
job.id = -1;
job.len = len;
job.result[len] = '\n';
job.end = (len - job_size + 1) / 2;
if (job.end < 0)
job.end = 0;
gen(&job, 0);
}
}
void* outputer(void *ctx)
{
(void) ctx;
int found = 1;
double t0 = now();
write(1, "0\n", 2);
while(1) {
Out *out = out_list->get_to_output();
if (out == NULL)
break;
assert(out->id == out_num);
out_num++;
found += out->found;
write(1, out->outbuf, out->outlen);
double t1 = now();
if (t1 - t0 > time_limit)
break;
out->outlen = 0;
out->found = 0;
out_list->free(out);
}
//fprintf(stderr, "job num %d\n", job_num);
if (flag_output_num)
fprintf(stderr, "%d\n", found);
Tracer::dump();
exit(0);
return NULL;
}
void* run_worker(void *ctx)
{
(void) ctx;
while (1) {
Out *out = out_list->get_to_fill();
Job *job = job_queue->get();
if (job == NULL) {
pthread_mutex_lock(&g_lock);
worker_num--;
out_list->notify();
pthread_mutex_unlock(&g_lock);
break;
}
job->out = out;
job->out->id = job->id;
assert(job->id == job->out->id);
run(job, job->a);
out_list->put(job->out);
delete job;
}
return NULL;
}
int min(int a, int b) { return a < b ? a : b; }
struct worker_t {
pthread_t thread;
};
int main(int argc, char *argv[])
{
int ch;
struct worker_t *thread_pool;
pthread_t output_thread;
while ((ch = getopt(argc, argv, "n:cC:t:j:T:m:")) != -1) {
switch (ch) {
case 'n':
maxlen = atoi(optarg);
break;
case 'c':
flag_output_num = true;
break;
case 't':
thread_num = atoi(optarg);
break;
case 'j':
job_size = atoi(optarg);
break;
case 'T':
time_limit = atof(optarg);
break;
case 'm':
memory_size = atoi(optarg) * 1048576;
break;
default:
fprintf(stderr, "unknown option '%c'\n",
ch);
}
}
worker_num = thread_num;
thread_pool = (worker_t*) malloc(sizeof(struct worker_t) * thread_num);
out_list = new OutputList(
memory_size / Out::cap
//thread_num * 20);
);
job_queue = new JobQueue(thread_num * 200);
for (unsigned int i = 0; i < thread_num; i++) {
pthread_create(&thread_pool[i].thread, NULL, run_worker, NULL);
}
pthread_create(&output_thread, NULL, outputer, NULL);
generate_job();
job_queue->no_more_job();
pthread_join(output_thread, NULL);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment