Created
January 13, 2022 01:42
-
-
Save jerry73204/ef83fe875df810f123287a5a67b1a0d1 to your computer and use it in GitHub Desktop.
ETU Hadoop 2015 competition code using lock-free concurrent trie
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef __CONFIG_H__ | |
#define __CONFIG_H__ | |
#define MAX_PATH_LEN 4096 | |
#define MAX_PRODUCT_ID_LEN 256 | |
#define MAX_NUM_PRODUCTS 65536 | |
#define MAX_NUM_TRIE_NODES 1048576 | |
#define MAX_MEM_SIZE 1656885703 | |
#define MIN_READ_SIZE 8192 | |
#define MAX_WRITE_SIZE 65536 | |
#define TOP_N 20 | |
#define OUTPUT_FILENAME ("Team23_Result.txt") | |
#endif |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <unistd.h> | |
#include <fcntl.h> | |
#include <errno.h> | |
#include <assert.h> | |
#include <pthread.h> | |
#include <sys/types.h> | |
#include <sys/stat.h> | |
#include <sys/mman.h> | |
#include <Rembedded.h> | |
#include <Rdefines.h> | |
#include "config.h" | |
#define PATTERN_ORDER ("action?;act=order;uid=") | |
#define PATTERN_ORDER_LEN (sizeof(PATTERN_ORDER) - 1) | |
#define PATTERN_PLIST ("plist=") | |
#define PATTERN_PLIST_LEN (sizeof(PATTERN_PLIST) - 1) | |
#define CAST_ARGS(arg) ((struct arguments*) ((arg))) | |
#define DEFERENCE_PRODUCT_POINTER(ptr) (* (struct product**) ((ptr))) | |
/* struct describing arguments passed to worker threads */ | |
struct arguments | |
{ | |
char *chunk_begin; | |
char *chunk_end; | |
}; | |
/* struct of each node on trie */ | |
struct trie_node | |
{ | |
int count; | |
int summary; | |
pthread_rwlock_t rwlock; | |
struct trie_node *next[10]; | |
}; | |
/* struct of elements in product array */ | |
struct product | |
{ | |
char product_id[MAX_PRODUCT_ID_LEN]; | |
int *summary_ptr; | |
}; | |
char hdfs_path[MAX_PATH_LEN]; | |
pthread_t tid_writer; | |
volatile int is_reading_done; | |
int num_threads; | |
char *mem_log_begin; | |
volatile char *mem_log_end; | |
size_t mem_log_size; | |
struct trie_node trie_nodes[MAX_NUM_TRIE_NODES]; | |
struct trie_node *trie_nodes_end = trie_nodes; | |
struct trie_node *trie_head; | |
struct product products[MAX_NUM_PRODUCTS]; | |
struct product *products_end = products; | |
struct product *products_pointers[MAX_NUM_PRODUCTS]; | |
struct product **products_pointers_end = products_pointers; | |
struct product *top_n_products[TOP_N]; | |
void | |
trie_node_init(struct trie_node *node) | |
{ | |
pthread_rwlock_init(&node->rwlock, NULL); | |
} | |
void | |
trie_node_destroy(struct trie_node *node) | |
{ | |
pthread_rwlock_destroy(&node->rwlock); | |
} | |
int compare_product(const void *ptr1, const void *ptr2) | |
{ | |
return (*DEFERENCE_PRODUCT_POINTER(ptr2)->summary_ptr) - (*DEFERENCE_PRODUCT_POINTER(ptr1)->summary_ptr); | |
} | |
void | |
*writer(void *arg) | |
{ | |
/* fork hadoop process */ | |
int fd_pipe[2]; | |
pipe(fd_pipe); | |
int pid = fork(); | |
if (pid == 0) | |
{ | |
int fd_null = open("/dev/null", O_WRONLY); | |
assert(fd_null >= 0); | |
close(fd_pipe[1]); | |
dup2(fd_pipe[0], STDIN_FILENO); | |
dup2(fd_null, STDOUT_FILENO); | |
execl("/usr/bin/hadoop", "/usr/bin/hadoop", "fs", "-put", "-f", "-", hdfs_path, NULL); | |
} | |
close(fd_pipe[0]); | |
char *curr_ptr = mem_log_begin; | |
/* read log from memory and write to hadoop in each loop */ | |
while (1) | |
{ | |
long long int bytes_left; | |
char *curr_end; | |
if (is_reading_done) | |
{ | |
/* idetical to curr_end = mem_log_end; */ | |
/* this inline asm force reading mem_log_size value from memory instead of register */ | |
asm("mov %1, %0\n\t" | |
:"=r"(curr_end) | |
:"m"(mem_log_end) | |
:); | |
if (curr_end == curr_ptr) | |
{ | |
close(fd_pipe[1]); | |
pthread_exit(NULL); | |
} | |
} | |
/* identical to curr_end = mem_log_end */ | |
asm("mov %1, %0\n\t" | |
:"=r"(curr_end) | |
:"m"(mem_log_end) | |
:); | |
bytes_left = curr_end - curr_ptr; | |
assert(bytes_left >= 0); | |
if(bytes_left != 0) | |
{ | |
ssize_t bytes_written = write(fd_pipe[1], curr_ptr, bytes_left); | |
if (bytes_written < 0) | |
{ | |
fprintf(stderr, "fail to write to pipe.\n"); | |
pthread_exit(NULL); | |
} | |
curr_ptr += bytes_written; | |
} | |
} | |
} | |
void | |
*worker(void *arg) | |
{ | |
char *chunk_begin = CAST_ARGS(arg)->chunk_begin; | |
char *chunk_end = CAST_ARGS(arg)->chunk_end; | |
char *chunk_ptr = chunk_begin; | |
while (1) | |
{ | |
chunk_ptr = strstr(chunk_ptr, PATTERN_ORDER); | |
if (chunk_ptr == NULL || chunk_ptr >= chunk_end) | |
break; | |
chunk_ptr = strstr(chunk_ptr, PATTERN_PLIST); | |
assert(chunk_ptr != NULL && chunk_ptr < chunk_end); | |
chunk_ptr += PATTERN_PLIST_LEN; | |
if (*chunk_ptr == ';') | |
continue; | |
while (1) | |
{ | |
char *product_id_begin; | |
char *product_id_end; | |
int amount; | |
int price; | |
char *ptr_end; | |
struct trie_node *trie_ptr; | |
/* find corresponding node on trie */ | |
trie_ptr = trie_head; | |
product_id_begin = chunk_ptr; | |
for (; *chunk_ptr != ','; chunk_ptr++) | |
{ | |
int index = *chunk_ptr - '0'; | |
pthread_rwlock_rdlock(&trie_ptr->rwlock); | |
if (trie_ptr->next[index] != NULL) | |
{ | |
struct trie_node *prev_node = trie_ptr; | |
trie_ptr = trie_ptr->next[index]; | |
pthread_rwlock_unlock(&prev_node->rwlock); | |
} | |
else | |
{ | |
pthread_rwlock_unlock(&trie_ptr->rwlock); | |
pthread_rwlock_wrlock(&trie_ptr->rwlock); | |
if (trie_ptr->next[index] == NULL) | |
{ | |
struct trie_node *next_node = __sync_fetch_and_add(&trie_nodes_end, sizeof(struct trie_node)); | |
assert(next_node - trie_nodes < MAX_NUM_TRIE_NODES); | |
trie_node_init(next_node); | |
trie_ptr->next[index] = next_node; | |
} | |
struct trie_node *prev_node = trie_ptr; | |
trie_ptr = trie_ptr->next[index]; | |
pthread_rwlock_unlock(&prev_node->rwlock); | |
} | |
} | |
product_id_end = chunk_ptr; | |
chunk_ptr++; | |
/* parse amount */ | |
amount = strtol(chunk_ptr, &ptr_end, 10); | |
assert(ptr_end > chunk_ptr); | |
assert( *ptr_end == ','); | |
chunk_ptr = ptr_end + 1; | |
/* parse price */ | |
price = strtol(chunk_ptr, &ptr_end, 10); | |
assert(ptr_end > chunk_ptr); | |
assert(*ptr_end == ',' || *ptr_end == ';'); | |
__sync_fetch_and_add(&trie_ptr->summary, amount * price); | |
if (!__sync_fetch_and_add(&trie_ptr->count, 1)) | |
{ | |
struct product *product_ptr = __sync_fetch_and_add(&products_end, sizeof(struct product)); | |
int product_id_len = product_id_end - product_id_begin; | |
strncpy(product_ptr->product_id, product_id_begin, product_id_len); | |
product_ptr->product_id[product_id_len] = '\0'; | |
product_ptr->summary_ptr = &trie_ptr->summary; | |
struct product **product_pointers_ptr = __sync_fetch_and_add(&products_pointers_end, sizeof(struct product*)); | |
*product_pointers_ptr = product_ptr; | |
} | |
/* break if all products are processed */ | |
if (*ptr_end == ';') | |
break; | |
chunk_ptr = ptr_end + 1; | |
} | |
} | |
pthread_exit(NULL); | |
} | |
int | |
main(int argc, char **argv) | |
{ | |
assert(argc == 2); | |
strcpy(hdfs_path, argv[1]); | |
/* initialize large memory */ | |
mem_log_begin = (char*) malloc(MAX_MEM_SIZE); | |
if (mem_log_begin == NULL) | |
{ | |
fprintf(stderr, "cannot allocate memory.\n"); | |
exit(errno); | |
} | |
mem_log_end = mem_log_begin; | |
/* start stdout writer thread and read from stdin */ | |
pthread_create(&tid_writer, NULL, writer, NULL); | |
while (1) | |
{ | |
ssize_t bytes_read = read(STDIN_FILENO, *(char**) &mem_log_end, MIN_READ_SIZE); | |
if (bytes_read < 0) | |
{ | |
fprintf(stderr, "fail to read from stdin.\n"); | |
exit(errno); | |
} | |
if (bytes_read == 0) | |
{ | |
mem_log_size = mem_log_end - mem_log_begin; | |
is_reading_done = 1; | |
break; | |
} | |
/* identical to bytes_read = mem_log_end */ | |
asm("add %1, %0\t\n" | |
:"=m"(mem_log_end) | |
:"r"(bytes_read) | |
:); | |
} | |
/* init trie */ | |
trie_head = trie_nodes_end; | |
trie_nodes_end++; | |
trie_node_init(trie_head); | |
int num_threads; | |
int max_num_threads = sysconf(_SC_NPROCESSORS_ONLN); | |
if (max_num_threads < 0) | |
{ | |
return errno; | |
} | |
size_t min_chunk_size = (mem_log_size - 1) / max_num_threads + 1; | |
struct arguments args[max_num_threads]; | |
pthread_t thread_ids[max_num_threads]; | |
char *ptr_begin; | |
char *ptr_end = mem_log_begin; | |
/* calc chunk range and start workers */ | |
for (int thread_idx = 0; thread_idx < max_num_threads; thread_idx++) | |
{ | |
ptr_begin = ptr_end; | |
ptr_end += min_chunk_size; | |
if (ptr_end >= mem_log_end) | |
{ | |
ptr_end = *(char**) &mem_log_end; | |
} | |
else | |
{ | |
ptr_end = strchr((char*) ptr_end, '\n'); | |
assert(ptr_end != NULL); | |
ptr_end++; | |
} | |
args[thread_idx].chunk_begin = ptr_begin; | |
args[thread_idx].chunk_end = ptr_end; | |
pthread_create(thread_ids + thread_idx, NULL, worker, &args[thread_idx]); | |
if (ptr_end == mem_log_end) | |
{ | |
num_threads = thread_idx + 1; | |
break; | |
} | |
} | |
/* join workers */ | |
for (int thread_idx = 0; thread_idx < num_threads; thread_idx++) | |
{ | |
pthread_join(thread_ids[thread_idx], NULL); | |
} | |
/* sort and find top 20 via R */ | |
char *r_argv[3] = {argv[0], "--vanilla", "-q"}; | |
Rf_initEmbeddedR(2, r_argv); | |
int n_products = products_pointers_end - products_pointers; | |
double *summaries = (double*) malloc(sizeof(double) * n_products); | |
int *indices = (int*) malloc(sizeof(int) * n_products); | |
for (int i = 0; i < n_products; i++) | |
{ | |
indices[i] = i; | |
summaries[i] = (double) -(*products_pointers[i]->summary_ptr); | |
} | |
rsort_with_index(summaries, indices, n_products); | |
for (int i = 0; i < TOP_N; i++) | |
{ | |
top_n_products[i] = products_pointers[indices[i]]; | |
} | |
free(summaries); | |
free(indices); | |
Rf_endEmbeddedR(0); | |
/* write result */ | |
FILE *file_out = fopen(OUTPUT_FILENAME, "w+"); | |
if (file_out == NULL) | |
{ | |
fprintf(stderr, "cannot write results.\n"); | |
return errno; | |
} | |
int nth_largest = 1; | |
for (struct product **ptr = top_n_products, **end = top_n_products + TOP_N; | |
ptr != end && nth_largest <= TOP_N; | |
ptr++, nth_largest++) | |
{ | |
fprintf(file_out, "%02d,%s\n", nth_largest, (*ptr)->product_id); | |
} | |
fclose(file_out); | |
pthread_join(tid_writer, NULL); | |
free(mem_log_begin); | |
return EXIT_SUCCESS; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment