Skip to content

Instantly share code, notes, and snippets.

@disintegrator
Created August 4, 2023 11:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save disintegrator/fe8db110e978a70bb366077c9413b78b to your computer and use it in GitHub Desktop.
Save disintegrator/fe8db110e978a70bb366077c9413b78b to your computer and use it in GitHub Desktop.
package functional
import (
"context"
"errors"
"time"
"github.com/benthosdev/benthos/v4/public/service"
)
// collectInput reads all messages from an child input and pushes them out as a
// single batch. If the child input does not return service.ErrEndOfInput before
// timeout then this input will return an error.
type collectInput struct {
child *service.OwnedInput
timeout time.Duration
}
func (i *collectInput) ReadBatch(inCtx context.Context) (service.MessageBatch, service.AckFunc, error) {
ctx := inCtx
if i.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, i.timeout)
defer cancel()
}
var batch service.MessageBatch
var ackFuncs []service.AckFunc
loop:
for {
b, ack, err := i.child.ReadBatch(ctx)
switch {
case errors.Is(err, service.ErrEndOfInput):
break loop
case err != nil:
return nil, nil, err
default:
batch = append(batch, b...)
ackFuncs = append(ackFuncs, ack)
}
}
ack := func(ctx context.Context, err error) error {
errs := make([]error, len(ackFuncs))
for i, f := range ackFuncs {
errs[i] = f(ctx, err)
}
return errors.Join(errs...)
}
return batch, ack, nil
}
func (i *collectInput) Close(ctx context.Context) error {
return i.child.Close(ctx)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment