Created
May 31, 2016 12:22
-
-
Save aryszka/a6a0ef94d7424c27ada26756ae4f32eb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package serve | |
import ( | |
"errors" | |
"io" | |
) | |
type pipedMessage struct { | |
buffer []byte | |
count int | |
err error | |
response chan *pipedMessage | |
} | |
// A PipeBody can be used to stream data from filters. | |
// To get an initialized instance, use NewPipedBody(int). | |
type PipedBody struct { | |
read chan<- *pipedMessage | |
write chan<- *pipedMessage | |
closeRead chan struct{} | |
closeWrite chan<- *pipedMessage | |
} | |
var ( | |
// Error returned when pipe operation called after the | |
// read side of the pipe was closed. | |
ErrReadClosed = errors.New("PipedBody: read closed") | |
// Error returned when pipe operation called after the | |
// write side of the pipe was closed. | |
ErrWriteClosed = errors.New("PipedBody: write closed") | |
) | |
// NewPipedBody creates a body object, that can be used to | |
// stream content from filters. It is synchronized and uses | |
// an internal buffer maximum of the size of the provided | |
// maxBuf argument. When there is nothing to read, the read | |
// side blocks. When the internal buffer reaches the maximum | |
// size, the writer side blocks. After the write side was | |
// closed, the read side returns the passed in error | |
// argument. To release the pipe, the read side needs to be | |
// closed. | |
// | |
// Example, gzip response: | |
// | |
// func (f *myFilter) Response(ctx filters.FilterContext) { | |
// in := ctx.Response().Body | |
// out := serve.NewPipedBody() | |
// ctx.Response().Body = out | |
// | |
// ctx.Response().Header.Del("Content-Lenght") | |
// ctx.Response().Header.Set("Content-Encoding", "gzip") | |
// ctx.Response().Header.Add("Vary", "Accept-Encoding") | |
// | |
// go func() { | |
// defer in.Close() | |
// | |
// gz := gzip.NewWriter(out) | |
// defer gz.Close() | |
// | |
// _, err := io.Copy(gz, in) // timeout handled through the original body | |
// if err == nil { | |
// err = io.EOF | |
// } | |
// | |
// out.CloseWrite(err) | |
// }() | |
// } | |
// | |
func NewPipedBody(maxBuf int) *PipedBody { | |
readChan := make(chan *pipedMessage) | |
writeChan := make(chan *pipedMessage) | |
closeRead := make(chan struct{}) | |
closeWrite := make(chan *pipedMessage) | |
go func() { | |
var ( | |
read <-chan *pipedMessage | |
write <-chan *pipedMessage | |
buf []byte | |
err error | |
) | |
write = writeChan | |
readMessage := func(m *pipedMessage) { | |
// after the write was closed, and there is | |
// no more data, read returns the passed in | |
// error | |
if err != nil && len(buf) == 0 { | |
m.count = 0 | |
m.err = err | |
m.response <- m | |
return | |
} | |
// copy max | |
m.count = copy(m.buffer, buf) | |
buf = buf[m.count:] | |
// if there is no data buffered, but the | |
// write hasn't closed yet, block read by | |
// setting the channel to nil | |
if len(buf) == 0 && err == nil { | |
read = nil | |
} | |
// when there is available space to write, | |
// open write by setting the channel to | |
// the shared write channel | |
if len(buf) < maxBuf { | |
write = writeChan | |
} | |
m.response <- m | |
} | |
writeMessage := func(m *pipedMessage) { | |
// after write was closed, don't allow writing | |
if err != nil { | |
m.err = ErrWriteClosed | |
m.response <- m | |
return | |
} | |
// cut the buffer to the available space | |
b := m.buffer | |
if len(buf) + len(b) > maxBuf { | |
b = b[:maxBuf - len(buf)] | |
} | |
buf = append(buf, b...) | |
// if there is available data, open read | |
// by setting the channel to the shared read | |
// channel | |
if len(buf) > 0 { | |
read = readChan | |
} | |
// if there is no available space to write, | |
// block write by setting the channel to nil | |
if len(buf) == maxBuf { | |
write = nil | |
} | |
m.count = len(b) | |
m.response <- m | |
} | |
closeWriteMessage := func(m *pipedMessage) { | |
// if write was already closed, return error | |
if err != nil { | |
m.err = ErrWriteClosed | |
m.response <- m | |
return | |
} | |
err, m.err = m.err, nil | |
if err == nil { | |
err = io.EOF | |
} | |
// after write was closed, read must be open | |
// either to return the remaining data or to | |
// return the error (io.EOF) | |
read = readChan | |
m.response <- m | |
} | |
for { | |
select { | |
case m := <-read: | |
readMessage(m) | |
case m := <-write: | |
writeMessage(m) | |
case m := <-closeWrite: | |
closeWriteMessage(m) | |
case <-closeRead: | |
return | |
} | |
} | |
}() | |
return &PipedBody{ | |
read: readChan, | |
write: writeChan, | |
closeRead: closeRead, | |
closeWrite: closeWrite} | |
} | |
// Reads from the pipe when data is available or blocks. | |
// If the write side was closed, returns the error passed | |
// in when closing write. | |
// | |
// Read and CloseRead must be called from the same | |
// goroutine. | |
func (pb *PipedBody) Read(b []byte) (int, error) { | |
select { | |
case <-pb.closeRead: | |
return 0, ErrReadClosed | |
default: | |
m := &pipedMessage{ | |
buffer: b, | |
response: make(chan *pipedMessage)} | |
pb.read <- m | |
m = <-m.response | |
return m.count, m.err | |
} | |
} | |
// Writes to the pipe when it didn't reach its max buffer | |
// or blocks. Returns ErrWriteClosed after the write side | |
// was closed. | |
func (pb *PipedBody) Write(b []byte) (int, error) { | |
m := &pipedMessage{ | |
buffer: b, | |
response: make(chan *pipedMessage)} | |
count := 0 | |
for { | |
select { | |
case <-pb.closeRead: | |
return 0, ErrReadClosed | |
case pb.write <- m: | |
m = <-m.response | |
if m.count == len(m.buffer) || m.err != nil { | |
return count + m.count, m.err | |
} | |
m.buffer = m.buffer[m.count:] | |
count += m.count | |
} | |
} | |
} | |
// Closes the pipe. | |
// | |
// Read and CloseRead must be called from the same | |
// goroutine. | |
func (pb *PipedBody) CloseRead() error { | |
close(pb.closeRead) | |
return nil | |
} | |
// Closes the write side of the pipe. After all buffered | |
// data was read, the read side receives the passed in | |
// error argument or io.EOF. After the read side was | |
// closed, returns ErrReadClosed. | |
func (pb *PipedBody) CloseWrite(err error) error { | |
m := &pipedMessage{ | |
err: err, | |
response: make(chan *pipedMessage)} | |
select { | |
case <-pb.closeRead: | |
return ErrReadClosed | |
case pb.closeWrite <- m: | |
m = <-m.response | |
return m.err | |
} | |
} | |
// io.Closer implementation. Equivalent to CloseRead(). | |
func (pb *PipedBody) Close() error { | |
return pb.CloseRead() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment