Skip to content

Instantly share code, notes, and snippets.

@aryszka
Created May 31, 2016 12:22
Show Gist options
  • Save aryszka/a6a0ef94d7424c27ada26756ae4f32eb to your computer and use it in GitHub Desktop.
Save aryszka/a6a0ef94d7424c27ada26756ae4f32eb to your computer and use it in GitHub Desktop.
package serve
import (
"errors"
"io"
)
type pipedMessage struct {
buffer []byte
count int
err error
response chan *pipedMessage
}
// A PipeBody can be used to stream data from filters.
// To get an initialized instance, use NewPipedBody(int).
type PipedBody struct {
read chan<- *pipedMessage
write chan<- *pipedMessage
closeRead chan struct{}
closeWrite chan<- *pipedMessage
}
var (
// Error returned when pipe operation called after the
// read side of the pipe was closed.
ErrReadClosed = errors.New("PipedBody: read closed")
// Error returned when pipe operation called after the
// write side of the pipe was closed.
ErrWriteClosed = errors.New("PipedBody: write closed")
)
// NewPipedBody creates a body object, that can be used to
// stream content from filters. It is synchronized and uses
// an internal buffer maximum of the size of the provided
// maxBuf argument. When there is nothing to read, the read
// side blocks. When the internal buffer reaches the maximum
// size, the writer side blocks. After the write side was
// closed, the read side returns the passed in error
// argument. To release the pipe, the read side needs to be
// closed.
//
// Example, gzip response:
//
// func (f *myFilter) Response(ctx filters.FilterContext) {
// in := ctx.Response().Body
// out := serve.NewPipedBody()
// ctx.Response().Body = out
//
// ctx.Response().Header.Del("Content-Lenght")
// ctx.Response().Header.Set("Content-Encoding", "gzip")
// ctx.Response().Header.Add("Vary", "Accept-Encoding")
//
// go func() {
// defer in.Close()
//
// gz := gzip.NewWriter(out)
// defer gz.Close()
//
// _, err := io.Copy(gz, in) // timeout handled through the original body
// if err == nil {
// err = io.EOF
// }
//
// out.CloseWrite(err)
// }()
// }
//
func NewPipedBody(maxBuf int) *PipedBody {
readChan := make(chan *pipedMessage)
writeChan := make(chan *pipedMessage)
closeRead := make(chan struct{})
closeWrite := make(chan *pipedMessage)
go func() {
var (
read <-chan *pipedMessage
write <-chan *pipedMessage
buf []byte
err error
)
write = writeChan
readMessage := func(m *pipedMessage) {
// after the write was closed, and there is
// no more data, read returns the passed in
// error
if err != nil && len(buf) == 0 {
m.count = 0
m.err = err
m.response <- m
return
}
// copy max
m.count = copy(m.buffer, buf)
buf = buf[m.count:]
// if there is no data buffered, but the
// write hasn't closed yet, block read by
// setting the channel to nil
if len(buf) == 0 && err == nil {
read = nil
}
// when there is available space to write,
// open write by setting the channel to
// the shared write channel
if len(buf) < maxBuf {
write = writeChan
}
m.response <- m
}
writeMessage := func(m *pipedMessage) {
// after write was closed, don't allow writing
if err != nil {
m.err = ErrWriteClosed
m.response <- m
return
}
// cut the buffer to the available space
b := m.buffer
if len(buf) + len(b) > maxBuf {
b = b[:maxBuf - len(buf)]
}
buf = append(buf, b...)
// if there is available data, open read
// by setting the channel to the shared read
// channel
if len(buf) > 0 {
read = readChan
}
// if there is no available space to write,
// block write by setting the channel to nil
if len(buf) == maxBuf {
write = nil
}
m.count = len(b)
m.response <- m
}
closeWriteMessage := func(m *pipedMessage) {
// if write was already closed, return error
if err != nil {
m.err = ErrWriteClosed
m.response <- m
return
}
err, m.err = m.err, nil
if err == nil {
err = io.EOF
}
// after write was closed, read must be open
// either to return the remaining data or to
// return the error (io.EOF)
read = readChan
m.response <- m
}
for {
select {
case m := <-read:
readMessage(m)
case m := <-write:
writeMessage(m)
case m := <-closeWrite:
closeWriteMessage(m)
case <-closeRead:
return
}
}
}()
return &PipedBody{
read: readChan,
write: writeChan,
closeRead: closeRead,
closeWrite: closeWrite}
}
// Reads from the pipe when data is available or blocks.
// If the write side was closed, returns the error passed
// in when closing write.
//
// Read and CloseRead must be called from the same
// goroutine.
func (pb *PipedBody) Read(b []byte) (int, error) {
select {
case <-pb.closeRead:
return 0, ErrReadClosed
default:
m := &pipedMessage{
buffer: b,
response: make(chan *pipedMessage)}
pb.read <- m
m = <-m.response
return m.count, m.err
}
}
// Writes to the pipe when it didn't reach its max buffer
// or blocks. Returns ErrWriteClosed after the write side
// was closed.
func (pb *PipedBody) Write(b []byte) (int, error) {
m := &pipedMessage{
buffer: b,
response: make(chan *pipedMessage)}
count := 0
for {
select {
case <-pb.closeRead:
return 0, ErrReadClosed
case pb.write <- m:
m = <-m.response
if m.count == len(m.buffer) || m.err != nil {
return count + m.count, m.err
}
m.buffer = m.buffer[m.count:]
count += m.count
}
}
}
// Closes the pipe.
//
// Read and CloseRead must be called from the same
// goroutine.
func (pb *PipedBody) CloseRead() error {
close(pb.closeRead)
return nil
}
// Closes the write side of the pipe. After all buffered
// data was read, the read side receives the passed in
// error argument or io.EOF. After the read side was
// closed, returns ErrReadClosed.
func (pb *PipedBody) CloseWrite(err error) error {
m := &pipedMessage{
err: err,
response: make(chan *pipedMessage)}
select {
case <-pb.closeRead:
return ErrReadClosed
case pb.closeWrite <- m:
m = <-m.response
return m.err
}
}
// io.Closer implementation. Equivalent to CloseRead().
func (pb *PipedBody) Close() error {
return pb.CloseRead()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment