Last active
May 8, 2019 08:10
-
-
Save JensRantil/e5c2b81fe7b690352ac3fdea5bb9f876 to your computer and use it in GitHub Desktop.
TopK implementation with filtering from dictionary.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module github.com/JensRantil/topk-challenge | |
require ( | |
github.com/funkygao/golib v0.0.0-20180314131852-90d4905c1961 // indirect | |
github.com/google/pprof v0.0.0-20190502144155-8358a9778bd1 // indirect | |
github.com/hashicorp/go-multierror v1.0.0 | |
github.com/pkg/errors v0.8.1 | |
golang.org/x/arch v0.0.0-20190312162104-788fe5ffcd8c // indirect | |
golang.org/x/sync v0.0.0-20190423024810-112230192c58 | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"bytes" | |
"container/heap" | |
"fmt" | |
"io" | |
"log" | |
"os" | |
"runtime" | |
"strconv" | |
"strings" | |
"sync" | |
"github.com/funkygao/golib/hashmap" | |
multierror "github.com/hashicorp/go-multierror" | |
"github.com/pkg/errors" | |
"golang.org/x/sync/errgroup" | |
) | |
func main() { | |
if len(os.Args) != 4 { | |
log.Fatalln("usage:", os.Args[0], "<dictionary> <input> <n>") | |
} | |
run(os.Args[1], os.Args[2], os.Args[3]) | |
} | |
func run(dictFile, inputFile, topKArg string) { | |
nTopK, err := strconv.Atoi(topKArg) | |
if err != nil { | |
log.Fatalln("could not parse <n>:", err) | |
} | |
log.Println("populating word finder...") | |
wf, err := newWordFinder(dictFile) | |
if err != nil { | |
log.Fatalln("could not instantiate word finder:", err) | |
} | |
log.Println("done populating word finder.") | |
log.Println("starting to build dictionary...") | |
dict, err := buildDict(wf, inputFile) | |
if err != nil { | |
log.Fatalln("could not build dictionary:", err) | |
} | |
log.Println("done building dictionary.") | |
log.Println("starting to find most common...") | |
words := findMostCommon(dict, nTopK) | |
log.Println("done finding most common. They are:") | |
revWords := make([]string, 0, words.Len()) | |
for words.Len() > 0 { | |
revWords = append(revWords, heap.Pop(words).(string)) | |
} | |
// Print reversed. | |
for i := len(revWords) - 1; i >= 0; i-- { | |
w := revWords[i] | |
fmt.Printf(" * %s (%d)\n", w, dict[w]) | |
} | |
} | |
type wordFinder struct { | |
m *hashmap.HashMap | |
} | |
func (w *wordFinder) Matches(b []byte) bool { | |
_, ok := w.m[b] | |
return ok | |
} | |
func newWordFinder(path string) (*wordFinder, error) { | |
words, err := readAllLines(path) | |
if err != nil { | |
return nil, err | |
} | |
m := hashmap.New() | |
for _, w := range words { | |
m[[]byte(w)] = struct{}{} | |
} | |
return &wordFinder{m}, err | |
} | |
func readAllLines(path string) (lines []string, err error) { | |
var f *os.File | |
f, err = os.Open(path) | |
if err != nil { | |
return | |
} | |
defer f.Close() | |
s := bufio.NewScanner(bufio.NewReader(f)) | |
for s.Scan() { | |
lines = append(lines, s.Text()) | |
} | |
err = s.Err() | |
return | |
} | |
var bufPool sync.Pool = sync.Pool{ | |
New: func() interface{} { | |
b := bytes.NewBuffer(make([]byte, chunkSize+300)) | |
b.Reset() | |
return b | |
}, | |
} | |
func buildDict(wf *wordFinder, path string) (map[string]int, error) { | |
var r io.Reader | |
if path == "-" { | |
r = os.Stdin | |
} else { | |
f, err := os.Open(path) | |
if err != nil { | |
return nil, err | |
} | |
defer f.Close() | |
r = f | |
} | |
var eg errgroup.Group | |
chunks := make(chan *bytes.Buffer) | |
eg.Go(func() error { | |
// TODO: Here lies a bug if there are UTF-8 characters. We might break the chunks. | |
defer close(chunks) | |
var remainder bytes.Buffer | |
for { | |
//ntoread := chunkSize + remainder.Len() | |
b := bufPool.Get().(*bytes.Buffer) | |
//b.Reset() // without this it will have content | |
b.ReadFrom(&remainder) | |
n, err := io.Copy(b, &io.LimitedReader{r, chunkSize}) | |
if n == chunkSize { | |
bf := bufio.NewReader(r) | |
line, err2 := bf.ReadBytes(' ') // stop by a word delimiter | |
b.Write(line) | |
remaining, _ := bf.Peek(bf.Buffered()) | |
remainder.Write(remaining) | |
chunks <- b | |
switch err2 { | |
case io.EOF: | |
return nil | |
case nil: | |
default: | |
return err2 | |
} | |
} else { | |
chunks <- b | |
switch err { | |
case io.EOF: | |
return nil | |
default: | |
return err | |
} | |
} | |
} | |
return nil | |
}) | |
maps := make(chan map[string]int) | |
for i := 0; i < runtime.NumCPU(); i++ { | |
eg.Go(func() error { | |
return errors.WithMessage(processChunk(wf, chunks, maps), "processChunk") | |
}) | |
} | |
var eg2 errgroup.Group | |
globalmap := make(map[string]int) | |
eg2.Go(func() error { | |
for m := range maps { | |
mergeMap(globalmap, m) | |
} | |
return nil | |
}) | |
err := eg.Wait() | |
close(maps) // Not more maps will be emitted | |
merr := multierror.Append(err, eg2.Wait()) | |
var finalerr error | |
if merr.Len() > 0 { | |
// multierror.Append(...) never returns nil for input nil. | |
finalerr = merr | |
} | |
resmap := make(map[string]int, len(globalmap)) | |
for k, v := range globalmap { | |
resmap[strings.ToLower(k)] += v | |
} | |
return resmap, finalerr | |
} | |
const chunkSize = 100 * 1024 * 1024 | |
func mergeMap(to, source map[string]int) { | |
for k, v := range source { | |
to[k] += v | |
} | |
} | |
func processChunk(wf *wordFinder, in <-chan *bytes.Buffer, out chan<- map[string]int) error { | |
var err error | |
for r := range in { | |
m, cerr := constructCountMap(wf, r) | |
r.Reset() | |
bufPool.Put(r) | |
if cerr != nil { | |
err = cerr | |
} else { | |
out <- m | |
} | |
} | |
return err | |
} | |
func constructCountMap(wf *wordFinder, r io.Reader) (map[string]int, error) { | |
s := bufio.NewScanner(bufio.NewReader(r)) | |
s.Split(bufio.ScanWords) | |
// The default buffer is not large enough. | |
s.Buffer(make([]byte, 0, 20*1024*1024), 0) | |
res := make(map[string]int) | |
for s.Scan() { | |
toMatch := s.Text() | |
if wf.Matches(toMatch) { | |
res[toMatch]++ | |
} | |
} | |
return res, s.Err() | |
} | |
func findMostCommon(m map[string]int, nTopK int) *ByCommonality { | |
h := &ByCommonality{counts: m} | |
for s, _ := range m { | |
heap.Push(h, s) | |
if h.Len() > nTopK { | |
heap.Pop(h) | |
} | |
} | |
return h | |
} | |
type ByCommonality struct { | |
counts map[string]int | |
items []string | |
} | |
func (c *ByCommonality) Len() int { | |
return len(c.items) | |
} | |
func (c *ByCommonality) Less(i, j int) bool { | |
ii := c.items[i] | |
ij := c.items[j] | |
ci := c.counts[ii] | |
cj := c.counts[ij] | |
if ci < cj { | |
return true | |
} else if ci == cj { | |
return strings.Compare(ii, ij) > 0 | |
} else { | |
return false | |
} | |
} | |
func (c *ByCommonality) Pop() interface{} { | |
old := c.items | |
n := len(old) | |
x := old[n-1] | |
c.items = old[0 : n-1] | |
return x | |
} | |
func (c *ByCommonality) Push(x interface{}) { | |
c.items = append(c.items, x.(string)) | |
} | |
func (c *ByCommonality) Swap(i, j int) { | |
c.items[i], c.items[j] = c.items[j], c.items[i] | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import "testing" | |
func BenchmarkRun(b *testing.B) { | |
for n := 0; n < b.N; n++ { | |
run("words_alpha.txt", "14m_hn_comments_sorted.txt", "1000") | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/sh | |
pv 14m_hn_comments_sorted.txt | time go run main.go words_alpha.txt - 1000 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment