|
package main |
|
|
|
import ( |
|
"bufio" |
|
"bytes" |
|
"container/heap" |
|
"fmt" |
|
"io" |
|
"log" |
|
"os" |
|
"runtime" |
|
"strconv" |
|
"strings" |
|
"sync" |
|
|
|
"github.com/funkygao/golib/hashmap" |
|
|
|
multierror "github.com/hashicorp/go-multierror" |
|
"github.com/pkg/errors" |
|
"golang.org/x/sync/errgroup" |
|
) |
|
|
|
func main() { |
|
if len(os.Args) != 4 { |
|
log.Fatalln("usage:", os.Args[0], "<dictionary> <input> <n>") |
|
} |
|
run(os.Args[1], os.Args[2], os.Args[3]) |
|
} |
|
|
|
func run(dictFile, inputFile, topKArg string) { |
|
nTopK, err := strconv.Atoi(topKArg) |
|
if err != nil { |
|
log.Fatalln("could not parse <n>:", err) |
|
} |
|
|
|
log.Println("populating word finder...") |
|
wf, err := newWordFinder(dictFile) |
|
if err != nil { |
|
log.Fatalln("could not instantiate word finder:", err) |
|
} |
|
log.Println("done populating word finder.") |
|
|
|
log.Println("starting to build dictionary...") |
|
dict, err := buildDict(wf, inputFile) |
|
if err != nil { |
|
log.Fatalln("could not build dictionary:", err) |
|
} |
|
log.Println("done building dictionary.") |
|
|
|
log.Println("starting to find most common...") |
|
words := findMostCommon(dict, nTopK) |
|
log.Println("done finding most common. They are:") |
|
|
|
revWords := make([]string, 0, words.Len()) |
|
for words.Len() > 0 { |
|
revWords = append(revWords, heap.Pop(words).(string)) |
|
} |
|
// Print reversed. |
|
for i := len(revWords) - 1; i >= 0; i-- { |
|
w := revWords[i] |
|
fmt.Printf(" * %s (%d)\n", w, dict[w]) |
|
} |
|
} |
|
|
|
type wordFinder struct { |
|
m *hashmap.HashMap |
|
} |
|
|
|
func (w *wordFinder) Matches(b []byte) bool { |
|
_, ok := w.m[b] |
|
return ok |
|
} |
|
|
|
func newWordFinder(path string) (*wordFinder, error) { |
|
words, err := readAllLines(path) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
m := hashmap.New() |
|
for _, w := range words { |
|
m[[]byte(w)] = struct{}{} |
|
} |
|
|
|
return &wordFinder{m}, err |
|
} |
|
|
|
func readAllLines(path string) (lines []string, err error) { |
|
var f *os.File |
|
f, err = os.Open(path) |
|
if err != nil { |
|
return |
|
} |
|
defer f.Close() |
|
|
|
s := bufio.NewScanner(bufio.NewReader(f)) |
|
for s.Scan() { |
|
lines = append(lines, s.Text()) |
|
} |
|
err = s.Err() |
|
return |
|
} |
|
|
|
var bufPool sync.Pool = sync.Pool{ |
|
New: func() interface{} { |
|
b := bytes.NewBuffer(make([]byte, chunkSize+300)) |
|
b.Reset() |
|
return b |
|
}, |
|
} |
|
|
|
func buildDict(wf *wordFinder, path string) (map[string]int, error) { |
|
var r io.Reader |
|
if path == "-" { |
|
r = os.Stdin |
|
} else { |
|
f, err := os.Open(path) |
|
if err != nil { |
|
return nil, err |
|
} |
|
defer f.Close() |
|
r = f |
|
} |
|
|
|
var eg errgroup.Group |
|
|
|
chunks := make(chan *bytes.Buffer) |
|
eg.Go(func() error { |
|
// TODO: Here lies a bug if there are UTF-8 characters. We might break the chunks. |
|
defer close(chunks) |
|
var remainder bytes.Buffer |
|
for { |
|
//ntoread := chunkSize + remainder.Len() |
|
b := bufPool.Get().(*bytes.Buffer) |
|
//b.Reset() // without this it will have content |
|
b.ReadFrom(&remainder) |
|
n, err := io.Copy(b, &io.LimitedReader{r, chunkSize}) |
|
if n == chunkSize { |
|
bf := bufio.NewReader(r) |
|
line, err2 := bf.ReadBytes(' ') // stop by a word delimiter |
|
b.Write(line) |
|
|
|
remaining, _ := bf.Peek(bf.Buffered()) |
|
remainder.Write(remaining) |
|
|
|
chunks <- b |
|
|
|
switch err2 { |
|
case io.EOF: |
|
return nil |
|
case nil: |
|
default: |
|
return err2 |
|
} |
|
} else { |
|
chunks <- b |
|
switch err { |
|
case io.EOF: |
|
return nil |
|
default: |
|
return err |
|
} |
|
} |
|
} |
|
return nil |
|
}) |
|
|
|
maps := make(chan map[string]int) |
|
for i := 0; i < runtime.NumCPU(); i++ { |
|
eg.Go(func() error { |
|
return errors.WithMessage(processChunk(wf, chunks, maps), "processChunk") |
|
}) |
|
} |
|
|
|
var eg2 errgroup.Group |
|
globalmap := make(map[string]int) |
|
eg2.Go(func() error { |
|
for m := range maps { |
|
mergeMap(globalmap, m) |
|
} |
|
return nil |
|
}) |
|
|
|
err := eg.Wait() |
|
close(maps) // Not more maps will be emitted |
|
|
|
merr := multierror.Append(err, eg2.Wait()) |
|
var finalerr error |
|
if merr.Len() > 0 { |
|
// multierror.Append(...) never returns nil for input nil. |
|
finalerr = merr |
|
} |
|
|
|
resmap := make(map[string]int, len(globalmap)) |
|
for k, v := range globalmap { |
|
resmap[strings.ToLower(k)] += v |
|
} |
|
|
|
return resmap, finalerr |
|
} |
|
|
|
const chunkSize = 100 * 1024 * 1024 |
|
|
|
func mergeMap(to, source map[string]int) { |
|
for k, v := range source { |
|
to[k] += v |
|
} |
|
} |
|
|
|
func processChunk(wf *wordFinder, in <-chan *bytes.Buffer, out chan<- map[string]int) error { |
|
var err error |
|
for r := range in { |
|
m, cerr := constructCountMap(wf, r) |
|
|
|
r.Reset() |
|
bufPool.Put(r) |
|
|
|
if cerr != nil { |
|
err = cerr |
|
} else { |
|
out <- m |
|
} |
|
} |
|
return err |
|
} |
|
|
|
func constructCountMap(wf *wordFinder, r io.Reader) (map[string]int, error) { |
|
s := bufio.NewScanner(bufio.NewReader(r)) |
|
s.Split(bufio.ScanWords) |
|
|
|
// The default buffer is not large enough. |
|
s.Buffer(make([]byte, 0, 20*1024*1024), 0) |
|
|
|
res := make(map[string]int) |
|
for s.Scan() { |
|
toMatch := s.Text() |
|
if wf.Matches(toMatch) { |
|
res[toMatch]++ |
|
} |
|
} |
|
|
|
return res, s.Err() |
|
} |
|
|
|
func findMostCommon(m map[string]int, nTopK int) *ByCommonality { |
|
h := &ByCommonality{counts: m} |
|
for s, _ := range m { |
|
heap.Push(h, s) |
|
if h.Len() > nTopK { |
|
heap.Pop(h) |
|
} |
|
} |
|
|
|
return h |
|
} |
|
|
|
type ByCommonality struct { |
|
counts map[string]int |
|
items []string |
|
} |
|
|
|
func (c *ByCommonality) Len() int { |
|
return len(c.items) |
|
} |
|
|
|
func (c *ByCommonality) Less(i, j int) bool { |
|
ii := c.items[i] |
|
ij := c.items[j] |
|
ci := c.counts[ii] |
|
cj := c.counts[ij] |
|
if ci < cj { |
|
return true |
|
} else if ci == cj { |
|
return strings.Compare(ii, ij) > 0 |
|
} else { |
|
return false |
|
} |
|
} |
|
|
|
func (c *ByCommonality) Pop() interface{} { |
|
old := c.items |
|
n := len(old) |
|
x := old[n-1] |
|
c.items = old[0 : n-1] |
|
return x |
|
} |
|
|
|
func (c *ByCommonality) Push(x interface{}) { |
|
c.items = append(c.items, x.(string)) |
|
} |
|
|
|
func (c *ByCommonality) Swap(i, j int) { |
|
c.items[i], c.items[j] = c.items[j], c.items[i] |
|
} |