run
./main 0
on one host
./main 1
and
./main 2
on two other hosts
#include <stdio.h> | |
#include <stdlib.h> | |
#include <memory.h> | |
#include <unistd.h> | |
#include <pthread.h> | |
#include <vector> | |
#define min(a,b) (((a) < (b)) ? (a) : (b)) | |
#define max(a,b) (((a) > (b)) ? (a) : (b)) | |
#define TESTFILE "/moosefs2/test.txt" | |
#define ALL_SIZE 1000000002 | |
#define SIZE_OF_BLOCK 130000000 | |
using namespace std; | |
struct Writter_struct | |
{ | |
int threadId; | |
}; | |
void* writer_routine(void *arg) | |
{ | |
Writter_struct *par = (Writter_struct *)arg; | |
int nthread=5; | |
int threadId=par->threadId; | |
printf("%d: start writting file...\n",threadId); | |
fflush(stdout); | |
size_t all_size=ALL_SIZE; | |
size_t size_of_block=SIZE_OF_BLOCK; | |
size_t number_of_blocks=1+(size_t)(all_size/size_of_block); | |
size_t size_of_last_block=all_size-(number_of_blocks-1)*number_of_blocks; | |
char* fill_block=new char[max(size_of_block,size_of_last_block)]; | |
memset(fill_block,0,sizeof(char)*max(size_of_block,size_of_last_block)); | |
char* fill_block_test=new char[max(size_of_block,size_of_last_block)]; | |
memset(fill_block_test,0,sizeof(char)*max(size_of_block,size_of_last_block)); | |
printf("filling block...\n"); | |
fflush(stdout); | |
char *p_c=fill_block; | |
for(int i=0;i<max(size_of_block,size_of_last_block);i++,p_c++) | |
{ | |
*p_c='A'; | |
} | |
printf("block was filled!\n"); | |
fflush(stdout); | |
FILE* fd=fopen(TESTFILE,"r+"); | |
for(size_t block_id=0;block_id<number_of_blocks;block_id++) | |
{ | |
int celoe=((int)(block_id))/nthread; | |
int ost=block_id-celoe*nthread; | |
if(ost!=threadId)continue; | |
printf("%d: writting block %lu from %lu\n",threadId,block_id,number_of_blocks); | |
fflush(stdout); | |
size_t size_to_write=size_of_block; | |
if(block_id==(number_of_blocks-1)) | |
size_to_write=size_of_last_block; | |
size_t shift_in_file=block_id*size_of_block; | |
fseek(fd,shift_in_file, SEEK_SET); | |
size_t xxx=fwrite (fill_block,1,size_to_write,fd); | |
if(xxx!=size_to_write) | |
{ | |
printf("E1:bad writting of block %lu\n",block_id); | |
fflush(stdout); | |
return 0; | |
} | |
if (fsync(fileno(fd)) != 0) | |
{ | |
perror("fsync() error\n"); | |
fflush(stdout); | |
} | |
else | |
{ | |
printf("fsync() success\n"); | |
fflush(stdout); | |
} | |
fseek(fd,shift_in_file, SEEK_SET); | |
fread (fill_block_test,1,size_to_write,fd); | |
char* p_fill_block_test=fill_block_test; | |
char* p_fill_block=fill_block; | |
for(int i=0;i<(int)size_to_write;i++,p_fill_block_test++,p_fill_block++) | |
{ | |
if(*p_fill_block_test!='A') | |
{ | |
printf("E2:bad writting block %lu :write=%d read=%d\n",block_id,(int)*p_fill_block,(int)*p_fill_block_test); | |
fflush(stdout); | |
break; | |
} | |
} | |
} | |
if(fill_block!=NULL) | |
{ | |
delete[] fill_block; | |
fill_block=NULL; | |
} | |
if(fill_block_test!=NULL) | |
{ | |
delete[] fill_block_test; | |
fill_block_test=NULL; | |
} | |
printf("%d:Closing file...\n",threadId); | |
fflush(stdout); | |
fflush(fd); | |
fsync(fileno(fd)); | |
fclose(fd); | |
printf("%d:file was written!\n",threadId); | |
fflush(stdout); | |
return 0; | |
} | |
int main(int argc,char* argv[]) | |
{ | |
printf("Hellow! argc=%d str=%s\n",argc,argv[1]); | |
fflush(stdout); | |
if(*argv[1]=='0') | |
{ | |
printf("create file...\n"); | |
fflush(stdout); | |
size_t all_size=ALL_SIZE; | |
FILE* fd=fopen(TESTFILE,"wb"); | |
fseek(fd,all_size,SEEK_SET); | |
fflush(fd); | |
fsync(fileno(fd)); | |
fclose(fd); | |
printf("file was created!\n"); | |
fflush(stdout); | |
while(1) { | |
} | |
return 0; | |
} | |
int rank=atoi(argv[1]); | |
printf("rank=%d\n",rank); | |
fflush(stdout); | |
int nthread=1; | |
std::vector<Writter_struct> thread_params; | |
std::vector<pthread_t> threads; | |
thread_params.resize(nthread); | |
for (int i = 0; i < nthread; i++) | |
{ | |
thread_params[i].threadId=rank; | |
} | |
threads.resize(nthread); | |
for (int i = 0; i < nthread; i++) | |
{ | |
printf("starting writter %d\n",i); | |
fflush(stdout); | |
int ret = pthread_create(&threads[i], NULL, | |
writer_routine, &thread_params[i]); | |
if (ret != 0) { | |
for (int j = 0; j < i; j++) | |
pthread_join(threads[j], NULL); | |
return 0; | |
} | |
} | |
for (int i = 0; i < threads.size(); i++) | |
pthread_join(threads[i], NULL); | |
return 0; | |
} |
CC = g++ | |
PROGRAM = main | |
#CFLAGS = -g `pkg-config --cflags gtk+-x11-2.0` `pkg-config --cflags gtkextra-2.0` | |
#LDFLAGS = `pkg-config --libs gtk+-x11-2.0` `pkg-config --libs gtkextra-2.0` | |
CFLAGS = -g #`pkg-config --cflags gtk+-2.0` | |
LDFLAGS = -lpthread -lrt#`pkg-config --libs gtk+-2.0` | |
SOURCE = main.cc | |
OBJS = main.o | |
.cpp.o: | |
$(CC) -c $(CFLAGS) $(INCLUDES) -o $*.o $< | |
############################################################################### | |
all: $(PROGRAM) | |
$(PROGRAM): $(OBJS) | |
$(CC) -o $(PROGRAM) $(OBJS) $(LDFLAGS) | |
clean: | |
rm -f *.o core *~ *% |