Skip to content

Instantly share code, notes, and snippets.

@bored-engineer
Last active March 31, 2023 15:07
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save bored-engineer/75a1148309e845ceaf0cb2724a0e0fe5 to your computer and use it in GitHub Desktop.
Save bored-engineer/75a1148309e845ceaf0cb2724a0e0fe5 to your computer and use it in GitHub Desktop.
[Golang] PrefixReader (io.WriteCloser) wraps an io.Reader allowing data to be written ("prefixed") to the Reader which will be returned by subsequent Read calls until Close is called at which points all reads will pass-through to the underlying reader.
// THIS IS THE CORRECT SOLUTION, NOT WHAT WAS IN THIS GIST
package main
import (
"strings"
"io"
"io/ioutil"
"fmt"
)
// This is better/more stdlib than what I wrote:
func main() {
r := strings.NewReader("Underlying Reader")
pr, pw := io.Pipe()
go func() {
pw.Write([]byte("Some Prefixed Data"))
pw.Write([]byte("More Prefixed Data"))
pw.Close()
}()
b, err := ioutil.ReadAll(io.MultiReader(pr, r))
if err != nil {
panic(err)
}
}
package main
import (
"strings"
"io/ioutil"
"fmt"
)
func main() {
r := strings.NewReader("Underlying Reader")
c := make(chan []byte)
pr := NewPrefixReader(r, c)
go func() {
c <- []byte("Some Prefixed Data\n")
c <- []byte("More Prefixed Data\n")
close(c)
}()
b, err := ioutil.ReadAll(pr)
if err != nil {
panic(err)
}
// Some Prefixed Data\nMore Prefixed Data\nUnderlying Reader
fmt.Printf("%s", b)
}
package main
import (
"io"
"sync"
"errors"
)
// PrefixReader implements io.ReadWriteCloser
type PrefixReader struct {
// mu syncronizes Read calls
mu sync.Mutex
// r is the underlying io.Reader to use after chan is closed
r io.Reader
// buf is the buffered data if a Read is shorter than buf
buf []byte
// c is the channel of incoming bytes to "read"
c chan []byte
}
// PrefixReader implements io.Closer and will always succeed
func (r *PrefixReader) Close() error {
// TODO: This panics if called more than once instead of returning error
close(r.c)
return nil
}
// PrefixReader implements io.Writer and will always succeed
func (r *PrefixReader) Write(data []byte) (int, error) {
// Per the io.Writer docs we cannot retain data so copy is needed
tmp := make([]byte, len(data))
n := copy(tmp, data)
select {
case r.c <- tmp:
return n, nil
default:
return 0, errors.New("attempt to Write to closed channel")
}
}
// PrefixReader implements io.Reader
func (r *PrefixReader) Read(data []byte) (int, error) {
// Obtain a lock first to serialize any reads
r.mu.Lock()
defer r.mu.Unlock()
// Before anything, if we have buffered data to read, return that
// This is only safe because of the above mutex
if available := len(r.buf); available > 0 {
// If the buf we're "reading" into is smaller than available, use that instead
if actual := len(data); actual > available {
available = actual
}
// Copy that many bytes into data
n := copy(data, r.buf[:available])
// Truncate the buffer by the amount we read and return
r.buf = r.buf[available:]
return n, nil
}
// We must wait for data from the channel
b, ok := <-r.c
// The channel was closed while we were waiting for it or was already closed
// pass-through to the read io.Reader implementation instead
if !ok {
if r.r == nil {
return 0, io.EOF
}
return r.r.Read(data)
}
// If we have more data to write than the available room,
// we'll have to buffer some for next the next Read call
if room := len(data); len(b) > room {
// Put the extra data into buf and truncate b so it's a safe sized copy
// Replacing buf here is safe as we have an exclusive lock and it was
// checked to be nil/0 length already in the above call
r.buf = b[room:]
b = b[:room]
}
// Copy the received bytes into data and return (length checked above)
return copy(data, b), nil
}
// NewPrefixReader creates a reader that "reads" from c until it is closed
// once c is closed it passes through all Read calls to the io.Reader in r
func NewPrefixReader(r io.Reader, c chan []byte) *PrefixReader {
return &PrefixReader{r:r, c:c}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment