Skip to content

Instantly share code, notes, and snippets.

@ajkr
Last active July 10, 2019 02:25
Show Gist options
  • Save ajkr/a0b7d2a8b7cf1ad24d65338f04731a30 to your computer and use it in GitHub Desktop.
Save ajkr/a0b7d2a8b7cf1ad24d65338f04731a30 to your computer and use it in GitHub Desktop.
commands to build/run:
$ capnp compile -oc++ ./kv.capnp
$ g++ -std=c++14 ./kv.capnp.c++ ./kv-server.c++ -o ./kv-server -lkj -lkj-async -lcapnp-rpc -lcapnp -lrocksdb -lpthread -lsnappy -lz -ldl
$ g++ -std=c++14 ./kv.capnp.c++ ./kv-client.c++ -o ./kv-client -lkj -lkj-async -lcapnp-rpc -lcapnp
$ strace -fe writev ./kv-server localhost:12345 ./tmp-db/
$ ./kv-client localhost:12345 a b
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#include "kv.capnp.h"
#include <capnp/ez-rpc.h>
#include <iostream>
int main(int argc, const char* argv[]) {
if (argc != 4) {
std::cerr << "usage: " << argv[0] << " HOST:PORT <start key> <end key>\n"
"Connects to the KV server at the given address and scans the provided "
"range." << std::endl;
return 1;
}
capnp::EzRpcClient client(argv[1]);
KvStore::Client kvStore = client.getMain<KvStore>();
// Keep an eye on `waitScope`. Whenever you see it used is a place where we
// stop and wait for the server to respond. If a line of code does not use
// `waitScope`, then it does not block!
auto& waitScope = client.getWaitScope();
std::cout << "Scanning [" << argv[2] << ", " << argv[3] << ")... " << std::endl;
std::cout.flush();
// Set up the request.
auto request = kvStore.scanRequest();
request.setStartKey(kj::arrayPtr((unsigned char*)argv[2], strlen(argv[2])));
request.setEndKey(kj::arrayPtr((unsigned char*)argv[3], strlen(argv[3])));
// Send it, which returns a promise for the result (without blocking).
auto scanPromise = request.send();
// Now that we've sent all the requests, wait for the response. Until this
// point, we haven't waited at all!
auto response = scanPromise.wait(waitScope);
std::cout << "Scan returned " << std::endl;
return 0;
}
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#include "kv.capnp.h"
#include <capnp/ez-rpc.h>
#include <iostream>
#include <rocksdb/db.h>
#include <rocksdb/options.h>
#include <rocksdb/table.h>
class KvStoreImpl final: public KvStore::Server {
public:
KvStoreImpl(const std::string& dbDir) {
std::shared_ptr<rocksdb::Cache> cache = rocksdb::NewLRUCache(1 << 30 /* 1 GB */);
rocksdb::BlockBasedTableOptions table_options;
table_options.block_cache = cache;
// `ReadOptions::pin_data` can only pin keys in memory when prefix encoding is disabled.
table_options.block_restart_interval = 1;
rocksdb::Options options;
options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options));
auto status = rocksdb::DB::Open(options, dbDir, &db_);
if (!status.ok()) {
fprintf(stderr, "DB::Open: %s\n", status.ToString().c_str());
exit(1);
}
}
kj::Promise<void> scan(ScanContext context) override {
auto params = context.getParams();
rocksdb::Slice startKey((char*)params.getStartKey().begin(), params.getStartKey().size());
rocksdb::Slice endKey((char*)params.getEndKey().begin(), params.getEndKey().size());
// Scan [startKey, endKey) in a way that pins all data blocks that are traversed.
// That way we can hold pointers to the keys/values in block cache without any
// allocations or copying (besides for the pointers). Incorporating the pointers
// into the response using the orphan API, as we do below, should cause the pointers
// to be passed all the way down to the kernel via `writev()`, where buffers finally
// coalesce.
rocksdb::ReadOptions read_opts;
read_opts.pin_data = true;
read_opts.total_order_seek = true;
read_opts.iterate_upper_bound = &endKey;
rocksdb::Iterator* iter = db_->NewIterator(read_opts);
auto orphanarium = context.getResultsOrphanage();
std::vector<std::pair<rocksdb::Slice, rocksdb::Slice>> slicePtrs;
for (iter->Seek(startKey); iter->Valid(); iter->Next()) {
slicePtrs.emplace_back(rocksdb::Slice(iter->key().data(), iter->key().size()),
rocksdb::Slice(iter->value().data(), iter->value().size()));
}
auto kvs = context.getResults().initKvs(slicePtrs.size());
for (size_t i = 0; i < slicePtrs.size(); ++i) {
kvs[i].adoptKey(orphanarium.referenceExternalData(kj::arrayPtr(
(unsigned char*)slicePtrs[i].first.data(), slicePtrs[i].first.size())));
kvs[i].adoptValue(orphanarium.referenceExternalData(kj::arrayPtr(
(unsigned char*)slicePtrs[i].second.data(), slicePtrs[i].second.size())));
}
// TODO: delete the iterator.
return kj::READY_NOW;
}
private:
rocksdb::DB* db_;
};
int main(int argc, const char* argv[]) {
if (argc != 3) {
std::cerr << "usage: " << argv[0] << " ADDRESS[:PORT]\n"
"Runs the server bound to the given address/port.\n"
"ADDRESS may be '*' to bind to all local addresses.\n"
":PORT may be omitted to choose a port automatically." << std::endl;
return 1;
}
// Set up a server.
capnp::EzRpcServer server(kj::heap<KvStoreImpl>(argv[2]), argv[1]);
// Write the port number to stdout, in case it was chosen automatically.
auto& waitScope = server.getWaitScope();
uint port = server.getPort().wait(waitScope);
if (port == 0) {
// The address format "unix:/path/to/socket" opens a unix domain socket,
// in which case the port will be zero.
std::cout << "Listening on Unix socket..." << std::endl;
} else {
std::cout << "Listening on port " << port << "..." << std::endl;
}
// Run forever, accepting connections and handling requests.
kj::NEVER_DONE.wait(waitScope);
}
@0xb8bae68ada1b7a81;
interface KvStore {
struct Kv {
key @0 :Data;
value @1 :Data;
}
scan @0 (startKey :Data, endKey :Data) -> (kvs :List(Kv));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment