Skip to content

Instantly share code, notes, and snippets.

@krishnasrinivas
Created March 8, 2019 21:56
Show Gist options
  • Save krishnasrinivas/7b0de0d4d309d4c3475665c45bfd9ea2 to your computer and use it in GitHub Desktop.
Save krishnasrinivas/7b0de0d4d309d4c3475665c45bfd9ea2 to your computer and use it in GitHub Desktop.
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "nkv_api.h"
#include "nkv_result.h"
struct minio_nkv_handle {
uint64_t nkv_handle;
uint64_t container_hash;
uint64_t network_path_hash;
};
typedef struct minio_nkv_private_ {
void *pfn;
uint64_t channel;
nkv_io_context ctx;
nkv_key nkvkey;
nkv_value nkvvalue;
nkv_store_option store_option;
nkv_retrieve_option retrieve_option;
} minio_nkv_private;
static int minio_nkv_open(char *config, uint64_t *nkv_handle) {
uint64_t instance_uuid = 0;
nkv_result result;
result = nkv_open(config, "minio", "msl-ssg-sk01", 1023, &instance_uuid, nkv_handle);
return result;
}
static int minio_nkv_open_path(struct minio_nkv_handle *handle, char *ipaddr) {
uint32_t index = 0;
uint32_t cnt_count = NKV_MAX_ENTRIES_PER_CALL;
nkv_container_info *cntlist = malloc(sizeof(nkv_container_info)*NKV_MAX_ENTRIES_PER_CALL);
memset(cntlist, 0, sizeof(nkv_container_info) * NKV_MAX_ENTRIES_PER_CALL);
for (int i = 0; i < NKV_MAX_ENTRIES_PER_CALL; i++) {
cntlist[i].num_container_transport = NKV_MAX_CONT_TRANSPORT;
cntlist[i].transport_list = malloc(sizeof(nkv_container_transport)*NKV_MAX_CONT_TRANSPORT);
memset(cntlist[i].transport_list, 0, sizeof(nkv_container_transport)*NKV_MAX_CONT_TRANSPORT);
}
int result = nkv_physical_container_list (handle->nkv_handle, index, cntlist, &cnt_count);
if (result != 0) {
printf("NKV getting physical container list failed !!, error = %d\n", result);
exit(1);
}
nkv_io_context io_ctx[16];
memset(io_ctx, 0, sizeof(nkv_io_context) * 16);
uint32_t io_ctx_cnt = 0;
for (uint32_t i = 0; i < cnt_count; i++) {
io_ctx[io_ctx_cnt].container_hash = cntlist[i].container_hash;
for (int p = 0; p < cntlist[i].num_container_transport; p++) {
printf("Transport information :: hash = %lu, id = %d, address = %s, port = %d, family = %d, speed = %d, status = %d, numa_node = %d\n",
cntlist[i].transport_list[p].network_path_hash, cntlist[i].transport_list[p].network_path_id, cntlist[i].transport_list[p].ip_addr,
cntlist[i].transport_list[p].port, cntlist[i].transport_list[p].addr_family, cntlist[i].transport_list[p].speed,
cntlist[i].transport_list[p].status, cntlist[i].transport_list[p].numa_node);
io_ctx[io_ctx_cnt].is_pass_through = 1;
io_ctx[io_ctx_cnt].container_hash = cntlist[i].container_hash;
io_ctx[io_ctx_cnt].network_path_hash = cntlist[i].transport_list[p].network_path_hash;
if(!strcmp(cntlist[i].transport_list[p].ip_addr, ipaddr)) {
handle->container_hash = cntlist[i].container_hash;
handle->network_path_hash = cntlist[i].transport_list[p].network_path_hash;
return 0;
}
io_ctx_cnt++;
}
}
return 1;
}
static void nkv_aio_complete (nkv_aio_construct* op_data, int32_t num_op) {
printf("nkv_aio_complete called\n");
if (!op_data) {
printf("NKV Async IO returned NULL op_Data, ignoring..");
}
minio_nkv_private* pvt = (minio_nkv_private*) op_data->private_data;
printf("ACTUALLENGTH %ld\n", pvt->nkvvalue.actual_length);
free(pvt->pfn);
free(pvt);
}
static int minio_nkv_put_async(struct minio_nkv_handle *handle, uint64_t channel, void *key, int keyLen, void *value, int valueLen) {
nkv_postprocess_function* pfn = (nkv_postprocess_function*) malloc (sizeof(nkv_postprocess_function));
minio_nkv_private* pvt = (minio_nkv_private*) malloc(sizeof(minio_nkv_private));
pfn->nkv_aio_cb = nkv_aio_complete;
pfn->private_data = (void*)pvt;
pvt->pfn = pfn;
pvt->channel = channel;
pvt->ctx.is_pass_through = 1;
pvt->ctx.container_hash = handle->container_hash;
pvt->ctx.network_path_hash = handle->network_path_hash;
pvt->ctx.ks_id = 0;
const nkv_key nkvkey = {key, keyLen};
pvt->nkvkey = nkvkey;
nkv_value nkvvalue = {value, valueLen, 0};
pvt->nkvvalue = nkvvalue;
nkv_store_option option = {0};
pvt->store_option = option;
nkv_result result = nkv_store_kvp_async(handle->nkv_handle, &pvt->ctx, &pvt->nkvkey, &pvt->store_option, &pvt->nkvvalue, pfn);
return result;
}
static int minio_nkv_get_async(struct minio_nkv_handle *handle, uint64_t channel, void *key, int keyLen, void *value, int valueLen) {
nkv_postprocess_function* pfn = (nkv_postprocess_function*) malloc (sizeof(nkv_postprocess_function));
minio_nkv_private* pvt = (minio_nkv_private*) malloc(sizeof(minio_nkv_private));
pfn->nkv_aio_cb = nkv_aio_complete;
pfn->private_data = (void*)pvt;
pvt->pfn = pfn;
pvt->channel = channel;
pvt->ctx.is_pass_through = 1;
pvt->ctx.container_hash = handle->container_hash;
pvt->ctx.network_path_hash = handle->network_path_hash;
pvt->ctx.ks_id = 0;
const nkv_key nkvkey = {key, keyLen};
pvt->nkvkey = nkvkey;
nkv_value nkvvalue = {value, valueLen, 0};
pvt->nkvvalue = nkvvalue;
nkv_retrieve_option option = {0};
pvt->retrieve_option = option;
nkv_result result = nkv_retrieve_kvp_async(handle->nkv_handle, &pvt->ctx, &pvt->nkvkey, &pvt->retrieve_option, &pvt->nkvvalue, pfn);
return result;
}
int main(int argc, char **argv) {
struct minio_nkv_handle handle;
nkv_result result = minio_nkv_open("nkv_config.json", &handle.nkv_handle);
if (result != 0) {
printf("result %d", result);
exit(1);
}
result = minio_nkv_open_path(&handle, argv[1]);
if (result != 0) {
printf("result %d", result);
exit(1);
}
char *key="aaaaaaaaaa";
char *value="bbbbbbbbbb";
result = minio_nkv_put_async(&handle, 0, (void *)key, strlen(key), (void *)value, strlen(value));
if (result != 0) {
printf("result %d", result);
exit(1);
}
sleep(1);
result = minio_nkv_get_async(&handle, 0, (void *)key, strlen(key), (void *)value, strlen(value));
if (result != 0) {
printf("result %d", result);
exit(1);
}
sleep(1);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment