Skip to content

Instantly share code, notes, and snippets.

@skelterjohn
Created September 18, 2012 15:27
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save skelterjohn/3743756 to your computer and use it in GitHub Desktop.
Save skelterjohn/3743756 to your computer and use it in GitHub Desktop.
Code and example for LimitBuffer
// code for http://gowithconfidence.tumblr.com/post/31797884887/limit-buffers
package main
import (
"bytes"
"errors"
"fmt"
"io"
"sync"
)
const (
BufferSize = 4
LagLimit = 2
)
func MakeDoubleReader(r io.Reader) (r1, r2 io.Reader) {
b1 := NewLimitBuffer(LagLimit)
b2 := NewLimitBuffer(LagLimit)
r1 = b1
r2 = b2
go readAndWrite(r, b1, b2)
return
}
func readAndWrite(r io.Reader, w1, w2 *LimitBuffer) {
buf := make([]byte, BufferSize)
for {
n, err := r.Read(buf)
if err == io.EOF {
break
}
if err != nil {
panic(err)
}
w1.Write(buf[:n])
w2.Write(buf[:n])
}
w1.Close()
w2.Close()
}
type LimitBuffer struct {
limit int
buf bytes.Buffer
writes chan writeRequest
reads chan readRequest
isclosed bool
}
func NewLimitBuffer(limit int) (lb *LimitBuffer) {
lb = &LimitBuffer{
limit: limit,
writes: make(chan writeRequest),
reads: make(chan readRequest),
}
go lb.mux()
return
}
type writeRequest struct {
buf []byte
closeit bool
response chan writeResponse
}
type writeResponse struct {
n int
err error
}
func (lb *LimitBuffer) Write(buf []byte) (n int, err error) {
for len(buf) > 0 {
req := writeRequest{
buf: buf,
response: make(chan writeResponse, 1),
}
lb.writes <- req
response := <-req.response
m, werr := response.n, response.err
n += m
if werr != nil {
err = werr
return
}
buf = buf[m:]
}
return
}
func (lb *LimitBuffer) Close() {
req := writeRequest{
closeit: true,
response: make(chan writeResponse, 1),
}
lb.writes <- req
}
type readRequest struct {
buf []byte
response chan readResponse
}
type readResponse struct {
n int
err error
}
func (lb *LimitBuffer) Read(buf []byte) (n int, err error) {
req := readRequest{
buf: buf,
response: make(chan readResponse, 1),
}
lb.reads <- req
response := <-req.response
n, err = response.n, response.err
return
}
func (lb *LimitBuffer) handleRead(req readRequest) {
n, err := lb.buf.Read(req.buf)
req.response <- readResponse{n, err}
}
func (lb *LimitBuffer) handleWrite(req writeRequest) {
if req.closeit {
lb.isclosed = true
} else {
m := lb.limit - lb.buf.Len()
if m > len(req.buf) {
m = len(req.buf)
}
n, err := lb.buf.Write(req.buf[:m])
req.response <- writeResponse{n, err}
}
}
func (lb *LimitBuffer) handleWriteClosed(req writeRequest) {
req.response <- writeResponse{
n: 0,
err: errors.New("Writing to closed stream"),
}
}
func (lb *LimitBuffer) mux() {
for {
if lb.isclosed {
if lb.buf.Len() == 0 {
select {
case req := <-lb.reads:
req.response <- readResponse{
n: 0,
err: io.EOF,
}
case req := <-lb.writes:
lb.handleWriteClosed(req)
}
} else {
select {
case req := <-lb.reads:
lb.handleRead(req)
case req := <-lb.writes:
lb.handleWriteClosed(req)
}
}
continue
}
if lb.buf.Len() > lb.limit {
lb.handleRead(<-lb.reads)
continue
}
if lb.buf.Len() == 0 {
lb.handleWrite(<-lb.writes)
continue
}
select {
case req := <-lb.reads:
lb.handleRead(req)
case req := <-lb.writes:
lb.handleWrite(req)
}
}
}
func main() {
r := bytes.NewBuffer([]byte("Hello this is the double buffer"))
r1, r2 := MakeDoubleReader(r)
var wg sync.WaitGroup
dump := func(r io.Reader, name string) {
buf := make([]byte, 10)
for {
_, err := r.Read(buf)
if err != nil {
fmt.Printf("%s %s\n", name, err)
break
}
fmt.Printf("%s %s\n", name, buf)
}
wg.Done()
}
wg.Add(1)
go dump(r1, "r1")
wg.Add(1)
go dump(r2, "r2")
wg.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment