Skip to content

Instantly share code, notes, and snippets.

@shanemhansen
Created December 9, 2016 22:46
Show Gist options
  • Save shanemhansen/879b6c87c3b8edd5dc8258039fd58e14 to your computer and use it in GitHub Desktop.
Save shanemhansen/879b6c87c3b8edd5dc8258039fd58e14 to your computer and use it in GitHub Desktop.
#include <iostream>
#include <fstream>
#include "concurrentqueue/blockingconcurrentqueue.h"
#include "glob.h"
#include <boost/iostreams/filtering_stream.hpp>
#include <boost/iostreams/copy.hpp>
#include <boost/iostreams/filter/zlib.hpp>
using namespace std;
void runQuery1(const string&);
int main() {
string data_dir = "out";
runQuery1(data_dir);
}
inline vector<string> glob(const string& pat){
using namespace std;
glob_t glob_result;
glob(pat.c_str(),GLOB_TILDE,NULL,&glob_result);
vector<string> ret;
for(unsigned int i=0;i<glob_result.gl_pathc;++i){
ret.push_back(string(glob_result.gl_pathv[i]));
}
globfree(&glob_result);
return ret;
}
struct ResultRow {
string URL;
string Ranking;
};
void runQuery1(const string& data_dir) {
auto path = data_dir + "/rankings/*deflate";
auto fnames = glob(path);
std::atomic<int> workersWorking(fnames.size());
moodycamel::BlockingConcurrentQueue<ResultRow> outbuf(1024);
auto t = std::thread([&] {
std::ios_base::sync_with_stdio(false);
ResultRow item;
while(workersWorking.load()!=0) {
outbuf.wait_dequeue(item);
cout << item.URL << ',' << item.Ranking << '\n';
}
});
vector<thread> wg(0);
for(auto fname: fnames) {
wg.push_back(std::thread([&](string fname) {
ifstream f(fname, ios_base::in | ios_base::binary);
boost::iostreams::filtering_istream zrdr;
zrdr.push(boost::iostreams::zlib_decompressor());
zrdr.push(f);
string line;
int commandIdx = 0;
int rankingIdx = 0;
ResultRow item;
while(getline(zrdr, line)) {
if((commandIdx = line.find(',')) == -1) {
continue;
}
item.URL = line.substr(0, commandIdx-1);
if((rankingIdx = line.find(commandIdx, ',')) == -1) {
continue;
}
item.Ranking= line.substr(commandIdx+1, rankingIdx-commandIdx);
int i=0;
for(auto d: item.Ranking) {
i*=10;
i+=(d-'0');
}
if (i<70) {
continue;
}
outbuf.enqueue(item);
}
}, fname));
}
for(auto& t: wg) {
cout << "join\n";
t.join();
}
t.join();
cout.flush();
}
package main
import (
"bufio"
"bytes"
"flag"
"github.com/klauspost/compress/zlib"
"log"
"os"
"path/filepath"
"sync"
)
var datadir = flag.String("data-dir", "./out", "directory containing dataset")
var query = flag.Int("query", 1, "which benchmark to run")
// this runs one of the big data benchmarks
func main() {
flag.Parse()
if *datadir == "" {
log.Fatal("data dir required")
}
switch *query {
case 1:
runQuery1()
}
}
func runQuery1() error {
fnames, err := filepath.Glob(filepath.Join(*datadir, "rankings", "*deflate"))
if err != nil {
return err
}
outbuf := make(chan [2][]byte, 1024)
var wg sync.WaitGroup
go func() {
out := bufio.NewWriter(os.Stdout)
defer out.Flush()
for buf := range outbuf {
out.Write(buf[0])
out.WriteByte(',')
out.Write(buf[1])
out.WriteByte('\n')
}
}()
for _, fname := range fnames {
log.Println(fname)
wg.Add(1)
go func(fname string) {
defer wg.Done()
f, err := os.Open(fname)
if err != nil {
log.Println(err)
return
}
defer f.Close()
zrdr, err := zlib.NewReader(f)
if err != nil {
log.Println(err)
return
}
defer zrdr.Close()
scanner := bufio.NewScanner(zrdr)
var commandIdx int
var currentBuf []byte
var pair [2][]byte
for scanner.Scan() {
currentBuf = scanner.Bytes()
if commandIdx = bytes.IndexByte(currentBuf, ','); commandIdx == -1 {
log.Println("bad data")
break
}
pair[0] = makecopy(currentBuf[:commandIdx-1])
currentBuf = currentBuf[commandIdx+1:]
if commandIdx = bytes.IndexByte(currentBuf, ','); commandIdx == -1 {
log.Println("bad data")
break
}
i := 0
tmp := makecopy(currentBuf[:commandIdx-1])
for _, d := range tmp {
i *= 10
i += int(d - '0')
}
if i < 70 {
continue
}
pair[1] = tmp
outbuf <- pair
}
}(fname)
}
wg.Wait()
close(outbuf)
return nil
}
func makecopy(buf []byte) []byte {
out := make([]byte, len(buf))
copy(out, buf)
return out
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment