Skip to content

Instantly share code, notes, and snippets.

@anguyen8
Forked from erikfrey/merge_ldb.cpp
Last active August 29, 2015 14:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anguyen8/b9884b2fa121e7afc01b to your computer and use it in GitHub Desktop.
Save anguyen8/b9884b2fa121e7afc01b to your computer and use it in GitHub Desktop.
/*
* given a list of input leveldb's, merge them into a single output leveldb
*
*/
#include <iostream>
#include <algorithm>
#include <boost/program_options.hpp>
#include <boost/cstdint.hpp>
#include <leveldb/db.h>
#include <leveldb/write_batch.h>
using namespace std;
using namespace boost;
namespace po = boost::program_options;
struct leveldb_iterator // leveldb and iterator
{
leveldb_iterator(const string& path)
: db_(NULL), it_(NULL)
{
leveldb::Options options;
options.create_if_missing = false;
leveldb::Status s = leveldb::DB::Open(options, path, &db_);
if (!s.ok())
throw runtime_error(s.ToString());
it_ = db_->NewIterator(leveldb::ReadOptions());
it_->SeekToFirst();
}
bool next()
{
it_->Next();
if (!it_->Valid())
{
delete it_;
delete db_;
return false;
}
return true;
}
bool operator < (const leveldb_iterator & rhs) const
{
return it_->key().compare(rhs.it_->key()) < 0;
}
leveldb::DB * db_;
leveldb::Iterator* it_;
};
void run(vector<leveldb_iterator> & leveldb_iterators, leveldb::DB * out)
{
leveldb::WriteOptions w;
leveldb::WriteBatch batch;
for (int64_t batch_size = 1; !leveldb_iterators.empty(); ++batch_size) {
if (batch_size % 1000 == 0){
// write previous batch out
out->Write(w, &batch);
batch.Clear();
}
// write to batch
batch.Put(leveldb_iterators.front().it_->key(), leveldb_iterators.front().it_->value());
pop_heap(leveldb_iterators.begin(), leveldb_iterators.end());
if (leveldb_iterators.back().next())
push_heap(leveldb_iterators.begin(), leveldb_iterators.end());
else
leveldb_iterators.pop_back();
}
// write last batch
out->Write(w, &batch);
}
int main(int argc, char * argv[])
{
vector<string> in_paths;
string out_path;
po::options_description desc("options");
desc.add_options()
("help,h", "produce help message")
("input,i", po::value< vector<string> >(&in_paths)->composing(), "leveldb input")
("output,o", po::value<string>(&out_path)->required(), "leveldb output")
;
po::variables_map vm;
try
{
po::store(po::parse_command_line(argc, argv, desc), vm);
po::notify(vm);
}
catch(std::exception & e)
{
cerr << e.what() << "\n";
return 1;
}
if (vm.count("help"))
{
cerr << "Usage: merge_ldb -i ldb1 -i ldb2 -o ldb_out" << endl;
cerr << desc;
return 0;
}
vector<leveldb_iterator> leveldb_iterators;
for (vector<string>::const_iterator it = in_paths.begin(); it != in_paths.end(); ++it)
leveldb_iterators.push_back(leveldb_iterator(*it));
make_heap(leveldb_iterators.begin(), leveldb_iterators.end());
leveldb::DB * out;
leveldb::Options options;
options.create_if_missing = true;
leveldb::Status s = leveldb::DB::Open(options, out_path, &out);
if (!s.ok())
throw runtime_error(s.ToString());
run(leveldb_iterators, out);
delete out;
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment