Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@JensRantil
Last active May 8, 2019 08:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JensRantil/e5c2b81fe7b690352ac3fdea5bb9f876 to your computer and use it in GitHub Desktop.
Save JensRantil/e5c2b81fe7b690352ac3fdea5bb9f876 to your computer and use it in GitHub Desktop.
TopK implementation with filtering from dictionary.
module github.com/JensRantil/topk-challenge
require (
github.com/funkygao/golib v0.0.0-20180314131852-90d4905c1961 // indirect
github.com/google/pprof v0.0.0-20190502144155-8358a9778bd1 // indirect
github.com/hashicorp/go-multierror v1.0.0
github.com/pkg/errors v0.8.1
golang.org/x/arch v0.0.0-20190312162104-788fe5ffcd8c // indirect
golang.org/x/sync v0.0.0-20190423024810-112230192c58
)
package main
import (
"bufio"
"bytes"
"container/heap"
"fmt"
"io"
"log"
"os"
"runtime"
"strconv"
"strings"
"sync"
"github.com/funkygao/golib/hashmap"
multierror "github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
func main() {
if len(os.Args) != 4 {
log.Fatalln("usage:", os.Args[0], "<dictionary> <input> <n>")
}
run(os.Args[1], os.Args[2], os.Args[3])
}
func run(dictFile, inputFile, topKArg string) {
nTopK, err := strconv.Atoi(topKArg)
if err != nil {
log.Fatalln("could not parse <n>:", err)
}
log.Println("populating word finder...")
wf, err := newWordFinder(dictFile)
if err != nil {
log.Fatalln("could not instantiate word finder:", err)
}
log.Println("done populating word finder.")
log.Println("starting to build dictionary...")
dict, err := buildDict(wf, inputFile)
if err != nil {
log.Fatalln("could not build dictionary:", err)
}
log.Println("done building dictionary.")
log.Println("starting to find most common...")
words := findMostCommon(dict, nTopK)
log.Println("done finding most common. They are:")
revWords := make([]string, 0, words.Len())
for words.Len() > 0 {
revWords = append(revWords, heap.Pop(words).(string))
}
// Print reversed.
for i := len(revWords) - 1; i >= 0; i-- {
w := revWords[i]
fmt.Printf(" * %s (%d)\n", w, dict[w])
}
}
type wordFinder struct {
m *hashmap.HashMap
}
func (w *wordFinder) Matches(b []byte) bool {
_, ok := w.m[b]
return ok
}
func newWordFinder(path string) (*wordFinder, error) {
words, err := readAllLines(path)
if err != nil {
return nil, err
}
m := hashmap.New()
for _, w := range words {
m[[]byte(w)] = struct{}{}
}
return &wordFinder{m}, err
}
func readAllLines(path string) (lines []string, err error) {
var f *os.File
f, err = os.Open(path)
if err != nil {
return
}
defer f.Close()
s := bufio.NewScanner(bufio.NewReader(f))
for s.Scan() {
lines = append(lines, s.Text())
}
err = s.Err()
return
}
var bufPool sync.Pool = sync.Pool{
New: func() interface{} {
b := bytes.NewBuffer(make([]byte, chunkSize+300))
b.Reset()
return b
},
}
func buildDict(wf *wordFinder, path string) (map[string]int, error) {
var r io.Reader
if path == "-" {
r = os.Stdin
} else {
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer f.Close()
r = f
}
var eg errgroup.Group
chunks := make(chan *bytes.Buffer)
eg.Go(func() error {
// TODO: Here lies a bug if there are UTF-8 characters. We might break the chunks.
defer close(chunks)
var remainder bytes.Buffer
for {
//ntoread := chunkSize + remainder.Len()
b := bufPool.Get().(*bytes.Buffer)
//b.Reset() // without this it will have content
b.ReadFrom(&remainder)
n, err := io.Copy(b, &io.LimitedReader{r, chunkSize})
if n == chunkSize {
bf := bufio.NewReader(r)
line, err2 := bf.ReadBytes(' ') // stop by a word delimiter
b.Write(line)
remaining, _ := bf.Peek(bf.Buffered())
remainder.Write(remaining)
chunks <- b
switch err2 {
case io.EOF:
return nil
case nil:
default:
return err2
}
} else {
chunks <- b
switch err {
case io.EOF:
return nil
default:
return err
}
}
}
return nil
})
maps := make(chan map[string]int)
for i := 0; i < runtime.NumCPU(); i++ {
eg.Go(func() error {
return errors.WithMessage(processChunk(wf, chunks, maps), "processChunk")
})
}
var eg2 errgroup.Group
globalmap := make(map[string]int)
eg2.Go(func() error {
for m := range maps {
mergeMap(globalmap, m)
}
return nil
})
err := eg.Wait()
close(maps) // Not more maps will be emitted
merr := multierror.Append(err, eg2.Wait())
var finalerr error
if merr.Len() > 0 {
// multierror.Append(...) never returns nil for input nil.
finalerr = merr
}
resmap := make(map[string]int, len(globalmap))
for k, v := range globalmap {
resmap[strings.ToLower(k)] += v
}
return resmap, finalerr
}
const chunkSize = 100 * 1024 * 1024
func mergeMap(to, source map[string]int) {
for k, v := range source {
to[k] += v
}
}
func processChunk(wf *wordFinder, in <-chan *bytes.Buffer, out chan<- map[string]int) error {
var err error
for r := range in {
m, cerr := constructCountMap(wf, r)
r.Reset()
bufPool.Put(r)
if cerr != nil {
err = cerr
} else {
out <- m
}
}
return err
}
func constructCountMap(wf *wordFinder, r io.Reader) (map[string]int, error) {
s := bufio.NewScanner(bufio.NewReader(r))
s.Split(bufio.ScanWords)
// The default buffer is not large enough.
s.Buffer(make([]byte, 0, 20*1024*1024), 0)
res := make(map[string]int)
for s.Scan() {
toMatch := s.Text()
if wf.Matches(toMatch) {
res[toMatch]++
}
}
return res, s.Err()
}
func findMostCommon(m map[string]int, nTopK int) *ByCommonality {
h := &ByCommonality{counts: m}
for s, _ := range m {
heap.Push(h, s)
if h.Len() > nTopK {
heap.Pop(h)
}
}
return h
}
type ByCommonality struct {
counts map[string]int
items []string
}
func (c *ByCommonality) Len() int {
return len(c.items)
}
func (c *ByCommonality) Less(i, j int) bool {
ii := c.items[i]
ij := c.items[j]
ci := c.counts[ii]
cj := c.counts[ij]
if ci < cj {
return true
} else if ci == cj {
return strings.Compare(ii, ij) > 0
} else {
return false
}
}
func (c *ByCommonality) Pop() interface{} {
old := c.items
n := len(old)
x := old[n-1]
c.items = old[0 : n-1]
return x
}
func (c *ByCommonality) Push(x interface{}) {
c.items = append(c.items, x.(string))
}
func (c *ByCommonality) Swap(i, j int) {
c.items[i], c.items[j] = c.items[j], c.items[i]
}
package main
import "testing"
func BenchmarkRun(b *testing.B) {
for n := 0; n < b.N; n++ {
run("words_alpha.txt", "14m_hn_comments_sorted.txt", "1000")
}
}
#!/bin/sh
pv 14m_hn_comments_sorted.txt | time go run main.go words_alpha.txt - 1000
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment