Last active
July 10, 2019 02:25
-
-
Save ajkr/a0b7d2a8b7cf1ad24d65338f04731a30 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
commands to build/run: | |
$ capnp compile -oc++ ./kv.capnp | |
$ g++ -std=c++14 ./kv.capnp.c++ ./kv-server.c++ -o ./kv-server -lkj -lkj-async -lcapnp-rpc -lcapnp -lrocksdb -lpthread -lsnappy -lz -ldl | |
$ g++ -std=c++14 ./kv.capnp.c++ ./kv-client.c++ -o ./kv-client -lkj -lkj-async -lcapnp-rpc -lcapnp | |
$ strace -fe writev ./kv-server localhost:12345 ./tmp-db/ | |
$ ./kv-client localhost:12345 a b |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors | |
// Licensed under the MIT License: | |
// | |
// Permission is hereby granted, free of charge, to any person obtaining a copy | |
// of this software and associated documentation files (the "Software"), to deal | |
// in the Software without restriction, including without limitation the rights | |
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
// copies of the Software, and to permit persons to whom the Software is | |
// furnished to do so, subject to the following conditions: | |
// | |
// The above copyright notice and this permission notice shall be included in | |
// all copies or substantial portions of the Software. | |
// | |
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
// THE SOFTWARE. | |
#include "kv.capnp.h" | |
#include <capnp/ez-rpc.h> | |
#include <iostream> | |
int main(int argc, const char* argv[]) { | |
if (argc != 4) { | |
std::cerr << "usage: " << argv[0] << " HOST:PORT <start key> <end key>\n" | |
"Connects to the KV server at the given address and scans the provided " | |
"range." << std::endl; | |
return 1; | |
} | |
capnp::EzRpcClient client(argv[1]); | |
KvStore::Client kvStore = client.getMain<KvStore>(); | |
// Keep an eye on `waitScope`. Whenever you see it used is a place where we | |
// stop and wait for the server to respond. If a line of code does not use | |
// `waitScope`, then it does not block! | |
auto& waitScope = client.getWaitScope(); | |
std::cout << "Scanning [" << argv[2] << ", " << argv[3] << ")... " << std::endl; | |
std::cout.flush(); | |
// Set up the request. | |
auto request = kvStore.scanRequest(); | |
request.setStartKey(kj::arrayPtr((unsigned char*)argv[2], strlen(argv[2]))); | |
request.setEndKey(kj::arrayPtr((unsigned char*)argv[3], strlen(argv[3]))); | |
// Send it, which returns a promise for the result (without blocking). | |
auto scanPromise = request.send(); | |
// Now that we've sent all the requests, wait for the response. Until this | |
// point, we haven't waited at all! | |
auto response = scanPromise.wait(waitScope); | |
std::cout << "Scan returned " << std::endl; | |
return 0; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors | |
// Licensed under the MIT License: | |
// | |
// Permission is hereby granted, free of charge, to any person obtaining a copy | |
// of this software and associated documentation files (the "Software"), to deal | |
// in the Software without restriction, including without limitation the rights | |
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
// copies of the Software, and to permit persons to whom the Software is | |
// furnished to do so, subject to the following conditions: | |
// | |
// The above copyright notice and this permission notice shall be included in | |
// all copies or substantial portions of the Software. | |
// | |
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
// THE SOFTWARE. | |
#include "kv.capnp.h" | |
#include <capnp/ez-rpc.h> | |
#include <iostream> | |
#include <rocksdb/db.h> | |
#include <rocksdb/options.h> | |
#include <rocksdb/table.h> | |
class KvStoreImpl final: public KvStore::Server { | |
public: | |
KvStoreImpl(const std::string& dbDir) { | |
std::shared_ptr<rocksdb::Cache> cache = rocksdb::NewLRUCache(1 << 30 /* 1 GB */); | |
rocksdb::BlockBasedTableOptions table_options; | |
table_options.block_cache = cache; | |
// `ReadOptions::pin_data` can only pin keys in memory when prefix encoding is disabled. | |
table_options.block_restart_interval = 1; | |
rocksdb::Options options; | |
options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options)); | |
auto status = rocksdb::DB::Open(options, dbDir, &db_); | |
if (!status.ok()) { | |
fprintf(stderr, "DB::Open: %s\n", status.ToString().c_str()); | |
exit(1); | |
} | |
} | |
kj::Promise<void> scan(ScanContext context) override { | |
auto params = context.getParams(); | |
rocksdb::Slice startKey((char*)params.getStartKey().begin(), params.getStartKey().size()); | |
rocksdb::Slice endKey((char*)params.getEndKey().begin(), params.getEndKey().size()); | |
// Scan [startKey, endKey) in a way that pins all data blocks that are traversed. | |
// That way we can hold pointers to the keys/values in block cache without any | |
// allocations or copying (besides for the pointers). Incorporating the pointers | |
// into the response using the orphan API, as we do below, should cause the pointers | |
// to be passed all the way down to the kernel via `writev()`, where buffers finally | |
// coalesce. | |
rocksdb::ReadOptions read_opts; | |
read_opts.pin_data = true; | |
read_opts.total_order_seek = true; | |
read_opts.iterate_upper_bound = &endKey; | |
rocksdb::Iterator* iter = db_->NewIterator(read_opts); | |
auto orphanarium = context.getResultsOrphanage(); | |
std::vector<std::pair<rocksdb::Slice, rocksdb::Slice>> slicePtrs; | |
for (iter->Seek(startKey); iter->Valid(); iter->Next()) { | |
slicePtrs.emplace_back(rocksdb::Slice(iter->key().data(), iter->key().size()), | |
rocksdb::Slice(iter->value().data(), iter->value().size())); | |
} | |
auto kvs = context.getResults().initKvs(slicePtrs.size()); | |
for (size_t i = 0; i < slicePtrs.size(); ++i) { | |
kvs[i].adoptKey(orphanarium.referenceExternalData(kj::arrayPtr( | |
(unsigned char*)slicePtrs[i].first.data(), slicePtrs[i].first.size()))); | |
kvs[i].adoptValue(orphanarium.referenceExternalData(kj::arrayPtr( | |
(unsigned char*)slicePtrs[i].second.data(), slicePtrs[i].second.size()))); | |
} | |
// TODO: delete the iterator. | |
return kj::READY_NOW; | |
} | |
private: | |
rocksdb::DB* db_; | |
}; | |
int main(int argc, const char* argv[]) { | |
if (argc != 3) { | |
std::cerr << "usage: " << argv[0] << " ADDRESS[:PORT]\n" | |
"Runs the server bound to the given address/port.\n" | |
"ADDRESS may be '*' to bind to all local addresses.\n" | |
":PORT may be omitted to choose a port automatically." << std::endl; | |
return 1; | |
} | |
// Set up a server. | |
capnp::EzRpcServer server(kj::heap<KvStoreImpl>(argv[2]), argv[1]); | |
// Write the port number to stdout, in case it was chosen automatically. | |
auto& waitScope = server.getWaitScope(); | |
uint port = server.getPort().wait(waitScope); | |
if (port == 0) { | |
// The address format "unix:/path/to/socket" opens a unix domain socket, | |
// in which case the port will be zero. | |
std::cout << "Listening on Unix socket..." << std::endl; | |
} else { | |
std::cout << "Listening on port " << port << "..." << std::endl; | |
} | |
// Run forever, accepting connections and handling requests. | |
kj::NEVER_DONE.wait(waitScope); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@0xb8bae68ada1b7a81; | |
interface KvStore { | |
struct Kv { | |
key @0 :Data; | |
value @1 :Data; | |
} | |
scan @0 (startKey :Data, endKey :Data) -> (kvs :List(Kv)); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment