Skip to content

Instantly share code, notes, and snippets.

@karalabe
Created January 28, 2015 18:37
Show Gist options
  • Save karalabe/6de57007034d972b9ab6 to your computer and use it in GitHub Desktop.
Save karalabe/6de57007034d972b9ab6 to your computer and use it in GitHub Desktop.
Buffered concurrent copy
package bufioext
import (
"io"
"sync/atomic"
)
// Copy copies from src to dst until either EOF is reached on src or an error
// occurs. It returns the number of bytes copied and the first error encountered
// while copying, if any.
//
// A successful Copy returns err == nil, not err == EOF. Because Copy is defined
// to read from src until EOF, it does not treat an EOF from Read as an error to
// be reported.
//
// Internally, one goroutine is reading the src, moving the data into an internal
// buffer, and another moving from the buffer to the writer. This permits both
// endpoints to run simultaneously, without one blocking the other.
func Copy(dst io.Writer, src io.Reader, buffer int) (written int64, err error) {
buf := make([]byte, buffer)
bs, ba, rp, wp := int32(buffer), int32(buffer), int32(0), int32(0)
rs := make(chan struct{}, 1) // signaler for the reader, if it's asleep
ws := make(chan struct{}, 1) // signaler for the writer, if it's asleep
rq := make(chan struct{}) // quit channel when the reader terminates
wq := make(chan struct{}) // quit channel when the writer terminates
// Start a reader goroutine that pushes data into the buffer
go func() {
defer close(rq)
chunk := make([]byte, 32*1024)
for {
nr, er := src.Read(chunk)
if nr > 0 {
rpc := atomic.LoadInt32(&rp)
// Repeat until the chunk is pushed into the buffer
left := chunk
for {
bac := atomic.LoadInt32(&ba)
// If the buffer is full, wait
if bac == 0 {
select {
case <-rs: // wake signal from writer, retry
continue
case <-wq: // writer dead, return
return
}
}
nw := 0
switch {
case int(bac) >= nr && wp <= bs-int32(nr): // enough space, no wrapping
copy(buf[wp:], left[:nr])
nw = nr
case int(bac) >= nr && wp > bs-int32(nr): // enough space, wrapping
copy(buf[wp:], left[:bs-wp])
copy(buf, left[bs-wp:nr])
nw = nr
case int(bac) < nr && wp <= rpc: // not enough space, no wrapping
copy(buf[wp:], left[:bac])
nw = int(bac)
case int(bac) < nr && wp > rpc: // not enough space, wrapping
copy(buf[wp:], left[:bs-wp])
copy(buf, left[bs-wp:bac])
nw = int(bac)
}
// Advance the writer pointer
wpn := wp + int32(nw)
if wpn >= bs {
wpn -= bs
}
atomic.StoreInt32(&wp, wpn)
atomic.AddInt32(&ba, -int32(nw))
// Signal the writer if it's asleep
select {
case ws <- struct{}{}:
default:
}
// If everything was buffered, get the next chunk
if nw == nr {
break
}
left, nr = left[nw:], nr-nw
}
}
if er == io.EOF {
break
}
if er != nil {
err = er
return
}
}
}()
// Start a writer goroutine that retrieves data from the buffer
go func() {
defer close(wq)
for {
wpc := atomic.LoadInt32(&wp)
bac := atomic.LoadInt32(&ba)
// If there's no data available, sleep
if bac == bs {
select {
case <-ws: // wake signal from reader
continue
case <-rq: // reader done, return
// Check for buffer write/reader quit and above check race
bac = atomic.LoadInt32(&ba)
if bac != bs {
continue
}
return
}
}
// Write a batch of data
nw, nc := 0, int32(0)
var we error
switch {
case rp < wpc: // data available, no wrapping
nc = wpc - rp
nw, we = dst.Write(buf[rp:wpc])
case rp >= wpc: // data available, wrapping
nc = bs - rp
nw, we = dst.Write(buf[rp:])
}
// Update the counters and check for errors
if nw > 0 {
written += int64(nw)
}
if we != nil {
err = we
return
}
if nw != int(nc) {
err = io.ErrShortWrite
return
}
// Advance the reader pointer
rpn := rp + int32(nw)
if rpn >= bs {
rpn -= bs
}
atomic.StoreInt32(&rp, rpn)
atomic.AddInt32(&ba, int32(nw))
// Signal the reader if it's asleep
select {
case rs <- struct{}{}:
default:
}
}
}()
// Wait until both finish and return
<-wq
<-rq
return
}
package bufioext
import (
"bytes"
"io/ioutil"
"math/rand"
"testing"
)
// Random generates a pseudo-random binary blob.
func random(length int) []byte {
src := rand.NewSource(0)
data := make([]byte, length)
for i := 0; i < length; i++ {
data[i] = byte(src.Int63() & 0xff)
}
return data
}
// Tests that a simple copy works
func TestCopy(t *testing.T) {
data := random(128 * 1024 * 1024)
rb := bytes.NewBuffer(data)
wb := new(bytes.Buffer)
if n, err := Copy(wb, rb, 333333); err != nil { // weird buffer size to catch index bugs
t.Fatalf("failed to copy data: %v.", err)
} else if int(n) != len(data) {
t.Fatalf("data length mismatch: have %d, want %d.", n, len(data))
}
if bytes.Compare(data, wb.Bytes()) != 0 {
t.Errorf("copy did not work properly.")
}
}
// Various combinations of benchmarks to measure the copy.
func BenchmarkCopy1KbData1KbBuffer(b *testing.B) {
benchmarkCopy(1024, 1024, b)
}
func BenchmarkCopy1KbData128KbBuffer(b *testing.B) {
benchmarkCopy(1024, 128*1024, b)
}
func BenchmarkCopy1KbData1MbBuffer(b *testing.B) {
benchmarkCopy(1024, 1024*1024, b)
}
func BenchmarkCopy1MbData1KbBuffer(b *testing.B) {
benchmarkCopy(1024*1024, 1024, b)
}
func BenchmarkCopy1MbData128KbBuffer(b *testing.B) {
benchmarkCopy(1024*1024, 128*1024, b)
}
func BenchmarkCopy1MbData1MbBuffer(b *testing.B) {
benchmarkCopy(1024*1024, 1024*1024, b)
}
func BenchmarkCopy128MbData1KbBuffer(b *testing.B) {
benchmarkCopy(128*1024*1024, 1024, b)
}
func BenchmarkCopy128MbData128KbBuffer(b *testing.B) {
benchmarkCopy(128*1024*1024, 128*1024, b)
}
func BenchmarkCopy128MbData1MbBuffer(b *testing.B) {
benchmarkCopy(128*1024*1024, 1024*1024, b)
}
// BenchmarkCopy measures the performance of the buffered copying for a given
// buffer size.
func benchmarkCopy(data int, buffer int, b *testing.B) {
blob := random(data)
b.SetBytes(int64(data))
b.ResetTimer()
for i := 0; i < b.N; i++ {
Copy(ioutil.Discard, bytes.NewBuffer(blob), buffer)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment