Last active
August 29, 2015 14:14
-
-
Save egonelbre/5600f3de19daec1ba055 to your computer and use it in GitHub Desktop.
bufio.Copy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package egonelbre | |
import ( | |
"io" | |
"sync/atomic" | |
) | |
type process struct { | |
quit chan struct{} | |
sleep chan struct{} | |
} | |
func newprocess() process { | |
return process{ | |
quit: make(chan struct{}), | |
sleep: make(chan struct{}, 1), | |
} | |
} | |
func (p process) exit() { close(p.quit) } | |
func (p process) wait() { <-p.quit } | |
func (p process) exited() bool { | |
select { | |
case <-p.quit: | |
return true | |
default: | |
return false | |
} | |
} | |
func (p process) waitchange(other process, expect int32, pv *int32) (exited bool) { | |
// say we are sleep | |
p.sleep <- struct{}{} | |
v := atomic.LoadInt32(pv) | |
// go to sleep | |
for expect == v { | |
select { | |
case <-other.quit: | |
return true | |
case p.sleep <- struct{}{}: | |
} | |
v = atomic.LoadInt32(pv) | |
} | |
p.unwait() | |
return false | |
} | |
func (p process) unwait() { | |
// clear sleeping | |
select { | |
case <-p.sleep: | |
default: | |
} | |
} | |
func chunk(a []byte) []byte { | |
const maxchunk = 8 << 10 | |
if len(a) > maxchunk { | |
return a[:maxchunk] | |
} | |
return a | |
} | |
func Copy(dst io.Writer, src io.Reader, buffer int) (written int64, err error) { | |
buf := make([]byte, buffer) | |
buflen := int32(len(buf)) | |
// data[ low : high ] is the written part of buf | |
// data[ high : low ] is the unwritten part of the buf | |
low, high := int32(0), int32(0) | |
var rerr, werr error | |
r := newprocess() | |
w := newprocess() | |
go func() { | |
defer r.exit() | |
h := atomic.LoadInt32(&high) | |
for rerr == nil && !w.exited() { | |
l := atomic.LoadInt32(&low) | |
// are we full | |
if (h+1)%buflen == l { | |
exited := r.waitchange(w, l, &low) | |
l = atomic.LoadInt32(&low) | |
if exited { | |
return | |
} | |
} | |
var next []byte | |
switch { | |
case l == 0: | |
next = buf[h : len(buf)-1] | |
case h < l: | |
next = buf[h : l-1] | |
case l <= h: | |
next = buf[h:] | |
} | |
var nr int | |
for len(next) > 0 && rerr == nil && !w.exited() { | |
nr, rerr = src.Read(chunk(next)) | |
h = (h + int32(nr)) % buflen | |
atomic.StoreInt32(&high, h) | |
w.unwait() | |
next = next[nr:] | |
} | |
} | |
}() | |
go func() { | |
defer w.exit() | |
l := atomic.LoadInt32(&low) | |
for werr == nil { | |
h := atomic.LoadInt32(&high) | |
// are we empty? | |
if l == h { | |
exited := w.waitchange(r, h, &high) | |
h = atomic.LoadInt32(&high) | |
if l == h && exited { | |
return | |
} | |
} | |
var next []byte | |
if l < h { | |
next = buf[l:h] | |
} else if h <= l { | |
next = buf[l:] | |
} | |
var nr int | |
for len(next) > 0 && werr == nil { | |
nr, werr = dst.Write(next) | |
atomic.AddInt64(&written, int64(nr)) | |
l = (l + int32(nr)) % buflen | |
atomic.StoreInt32(&low, l) | |
r.unwait() | |
next = next[nr:] | |
} | |
} | |
}() | |
// Wait until both finish and return | |
w.wait() | |
r.wait() | |
switch { | |
case rerr == nil || rerr == io.EOF: | |
return written, werr | |
// what if both errors happened? | |
default: | |
return written, rerr | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment