Skip to content

Instantly share code, notes, and snippets.

@nitingupta910
Created April 4, 2015 18:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nitingupta910/6d24b1454aac848595e6 to your computer and use it in GitHub Desktop.
Save nitingupta910/6d24b1454aac848595e6 to your computer and use it in GitHub Desktop.
Using rocksdb merge operator in C
/*
Makefile:
(make sure Makefile is indented using a tab and not spaces)
all:
cc -Wall -g -O0 rdb_mergeop.c -o rdb_mergeop -lstdc++ -lrocksdb -lsnappy -lbz2 -llz4 -lz
clean:
rm -rf rdb_mergeop
rm -rf testdb
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <rocksdb/c.h>
struct merge_operator_state {
int val;
};
char* merge_operator_full_merge_fn (
void *state,
const char *key, size_t key_length,
const char *existing_value, size_t existing_value_length,
const char *const *operands_list, const size_t *operands_list_length,
int num_operands,
unsigned char *success, size_t *new_value_length)
{
printf("full_merge_fn called\n");
// dumping params
fprintf(stderr, "\nDumping full_merge_fn params:\n");
fprintf(stderr, "key: %.*s\n", (int)key_length, key);
fprintf(stderr, "existing value: %.*s\n", (int)existing_value_length, existing_value);
fprintf(stderr, "operands (%d):\n", num_operands);
for (int i = 0; i < num_operands; i++) {
fprintf(stderr, "\t%.*s\n", (int)operands_list_length[i], operands_list[i]);
}
fprintf(stderr, "END PARAMS\n");
char *str = "MERGED FOO";
size_t len = strlen(str);
// result must be malloc'ed here (freed by rocksdb)
char *result = malloc(len);
memcpy(result, str, len);
if (!result) {
*success = 0;
*new_value_length = 0;
return NULL;
}
*success = 1;
*new_value_length = len;
return result;
}
const char* merge_operator_name_fn(void *state)
{
return "tesbdb_merge_op";
}
// destructor MUST be provided for a merge operator
void merge_operator_destructor_fn(void *state)
{
printf("merge_operator_destructor_fn called\n");
}
int main()
{
const char *db_name = "testdb";
// db options
rocksdb_options_t *opts = rocksdb_options_create();
rocksdb_options_set_create_if_missing(opts, 1);
rocksdb_options_set_error_if_exists(opts, 1);
rocksdb_options_set_compression(opts, rocksdb_snappy_compression);
// merge operator
struct merge_operator_state *state = malloc(sizeof(*state));
if (!state) {
return -1;
}
rocksdb_mergeoperator_t *merge_op = rocksdb_mergeoperator_create(
(void *)state,
merge_operator_destructor_fn,
merge_operator_full_merge_fn,
NULL, // partial merge function
NULL, // delete value function
merge_operator_name_fn);
rocksdb_options_set_merge_operator(opts, merge_op);
char *err;
rocksdb_t *db = rocksdb_open(opts, db_name, &err);
if (err != NULL) {
fprintf(stderr, "database open %s\n", err);
rocksdb_mergeoperator_destroy(merge_op);
return -1;
}
free(err);
err = NULL;
rocksdb_writeoptions_t *wo = rocksdb_writeoptions_create();
char *key = "name";
char *value = "foo";
rocksdb_put(db, wo, key, strlen(key), value, strlen(value), &err);
if (err != NULL) {
fprintf(stderr, "put key %s\n", err);
rocksdb_mergeoperator_destroy(merge_op);
rocksdb_close(db);
return -1;
}
free(err);
err = NULL;
// insert key, value: name => foo
rocksdb_readoptions_t *ro = rocksdb_readoptions_create();
size_t rlen;
value = rocksdb_get(db, ro, key, strlen(key), &rlen, &err);
if (err != NULL) {
fprintf(stderr, "get key %s\n", err);
rocksdb_mergeoperator_destroy(merge_op);
rocksdb_close(db);
return -1;
}
free(err);
err = NULL;
printf("get key len: %lu, value: %s\n", rlen, value);
// Merge values into an existing key "name"
// name => bar
value = "bar";
rocksdb_merge(db, wo, key, strlen(key), value, strlen(value), &err);
if (err != NULL) {
fprintf(stderr, "merge key %s\n", err);
rocksdb_mergeoperator_destroy(merge_op);
rocksdb_close(db);
return -1;
}
free(err);
err = NULL;
// name => baz
value = "baz";
rocksdb_merge(db, wo, key, strlen(key), value, strlen(value), &err);
if (err != NULL) {
fprintf(stderr, "merge key %s\n", err);
rocksdb_mergeoperator_destroy(merge_op);
rocksdb_close(db);
return -1;
}
free(err);
err = NULL;
// get key "name"
value = rocksdb_get(db, ro, key, strlen(key), &rlen, &err);
if (err != NULL) {
fprintf(stderr, "get key %s\n", err);
rocksdb_mergeoperator_destroy(merge_op);
rocksdb_close(db);
return -1;
}
free(err);
err = NULL;
printf("get key (merged) len: %lu, value: %.*s\n", rlen, (int)rlen, value);
rocksdb_mergeoperator_destroy(merge_op);
rocksdb_close(db);
return 0;
}
@nitingupta910
Copy link
Author

Output:

get key len: 3, value: foo
full_merge_fn called

Dumping full_merge_fn params:
key: name
existing value: foo
operands (2):
    bar
    baz
END PARAMS
get key (merged) len: 10, value: MERGED FOO

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment