Skip to content

Instantly share code, notes, and snippets.

@billhathaway
Last active September 16, 2016 14:04
Show Gist options
  • Save billhathaway/303c72387007191823f8f3a57bd03b07 to your computer and use it in GitHub Desktop.
Save billhathaway/303c72387007191823f8f3a57bd03b07 to your computer and use it in GitHub Desktop.
elvis added stream example with buffer and fixed bug in case of find pattern split across buffers
Thanks to Tyler Burnell, Bill Kennedy and others who had written all the boilerplate I copied in main.go and main_test.go
Note: the benchmark data is horribly unrealistic (10M input of either no matches or all matches), but I think the model of testing MB/sec throughput of large input sets is the right one for the problem statement. I welcome additions where we make the test data more realistic. Using a very small data set of a few hundred bytes doesn't seem like a good test
for stream processing algorithm performance.
go test -run none -bench . -benchtime 3s -benchmem
testing: warning: no tests to run
PASS
BenchmarkProcessByteUnmatched-8 30 143059391 ns/op 69.90 MB/s 1 B/op 1 allocs/op
BenchmarkProcessByteMatched-8 30 120619527 ns/op 82.91 MB/s 1 B/op 1 allocs/op
BenchmarkProcessBuffer128Unmatched-8 200 17943013 ns/op 557.32 MB/s 128 B/op 1 allocs/op
BenchmarkProcessBuffer128Matched-8 100 34317933 ns/op 291.39 MB/s 128 B/op 1 allocs/op
BenchmarkProcessBuffer8192Unmatched-8 300 16336677 ns/op 612.12 MB/s 8192 B/op 1 allocs/op
BenchmarkProcessBuffer8192Matched-8 100 32197112 ns/op 310.59 MB/s 8192 B/op 1 allocs/op
ok github.com/billhathaway/streamprocessing/algo1 27.133s
package main
import (
"bytes"
"errors"
"fmt"
"io"
)
var (
data = []struct {
input []byte
output []byte
}{
{[]byte("abc"), []byte("abc")},
{[]byte("elvis"), []byte("Elvis")},
{[]byte("aElvis"), []byte("aElvis")},
{[]byte("abcelvis"), []byte("abcElvis")},
{[]byte("eelvis"), []byte("eElvis")},
{[]byte("aelvis"), []byte("aElvis")},
{[]byte("aabeeeelvis"), []byte("aabeeeElvis")},
{[]byte("e l v i s"), []byte("e l v i s")},
{[]byte("aa bb e l v i saa"), []byte("aa bb e l v i saa")},
{[]byte(" elvi s"), []byte(" elvi s")},
{[]byte("elvielvis"), []byte("elviElvis")},
{[]byte("elvielvielviselvi1"), []byte("elvielviElviselvi1")},
{[]byte("elvielviselvis"), []byte("elviElvisElvis")},
// test cases to verify when we read partial find string across end of 32 byte buffer
{[]byte("012345678901234567890123456789elvis"), []byte("012345678901234567890123456789Elvis")},
{[]byte("0123456789012345678901234567890elvis"), []byte("0123456789012345678901234567890Elvis")},
{[]byte("01234567890123456789012345678901elvis"), []byte("01234567890123456789012345678901Elvis")},
{[]byte("012345678901234567890123456789012elvis"), []byte("012345678901234567890123456789012Elvis")},
{[]byte("0123456789012345678901234567890123elvis"), []byte("0123456789012345678901234567890123Elvis")},
}
find = []byte("elvis")
repl = []byte("Elvis")
)
// processBuffer reads / writes from io.Reader/Writer using an internal buffer
// this greatly reduces the number of Read and Write calls at this expense of an
// allocation up front
// buffer size must be bigger than len(find)
func processBuffer(r io.Reader, w io.Writer, size int, find, replace []byte) error {
var idx int // index of how far matched into find
var wrote int // last index we have written from our buffer
if size <= len(find) {
return errors.New("buffer size must be > find size")
}
b := make([]byte, size)
for {
// copy remaining part of find into new buffer if we were matching when we hit end of buffer
for i := 0; i < idx; i++ {
b[i] = find[i]
}
n, err := r.Read(b[idx:])
n += idx
// we might hit EOF when we still have some data to process from previous read
if err != nil && n == 0 {
return nil
}
for i := 0; i < n; i++ {
// if we matched deeper into find
if b[i] == find[idx] {
idx++
if idx == len(find) {
// after finding pattern, write out everything since out last write until beginning of find
w.Write(b[wrote : i+1-len(find)])
wrote = i + 1
// output replacement pattern
w.Write(repl)
idx = 0
}
} else {
// stopped matching further into find, but match starting byte of find
if b[i] == find[0] {
idx = 1
} else {
idx = 0
}
}
}
w.Write(b[wrote : n-idx])
wrote = 0
}
}
func processByte(r io.Reader, w io.Writer, find, replace []byte) {
var idx int
b := make([]byte, 1)
for {
n, err := r.Read(b)
if err != nil || n == 0 {
break
}
if b[0] == find[idx] {
idx++
if idx == len(find) {
w.Write(repl)
idx = 0
}
continue
}
if idx > 0 {
w.Write(find[:idx])
idx = 0
}
if b[0] == find[0] {
idx = 1
} else {
w.Write(b)
}
}
w.Write(find[:idx])
}
func main() {
output := bytes.Buffer{}
fmt.Println("=======================================\nRunning processByte")
for _, d := range data {
output.Reset()
processByte(bytes.NewReader(d.input), &output, find, repl)
matched := bytes.Compare(d.output, output.Bytes())
fmt.Printf("Matched: %v Inp: [%s] Exp: [%s] Got: [%s]\n", matched == 0, d.input, d.output, output.Bytes())
}
fmt.Println("=======================================\nRunning processBuffer")
for _, d := range data {
output.Reset()
processBuffer(bytes.NewReader(d.input), &output, 32, find, repl)
matched := bytes.Compare(d.output, output.Bytes())
fmt.Printf("Matched: %v Inp: [%s] Exp: [%s] Got: [%s]\n", matched == 0, d.input, d.output, output.Bytes())
}
}
// All material is licensed under the Apache License Version 2.0, January 2004
// http://www.apache.org/licenses/LICENSE-2.0
// go test -run none -bench . -benchtime 3s -benchmem
// Tests to see how each algorithm compare.
package main
import (
"bytes"
"io/ioutil"
"testing"
)
var bigUnmatchedInput = make([]byte, 10000000)
var unmatchedR = bytes.NewReader(bigUnmatchedInput)
var bigMatchedInput = make([]byte, 0)
var matchedR = &bytes.Reader{}
func init() {
for i := 0; i < 2000000; i++ {
bigMatchedInput = append(bigMatchedInput, []byte("elvis")...)
matchedR = bytes.NewReader(bigMatchedInput)
}
}
// assembleInputStream appends all the input slices together to allow for
// consistent testing across all benchmarks
func assembleInputStream() []byte {
var out []byte
for _, d := range data {
out = append(out, d.input...)
}
return out
}
// Capture the time it takes to execute algorithm processByte when no matches
func BenchmarkProcessByteUnmatched(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.SetBytes(int64(len(bigUnmatchedInput)))
// Seek our reader to the beginning of the byte array
unmatchedR.Seek(0, 0)
processByte(unmatchedR, ioutil.Discard, find, repl)
}
}
// Capture the time it takes to execute algorithm processByte when all matches
func BenchmarkProcessByteMatched(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.SetBytes(int64(len(bigMatchedInput)))
// Seek our reader to the beginning of the byte array
matchedR.Seek(0, 0)
processByte(matchedR, ioutil.Discard, find, repl)
}
}
// Capture the time it takes to execute algorithm processBuffer with 128 byte buffer no matches
func BenchmarkProcessBuffer128Unmatched(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.SetBytes(int64(len(bigUnmatchedInput)))
// Seek our reader to the beginning of the byte array
unmatchedR.Seek(0, 0)
processBuffer(unmatchedR, ioutil.Discard, 128, find, repl)
}
}
// Capture the time it takes to execute algorithm processBuffer with 128 byte buffer all matches
func BenchmarkProcessBuffer128Matched(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.SetBytes(int64(len(bigMatchedInput)))
// Seek our reader to the beginning of the byte array
matchedR.Seek(0, 0)
processBuffer(matchedR, ioutil.Discard, 128, find, repl)
}
}
// Capture the time it takes to execute algorithm processBuffer with 8192 byte buffer no matches
func BenchmarkProcessBuffer8192Unmatched(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.SetBytes(int64(len(bigUnmatchedInput)))
// Seek our reader to the beginning of the byte array
unmatchedR.Seek(0, 0)
processBuffer(unmatchedR, ioutil.Discard, 8192, find, repl)
}
}
// Capture the time it takes to execute algorithm processBuffer with 8192 byte buffer all matches
func BenchmarkProcessBuffer8192Matched(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.SetBytes(int64(len(bigMatchedInput)))
// Seek our reader to the beginning of the byte array
matchedR.Seek(0, 0)
processBuffer(matchedR, ioutil.Discard, 8192, find, repl)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment