Skip to content

Instantly share code, notes, and snippets.

@jerry73204
Created January 13, 2022 01:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jerry73204/ef83fe875df810f123287a5a67b1a0d1 to your computer and use it in GitHub Desktop.
Save jerry73204/ef83fe875df810f123287a5a67b1a0d1 to your computer and use it in GitHub Desktop.
ETU Hadoop 2015 competition code using lock-free concurrent trie
#ifndef __CONFIG_H__
#define __CONFIG_H__
#define MAX_PATH_LEN 4096
#define MAX_PRODUCT_ID_LEN 256
#define MAX_NUM_PRODUCTS 65536
#define MAX_NUM_TRIE_NODES 1048576
#define MAX_MEM_SIZE 1656885703
#define MIN_READ_SIZE 8192
#define MAX_WRITE_SIZE 65536
#define TOP_N 20
#define OUTPUT_FILENAME ("Team23_Result.txt")
#endif
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <assert.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <Rembedded.h>
#include <Rdefines.h>
#include "config.h"
#define PATTERN_ORDER ("action?;act=order;uid=")
#define PATTERN_ORDER_LEN (sizeof(PATTERN_ORDER) - 1)
#define PATTERN_PLIST ("plist=")
#define PATTERN_PLIST_LEN (sizeof(PATTERN_PLIST) - 1)
#define CAST_ARGS(arg) ((struct arguments*) ((arg)))
#define DEFERENCE_PRODUCT_POINTER(ptr) (* (struct product**) ((ptr)))
/* struct describing arguments passed to worker threads */
struct arguments
{
char *chunk_begin;
char *chunk_end;
};
/* struct of each node on trie */
struct trie_node
{
int count;
int summary;
pthread_rwlock_t rwlock;
struct trie_node *next[10];
};
/* struct of elements in product array */
struct product
{
char product_id[MAX_PRODUCT_ID_LEN];
int *summary_ptr;
};
char hdfs_path[MAX_PATH_LEN];
pthread_t tid_writer;
volatile int is_reading_done;
int num_threads;
char *mem_log_begin;
volatile char *mem_log_end;
size_t mem_log_size;
struct trie_node trie_nodes[MAX_NUM_TRIE_NODES];
struct trie_node *trie_nodes_end = trie_nodes;
struct trie_node *trie_head;
struct product products[MAX_NUM_PRODUCTS];
struct product *products_end = products;
struct product *products_pointers[MAX_NUM_PRODUCTS];
struct product **products_pointers_end = products_pointers;
struct product *top_n_products[TOP_N];
void
trie_node_init(struct trie_node *node)
{
pthread_rwlock_init(&node->rwlock, NULL);
}
void
trie_node_destroy(struct trie_node *node)
{
pthread_rwlock_destroy(&node->rwlock);
}
int compare_product(const void *ptr1, const void *ptr2)
{
return (*DEFERENCE_PRODUCT_POINTER(ptr2)->summary_ptr) - (*DEFERENCE_PRODUCT_POINTER(ptr1)->summary_ptr);
}
void
*writer(void *arg)
{
/* fork hadoop process */
int fd_pipe[2];
pipe(fd_pipe);
int pid = fork();
if (pid == 0)
{
int fd_null = open("/dev/null", O_WRONLY);
assert(fd_null >= 0);
close(fd_pipe[1]);
dup2(fd_pipe[0], STDIN_FILENO);
dup2(fd_null, STDOUT_FILENO);
execl("/usr/bin/hadoop", "/usr/bin/hadoop", "fs", "-put", "-f", "-", hdfs_path, NULL);
}
close(fd_pipe[0]);
char *curr_ptr = mem_log_begin;
/* read log from memory and write to hadoop in each loop */
while (1)
{
long long int bytes_left;
char *curr_end;
if (is_reading_done)
{
/* idetical to curr_end = mem_log_end; */
/* this inline asm force reading mem_log_size value from memory instead of register */
asm("mov %1, %0\n\t"
:"=r"(curr_end)
:"m"(mem_log_end)
:);
if (curr_end == curr_ptr)
{
close(fd_pipe[1]);
pthread_exit(NULL);
}
}
/* identical to curr_end = mem_log_end */
asm("mov %1, %0\n\t"
:"=r"(curr_end)
:"m"(mem_log_end)
:);
bytes_left = curr_end - curr_ptr;
assert(bytes_left >= 0);
if(bytes_left != 0)
{
ssize_t bytes_written = write(fd_pipe[1], curr_ptr, bytes_left);
if (bytes_written < 0)
{
fprintf(stderr, "fail to write to pipe.\n");
pthread_exit(NULL);
}
curr_ptr += bytes_written;
}
}
}
void
*worker(void *arg)
{
char *chunk_begin = CAST_ARGS(arg)->chunk_begin;
char *chunk_end = CAST_ARGS(arg)->chunk_end;
char *chunk_ptr = chunk_begin;
while (1)
{
chunk_ptr = strstr(chunk_ptr, PATTERN_ORDER);
if (chunk_ptr == NULL || chunk_ptr >= chunk_end)
break;
chunk_ptr = strstr(chunk_ptr, PATTERN_PLIST);
assert(chunk_ptr != NULL && chunk_ptr < chunk_end);
chunk_ptr += PATTERN_PLIST_LEN;
if (*chunk_ptr == ';')
continue;
while (1)
{
char *product_id_begin;
char *product_id_end;
int amount;
int price;
char *ptr_end;
struct trie_node *trie_ptr;
/* find corresponding node on trie */
trie_ptr = trie_head;
product_id_begin = chunk_ptr;
for (; *chunk_ptr != ','; chunk_ptr++)
{
int index = *chunk_ptr - '0';
pthread_rwlock_rdlock(&trie_ptr->rwlock);
if (trie_ptr->next[index] != NULL)
{
struct trie_node *prev_node = trie_ptr;
trie_ptr = trie_ptr->next[index];
pthread_rwlock_unlock(&prev_node->rwlock);
}
else
{
pthread_rwlock_unlock(&trie_ptr->rwlock);
pthread_rwlock_wrlock(&trie_ptr->rwlock);
if (trie_ptr->next[index] == NULL)
{
struct trie_node *next_node = __sync_fetch_and_add(&trie_nodes_end, sizeof(struct trie_node));
assert(next_node - trie_nodes < MAX_NUM_TRIE_NODES);
trie_node_init(next_node);
trie_ptr->next[index] = next_node;
}
struct trie_node *prev_node = trie_ptr;
trie_ptr = trie_ptr->next[index];
pthread_rwlock_unlock(&prev_node->rwlock);
}
}
product_id_end = chunk_ptr;
chunk_ptr++;
/* parse amount */
amount = strtol(chunk_ptr, &ptr_end, 10);
assert(ptr_end > chunk_ptr);
assert( *ptr_end == ',');
chunk_ptr = ptr_end + 1;
/* parse price */
price = strtol(chunk_ptr, &ptr_end, 10);
assert(ptr_end > chunk_ptr);
assert(*ptr_end == ',' || *ptr_end == ';');
__sync_fetch_and_add(&trie_ptr->summary, amount * price);
if (!__sync_fetch_and_add(&trie_ptr->count, 1))
{
struct product *product_ptr = __sync_fetch_and_add(&products_end, sizeof(struct product));
int product_id_len = product_id_end - product_id_begin;
strncpy(product_ptr->product_id, product_id_begin, product_id_len);
product_ptr->product_id[product_id_len] = '\0';
product_ptr->summary_ptr = &trie_ptr->summary;
struct product **product_pointers_ptr = __sync_fetch_and_add(&products_pointers_end, sizeof(struct product*));
*product_pointers_ptr = product_ptr;
}
/* break if all products are processed */
if (*ptr_end == ';')
break;
chunk_ptr = ptr_end + 1;
}
}
pthread_exit(NULL);
}
int
main(int argc, char **argv)
{
assert(argc == 2);
strcpy(hdfs_path, argv[1]);
/* initialize large memory */
mem_log_begin = (char*) malloc(MAX_MEM_SIZE);
if (mem_log_begin == NULL)
{
fprintf(stderr, "cannot allocate memory.\n");
exit(errno);
}
mem_log_end = mem_log_begin;
/* start stdout writer thread and read from stdin */
pthread_create(&tid_writer, NULL, writer, NULL);
while (1)
{
ssize_t bytes_read = read(STDIN_FILENO, *(char**) &mem_log_end, MIN_READ_SIZE);
if (bytes_read < 0)
{
fprintf(stderr, "fail to read from stdin.\n");
exit(errno);
}
if (bytes_read == 0)
{
mem_log_size = mem_log_end - mem_log_begin;
is_reading_done = 1;
break;
}
/* identical to bytes_read = mem_log_end */
asm("add %1, %0\t\n"
:"=m"(mem_log_end)
:"r"(bytes_read)
:);
}
/* init trie */
trie_head = trie_nodes_end;
trie_nodes_end++;
trie_node_init(trie_head);
int num_threads;
int max_num_threads = sysconf(_SC_NPROCESSORS_ONLN);
if (max_num_threads < 0)
{
return errno;
}
size_t min_chunk_size = (mem_log_size - 1) / max_num_threads + 1;
struct arguments args[max_num_threads];
pthread_t thread_ids[max_num_threads];
char *ptr_begin;
char *ptr_end = mem_log_begin;
/* calc chunk range and start workers */
for (int thread_idx = 0; thread_idx < max_num_threads; thread_idx++)
{
ptr_begin = ptr_end;
ptr_end += min_chunk_size;
if (ptr_end >= mem_log_end)
{
ptr_end = *(char**) &mem_log_end;
}
else
{
ptr_end = strchr((char*) ptr_end, '\n');
assert(ptr_end != NULL);
ptr_end++;
}
args[thread_idx].chunk_begin = ptr_begin;
args[thread_idx].chunk_end = ptr_end;
pthread_create(thread_ids + thread_idx, NULL, worker, &args[thread_idx]);
if (ptr_end == mem_log_end)
{
num_threads = thread_idx + 1;
break;
}
}
/* join workers */
for (int thread_idx = 0; thread_idx < num_threads; thread_idx++)
{
pthread_join(thread_ids[thread_idx], NULL);
}
/* sort and find top 20 via R */
char *r_argv[3] = {argv[0], "--vanilla", "-q"};
Rf_initEmbeddedR(2, r_argv);
int n_products = products_pointers_end - products_pointers;
double *summaries = (double*) malloc(sizeof(double) * n_products);
int *indices = (int*) malloc(sizeof(int) * n_products);
for (int i = 0; i < n_products; i++)
{
indices[i] = i;
summaries[i] = (double) -(*products_pointers[i]->summary_ptr);
}
rsort_with_index(summaries, indices, n_products);
for (int i = 0; i < TOP_N; i++)
{
top_n_products[i] = products_pointers[indices[i]];
}
free(summaries);
free(indices);
Rf_endEmbeddedR(0);
/* write result */
FILE *file_out = fopen(OUTPUT_FILENAME, "w+");
if (file_out == NULL)
{
fprintf(stderr, "cannot write results.\n");
return errno;
}
int nth_largest = 1;
for (struct product **ptr = top_n_products, **end = top_n_products + TOP_N;
ptr != end && nth_largest <= TOP_N;
ptr++, nth_largest++)
{
fprintf(file_out, "%02d,%s\n", nth_largest, (*ptr)->product_id);
}
fclose(file_out);
pthread_join(tid_writer, NULL);
free(mem_log_begin);
return EXIT_SUCCESS;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment