Skip to content

Instantly share code, notes, and snippets.

@houjun
Created May 8, 2018 16:18
Show Gist options
  • Save houjun/65dbd75a004de940d081d10f4284bf04 to your computer and use it in GitHub Desktop.
Save houjun/65dbd75a004de940d081d10f4284bf04 to your computer and use it in GitHub Desktop.
TileDB concurrent write with MPI
#include <tiledb/tiledb.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <sys/time.h>
#include <unistd.h>
#include "mpi.h"
int main(int argc, char** argv) {
int rank, size;
MPI_Init(NULL, NULL);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
int x_len, y_len, x_tile_size, y_tile_size;
int my_x_len, my_y_len;
uint64_t dim_domain[] = {1, 1, 1, 1};
uint64_t tile_extents[] = {2, 2};
char array_name[128];
tiledb_dimension_t* d1;
tiledb_dimension_t* d2;
tiledb_domain_t* domain;
tiledb_array_schema_t* array_schema;
int ret_val;
if (argc != 6) {
if (rank == 0)
printf("Usage: ./%s array_name x_len y_len x_tile_size y_tile_size\nexiting...\n", argv[0]);
MPI_Finalize();
return 0;
}
x_len = atoi(argv[2]);
y_len = atoi(argv[3]);
x_tile_size = atoi(argv[4]);
y_tile_size = atoi(argv[5]);
sprintf(array_name, "%s_w%d_x%d_y%d_xt%d_yt%d", argv[1], size, x_len, y_len, x_tile_size, y_tile_size);
// Distribute workload evenly
my_x_len = x_len;
my_y_len = y_len / size;
if (rank == 0) {
printf("x_len=%d y_len=%d x_tile_size=%d y_tile_size=%d, my_x_len=%d, my_y_len=%d\n",
x_len, y_len, x_tile_size, y_tile_size, my_x_len, my_y_len);
fflush(stdout);
}
// Initialize context with the default configuration parameters
tiledb_ctx_t* ctx;
tiledb_ctx_create(&ctx, NULL);
tiledb_attribute_t* a1 = NULL;
if (rank == 0) {
// Create dimensions
dim_domain[0] = 1;
dim_domain[1] = x_len;
dim_domain[2] = 1;
dim_domain[3] = y_len;
tile_extents[0] = x_tile_size;
tile_extents[1] = y_tile_size;
tiledb_dimension_create(ctx, &d1, "d1", TILEDB_UINT64, &dim_domain[0], &tile_extents[0]);
tiledb_dimension_create(ctx, &d2, "d2", TILEDB_UINT64, &dim_domain[2], &tile_extents[1]);
// Create domain
tiledb_domain_create(ctx, &domain);
tiledb_domain_add_dimension(ctx, domain, d1);
tiledb_domain_add_dimension(ctx, domain, d2);
// Create attributes
tiledb_attribute_t* a1;
tiledb_attribute_create(ctx, &a1, "a1", TILEDB_INT32);
tiledb_attribute_set_compressor(ctx, a1, TILEDB_NO_COMPRESSION, -1);
tiledb_attribute_set_cell_val_num(ctx, a1, 1);
// Create array schema
tiledb_array_schema_create(ctx, &array_schema, TILEDB_DENSE);
tiledb_array_schema_set_cell_order(ctx, array_schema, TILEDB_ROW_MAJOR);
tiledb_array_schema_set_tile_order(ctx, array_schema, TILEDB_ROW_MAJOR);
tiledb_array_schema_set_domain(ctx, array_schema, domain);
tiledb_array_schema_add_attribute(ctx, array_schema, a1);
// Check array schema
if (tiledb_array_schema_check(ctx, array_schema) != TILEDB_OK) {
return -1;
}
// Create array
tiledb_array_create(ctx, array_name, array_schema);
}
MPI_Barrier(MPI_COMM_WORLD);
// Set attributes
const char* attributes[] = {"a1"};
// Prepare cell buffers
int *buffer_a1 = (int*)calloc(sizeof(int), my_x_len*my_y_len);
void* buffers[] = {buffer_a1};
uint64_t buffer_sizes[] = {sizeof(int)*my_x_len*my_y_len};
// Set subarray
uint64_t subarray[4];
subarray[0] = 1;
subarray[1] = (uint64_t)x_len;
subarray[2] = (uint64_t)(rank*my_y_len + 1);
subarray[3] = (uint64_t)((rank+1)*my_y_len);
int i;
for (i = 0; i < 4; i++) {
printf(" %lu", subarray[i]);
}
printf("\n");
printf("my buffer size: %lu\n", buffer_sizes[0]);
fflush(stdout);
// Create query
tiledb_query_t* query;
tiledb_query_create(ctx, &query, array_name, TILEDB_WRITE);
tiledb_query_set_subarray(ctx, query, subarray);
tiledb_query_set_buffers(ctx, query, attributes, 1, buffers, buffer_sizes);
tiledb_query_set_layout(ctx, query, TILEDB_GLOBAL_ORDER);
// Timing
struct timeval t1, t2;
double elapsedTime;
MPI_Barrier(MPI_COMM_WORLD);
gettimeofday(&t1, NULL);
if (rank == 0) {
sleep(1);
}
// OK: Only 1 rank writes data at a given time
/* for (i = 0; i < size; i++) { */
/* if (rank == i) { */
/* ret_val = tiledb_query_submit(ctx, query); */
/* if (TILEDB_OK != ret_val) { */
/* printf("Rank %d: error with query submit!\n", rank); */
/* fflush(stdout); */
/* } */
/* ret_val = tiledb_query_finalize(ctx, query); */
/* if (TILEDB_OK != ret_val) { */
/* printf("Rank %d: error with query finalize %d!\n", rank, ret_val); */
/* fflush(stdout); */
/* } */
/* } */
/* MPI_Barrier(MPI_COMM_WORLD); */
/* } */
// PROBLEMATIC with Lustre: All ranks writes concurrently, but only a subset of data is written (often only 1 rank succeed)
ret_val = tiledb_query_submit(ctx, query);
if (TILEDB_OK != ret_val) {
printf("Rank %d: error with query submit %d!\n", rank, ret_val);
fflush(stdout);
}
ret_val = tiledb_query_finalize(ctx, query);
if (TILEDB_OK != ret_val) {
printf("Rank %d: error with query finalize %d!\n", rank, ret_val);
fflush(stdout);
}
// PROBLEMATIC with Lustre: Write async, but program hangs after output directory does not exist error message.
/* // Begin async */
/* tiledb_query_status_t status; */
/* ret_val = tiledb_query_submit_async(ctx, query, NULL, NULL); */
/* if (TILEDB_OK != ret_val) { */
/* printf("Rank %d: error with query submit!\n", rank); */
/* fflush(stdout); */
/* } */
/* do { */
/* tiledb_query_get_status(ctx, query, &status); */
/* } while (status != TILEDB_COMPLETED); */
/* // Finalize query */
/* ret_val = tiledb_query_finalize(ctx, query); */
/* if (TILEDB_OK != ret_val) { */
/* printf("Rank %d: error with query finalize %d!\n", rank, ret_val); */
/* fflush(stdout); */
/* } */
/* // End async */
MPI_Barrier(MPI_COMM_WORLD);
gettimeofday(&t2, NULL);
elapsedTime = (t2.tv_sec - t1.tv_sec) * 1000.0; // sec to ms
elapsedTime += (t2.tv_usec - t1.tv_usec) / 1000.0; // us to s
if (rank == 0) {
printf("Total time to write data is %.6f seconds.\n", elapsedTime/1000.0);
}
// Clean up
if (rank == 0) {
tiledb_attribute_free(ctx, &a1);
tiledb_dimension_free(ctx, &d1);
tiledb_dimension_free(ctx, &d2);
tiledb_domain_free(ctx, &domain);
tiledb_array_schema_free(ctx, &array_schema);
}
tiledb_query_free(ctx, &query);
tiledb_ctx_free(&ctx);
MPI_Finalize();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment