Last active
October 7, 2024 19:17
-
-
Save Zatte/2144765c26ddab4a9c596557cf24a92c to your computer and use it in GitHub Desktop.
A GoLang fanout reader / Multi reader to have multiple consumers process the same io.Reader without buffering
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// ReaderFanOut allows multiple consumers of one io.Reader. | |
// all consumers are run concurrently (in go threads) but will process | |
// the reading sequentially; this means that if a consumer stops consuming | |
// the reader it will block other consumers. | |
// If a consumer returns the it's reader is automatically drained to avoid | |
// stalling other consumers. If any consumer returns a non nil error the | |
// context for all other consumers is cancelled and the first error is returned. | |
func ReaderFanOut( | |
ctx context.Context, | |
source io.Reader, | |
secondaryConsumers ...func(ctx context.Context, reader io.Reader) error, | |
) error { | |
g, ctx := errgroup.WithContext(ctx) | |
writers := make([]io.Writer, len(secondaryConsumers)) | |
pipeWriters := make([]*io.PipeWriter, len(secondaryConsumers)) | |
for idx, consumer := range secondaryConsumers { | |
r, w := io.Pipe() | |
writers[idx] = w | |
pipeWriters[idx] = w | |
scopedConsumer := consumer | |
g.Go(func() error { | |
// Ensure to cleanup and not block other readers | |
defer io.Copy(io.Discard, r) //nolint:errcheck | |
err := scopedConsumer(ctx, r) | |
if err != nil { | |
return fmt.Errorf("ReaderFanOut callback error: %w", err) | |
} | |
return nil | |
}) | |
} | |
g.Go(func() error { | |
_, err := io.Copy(io.MultiWriter(writers...), source) | |
for _, writer := range pipeWriters { | |
// Maybe close with error? | |
if err2 := writer.Close(); err2 != nil && err == nil { | |
err = err2 // In leu of native multi-error pkg we only keep the first | |
} | |
} | |
if err != nil { | |
return fmt.Errorf("ReaderFanOut error writing to all consumers: %w", err) | |
} | |
return nil | |
}) | |
if err := g.Wait(); err != nil { | |
return err | |
} | |
return nil | |
} |
Yes, you're right, looking at the code actually in use it a bit updated; will update the example. Thanks!
@paulheg : in the updated example, what's your though on line 40; Close or CloseWithError()?
If the original io.Copy to multiwriters failed; we don't know if some succeeded or not. It might not make sense to call CloseWithError where everything was fine for a single conumer when we still capture the error of the whole operation.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi @Zatte
Im trying to replicate something like this myself, but cant seem to get it to work, so thank you for your inspiration :)
I think in line 33 you forgot to add the for loop back in to get the
writer
frompipeWriters
.Best regards
Paul