Skip to content

Instantly share code, notes, and snippets.

@jmackie
Last active February 18, 2024 18:42
Show Gist options
  • Star 12 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save jmackie/11570bdcd8a4c10d72619a5e1f21c5f8 to your computer and use it in GitHub Desktop.
Save jmackie/11570bdcd8a4c10d72619a5e1f21c5f8 to your computer and use it in GitHub Desktop.
Pass a single io.Reader to multiple goroutines
/*
Package fan is a little concurrent io experiment.
Example Use Case
----------------
You have a function that takes a single io.Reader as an argument. You would like
to pass that reader to several processing functions. You could just make the
function accept an io.ReadSeeker, invoke each function serially in a for loop,
seeking after each call. But that's not cool.
So for example:
func process(r io.ReadSeeker) error {
for _, fn := range fns {
err := fn(r)
if err != nil {
return err
}
r.Seek(0, io.SeekStart)
}
}
Becomes:
import "./fan"
func process(r io.Reader) error {
fr := fan.Reader{r}
for _, fn := range fns {
go fn(fr.View())
// ... handle errors via a channel
}
}
Limitations
-----------
Probably quite a lot.
*/
package fan
import (
"bytes"
"io"
"sync"
)
// Reader wraps an io.Reader, allowing "clones" to be created via the View method.
type Reader struct {
io.Reader
s []byte // buffered io.Reader data
mux sync.Mutex // could maybe be replaced by an RWMutex
}
// View returns a new io.Reader that behaves like a copy of the original io.Reader
func (r *Reader) View() io.Reader {
var i int64 // current reading index
return readFunc(func(p []byte) (int, error) {
r.mux.Lock()
defer r.mux.Unlock()
// Declare the returned error here. It is only assigned by calls to
// r.Reader.Read (`if` block below). That way callers see the io.EOF
// error only when they reach the limit of r.s
var err error
// If the client has asked for more data than is available, we need to
// grow the buffer.
if i+int64(len(p)) > int64(len(r.s)) {
cp := make([]byte, len(p))
var n int // don't shadow err
n, err = r.Reader.Read(cp)
r.s = append(r.s, cp[:n]...)
}
n := copy(p, r.s[i:])
i += int64(n)
return n, err
})
}
// Original returns the "original" io.Reader without any thread-safety working
// behind the scenes.
func (r *Reader) Original() io.Reader {
return io.MultiReader(bytes.NewReader(r.s), r.Reader)
}
// ReadFunc follows the design of http.HandlerFunc, allowing us to create io.Reader
// functions that can exploit closured variables
type readFunc func(p []byte) (n int, err error)
func (rf readFunc) Read(p []byte) (n int, err error) { return rf(p) }
package fan
import (
"bytes"
"sync"
"testing"
"github.com/drhodes/golorem"
)
// We will be reading some random lorem ipsum text.
var data = []byte(lorem.Paragraph(500, 600))
func TestReader(t *testing.T) {
r := bytes.NewReader(data)
fr := Reader{r, // fan.Reader
// Because this is an internal test we need to completely intialise
// the struct. But note the API would look like `fan.Reader{r}`
[]byte(nil), *new(sync.Mutex),
}
wg := new(sync.WaitGroup)
// Start 10x goroutines
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
buf := new(bytes.Buffer)
n, err := buf.ReadFrom(fr.View())
// Check the error
if err != nil {
t.Errorf("Goroutine %d err: %v", i, err)
return
}
// Validate the result
if int(n) != len(data) {
t.Errorf("Goroutine %d err: expected %d bytes, got %d", i, len(data), n)
return
}
t.Logf("Goroutine %d ok: got %d bytes\n", i, len(buf.Bytes()))
}(i)
}
wg.Wait()
}
@ttlins
Copy link

ttlins commented Apr 15, 2019

Helped me a lot, thanks man. I was trying to make concurrent put requests and didn't realize the buffer wasn't thread safe.
Just a question, thought (might be really stupid). How do you initialize Reader, considering s and mux are unexported?
I just wrote a simple constructor using the code from your test file. That's intended? I imagine you don't use this as an external package.
Cheers

@robinbraemer
Copy link

robinbraemer commented Jul 31, 2020

Answering titolins question for anyone who might come across:
The the mutex and byte slice in the Reader structure is already initialized by it's zero value when creating a new Reader{}.

@neilotoole
Copy link

@jmackie: I just came across this gist. I had need of something similar a while back: streamcache has the same goal as your Reader, but with a bunch of bells & whistles, such as discarding the cache when it's no longer needed.

It's nice to know somebody else was dealing with the same problem 🤓

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment