Last active
August 29, 2015 14:15
-
-
Save kevinkreiser/26f17838d4dea3e1f494 to your computer and use it in GitHub Desktop.
Kyotocabinet File Based Hash Map Tests for Reading and Writing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
sudo apt-get install libkyotocabinet-dev | |
g++ kyotocabinet_hash_map_test.cpp -o kyotocabinet_hash_map_test -std=c++11 -O2 -lkyotocabinet | |
./kyotocabinet_hash_map_test 500000 | |
./kyotocabinet_hash_map_test 500000 threaded_read | |
*/ | |
#include <fstream> | |
#include <string> | |
#include <iostream> | |
#include <cinttypes> | |
#include <utility> | |
#include <algorithm> | |
#include <unordered_map> | |
#include <thread> | |
#include <condition_variable> | |
#include <future> | |
#include <list> | |
#include <tuple> | |
#include <chrono> | |
#include <kchashdb.h> | |
struct osm_node { | |
std::pair<float, float> lat_lon; | |
uint32_t attributes; | |
}; | |
template <class T> | |
std::string write(T start, T end) { | |
//crack open a file for this thread | |
std::ostringstream s; s << std::hash<std::thread::id>()(std::this_thread::get_id()); | |
std::string file_name = s.str() + ".kch"; | |
kyotocabinet::HashDB db; | |
db.tune_buckets(static_cast<int64_t>((end - start) * 1.5f)); | |
if(!db.open(file_name, kyotocabinet::HashDB::OWRITER | kyotocabinet::HashDB::OCREATE)) | |
throw std::runtime_error("Could not write file based cache with file: " + file_name); | |
//write the data | |
const auto key_size = sizeof(T); | |
const auto value_size = sizeof(osm_node); | |
for(; start < end; ++start) { | |
//make the key | |
auto key_data = static_cast<const char*>(static_cast<void*>(&start)); | |
//make the value | |
osm_node node; | |
auto value_data = static_cast<const char*>(static_cast<void*>(&node)); | |
//store it | |
if(!db.add(key_data, key_size, value_data, value_size)) | |
throw std::runtime_error("Tried to write the same value twice"); | |
} | |
//close the file | |
if(!db.close()) | |
throw std::runtime_error("Could not close file based cache with file: " + file_name); | |
return file_name; | |
} | |
template <class T> | |
std::list<kyotocabinet::HashDB> write_nodes_threaded(T count) { | |
//start up some threads and have them write some nodes into file backed hash maps | |
const T threads = static_cast<T>(std::max(static_cast<unsigned int>(1), std::thread::hardware_concurrency())); | |
T start = 0; | |
std::list<std::future<std::string> > results; | |
for(T i = 0; i < threads; ++i){ | |
//last thread | |
if(i == threads - 1) | |
results.push_back(std::async(std::launch::async, write<T>, start, count)); | |
else | |
results.push_back(std::async(std::launch::async, write<T>, start, start + std::floor<uint64_t>(count / threads))); | |
start += std::floor<uint64_t>(count / threads); | |
} | |
//get back the names of all files that were written and open them up for reading | |
std::list<kyotocabinet::HashDB> readers; | |
for(auto& result : results){ | |
try { | |
std::string file_name = result.get(); | |
std::cout << "Write Succeeded: " << file_name << std::endl << std::endl; | |
readers.emplace_back(); | |
if(!readers.back().open(file_name, kyotocabinet::HashDB::OREADER)) { | |
readers.pop_back(); | |
throw std::runtime_error("Couldn't open " + file_name + " for reading"); | |
} | |
} | |
catch(const std::exception& e) { | |
std::cout << "Write failed: " << e.what() << std::endl << std::endl; | |
} | |
} | |
return readers; | |
} | |
template <class T> | |
std::list<kyotocabinet::HashDB> write_nodes(T count) { | |
//crack open a file for this thread | |
std::ostringstream s; s << std::hash<std::thread::id>()(std::this_thread::get_id()); | |
std::string file_name = s.str() + ".kch"; | |
kyotocabinet::HashDB db; | |
db.tune_buckets(static_cast<int64_t>(count * 1.5f)); | |
if(!db.open(file_name, kyotocabinet::HashDB::OWRITER | kyotocabinet::HashDB::OCREATE)) | |
throw std::runtime_error("Could not write file based cache with file: " + file_name); | |
//write the data | |
const auto key_size = sizeof(T); | |
const auto value_size = sizeof(osm_node); | |
for(T start = 0; start < count; ++start) { | |
//make the key | |
auto key_data = static_cast<const char*>(static_cast<void*>(&start)); | |
//make the value | |
osm_node node; | |
auto value_data = static_cast<const char*>(static_cast<void*>(&node)); | |
//store it | |
if(!db.add(key_data, key_size, value_data, value_size)) | |
throw std::runtime_error("Tried to write the same value twice"); | |
} | |
//close the file | |
if(db.close()) | |
std::cout << "Write Succeeded: " << file_name << std::endl << std::endl; | |
else | |
throw std::runtime_error("Could not close file based cache with file: " + file_name); | |
//hand back a reader | |
std::list<kyotocabinet::HashDB> readers; | |
readers.emplace_back(); | |
if(!readers.back().open(file_name, kyotocabinet::HashDB::OREADER)) { | |
throw std::runtime_error("Couldn't open " + file_name + " for reading"); | |
} | |
return readers; | |
} | |
template <class T> | |
struct inter_thread_comm { | |
std::atomic<bool> working; | |
std::mutex mutex; | |
std::condition_variable condition; | |
T result; | |
}; | |
template<class T1, class T2> | |
void read(inter_thread_comm<T1>& comm, const std::atomic<bool>& done, kyotocabinet::HashDB& db, const T2& key) { | |
const auto key_size = sizeof(T2); | |
const auto value_size = sizeof(osm_node); | |
osm_node fetched; | |
//while we aren't done | |
while(!done) { | |
//wait for someone notify us | |
std::unique_lock<std::mutex> lock(comm.mutex); | |
comm.condition.wait(lock, [&comm, &done](){return comm.working || done;}); | |
//quit | |
if(done) | |
return; | |
//try to get the actual data | |
comm.result.first = | |
db.get(static_cast<const char*>(static_cast<const void*>(&key)), key_size, | |
static_cast<char*>(static_cast<void*>(&fetched)), value_size) != -1; | |
comm.result.second = fetched; | |
//tell the main thread we are done looking for this one | |
comm.working = false; | |
lock.unlock(); | |
comm.condition.notify_one(); | |
} | |
} | |
template <class T> | |
void read_nodes_threaded(std::list<kyotocabinet::HashDB>& dbs, T count) { | |
//some threads for reading and some state for querying the pool | |
std::list<std::thread> pool; | |
std::list<inter_thread_comm<std::pair<bool, osm_node> > > communications; | |
std::atomic<bool> done(false); | |
T key; | |
for(auto& db : dbs) { | |
//make a thread communication object | |
communications.emplace_back(); | |
//start the thread | |
pool.emplace_back(::read<std::pair<bool, osm_node>, T>, std::ref(communications.back()), std::cref(done), std::ref(db), std::cref(key)); | |
} | |
//read the data | |
for(key = 0; key < count; ++key){ | |
//notify each thread to look for the value | |
for(auto& comm : communications){ | |
std::lock_guard<std::mutex> lock(comm.mutex); | |
comm.working = true; | |
comm.condition.notify_one(); | |
} | |
//wait for each to notify us that its finished | |
bool found = false; | |
osm_node node; | |
for(auto& comm : communications) { | |
std::unique_lock<std::mutex> lock(comm.mutex); | |
comm.condition.wait(lock, [&comm]{return comm.working != true;}); | |
if(comm.result.first){ | |
found = true; | |
node = comm.result.second; | |
break; | |
} | |
} | |
//complain if we didnt find it | |
if(!found) | |
throw std::runtime_error("Couldn't find: " + std::to_string(key)); | |
} | |
//we are done reading from the threads | |
done = true; | |
for(auto& comm : communications){ | |
std::lock_guard<std::mutex> lock(comm.mutex); | |
comm.condition.notify_one(); | |
} | |
for(auto& thread : pool) { | |
thread.join(); | |
} | |
} | |
template <class T> | |
void read_nodes(std::list<kyotocabinet::HashDB>& dbs, T count) { | |
//read the data | |
const auto key_size = sizeof(T); | |
const auto value_size = sizeof(osm_node); | |
for(T i = 0; i < count; ++i){ | |
//try to get it | |
bool found = false; | |
osm_node node; | |
for(auto& db : dbs){ | |
if(db.get(static_cast<const char*>(static_cast<void*>(&i)), key_size, static_cast<char*>(static_cast<void*>(&node)), value_size) != -1) { | |
found = true; | |
break; | |
} | |
} | |
if(!found) | |
throw std::runtime_error("Failed to read node for " + std::to_string(i)); | |
} | |
} | |
int main(int argc, char** argv){ | |
uint64_t count = 500000; | |
if(argc > 1) | |
count = static_cast<uint64_t>(std::stoul(argv[1])); | |
bool threaded = false; | |
if(argc > 2) | |
threaded = std::string(argv[2]) == "threaded"; | |
std::cout << "Working with " << count << " osm_nodes" << std::endl; | |
std::cout << "Writing nodes to file backed hash db" << std::endl; | |
auto start = std::chrono::system_clock::now(); | |
auto dbs = threaded ? write_nodes_threaded<uint64_t>(count) : write_nodes<uint64_t>(count); | |
auto elapsed = std::chrono::system_clock::now() - start; | |
std::cout << "Took " << std::to_string(std::chrono::duration_cast<std::chrono::seconds>(elapsed).count()) << " sec" << std::endl; | |
std::cout << "Reading nodes from file backed hash db" << std::endl; | |
start = std::chrono::system_clock::now(); | |
if(threaded) | |
read_nodes_threaded<uint64_t>(dbs, count); | |
else | |
read_nodes<uint64_t>(dbs, count); | |
elapsed = std::chrono::system_clock::now() - start; | |
std::cout << "Took " << std::to_string(std::chrono::duration_cast<std::chrono::seconds>(elapsed).count()) << " sec" << std::endl; | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment