Skip to content

Instantly share code, notes, and snippets.

@acmercyj
Forked from chenshuo/query_freq.cc
Created January 14, 2013 06:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save acmercyj/4528073 to your computer and use it in GitHub Desktop.
Save acmercyj/4528073 to your computer and use it in GitHub Desktop.
// answer to http://weibo.com/1915548291/z2UtyzuvQ
// see also http://www.cnblogs.com/baiyanhuang/archive/2012/11/11/2764914.html
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/noncopyable.hpp>
#include <boost/ptr_container/ptr_vector.hpp>
#include <fstream>
#include <iostream>
#include <unordered_map>
#ifdef STD_STRING
#warning "STD STRING"
#include <string>
using std::string;
#else
#include <ext/vstring.h>
typedef __gnu_cxx::__sso_string string;
#endif
typedef boost::function<void(const string& query, int64_t count)> Callback;
const size_t kMaxSize = 10 * 1000 * 1000;
void processOneInput(const char* file, const Callback& cb)
{
std::unordered_map<string, int64_t> queries;
std::ifstream in(file);
string query;
while (in && !in.eof())
{
queries.clear();
while (getline(in, query))
{
queries[query] += 1;
if (queries.size() > kMaxSize)
{
std::cout << " split\n";
break;
}
}
for (auto kv : queries)
{
cb(kv.first, kv.second);
}
}
}
class LocalSink : boost::noncopyable
{
public:
explicit LocalSink(int idx, int nbuckets)
{
char buf[256];
snprintf(buf, sizeof buf, "shard-%05d-of-%05d", idx, nbuckets);
out.open(buf);
}
void output(const string& query, int64_t count)
{
out << query << '\t' << count << '\n';
}
private:
std::ofstream out;
};
// class RemoteSink;
// template<typename SINK>
class Shuffler : boost::noncopyable
{
public:
explicit Shuffler(int nbuckets)
: buckets_(nbuckets)
{
for (int i = 0; i < nbuckets; ++i)
{
buckets_.push_back(new LocalSink(i, nbuckets));
}
assert(buckets_.size() == static_cast<size_t>(nbuckets));
}
void output(const string& query, int64_t count)
{
int idx = std::hash<string>()(query) % buckets_.size();
buckets_[idx].output(query, count);
}
protected:
boost::ptr_vector<LocalSink> buckets_;
};
void shuffle(int nbuckets, int argc, char* argv[])
{
Shuffler shuffler(nbuckets);
for (int i = 1; i < argc; ++i)
{
std::cout << " processing " << argv[i] << std::endl;
processOneInput(argv[i],
boost::bind(&Shuffler::output, &shuffler, _1, _2));
}
std::cout << "shuffling done" << std::endl;
}
// ======= reduce =======
std::unordered_map<string, int64_t> read_shard(int idx, int nbuckets)
{
std::unordered_map<string, int64_t> queries;
char buf[256];
snprintf(buf, sizeof buf, "shard-%05d-of-%05d", idx, nbuckets);
std::cout << " reading " << buf << std::endl;
{
std::ifstream in(buf);
string line;
while (getline(in, line))
{
size_t tab = line.find('\t');
if (tab != string::npos)
{
int64_t count = strtol(line.c_str() + tab, NULL, 10);
if (count > 0)
{
queries[line.substr(0, tab)] += count;
}
}
}
}
::unlink(buf);
return queries;
}
void reduce(const int nbuckets)
{
for (int i = 0; i < nbuckets; ++i)
{
std::unordered_map<string, int64_t> queries(read_shard(i, nbuckets));
// std::cout << " sorting " << std::endl;
std::vector<std::pair<int64_t, string>> counts;
for (auto entry : queries)
{
counts.push_back(make_pair(entry.second, entry.first));
}
std::sort(counts.begin(), counts.end());
char buf[256];
snprintf(buf, sizeof buf, "count-%05d-of-%05d", i, nbuckets);
std::ofstream out(buf);
std::cout << " writing " << buf << std::endl;
for (auto it = counts.rbegin(); it != counts.rend(); ++it)
{
out << it->first << '\t' << it->second << '\n';
}
}
std::cout << "reducing done" << std::endl;
}
// ======= merge =======
class Source
{
public:
explicit Source(std::ifstream* in)
: in_(in),
count_(0),
query_()
{
}
bool next()
{
string line;
if (getline(*in_, line))
{
size_t tab = line.find('\t');
if (tab != string::npos)
{
count_ = strtol(line.c_str(), NULL, 10);
if (count_ > 0)
{
query_ = line.substr(tab+1);
return true;
}
}
}
return false;
}
bool operator<(const Source& rhs) const
{
return count_ < rhs.count_;
}
void output(std::ostream& out)
{
out << count_ << '\t' << query_ << '\n';
}
private:
std::ifstream* in_;
int64_t count_;
string query_;
};
void merge(const int nbuckets)
{
boost::ptr_vector<std::ifstream> inputs;
std::vector<Source> keys;
for (int i = 0; i < nbuckets; ++i)
{
char buf[256];
snprintf(buf, sizeof buf, "count-%05d-of-%05d", i, nbuckets);
inputs.push_back(new std::ifstream(buf));
Source rec(&inputs.back());
if (rec.next())
{
keys.push_back(rec);
}
::unlink(buf);
}
std::ofstream out("output");
std::make_heap(keys.begin(), keys.end());
while (!keys.empty())
{
std::pop_heap(keys.begin(), keys.end());
keys.back().output(out);
// out.writeRecord(keys.back().data);
if (keys.back().next())
{
std::push_heap(keys.begin(), keys.end());
}
else
{
keys.pop_back();
}
}
std::cout << "merging done\n";
}
int main(int argc, char* argv[])
{
int nbuckets = 10;
shuffle(nbuckets, argc, argv);
reduce(nbuckets);
merge(nbuckets);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment