Skip to content

Instantly share code, notes, and snippets.

@kevinkreiser
Last active August 29, 2015 14:15
Show Gist options
  • Save kevinkreiser/26f17838d4dea3e1f494 to your computer and use it in GitHub Desktop.
Save kevinkreiser/26f17838d4dea3e1f494 to your computer and use it in GitHub Desktop.
Kyotocabinet File Based Hash Map Tests for Reading and Writing
/*
sudo apt-get install libkyotocabinet-dev
g++ kyotocabinet_hash_map_test.cpp -o kyotocabinet_hash_map_test -std=c++11 -O2 -lkyotocabinet
./kyotocabinet_hash_map_test 500000
./kyotocabinet_hash_map_test 500000 threaded_read
*/
#include <fstream>
#include <string>
#include <iostream>
#include <cinttypes>
#include <utility>
#include <algorithm>
#include <unordered_map>
#include <thread>
#include <condition_variable>
#include <future>
#include <list>
#include <tuple>
#include <chrono>
#include <kchashdb.h>
struct osm_node {
std::pair<float, float> lat_lon;
uint32_t attributes;
};
template <class T>
std::string write(T start, T end) {
//crack open a file for this thread
std::ostringstream s; s << std::hash<std::thread::id>()(std::this_thread::get_id());
std::string file_name = s.str() + ".kch";
kyotocabinet::HashDB db;
db.tune_buckets(static_cast<int64_t>((end - start) * 1.5f));
if(!db.open(file_name, kyotocabinet::HashDB::OWRITER | kyotocabinet::HashDB::OCREATE))
throw std::runtime_error("Could not write file based cache with file: " + file_name);
//write the data
const auto key_size = sizeof(T);
const auto value_size = sizeof(osm_node);
for(; start < end; ++start) {
//make the key
auto key_data = static_cast<const char*>(static_cast<void*>(&start));
//make the value
osm_node node;
auto value_data = static_cast<const char*>(static_cast<void*>(&node));
//store it
if(!db.add(key_data, key_size, value_data, value_size))
throw std::runtime_error("Tried to write the same value twice");
}
//close the file
if(!db.close())
throw std::runtime_error("Could not close file based cache with file: " + file_name);
return file_name;
}
template <class T>
std::list<kyotocabinet::HashDB> write_nodes_threaded(T count) {
//start up some threads and have them write some nodes into file backed hash maps
const T threads = static_cast<T>(std::max(static_cast<unsigned int>(1), std::thread::hardware_concurrency()));
T start = 0;
std::list<std::future<std::string> > results;
for(T i = 0; i < threads; ++i){
//last thread
if(i == threads - 1)
results.push_back(std::async(std::launch::async, write<T>, start, count));
else
results.push_back(std::async(std::launch::async, write<T>, start, start + std::floor<uint64_t>(count / threads)));
start += std::floor<uint64_t>(count / threads);
}
//get back the names of all files that were written and open them up for reading
std::list<kyotocabinet::HashDB> readers;
for(auto& result : results){
try {
std::string file_name = result.get();
std::cout << "Write Succeeded: " << file_name << std::endl << std::endl;
readers.emplace_back();
if(!readers.back().open(file_name, kyotocabinet::HashDB::OREADER)) {
readers.pop_back();
throw std::runtime_error("Couldn't open " + file_name + " for reading");
}
}
catch(const std::exception& e) {
std::cout << "Write failed: " << e.what() << std::endl << std::endl;
}
}
return readers;
}
template <class T>
std::list<kyotocabinet::HashDB> write_nodes(T count) {
//crack open a file for this thread
std::ostringstream s; s << std::hash<std::thread::id>()(std::this_thread::get_id());
std::string file_name = s.str() + ".kch";
kyotocabinet::HashDB db;
db.tune_buckets(static_cast<int64_t>(count * 1.5f));
if(!db.open(file_name, kyotocabinet::HashDB::OWRITER | kyotocabinet::HashDB::OCREATE))
throw std::runtime_error("Could not write file based cache with file: " + file_name);
//write the data
const auto key_size = sizeof(T);
const auto value_size = sizeof(osm_node);
for(T start = 0; start < count; ++start) {
//make the key
auto key_data = static_cast<const char*>(static_cast<void*>(&start));
//make the value
osm_node node;
auto value_data = static_cast<const char*>(static_cast<void*>(&node));
//store it
if(!db.add(key_data, key_size, value_data, value_size))
throw std::runtime_error("Tried to write the same value twice");
}
//close the file
if(db.close())
std::cout << "Write Succeeded: " << file_name << std::endl << std::endl;
else
throw std::runtime_error("Could not close file based cache with file: " + file_name);
//hand back a reader
std::list<kyotocabinet::HashDB> readers;
readers.emplace_back();
if(!readers.back().open(file_name, kyotocabinet::HashDB::OREADER)) {
throw std::runtime_error("Couldn't open " + file_name + " for reading");
}
return readers;
}
template <class T>
struct inter_thread_comm {
std::atomic<bool> working;
std::mutex mutex;
std::condition_variable condition;
T result;
};
template<class T1, class T2>
void read(inter_thread_comm<T1>& comm, const std::atomic<bool>& done, kyotocabinet::HashDB& db, const T2& key) {
const auto key_size = sizeof(T2);
const auto value_size = sizeof(osm_node);
osm_node fetched;
//while we aren't done
while(!done) {
//wait for someone notify us
std::unique_lock<std::mutex> lock(comm.mutex);
comm.condition.wait(lock, [&comm, &done](){return comm.working || done;});
//quit
if(done)
return;
//try to get the actual data
comm.result.first =
db.get(static_cast<const char*>(static_cast<const void*>(&key)), key_size,
static_cast<char*>(static_cast<void*>(&fetched)), value_size) != -1;
comm.result.second = fetched;
//tell the main thread we are done looking for this one
comm.working = false;
lock.unlock();
comm.condition.notify_one();
}
}
template <class T>
void read_nodes_threaded(std::list<kyotocabinet::HashDB>& dbs, T count) {
//some threads for reading and some state for querying the pool
std::list<std::thread> pool;
std::list<inter_thread_comm<std::pair<bool, osm_node> > > communications;
std::atomic<bool> done(false);
T key;
for(auto& db : dbs) {
//make a thread communication object
communications.emplace_back();
//start the thread
pool.emplace_back(::read<std::pair<bool, osm_node>, T>, std::ref(communications.back()), std::cref(done), std::ref(db), std::cref(key));
}
//read the data
for(key = 0; key < count; ++key){
//notify each thread to look for the value
for(auto& comm : communications){
std::lock_guard<std::mutex> lock(comm.mutex);
comm.working = true;
comm.condition.notify_one();
}
//wait for each to notify us that its finished
bool found = false;
osm_node node;
for(auto& comm : communications) {
std::unique_lock<std::mutex> lock(comm.mutex);
comm.condition.wait(lock, [&comm]{return comm.working != true;});
if(comm.result.first){
found = true;
node = comm.result.second;
break;
}
}
//complain if we didnt find it
if(!found)
throw std::runtime_error("Couldn't find: " + std::to_string(key));
}
//we are done reading from the threads
done = true;
for(auto& comm : communications){
std::lock_guard<std::mutex> lock(comm.mutex);
comm.condition.notify_one();
}
for(auto& thread : pool) {
thread.join();
}
}
template <class T>
void read_nodes(std::list<kyotocabinet::HashDB>& dbs, T count) {
//read the data
const auto key_size = sizeof(T);
const auto value_size = sizeof(osm_node);
for(T i = 0; i < count; ++i){
//try to get it
bool found = false;
osm_node node;
for(auto& db : dbs){
if(db.get(static_cast<const char*>(static_cast<void*>(&i)), key_size, static_cast<char*>(static_cast<void*>(&node)), value_size) != -1) {
found = true;
break;
}
}
if(!found)
throw std::runtime_error("Failed to read node for " + std::to_string(i));
}
}
int main(int argc, char** argv){
uint64_t count = 500000;
if(argc > 1)
count = static_cast<uint64_t>(std::stoul(argv[1]));
bool threaded = false;
if(argc > 2)
threaded = std::string(argv[2]) == "threaded";
std::cout << "Working with " << count << " osm_nodes" << std::endl;
std::cout << "Writing nodes to file backed hash db" << std::endl;
auto start = std::chrono::system_clock::now();
auto dbs = threaded ? write_nodes_threaded<uint64_t>(count) : write_nodes<uint64_t>(count);
auto elapsed = std::chrono::system_clock::now() - start;
std::cout << "Took " << std::to_string(std::chrono::duration_cast<std::chrono::seconds>(elapsed).count()) << " sec" << std::endl;
std::cout << "Reading nodes from file backed hash db" << std::endl;
start = std::chrono::system_clock::now();
if(threaded)
read_nodes_threaded<uint64_t>(dbs, count);
else
read_nodes<uint64_t>(dbs, count);
elapsed = std::chrono::system_clock::now() - start;
std::cout << "Took " << std::to_string(std::chrono::duration_cast<std::chrono::seconds>(elapsed).count()) << " sec" << std::endl;
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment