Skip to content

Instantly share code, notes, and snippets.

@karthick18
Created May 2, 2011 08:16
Show Gist options
  • Save karthick18/951299 to your computer and use it in GitHub Desktop.
Save karthick18/951299 to your computer and use it in GitHub Desktop.
A parallel sort but currently using just 1 intermediate input and output files for merging sort results
CC := gcc
CFLAGS := -Wall -g -std=c99
LD_LIBS := -lpthread
ARCH := $(shell uname)
BLOCK_SIZE := 1m
ifeq ("$(strip $(ARCH))", "Linux")
LD_LIBS += -lrt
BLOCK_SIZE := 1M
endif
SRCS := prsort.c
OBJS := $(SRCS:%.c=%.o)
TARGET := prsort
SIZE := 4
INPUT_FILE := random.dat
BUFFER_SIZE := 1024
all: $(TARGET)
.PHONY: $(INPUT_FILE) create_file run
run: create_file
./$(TARGET) $(INPUT_FILE) $(BUFFER_SIZE)
create_file: $(INPUT_FILE)
dd if=/dev/urandom of=$< bs=$(BLOCK_SIZE) count=$(SIZE)
prsort: $(OBJS)
$(CC) -o $@ $^ $(LD_LIBS)
%.o: %.c
$(CC) -c $(CFLAGS) -o $@ $<
clean:
rm -f $(OBJS) $(TARGET) *~
/*
* A parallel sort currently using only 1 intermediate input and output file for merging sort results from
* multiple threads in each pass.
* The memory for the sort would be the input buffer size in KB multiplied by number of cpus. + a running buffer_size in KB
*
* To compile:
* gcc -o prsort prsort.c -Wall -g -std=c99 -pedantic -lpthread -lrt
*
* To run or test, create a 16 MB input file:
* dd if=/dev/urandom of=random.dat bs=1M count=16
*
* ./prsort random.dat 2048 to run with 2 MB buffer size for each thread
*/
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#include <assert.h>
#include <pthread.h>
#include <time.h>
#include <sys/time.h>
#include <sched.h>
#ifdef __linux__
#define offset_t loff_t
#else
#define offset_t off_t
#endif
struct sort_run
{
void *items;
int length;
int size; /* size of each */
int buf_size;
int (*cmp)(const void *, const void *);
int input_fd;
int output_fd;
offset_t offset;
char input_file[40];
char output_file[40];
};
#ifdef CLOCK_MONOTONIC
static __inline__ void __time(unsigned long long *start)
{
struct timespec t;
clock_gettime(CLOCK_MONOTONIC, &t);
*start = (unsigned long long)t.tv_sec * 1000000LL + t.tv_nsec/1000;
}
#else
static __inline__ void __time(unsigned long long *start)
{
struct timeval t;
gettimeofday(&t, NULL);
*start = (unsigned long long)t.tv_sec * 1000000LL + t.tv_usec;
}
#endif
static __inline__ void __swap(char *a, char *b, int s)
{
char temp;
int i;
if(a == b) return;
for(i = 0; i < s; ++i)
{
temp = a[i];
a[i] = b[i];
b[i] = temp;
}
}
static int int_cmp(const void *a, const void *b)
{
return *(unsigned int*)a - *(unsigned int*)b;
}
static int __partition(void *base, int size, int left, int right, int pivot,
int (*cmp)(const void *, const void *))
{
int next_pivot = left;
int i;
char *pivot_ele = (char *)base + pivot*size;
char *right_ele = (char *)base + right*size;
/*
* swap the existing pivot to the end.
*/
__swap(pivot_ele, right_ele, size);
for(i = left; i < right; ++i)
{
char *cur_ele = (char*)base + i*size;
if(cmp(cur_ele, right_ele) <= 0 )
{
pivot_ele = (char*)base + size*next_pivot;
__swap(pivot_ele, cur_ele, size);
++next_pivot;
}
}
pivot_ele = (char*)base + next_pivot*size;
/*
* swap back the pivot to the right next_pivot index.
*/
__swap(pivot_ele, right_ele, size);
return next_pivot;
}
static int __bubble_sort(void *base, int size, int left, int right, int (*cmp)(const void *, const void *))
{
int i;
int j;
for(i = left; i <= right; ++i)
{
for(j = left; j <= right-1-(i-left); ++j)
{
char *ele1 = (char *)base + size*j;
char *ele2 = (char *)base + size*(j+1);
if(cmp(ele1, ele2) > 0)
{
__swap(ele1, ele2, size);
}
}
}
return 0;
}
static int do_qsort(void *base, int size, int left, int right, int (*cmp)(const void *, const void *))
{
if(right > left)
{
if(right - left < 5)
return __bubble_sort(base, size, left, right, cmp);
int pivot = (left+right) >> 1;
int new_pivot = __partition(base, size, left, right, pivot, cmp);
if(new_pivot - left - 1 < right - new_pivot)
{
do_qsort(base, size, left, new_pivot-1, cmp);
do_qsort(base, size, new_pivot, right, cmp);
}
else
{
do_qsort(base, size, new_pivot, right, cmp);
do_qsort(base, size, left, new_pivot-1,cmp);
}
}
return 0;
}
static int __qsort(void *base, int nelem, int size, int (*cmp)(const void *, const void *))
{
int left = 0;
int right = nelem - 1;
return do_qsort(base, size, left, right, cmp);
}
static struct sort_run *g_runs;
static void *parallel_sort(void *arg)
{
struct sort_run *run = arg;
int num_input = run->length/run->size;
int err = __qsort(run->items, num_input, run->size, run->cmp);
printf("Sorting done for run [%d] with [%d] items [%s]\n",
(int)(run - g_runs), (int) run->length/run->size, err ? "Unsuccessfully" : "Successfully");
return (void *)(unsigned long)err;
}
static int verify_sort(struct sort_run *run)
{
int err = 0;
int i;
void *items = run->items;
char *prev_ele = (char*)items;
int num_items = run->length/run->size;
for(i = 1; i < num_items; ++i)
{
char *cur_ele = (char*)items + run->size*i;
if(run->cmp(cur_ele, prev_ele) < 0)
{
err = 1;
fprintf(stderr, "Sort error for item [%p] at index [%d]\n", (void*)cur_ele, i);
assert(0);
}
prev_ele = cur_ele;
}
return err;
}
static int verify_sort_file(struct sort_run *run)
{
char *buf ;
int bytes;
struct sort_run output_run = {.cmp = run->cmp, .size = run->size, .buf_size = run->buf_size };
if(!run->output_file[0])
return -1;
if(run->output_fd < 0)
{
run->output_fd = open(run->output_file, O_RDONLY);
assert(run->output_fd >= 0);
}
else lseek(run->output_fd, 0, SEEK_SET);
buf = calloc(1, run->buf_size);
assert(buf != NULL);
output_run.items = buf;
int cur_run = 0;
while( cur_run++, (bytes = read(run->output_fd, buf, run->buf_size)) > 0 )
{
output_run.length = bytes;
printf("Verifying sort run [%d] for file [%s]...\n", cur_run, run->output_file);
verify_sort(&output_run);
}
free(buf);
return 0;
}
static struct sort_run *merge_runs(struct sort_run *runs, int num_runs,
struct sort_run **ret_output_run, int *total_runs)
{
struct sort_run *output_run = *ret_output_run;
int run_item_size,buf_size;
int (*run_cmp)(const void *,const void *) = NULL;
int i;
int cur_runs = num_runs + (output_run ? 1 : 0);
int run_index_map[cur_runs];
struct sort_run *run_array[cur_runs];
int cur_output_index = 0, cur_run_index = 0;
static char input_file[] = "foo";
static char output_file[] = "bar";
char *output_buffer = NULL;
if(!output_run && num_runs <= 1) return runs;
if(!runs) return output_run;
run_cmp= runs->cmp;
run_item_size = runs->size;
buf_size = runs->buf_size;
if(output_run)
{
run_index_map[cur_run_index] = 0;
run_array[cur_run_index++] = output_run;
}
for(i = cur_run_index; i < num_runs + cur_run_index; ++i)
{
run_array[i] = runs + (i - cur_run_index);
run_index_map[i] = 0;
}
assert(i == cur_runs);
num_runs = cur_runs;
if(!output_run)
{
*ret_output_run = output_run = calloc(1, sizeof(*output_run));
assert(output_run != NULL);
output_run->size = run_item_size;
output_run->cmp = run_cmp;
output_run->buf_size = buf_size;
output_run->input_fd = -1;
output_run->output_file[0] = 0;
strncat(output_run->output_file, output_file, sizeof(output_run->output_file)-1);
output_run->input_file[0] = 0;
strncat(output_run->input_file, input_file, sizeof(output_run->input_file)-1);
}
else
{
/*
* Swap input with output
*/
char tmpfile[sizeof(output_run->input_file)];
close(output_run->output_fd);
output_run->output_fd = -1;
tmpfile[0] = 0;
strncat(tmpfile, output_run->input_file, sizeof(tmpfile)-1);
output_run->input_file[0] = 0;
strncat(output_run->input_file, output_run->output_file, sizeof(output_run->input_file)-1);
output_run->output_file[0] = 0;
strncat(output_run->output_file, tmpfile, sizeof(output_run->output_file)-1);
output_run->input_fd = open(output_run->input_file, O_RDONLY);
assert(output_run->input_fd >= 0);
output_run->items = calloc(1, buf_size);
assert(output_run->items != NULL);
output_run->length = read(output_run->input_fd, output_run->items, buf_size);
if(output_run->length < 0)
output_run->length = 0;
}
output_run->output_fd = open(output_run->output_file, O_RDWR | O_CREAT | O_TRUNC, 0777);
assert(output_run->output_fd >= 0);
output_buffer = calloc(1, buf_size);
assert(output_buffer != NULL);
while(cur_runs > 0)
{
int min_run = 0;
void *min_item = NULL;
for(i = 0; i < num_runs; ++i)
{
if(run_index_map[i] >= run_array[i]->length/run_item_size)
continue;
char *cur_item = (char*)run_array[i]->items + run_item_size*run_index_map[i];
if(!min_item || run_cmp(cur_item, min_item) < 0)
{
min_item = cur_item;
min_run = i;
}
}
run_index_map[min_run]++;
char *tgt_item = output_buffer + cur_output_index*run_item_size;
memcpy(tgt_item, min_item, run_item_size);
++cur_output_index;
/*
* Flush output buffer if at the limit.
*/
if(cur_output_index >= buf_size/run_item_size)
{
write(output_run->output_fd, output_buffer, buf_size);
cur_output_index = 0;
}
/*
* Check for the limits. and re-read if required for the relevant run from the input file
*/
if(run_index_map[min_run] >= run_array[min_run]->length/run_item_size)
{
if(run_array[min_run]->input_fd < 0)
{
--cur_runs;
}
else
{
int length = read(run_array[min_run]->input_fd, run_array[min_run]->items, buf_size);
if(length <= 0)
{
--cur_runs;
}
else
{
run_array[min_run]->length = length;
run_index_map[min_run] = 0;
}
}
}
}
/*
* Final flush.
*/
if(cur_output_index > 0)
{
write(output_run->output_fd, output_buffer, cur_output_index*run_item_size);
fsync(output_run->output_fd);
}
if(output_run->input_fd >= 0)
{
free(output_run->items);
output_run->items = NULL;
close(output_run->input_fd);
output_run->input_fd = -1;
}
free(output_buffer);
if(total_runs) *total_runs += num_runs;
return output_run;
}
#ifdef DEBUG
static void dump_result(struct sort_run *run)
{
int i;
int nentries = run->length/run->size;
char *result = run->items;
for(i = 0; i < nentries; ++i)
{
unsigned int *ele = (unsigned int*) (result + i*sizeof(int));
printf("[%d] = [%u]\n", i, *ele);
}
}
#else
static __inline__ void dump_result(struct sort_run *run) { }
#endif
static int check_libc_qsort(int fd, size_t size)
{
struct sort_run output_run = {0};
char *buf = calloc(1, size);
offset_t offset = 0;
assert(buf);
int bytes = 0;
lseek(fd, 0, SEEK_SET);
while( (bytes = read(fd, buf + offset, size - offset) ) > 0 )
offset += bytes;
output_run.length = size;
output_run.items = buf;
output_run.size = sizeof(int);
output_run.cmp = int_cmp;
unsigned long long start = 0, end = 0;
__time(&start);
qsort(buf, size/sizeof(int), sizeof(int), int_cmp);
__time(&end);
verify_sort(&output_run);
printf("LIBC qsort time for [%ld] items = [%lld] usecs\n", size/sizeof(int), end - start);
free(buf);
return 0;
}
#ifdef __linux__
static int get_cpu_count(void)
{
cpu_set_t set;
sched_getaffinity(0, sizeof(set), &set);
return CPU_COUNT(&set);
}
#else
static int get_cpu_count(void)
{
return 4;
}
#endif
int main(int argc, char **argv)
{
int num_threads;
pthread_t *tids;
char filename[0xff+1];
int buf_size;
char *endp = NULL;
int fd;
struct stat statbuf;
size_t size;
int i;
if(argc != 3)
{
fprintf(stderr, "Insufficient args...\n");
fprintf(stderr, "%s filename buf_size_in_kb\n", argv[0]);
exit(127);
}
filename[0] = 0;
strncat(filename, argv[1], sizeof(filename)-1);
buf_size = strtol(argv[2], &endp, 10);
if(*endp)
{
fprintf(stderr, "Invalid format for buf size [%s]\n", argv[2]);
exit(127);
}
buf_size <<= 10;/*in kb*/
fd = open(filename, O_RDONLY);
if(fd < 0)
{
perror("open:");
exit(127);
}
if(fstat(fd, &statbuf))
{
perror("fstat:");
exit(127);
}
size = statbuf.st_size;
if(buf_size > size)
buf_size = size;
/*
* Allocate threads based on the current cpu count for real parallelization.
*/
num_threads = get_cpu_count();
assert(num_threads >= 1);
struct sort_run *runs = calloc(num_threads, sizeof(*runs));
assert(runs != NULL);
g_runs = runs;
tids = calloc(num_threads, sizeof(*tids));
assert(tids != NULL);
unsigned long long start = 0, end = 0;
struct sort_run *output_run = NULL;
__time(&start);
while(size > 0)
{
for(i = 0; i < num_threads && size > 0; ++i)
{
int length = buf_size;
if(size < buf_size) length = size;
size -= length;
if(length != runs[i].length)
{
if(runs[i].items) free(runs[i].items);
runs[i].length = length;
runs[i].items = calloc(1, length);
}
assert(runs[i].items != NULL);
runs[i].input_fd = runs[i].output_fd = -1;
runs[i].size = sizeof(int);
runs[i].buf_size = buf_size;
runs[i].cmp = int_cmp;
if(read(fd, runs[i].items, length) != length)
{
perror("read:");
goto out;
}
}
/*
* Start all the threads at once. with the sort load.
*/
if(i > 1)
{
for(int j = 0; j < i; ++j)
{
int err = pthread_create(tids + j, NULL, parallel_sort, runs+j);
assert(err == 0);
}
for(int j = 0; j < i; ++j)
pthread_join(tids[j], NULL);
}
else
{
parallel_sort(runs);
}
output_run = merge_runs(runs, i, &output_run, NULL);
}
__time(&end);
printf("Time taken to sort [%ld] items = [%lld] usecs\n", (long int) (statbuf.st_size/sizeof(int)), end-start);
if(output_run == runs) runs = NULL;
out:
if(runs)
{
for(i = 0; i < num_threads; ++i)
{
if(runs[i].items)
{
free(runs[i].items);
runs[i].items = NULL;
}
}
free(runs);
}
if(output_run)
{
if(output_run->items)
{
free(output_run->items);
output_run->items = NULL;
}
verify_sort_file(output_run);
if(output_run->output_fd >= 0)
close(output_run->output_fd);
free(output_run);
}
if(tids) free(tids);
//check_libc_qsort(fd, statbuf.st_size);
close(fd);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment