Created
May 8, 2018 16:18
-
-
Save houjun/65dbd75a004de940d081d10f4284bf04 to your computer and use it in GitHub Desktop.
TileDB concurrent write with MPI
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <tiledb/tiledb.h> | |
#include <stdio.h> | |
#include <stdlib.h> | |
#include <time.h> | |
#include <sys/time.h> | |
#include <unistd.h> | |
#include "mpi.h" | |
int main(int argc, char** argv) { | |
int rank, size; | |
MPI_Init(NULL, NULL); | |
MPI_Comm_size(MPI_COMM_WORLD, &size); | |
MPI_Comm_rank(MPI_COMM_WORLD, &rank); | |
int x_len, y_len, x_tile_size, y_tile_size; | |
int my_x_len, my_y_len; | |
uint64_t dim_domain[] = {1, 1, 1, 1}; | |
uint64_t tile_extents[] = {2, 2}; | |
char array_name[128]; | |
tiledb_dimension_t* d1; | |
tiledb_dimension_t* d2; | |
tiledb_domain_t* domain; | |
tiledb_array_schema_t* array_schema; | |
int ret_val; | |
if (argc != 6) { | |
if (rank == 0) | |
printf("Usage: ./%s array_name x_len y_len x_tile_size y_tile_size\nexiting...\n", argv[0]); | |
MPI_Finalize(); | |
return 0; | |
} | |
x_len = atoi(argv[2]); | |
y_len = atoi(argv[3]); | |
x_tile_size = atoi(argv[4]); | |
y_tile_size = atoi(argv[5]); | |
sprintf(array_name, "%s_w%d_x%d_y%d_xt%d_yt%d", argv[1], size, x_len, y_len, x_tile_size, y_tile_size); | |
// Distribute workload evenly | |
my_x_len = x_len; | |
my_y_len = y_len / size; | |
if (rank == 0) { | |
printf("x_len=%d y_len=%d x_tile_size=%d y_tile_size=%d, my_x_len=%d, my_y_len=%d\n", | |
x_len, y_len, x_tile_size, y_tile_size, my_x_len, my_y_len); | |
fflush(stdout); | |
} | |
// Initialize context with the default configuration parameters | |
tiledb_ctx_t* ctx; | |
tiledb_ctx_create(&ctx, NULL); | |
tiledb_attribute_t* a1 = NULL; | |
if (rank == 0) { | |
// Create dimensions | |
dim_domain[0] = 1; | |
dim_domain[1] = x_len; | |
dim_domain[2] = 1; | |
dim_domain[3] = y_len; | |
tile_extents[0] = x_tile_size; | |
tile_extents[1] = y_tile_size; | |
tiledb_dimension_create(ctx, &d1, "d1", TILEDB_UINT64, &dim_domain[0], &tile_extents[0]); | |
tiledb_dimension_create(ctx, &d2, "d2", TILEDB_UINT64, &dim_domain[2], &tile_extents[1]); | |
// Create domain | |
tiledb_domain_create(ctx, &domain); | |
tiledb_domain_add_dimension(ctx, domain, d1); | |
tiledb_domain_add_dimension(ctx, domain, d2); | |
// Create attributes | |
tiledb_attribute_t* a1; | |
tiledb_attribute_create(ctx, &a1, "a1", TILEDB_INT32); | |
tiledb_attribute_set_compressor(ctx, a1, TILEDB_NO_COMPRESSION, -1); | |
tiledb_attribute_set_cell_val_num(ctx, a1, 1); | |
// Create array schema | |
tiledb_array_schema_create(ctx, &array_schema, TILEDB_DENSE); | |
tiledb_array_schema_set_cell_order(ctx, array_schema, TILEDB_ROW_MAJOR); | |
tiledb_array_schema_set_tile_order(ctx, array_schema, TILEDB_ROW_MAJOR); | |
tiledb_array_schema_set_domain(ctx, array_schema, domain); | |
tiledb_array_schema_add_attribute(ctx, array_schema, a1); | |
// Check array schema | |
if (tiledb_array_schema_check(ctx, array_schema) != TILEDB_OK) { | |
return -1; | |
} | |
// Create array | |
tiledb_array_create(ctx, array_name, array_schema); | |
} | |
MPI_Barrier(MPI_COMM_WORLD); | |
// Set attributes | |
const char* attributes[] = {"a1"}; | |
// Prepare cell buffers | |
int *buffer_a1 = (int*)calloc(sizeof(int), my_x_len*my_y_len); | |
void* buffers[] = {buffer_a1}; | |
uint64_t buffer_sizes[] = {sizeof(int)*my_x_len*my_y_len}; | |
// Set subarray | |
uint64_t subarray[4]; | |
subarray[0] = 1; | |
subarray[1] = (uint64_t)x_len; | |
subarray[2] = (uint64_t)(rank*my_y_len + 1); | |
subarray[3] = (uint64_t)((rank+1)*my_y_len); | |
int i; | |
for (i = 0; i < 4; i++) { | |
printf(" %lu", subarray[i]); | |
} | |
printf("\n"); | |
printf("my buffer size: %lu\n", buffer_sizes[0]); | |
fflush(stdout); | |
// Create query | |
tiledb_query_t* query; | |
tiledb_query_create(ctx, &query, array_name, TILEDB_WRITE); | |
tiledb_query_set_subarray(ctx, query, subarray); | |
tiledb_query_set_buffers(ctx, query, attributes, 1, buffers, buffer_sizes); | |
tiledb_query_set_layout(ctx, query, TILEDB_GLOBAL_ORDER); | |
// Timing | |
struct timeval t1, t2; | |
double elapsedTime; | |
MPI_Barrier(MPI_COMM_WORLD); | |
gettimeofday(&t1, NULL); | |
if (rank == 0) { | |
sleep(1); | |
} | |
// OK: Only 1 rank writes data at a given time | |
/* for (i = 0; i < size; i++) { */ | |
/* if (rank == i) { */ | |
/* ret_val = tiledb_query_submit(ctx, query); */ | |
/* if (TILEDB_OK != ret_val) { */ | |
/* printf("Rank %d: error with query submit!\n", rank); */ | |
/* fflush(stdout); */ | |
/* } */ | |
/* ret_val = tiledb_query_finalize(ctx, query); */ | |
/* if (TILEDB_OK != ret_val) { */ | |
/* printf("Rank %d: error with query finalize %d!\n", rank, ret_val); */ | |
/* fflush(stdout); */ | |
/* } */ | |
/* } */ | |
/* MPI_Barrier(MPI_COMM_WORLD); */ | |
/* } */ | |
// PROBLEMATIC with Lustre: All ranks writes concurrently, but only a subset of data is written (often only 1 rank succeed) | |
ret_val = tiledb_query_submit(ctx, query); | |
if (TILEDB_OK != ret_val) { | |
printf("Rank %d: error with query submit %d!\n", rank, ret_val); | |
fflush(stdout); | |
} | |
ret_val = tiledb_query_finalize(ctx, query); | |
if (TILEDB_OK != ret_val) { | |
printf("Rank %d: error with query finalize %d!\n", rank, ret_val); | |
fflush(stdout); | |
} | |
// PROBLEMATIC with Lustre: Write async, but program hangs after output directory does not exist error message. | |
/* // Begin async */ | |
/* tiledb_query_status_t status; */ | |
/* ret_val = tiledb_query_submit_async(ctx, query, NULL, NULL); */ | |
/* if (TILEDB_OK != ret_val) { */ | |
/* printf("Rank %d: error with query submit!\n", rank); */ | |
/* fflush(stdout); */ | |
/* } */ | |
/* do { */ | |
/* tiledb_query_get_status(ctx, query, &status); */ | |
/* } while (status != TILEDB_COMPLETED); */ | |
/* // Finalize query */ | |
/* ret_val = tiledb_query_finalize(ctx, query); */ | |
/* if (TILEDB_OK != ret_val) { */ | |
/* printf("Rank %d: error with query finalize %d!\n", rank, ret_val); */ | |
/* fflush(stdout); */ | |
/* } */ | |
/* // End async */ | |
MPI_Barrier(MPI_COMM_WORLD); | |
gettimeofday(&t2, NULL); | |
elapsedTime = (t2.tv_sec - t1.tv_sec) * 1000.0; // sec to ms | |
elapsedTime += (t2.tv_usec - t1.tv_usec) / 1000.0; // us to s | |
if (rank == 0) { | |
printf("Total time to write data is %.6f seconds.\n", elapsedTime/1000.0); | |
} | |
// Clean up | |
if (rank == 0) { | |
tiledb_attribute_free(ctx, &a1); | |
tiledb_dimension_free(ctx, &d1); | |
tiledb_dimension_free(ctx, &d2); | |
tiledb_domain_free(ctx, &domain); | |
tiledb_array_schema_free(ctx, &array_schema); | |
} | |
tiledb_query_free(ctx, &query); | |
tiledb_ctx_free(&ctx); | |
MPI_Finalize(); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment