Created
June 27, 2019 22:58
-
-
Save manodeep/b8eb323f11d57165508bbe546748a944 to your computer and use it in GitHub Desktop.
OpenMP parallel reads of 1-D arrays from HDF5 files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <omp.h> | |
#include "hdf5.h" | |
#include "sglib.h" | |
/* | |
This is a small code that demonstrates how to read an HDF5 array (i.e., 1-D array) | |
in an OpenMP parallel manner. | |
Note, that reading in parallel is only directly supported by the threadsafe build of the | |
hdf5 library. The code will automatically detect if the library is threadsafe; and if not, | |
then this code will serialise all the reads via an OpenMP lock mechanism. | |
*/ | |
/*Global variable, OpenMP lock*/ | |
omp_lock_t lock; | |
#define CHECK_STATUS_AND_RETURN_ON_FAIL(status, return_value, ...) \ | |
do { \ | |
if(status < 0) { \ | |
fprintf(stderr, __VA_ARGS__); \ | |
return status; \ | |
} \ | |
} while (0) | |
#define CREATE_AND_WRITE_DATASET(file_id, field_name, dims, h5_dtype, data) { \ | |
hid_t macro_dataspace_id = H5Screate_simple(1, dims, NULL); \ | |
CHECK_STATUS_AND_RETURN_ON_FAIL(macro_dataspace_id, (int32_t) dataspace_id, \ | |
"Could not create a dataspace for field #field_name.\n" \ | |
"The dimensions of the dataspace was %d\n", (int32_t) dims[0]); \ | |
hid_t macro_dataset_id = H5Dcreate2(file_id, field_name, h5_dtype, macro_dataspace_id, H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT); \ | |
CHECK_STATUS_AND_RETURN_ON_FAIL(macro_dataset_id, (int32_t) dataset_id, \ | |
"Could not create a dataset for field #field_name.\n" \ | |
"The dimensions of the dataset was %d\nThe file id was %d\n.", \ | |
(int32_t) dims[0], (int32_t) file_id); \ | |
status = H5Dwrite(macro_dataset_id, h5_dtype, H5S_ALL, H5S_ALL, H5P_DEFAULT, data); \ | |
CHECK_STATUS_AND_RETURN_ON_FAIL(status, (int32_t) status, \ | |
"Failed to write a dataset for field #field_name.\n" \ | |
"The dimensions of the dataset was %d\nThe file ID was %d\n." \ | |
"The dataset ID was %d.", (int32_t) dims[0], (int32_t) file_id, \ | |
(int32_t) macro_dataset_id); \ | |
status = H5Dclose(macro_dataset_id); \ | |
CHECK_STATUS_AND_RETURN_ON_FAIL(status, (int32_t) status, \ | |
"Failed to close the dataset for field #field_name.\n" \ | |
"The dimensions of the dataset was %d\nThe file ID was %d\n." \ | |
"The dataset ID was %d.", (int32_t) dims[0], (int32_t) file_id, \ | |
(int32_t) macro_dataset_id); \ | |
status = H5Sclose(macro_dataspace_id); \ | |
CHECK_STATUS_AND_RETURN_ON_FAIL(status, (int32_t) status, \ | |
"Failed to close the dataspace for field #field_name.\n" \ | |
"The dimensions of the dataset was %d\nThe file ID was %d\n." \ | |
"The dataspace ID was %d.", (int32_t) dims[0], (int32_t) file_id, \ | |
(int32_t) macro_dataspace_id); \ | |
} | |
#define READ_PARTIAL_1D_ARRAY(fd, dataset_name, offset, stride, count, block, h5_dtype, buffer) \ | |
{ \ | |
hid_t dataset_id = H5Dopen(fd, dataset_name, H5P_DEFAULT); \ | |
if (dataset_id < 0) { \ | |
fprintf(stderr, "Error encountered when trying to open up dataset %s\n", dataset_name); \ | |
H5Eprint(dataset_id, stderr); \ | |
return EXIT_FAILURE; \ | |
} \ | |
hid_t filespace = H5Dget_space (dataset_id); \ | |
if(filespace < 0) { \ | |
fprintf(stderr, "Error encountered when trying to get space for dataset %s\n", dataset_name); \ | |
return EXIT_FAILURE; \ | |
} \ | |
herr_t status = H5Sselect_hyperslab(filespace, H5S_SELECT_SET, offset, stride, count, block); \ | |
CHECK_STATUS_AND_RETURN_ON_FAIL(status, (int32_t) status, \ | |
"Failed to select hyperslab for #dataset_name.\n" \ | |
"The dimensions of the dataset was %d\nThe file ID was %d\n." \ | |
"The dataspace ID was %d.", (int32_t) *count, (int32_t) fd, \ | |
(int32_t) dataset_id); \ | |
hid_t memspace = H5Screate_simple(1, count, NULL); \ | |
status = H5Dread(dataset_id, h5_dtype, memspace, filespace, H5P_DEFAULT, buffer); \ | |
CHECK_STATUS_AND_RETURN_ON_FAIL(status, (int32_t) status, \ | |
"Failed to read array for #dataset_name.\n" \ | |
"The dimensions of the dataset was %d\nThe file ID was %d\n." \ | |
"The dataspace ID was %d.", (int32_t) *count, (int32_t) fd, \ | |
(int32_t) dataset_id); \ | |
H5Dclose(dataset_id); \ | |
H5Sclose(filespace); \ | |
H5Sclose(memspace); \ | |
} | |
#define GENERATE_VALUE_IN_ARRAY(i) (i*i) | |
typedef struct halo_info { | |
int32_t rank; | |
int32_t size; | |
int64_t start_offset; | |
int64_t nelements; | |
int64_t *partial_data; | |
} halo_info; | |
int read_partial_array(struct halo_info *info, hid_t file, const int is_threadsafe) | |
{ | |
hsize_t offset[1]; | |
offset[0] = info->start_offset; | |
hsize_t count[1]; | |
count[0] = info->nelements; | |
if(is_threadsafe != 1) { | |
/*Serialize HDF dataset writing using OpenMP lock*/ | |
omp_set_lock(&lock); | |
} | |
READ_PARTIAL_1D_ARRAY(file, "array", &offset[0], NULL, &count[0], NULL, H5T_NATIVE_INT64, info->partial_data); | |
if(is_threadsafe != 1) { | |
/*Release lock*/ | |
omp_unset_lock(&lock); | |
} | |
return EXIT_SUCCESS; | |
} | |
int32_t return_random_integer_within_range(const int32_t range_min, const int32_t range_max) | |
{ | |
/* Assumes the random number is already seeded appropriately */ | |
return (rand() % (range_max - range_min + 1)) + range_min; | |
} | |
//A reasonable solution solution to randomly dividing "totN" over "info->size" partitions | |
int split_array_over_tasks(const int64_t totN, const unsigned int seed, const int size, int32_t *nchunks) | |
{ | |
srand(seed); | |
if(nchunks == NULL) { | |
fprintf(stderr, "Error: `nchunks` can not be NULL. Needs to hold size=%d elements of int32_t each\n", size); | |
return EXIT_FAILURE; | |
} | |
for(int i = 0; i < size-1; i++) { | |
nchunks[i] = return_random_integer_within_range(1, totN); | |
} | |
/* sort the nchunks array */ | |
SGLIB_ARRAY_SINGLE_QUICK_SORT(int32_t, nchunks, size-1, SGLIB_NUMERIC_COMPARATOR); | |
nchunks[size-1] = totN; | |
int32_t prev_val = 0; | |
for(int i=0;i<size;i++) { | |
int32_t tmp_chunk = nchunks[i] - prev_val; | |
prev_val = nchunks[i]; | |
nchunks[i] = tmp_chunk; | |
} | |
int64_t sum = 0; | |
for(int i=0;i<size;i++) { | |
/* fprintf(stderr,"rank = %d nchunks[%d] = %d sum = %"PRId64"\n", i, i, nchunks[i], sum); */ | |
sum += nchunks[i]; | |
} | |
if(sum != totN) { | |
fprintf(stderr,"Error: The sum of chunks does not equal the requested range\n"); | |
return EXIT_FAILURE; | |
} | |
return EXIT_SUCCESS; | |
} | |
int main (void) | |
{ | |
herr_t status; | |
/*Disable dynamic allocation of threads*/ | |
omp_set_dynamic(0); | |
hbool_t is_ts; | |
H5is_library_threadsafe(&is_ts); | |
fprintf(stderr, "HDF5 library threadsafe = %d\n", (int) is_ts); | |
if((int) is_ts != 1) { | |
fprintf(stderr,"Warning: The HDF5 library is not thread-safe -- all reads will be serialized\n"); | |
/*Initialize (global) lock*/ | |
omp_init_lock(&lock); | |
} | |
/* | |
* Create a new file using the default properties. | |
*/ | |
const char partial_filename[] = {"test_partial.h5"}; | |
hid_t file = H5Fcreate (partial_filename, H5F_ACC_TRUNC, H5P_DEFAULT, H5P_DEFAULT); | |
const int64_t N = 100000000; | |
int64_t *new_array = malloc(N*sizeof(*new_array)); | |
for(int64_t i=0;i<N;i++) new_array[i] = GENERATE_VALUE_IN_ARRAY(i); | |
// Redshift at each snapshot. | |
hsize_t dims[] = {N}; | |
CREATE_AND_WRITE_DATASET(file, "array", dims, H5T_NATIVE_INT64, new_array); | |
status = H5Fclose (file); | |
//Now open the file for reading | |
file = H5Fopen (partial_filename, H5F_ACC_RDONLY, H5P_DEFAULT); | |
const int nthreadmax = 32; | |
const unsigned int seed = 237046530; | |
status = EXIT_FAILURE; | |
for(int nthreads=1;nthreads<nthreadmax;nthreads++) { | |
double start_time = omp_get_wtime(); | |
omp_set_num_threads(nthreads); | |
int32_t *nchunks = calloc(nthreads, sizeof(*nchunks)); | |
fprintf(stderr,"Running with %d threads................", nthreads); | |
status = split_array_over_tasks(N, seed, nthreads, nchunks); | |
if(status != EXIT_SUCCESS) { | |
goto fail; | |
} | |
#pragma omp parallel | |
{ | |
const int tid = omp_get_thread_num(); | |
if(nthreads != omp_get_num_threads()) { | |
fprintf(stderr,"Error: The total number of threads as seen by OpenMP is not the same as intended...exiting on rank = %d\n", tid); | |
exit(EXIT_FAILURE); | |
} | |
struct halo_info info; | |
info.rank = tid; | |
info.size = nthreads; | |
info.nelements = nchunks[tid]; | |
int64_t sum = 0; | |
for(int i=0;i<nthreads;i++) { | |
if(i == tid) { | |
info.start_offset = sum; | |
} | |
sum += nchunks[i]; | |
} | |
info.partial_data = malloc(info.nelements*sizeof(info.partial_data[0])); | |
if(info.partial_data == NULL) { | |
fprintf(stderr,"Error: on tid = %d could not allocate memory for partial_data with nelements = %"PRId64"\n", tid, info.nelements); | |
exit(EXIT_FAILURE); | |
} | |
status = read_partial_array(&info, file, (int) is_ts);//last argument specifies whether the library is threadsafe. | |
if(status != EXIT_SUCCESS) { | |
exit(status); | |
} | |
/* Now test that the data was read in correctly */ | |
for(int i=0;i<nthreads;i++) { | |
if(tid == i) { | |
/* fprintf(stderr,"On tid = %d start = %"PRId64" nelements = %"PRId64"\n", tid, info.start_offset, info.nelements); */ | |
for(int64_t j=0;j<info.nelements;j++) { | |
const int64_t idx = info.start_offset + j; | |
if(idx >= N) { | |
fprintf(stderr,"Error: on tid = %d trying to access idx = %"PRId64" that is invalid since the max. is N = %"PRId64"\n", | |
tid, idx, N); | |
exit(EXIT_FAILURE); | |
} | |
/* fprintf(stderr,"On tid = %d, accessing idx = %"PRId64" and comparing to partial_data[%"PRId64"]\n", tid, idx, j); */ | |
if (info.partial_data[j] != new_array[idx]) { | |
fprintf(stderr,"Error: on tid = %d partial_data[%"PRId64"] = %"PRId64" does NOT equal new_array[%"PRId64"] = %"PRId64"\n", | |
tid, j, info.partial_data[j], idx, new_array[idx]); | |
exit(EXIT_FAILURE); | |
} | |
} | |
} | |
} | |
free(info.partial_data); | |
} | |
free(nchunks); | |
fprintf(stderr,"...done. Time taken = %0.3g seconds\n", omp_get_wtime() - start_time); | |
} | |
if((int) is_ts != 1) { | |
//Finished lock mechanism, destroy it | |
omp_destroy_lock(&lock); | |
} | |
free(new_array); | |
H5Fclose (file); | |
return EXIT_SUCCESS; | |
fail: | |
free(new_array); | |
H5Fclose(file); | |
return status; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Default output on local supercomputer:
On the same supercomputer, but without a threadsafe hdf5 build:
With a non-threadsafe, but MPI build of the hdf5 library: