Skip to content

Instantly share code, notes, and snippets.

@system123
Last active September 20, 2017 14:14
Show Gist options
  • Save system123/b2a41b7667f8e9d7c1b72fc91febde17 to your computer and use it in GitHub Desktop.
Save system123/b2a41b7667f8e9d7c1b72fc91febde17 to your computer and use it in GitHub Desktop.
CDiscount Kaggle Competition - BSON to HDF5
/* Converts the CDiscount data from BSON format to HDF5 files
*
* Author: Lloyd Hughes <hughes.lloyd+kaggle@gmail.com>
*
* To compile: gcc -o bson_to_hdf5 bson_to_hdf5.c -I/usr/include/libbson-1.0 -L/usr/lib64 -lbson-1.0 -lhdf5_hl -lhdf5 -lrt -lpthread -lz -ldl -lm -Wl
*/
#include <stdio.h>
#include <bson.h>
#include <hdf5.h>
#include <pthread.h>
#define STB_IMAGE_IMPLEMENTATION
#define STBI_ONLY_JPEG
#define STBI_NO_LINEAR
#include "stb_image.h"
/*
* Set your preferences here before compiling and running
*/
// #define F_VERBOSE
// #define F_DECOMPESSJPEG
#define BSON_FNAME "test.bson"
#define H5_FNAME "test.h5"
#define BUFFER_LEN 1000
#define NUM_THREADS 4
#ifdef F_DECOMPESSJPEG
#define RANK 3
#else
#define RANK 1
#endif
typedef struct {
uint32_t id;
uint32_t category_id;
int x[4];
int y[4];
int ch[4];
uint8_t* imgs[4];
uint8_t n;
hid_t* file;
} cdiscount_entity_t;
void *write_single_image(void* entity_ptr)
{
hid_t dataset, dataspace, attr_dataspace, attr_id, attr_cid, attr_n;
hsize_t dims[RANK] = {0};
hsize_t attr_dim = 1;
herr_t status;
// Max length of max_int_max_int\0
char name[22];
cdiscount_entity_t *entity = (cdiscount_entity_t*) entity_ptr;
hid_t* file = entity->file;
for (unsigned int i = 0; i < entity->n; i++) {
#ifdef F_DECOMPESSJPEG
// Decompress the JPEG to a byte array
int len = entity->x[i];
entity->imgs[i] = stbi_load_from_memory((stbi_uc*) entity->imgs[i], len, &entity->x[i], &entity->y[i], &entity->ch[i], 0 );
#endif
dims[0] = entity->x[i];
dims[1] = entity->y[i];
dims[2] = entity->ch[i];
sprintf(name, "%d_%d", entity->id, i);
dataspace = H5Screate_simple(RANK, dims, NULL);
attr_dataspace = H5Screate_simple(1, &attr_dim, NULL);
dataset = H5Dcreate2(*file, name, H5T_NATIVE_UINT8, dataspace, H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT);
attr_id = H5Acreate2(dataset, "id", H5T_NATIVE_UINT, attr_dataspace, H5P_DEFAULT, H5P_DEFAULT);
attr_n = H5Acreate2(dataset, "img", H5T_NATIVE_UINT, attr_dataspace, H5P_DEFAULT, H5P_DEFAULT);
attr_cid = H5Acreate2(dataset, "category_id", H5T_NATIVE_UINT, attr_dataspace, H5P_DEFAULT, H5P_DEFAULT);
status = H5Dwrite(dataset, H5T_NATIVE_UINT8, H5S_ALL, H5S_ALL, H5P_DEFAULT, entity->imgs[i]);
status = H5Awrite(attr_id, H5T_NATIVE_UINT, &entity->id);
status = H5Awrite(attr_n, H5T_NATIVE_UINT, &i);
status = H5Awrite(attr_cid, H5T_NATIVE_UINT, &entity->category_id);
H5Aclose(attr_id);
H5Aclose(attr_n);
H5Aclose(attr_cid);
H5Dclose(dataset);
H5Sclose(dataspace);
#ifdef F_DECOMPESSJPEG
stbi_image_free(entity->imgs[i]);
#endif
}
pthread_exit(NULL);
}
void write_hdf5(const cdiscount_entity_t *entities)
{
pthread_t threads[NUM_THREADS];
int thread_cnt = 0;
// We could paralise this code to speed up writing out
for (int i = 0; i < BUFFER_LEN; i++) {
if (pthread_create(&threads[thread_cnt], NULL, write_single_image, (void*) &entities[i])) {
printf("Error creating thread. Exiting.\n" );
return;
}
thread_cnt++;
if (thread_cnt == NUM_THREADS) {
for (int t = 0; t < NUM_THREADS; t++) {
pthread_join(threads[t], NULL);
}
thread_cnt = 0;
}
}
}
int main(int argc, char* argv[])
{
bson_reader_t *reader;
const bson_t *doc;
const bson_value_t * val;
const bson_value_t * img;
const char* key;
bson_error_t error;
bson_iter_t iter, sub_iter, img_iter;
hid_t file;
bool eof;
cdiscount_entity_t buffer[BUFFER_LEN] = {0};
int cnt = 0;
int buf_ptr = 0;
uint8_t img_n = 0;
reader = bson_reader_new_from_file(BSON_FNAME, &error);
file = H5Fcreate(H5_FNAME, H5F_ACC_TRUNC, H5P_DEFAULT, H5P_DEFAULT);
if (!reader) {
printf("Failed to open the file.\n");
return 1;
}
// Iterate through the BSON documents
while (doc = bson_reader_read(reader, &eof)) {
cdiscount_entity_t* entity = &buffer[buf_ptr];
// Process each document
if (bson_iter_init(&iter, doc)) {
while (bson_iter_next(&iter)) {
// Get the key and value for each item in the document
val = bson_iter_value(&iter);
key = bson_iter_key(&iter);
if (strcmp(key, "_id") == 0) {
entity->id = val->value.v_int32;
#ifdef F_VERBOSE
printf ("Processing entity ID: = %d\n", entity->id);
#endif //F_VERBOSE
// Store the category ID if it exists
} else if (strcmp(key, "category_id") == 0) {
entity->category_id = val->value.v_int32;
// Process the images
} else if (strcmp(key, "imgs") == 0) {
// Ensure we have a BSON array and then iterate through each element in the array [0: <bson doc>, 1: <bson doc> ...]
if (BSON_ITER_HOLDS_ARRAY(&iter) && bson_iter_recurse(&iter, &sub_iter)) {
img_n = 0; // Reset the image counterjoin
while (bson_iter_next(&sub_iter)) {
// Check that the array element is a BSON document and get an iterater to the picture element
if (BSON_ITER_HOLDS_DOCUMENT(&sub_iter) && bson_iter_recurse(&sub_iter, &img_iter)) {
bson_iter_next(&img_iter);
img = bson_iter_value(&img_iter);
entity->imgs[img_n] = (uint8_t*) img->value.v_binary.data;
entity->x[img_n] = img->value.v_binary.data_len;
// printf ("Key: %s [%s] len: %d, size(%d, %d, %d)\n", bson_iter_key(&img_iter), bson_iter_key(&sub_iter), img->value.v_binary.data_len, entity.x[entity.n], entity.y[entity.n], entity.ch[entity.n]);
img_n++;
}
}
entity->n = img_n;
entity->file = &file;
}
}
}
}
buf_ptr++;
if (buf_ptr == BUFFER_LEN) {
write_hdf5(buffer);
buf_ptr = 0;
}
#ifndef F_VERBOSE
cnt++;
if (cnt % 100000 == 0) {
printf("Processed %d items.\n", cnt);
}
#endif
}
if (!eof) {
printf("Corrupted BSON document found at %u\n", (unsigned int) bson_reader_tell(reader));
}
// Write out any remaining data
if (buf_ptr > 0) {
write_hdf5(buffer);
}
bson_reader_destroy(reader);
H5Fclose(file);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment