Skip to content

Instantly share code, notes, and snippets.

@Zatte
Last active November 30, 2023 15:46
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Zatte/2144765c26ddab4a9c596557cf24a92c to your computer and use it in GitHub Desktop.
Save Zatte/2144765c26ddab4a9c596557cf24a92c to your computer and use it in GitHub Desktop.
A GoLang fanout reader / Multi reader to have multiple consumers process the same io.Reader without buffering
// ReaderFanOut allows multiple consumers of one io.Reader.
// all consumers are run concurrently (in go threads) but will process
// the reading sequentially; this means that if a consumer stops consuming
// the reader it will block other consumers.
// If a consumer returns the it's reader is automatically drained to avoid
// stalling other consumers. If any consumer returns a non nil error the
// context for all other consumers is cancelled and the first error is returned.
func ReaderFanOut(
ctx context.Context,
source io.Reader,
secondaryConsumers ...func(ctx context.Context, reader io.Reader) error,
) error {
g, ctx := errgroup.WithContext(ctx)
writers := make([]io.Writer, len(secondaryConsumers))
pipeWriters := make([]*io.PipeWriter, len(secondaryConsumers))
for idx, consumer := range secondaryConsumers {
r, w := io.Pipe()
writers[idx] = w
pipeWriters[idx] = w
scopedConsumer := consumer
g.Go(func() error {
// Ensure to cleanup and not block other readers
defer io.Copy(io.Discard, r) //nolint:errcheck
err := scopedConsumer(ctx, r)
if err != nil {
return fmt.Errorf("ReaderFanOut callback error: %w", err)
}
return nil
})
}
g.Go(func() error {
_, err := io.Copy(io.MultiWriter(writers...), source)
for _, writer := range pipeWriters {
// Maybe close with error?
if err2 := writer.Close(); err2 != nil && err == nil {
err = err2 // In leu of native multi-error pkg we only keep the first
}
}
if err != nil {
return fmt.Errorf("ReaderFanOut error writing to all consumers: %w", err)
}
return nil
})
if err := g.Wait(); err != nil {
return err
}
return nil
}
@paulheg
Copy link

paulheg commented Nov 30, 2023

Hi @Zatte
Im trying to replicate something like this myself, but cant seem to get it to work, so thank you for your inspiration :)

I think in line 33 you forgot to add the for loop back in to get the writer from pipeWriters.

Best regards
Paul

@Zatte
Copy link
Author

Zatte commented Nov 30, 2023

Yes, you're right, looking at the code actually in use it a bit updated; will update the example. Thanks!

@Zatte
Copy link
Author

Zatte commented Nov 30, 2023

@paulheg : in the updated example, what's your though on line 40; Close or CloseWithError()?

If the original io.Copy to multiwriters failed; we don't know if some succeeded or not. It might not make sense to call CloseWithError where everything was fine for a single conumer when we still capture the error of the whole operation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment