Skip to content

Instantly share code, notes, and snippets.

@syndicut
Last active December 29, 2015 07:59
Show Gist options
  • Save syndicut/7640476 to your computer and use it in GitHub Desktop.
Save syndicut/7640476 to your computer and use it in GitHub Desktop.
Problem with parallel writes to moosefs from multiple nodes

run

./main 0 

on one host

./main 1

and

./main 2

on two other hosts

#include <stdio.h>
#include <stdlib.h>
#include <memory.h>
#include <unistd.h>
#include <pthread.h>
#include <vector>
#define min(a,b) (((a) < (b)) ? (a) : (b))
#define max(a,b) (((a) > (b)) ? (a) : (b))
#define TESTFILE "/moosefs2/test.txt"
#define ALL_SIZE 1000000002
#define SIZE_OF_BLOCK 130000000
using namespace std;
struct Writter_struct
{
int threadId;
};
void* writer_routine(void *arg)
{
Writter_struct *par = (Writter_struct *)arg;
int nthread=5;
int threadId=par->threadId;
printf("%d: start writting file...\n",threadId);
fflush(stdout);
size_t all_size=ALL_SIZE;
size_t size_of_block=SIZE_OF_BLOCK;
size_t number_of_blocks=1+(size_t)(all_size/size_of_block);
size_t size_of_last_block=all_size-(number_of_blocks-1)*number_of_blocks;
char* fill_block=new char[max(size_of_block,size_of_last_block)];
memset(fill_block,0,sizeof(char)*max(size_of_block,size_of_last_block));
char* fill_block_test=new char[max(size_of_block,size_of_last_block)];
memset(fill_block_test,0,sizeof(char)*max(size_of_block,size_of_last_block));
printf("filling block...\n");
fflush(stdout);
char *p_c=fill_block;
for(int i=0;i<max(size_of_block,size_of_last_block);i++,p_c++)
{
*p_c='A';
}
printf("block was filled!\n");
fflush(stdout);
FILE* fd=fopen(TESTFILE,"r+");
for(size_t block_id=0;block_id<number_of_blocks;block_id++)
{
int celoe=((int)(block_id))/nthread;
int ost=block_id-celoe*nthread;
if(ost!=threadId)continue;
printf("%d: writting block %lu from %lu\n",threadId,block_id,number_of_blocks);
fflush(stdout);
size_t size_to_write=size_of_block;
if(block_id==(number_of_blocks-1))
size_to_write=size_of_last_block;
size_t shift_in_file=block_id*size_of_block;
fseek(fd,shift_in_file, SEEK_SET);
size_t xxx=fwrite (fill_block,1,size_to_write,fd);
if(xxx!=size_to_write)
{
printf("E1:bad writting of block %lu\n",block_id);
fflush(stdout);
return 0;
}
if (fsync(fileno(fd)) != 0)
{
perror("fsync() error\n");
fflush(stdout);
}
else
{
printf("fsync() success\n");
fflush(stdout);
}
fseek(fd,shift_in_file, SEEK_SET);
fread (fill_block_test,1,size_to_write,fd);
char* p_fill_block_test=fill_block_test;
char* p_fill_block=fill_block;
for(int i=0;i<(int)size_to_write;i++,p_fill_block_test++,p_fill_block++)
{
if(*p_fill_block_test!='A')
{
printf("E2:bad writting block %lu :write=%d read=%d\n",block_id,(int)*p_fill_block,(int)*p_fill_block_test);
fflush(stdout);
break;
}
}
}
if(fill_block!=NULL)
{
delete[] fill_block;
fill_block=NULL;
}
if(fill_block_test!=NULL)
{
delete[] fill_block_test;
fill_block_test=NULL;
}
printf("%d:Closing file...\n",threadId);
fflush(stdout);
fflush(fd);
fsync(fileno(fd));
fclose(fd);
printf("%d:file was written!\n",threadId);
fflush(stdout);
return 0;
}
int main(int argc,char* argv[])
{
printf("Hellow! argc=%d str=%s\n",argc,argv[1]);
fflush(stdout);
if(*argv[1]=='0')
{
printf("create file...\n");
fflush(stdout);
size_t all_size=ALL_SIZE;
FILE* fd=fopen(TESTFILE,"wb");
fseek(fd,all_size,SEEK_SET);
fflush(fd);
fsync(fileno(fd));
fclose(fd);
printf("file was created!\n");
fflush(stdout);
while(1) {
}
return 0;
}
int rank=atoi(argv[1]);
printf("rank=%d\n",rank);
fflush(stdout);
int nthread=1;
std::vector<Writter_struct> thread_params;
std::vector<pthread_t> threads;
thread_params.resize(nthread);
for (int i = 0; i < nthread; i++)
{
thread_params[i].threadId=rank;
}
threads.resize(nthread);
for (int i = 0; i < nthread; i++)
{
printf("starting writter %d\n",i);
fflush(stdout);
int ret = pthread_create(&threads[i], NULL,
writer_routine, &thread_params[i]);
if (ret != 0) {
for (int j = 0; j < i; j++)
pthread_join(threads[j], NULL);
return 0;
}
}
for (int i = 0; i < threads.size(); i++)
pthread_join(threads[i], NULL);
return 0;
}
CC = g++
PROGRAM = main
#CFLAGS = -g `pkg-config --cflags gtk+-x11-2.0` `pkg-config --cflags gtkextra-2.0`
#LDFLAGS = `pkg-config --libs gtk+-x11-2.0` `pkg-config --libs gtkextra-2.0`
CFLAGS = -g #`pkg-config --cflags gtk+-2.0`
LDFLAGS = -lpthread -lrt#`pkg-config --libs gtk+-2.0`
SOURCE = main.cc
OBJS = main.o
.cpp.o:
$(CC) -c $(CFLAGS) $(INCLUDES) -o $*.o $<
###############################################################################
all: $(PROGRAM)
$(PROGRAM): $(OBJS)
$(CC) -o $(PROGRAM) $(OBJS) $(LDFLAGS)
clean:
rm -f *.o core *~ *%
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment