Code and example for LimitBuffer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// code for http://gowithconfidence.tumblr.com/post/31797884887/limit-buffers | |
package main | |
import ( | |
"bytes" | |
"errors" | |
"fmt" | |
"io" | |
"sync" | |
) | |
const ( | |
BufferSize = 4 | |
LagLimit = 2 | |
) | |
func MakeDoubleReader(r io.Reader) (r1, r2 io.Reader) { | |
b1 := NewLimitBuffer(LagLimit) | |
b2 := NewLimitBuffer(LagLimit) | |
r1 = b1 | |
r2 = b2 | |
go readAndWrite(r, b1, b2) | |
return | |
} | |
func readAndWrite(r io.Reader, w1, w2 *LimitBuffer) { | |
buf := make([]byte, BufferSize) | |
for { | |
n, err := r.Read(buf) | |
if err == io.EOF { | |
break | |
} | |
if err != nil { | |
panic(err) | |
} | |
w1.Write(buf[:n]) | |
w2.Write(buf[:n]) | |
} | |
w1.Close() | |
w2.Close() | |
} | |
type LimitBuffer struct { | |
limit int | |
buf bytes.Buffer | |
writes chan writeRequest | |
reads chan readRequest | |
isclosed bool | |
} | |
func NewLimitBuffer(limit int) (lb *LimitBuffer) { | |
lb = &LimitBuffer{ | |
limit: limit, | |
writes: make(chan writeRequest), | |
reads: make(chan readRequest), | |
} | |
go lb.mux() | |
return | |
} | |
type writeRequest struct { | |
buf []byte | |
closeit bool | |
response chan writeResponse | |
} | |
type writeResponse struct { | |
n int | |
err error | |
} | |
func (lb *LimitBuffer) Write(buf []byte) (n int, err error) { | |
for len(buf) > 0 { | |
req := writeRequest{ | |
buf: buf, | |
response: make(chan writeResponse, 1), | |
} | |
lb.writes <- req | |
response := <-req.response | |
m, werr := response.n, response.err | |
n += m | |
if werr != nil { | |
err = werr | |
return | |
} | |
buf = buf[m:] | |
} | |
return | |
} | |
func (lb *LimitBuffer) Close() { | |
req := writeRequest{ | |
closeit: true, | |
response: make(chan writeResponse, 1), | |
} | |
lb.writes <- req | |
} | |
type readRequest struct { | |
buf []byte | |
response chan readResponse | |
} | |
type readResponse struct { | |
n int | |
err error | |
} | |
func (lb *LimitBuffer) Read(buf []byte) (n int, err error) { | |
req := readRequest{ | |
buf: buf, | |
response: make(chan readResponse, 1), | |
} | |
lb.reads <- req | |
response := <-req.response | |
n, err = response.n, response.err | |
return | |
} | |
func (lb *LimitBuffer) handleRead(req readRequest) { | |
n, err := lb.buf.Read(req.buf) | |
req.response <- readResponse{n, err} | |
} | |
func (lb *LimitBuffer) handleWrite(req writeRequest) { | |
if req.closeit { | |
lb.isclosed = true | |
} else { | |
m := lb.limit - lb.buf.Len() | |
if m > len(req.buf) { | |
m = len(req.buf) | |
} | |
n, err := lb.buf.Write(req.buf[:m]) | |
req.response <- writeResponse{n, err} | |
} | |
} | |
func (lb *LimitBuffer) handleWriteClosed(req writeRequest) { | |
req.response <- writeResponse{ | |
n: 0, | |
err: errors.New("Writing to closed stream"), | |
} | |
} | |
func (lb *LimitBuffer) mux() { | |
for { | |
if lb.isclosed { | |
if lb.buf.Len() == 0 { | |
select { | |
case req := <-lb.reads: | |
req.response <- readResponse{ | |
n: 0, | |
err: io.EOF, | |
} | |
case req := <-lb.writes: | |
lb.handleWriteClosed(req) | |
} | |
} else { | |
select { | |
case req := <-lb.reads: | |
lb.handleRead(req) | |
case req := <-lb.writes: | |
lb.handleWriteClosed(req) | |
} | |
} | |
continue | |
} | |
if lb.buf.Len() > lb.limit { | |
lb.handleRead(<-lb.reads) | |
continue | |
} | |
if lb.buf.Len() == 0 { | |
lb.handleWrite(<-lb.writes) | |
continue | |
} | |
select { | |
case req := <-lb.reads: | |
lb.handleRead(req) | |
case req := <-lb.writes: | |
lb.handleWrite(req) | |
} | |
} | |
} | |
func main() { | |
r := bytes.NewBuffer([]byte("Hello this is the double buffer")) | |
r1, r2 := MakeDoubleReader(r) | |
var wg sync.WaitGroup | |
dump := func(r io.Reader, name string) { | |
buf := make([]byte, 10) | |
for { | |
_, err := r.Read(buf) | |
if err != nil { | |
fmt.Printf("%s %s\n", name, err) | |
break | |
} | |
fmt.Printf("%s %s\n", name, buf) | |
} | |
wg.Done() | |
} | |
wg.Add(1) | |
go dump(r1, "r1") | |
wg.Add(1) | |
go dump(r2, "r2") | |
wg.Wait() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment