Skip to content

Instantly share code, notes, and snippets.

@CAFxX
Last active April 10, 2018 22:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save CAFxX/170827126275eac9ea8a7cb0d139465f to your computer and use it in GitHub Desktop.
Save CAFxX/170827126275eac9ea8a7cb0d139465f to your computer and use it in GitHub Desktop.
Batching(Read|Writ)er
type WriteBatcher struct {
l sync.Mutex
pending bool
buf []byte
err error
}
func (w *WriteBatcher) Write(p []byte) (int, error) {
w.l.Lock()
defer w.l.Unlock()
if w.err != nil {
return 0, w.err
}
w.buf = append(w.buf, p...)
if !w.pending {
w.pending = true
go w.flush()
}
return len(p), nil
}
func (w *WriteBatcher) flush() {
for {
w.l.Lock()
if len(w.buf) == 0 {
goto stop // w.l must be Lock()ed!
}
buf := w.buf
w.buf = nil
w.l.Unlock()
_, err := w.w.Write(buf)
if err != nil {
w.l.Lock()
if w.err == nil {
w.err = err
}
goto stop // w.l must be Lock()ed!
}
}
stop:
w.pending = false
w.l.Unlock()
}
type ReadBatcher struct {
l sync.Mutex
buf []byte
err error
}
func (r *ReadBatcher) Read(p []byte) (int, error) {
r.l.Lock()
defer r.l.Unlock()
for {
if n := copy(p, r.buf); n > 0 {
r.buf = r.buf[n:]
if !r.reading && len(r.buf) < r.ra {
go func() {
r.l.Lock()
defer r.l.Unlock()
r.bgread()
}()
}
return n, nil
}
if r.err {
return 0, r.err
}
if r.reading {
r.bgreadwait()
r.ra = r.ra * 2
continue
}
if lp := len(p); lp > r.ra {
r.ra = lp
}
r.bgread()
}
}
func (r *ReadBatcher) bgread() {
r.l2.Lock()
defer r.l2.Unlock()
if r.err != nil {
return
}
buf := make([]byte, r.ra)
r.reading = true
func() {
r.l.Unlock()
defer r.l.Lock()
n, err := r.r.Read(buf)
}()
r.reading = false
r.buf = append(r.buf, buf[:n]...)
r.err = err
}
func (r *ReadBatcher) bgreadwait() {
r.l.Unlock()
r.l2.Lock()
r.l2.Unlock()
r.l.Lock()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment