Skip to content

Instantly share code, notes, and snippets.

@miku
Last active July 1, 2016 11:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save miku/0687690c68e6104893a7c328d2900ca8 to your computer and use it in GitHub Desktop.
Save miku/0687690c68e6104893a7c328d2900ca8 to your computer and use it in GitHub Desktop.
ByteBatch example, parallel wc -l
// Parallel grep -F.
//
// $ cat grep.go | go run grep.go
// ss := "Num"
// p.NumWorkers = 4
//
// Performance data point:
//
// $ time cat 1G.txt | grep -F "Number" | wc -l
// 2558
//
// real 0m50.532s
// user 0m50.288s
// sys 0m0.821s
//
// $ time cat 1G.txt | LC_ALL=C grep -F "Number" | wc -l
// 2558
//
// real 0m28.364s
// user 0m28.172s
// sys 0m0.749s
//
// $ time cat 1G.txt | go run grep.go | wc -l
// 2558
//
// real 0m3.004s
// user 0m3.032s
// sys 0m2.073s
package main
import (
"bytes"
"flag"
"log"
"os"
"runtime"
"strings"
"github.com/miku/span/bytebatch"
)
func main() {
size := flag.Int("b", 20000, "batch size")
numWorkers := flag.Int("w", runtime.NumCPU(), "number of workers")
flag.Parse()
ss := strings.Join(flag.Args(), " ")
var nothing []byte
p := bytebatch.NewLineProcessor(os.Stdin, os.Stdout, func(b []byte) ([]byte, error) {
if bytes.Contains(b, []byte(ss)) {
return b, nil
}
return nothing, nil
})
p.NumWorkers = *numWorkers
p.BatchSize = *size
if err := p.Run(); err != nil {
log.Fatal(err)
}
}
// Parallel line count.
//
// $ cat linecount.go | go run linecount.go
// 33
package main
import (
"fmt"
"io/ioutil"
"log"
"os"
"sync/atomic"
"github.com/miku/span/bytebatch"
)
func main() {
var lines uint64 = 0
p := bytebatch.NewLineProcessor(os.Stdin, ioutil.Discard, func(b []byte) ([]byte, error) {
atomic.AddUint64(&lines, 1)
return b, nil
})
p.NumWorkers = 4
p.BatchSize = 20000
if err := p.Run(); err != nil {
log.Fatal(err)
}
fmt.Println(lines)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment