Last active
April 10, 2018 22:50
-
-
Save CAFxX/170827126275eac9ea8a7cb0d139465f to your computer and use it in GitHub Desktop.
Batching(Read|Writ)er
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type WriteBatcher struct { | |
l sync.Mutex | |
pending bool | |
buf []byte | |
err error | |
} | |
func (w *WriteBatcher) Write(p []byte) (int, error) { | |
w.l.Lock() | |
defer w.l.Unlock() | |
if w.err != nil { | |
return 0, w.err | |
} | |
w.buf = append(w.buf, p...) | |
if !w.pending { | |
w.pending = true | |
go w.flush() | |
} | |
return len(p), nil | |
} | |
func (w *WriteBatcher) flush() { | |
for { | |
w.l.Lock() | |
if len(w.buf) == 0 { | |
goto stop // w.l must be Lock()ed! | |
} | |
buf := w.buf | |
w.buf = nil | |
w.l.Unlock() | |
_, err := w.w.Write(buf) | |
if err != nil { | |
w.l.Lock() | |
if w.err == nil { | |
w.err = err | |
} | |
goto stop // w.l must be Lock()ed! | |
} | |
} | |
stop: | |
w.pending = false | |
w.l.Unlock() | |
} | |
type ReadBatcher struct { | |
l sync.Mutex | |
buf []byte | |
err error | |
} | |
func (r *ReadBatcher) Read(p []byte) (int, error) { | |
r.l.Lock() | |
defer r.l.Unlock() | |
for { | |
if n := copy(p, r.buf); n > 0 { | |
r.buf = r.buf[n:] | |
if !r.reading && len(r.buf) < r.ra { | |
go func() { | |
r.l.Lock() | |
defer r.l.Unlock() | |
r.bgread() | |
}() | |
} | |
return n, nil | |
} | |
if r.err { | |
return 0, r.err | |
} | |
if r.reading { | |
r.bgreadwait() | |
r.ra = r.ra * 2 | |
continue | |
} | |
if lp := len(p); lp > r.ra { | |
r.ra = lp | |
} | |
r.bgread() | |
} | |
} | |
func (r *ReadBatcher) bgread() { | |
r.l2.Lock() | |
defer r.l2.Unlock() | |
if r.err != nil { | |
return | |
} | |
buf := make([]byte, r.ra) | |
r.reading = true | |
func() { | |
r.l.Unlock() | |
defer r.l.Lock() | |
n, err := r.r.Read(buf) | |
}() | |
r.reading = false | |
r.buf = append(r.buf, buf[:n]...) | |
r.err = err | |
} | |
func (r *ReadBatcher) bgreadwait() { | |
r.l.Unlock() | |
r.l2.Lock() | |
r.l2.Unlock() | |
r.l.Lock() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment