Skip to content

Instantly share code, notes, and snippets.

@voronaam
Last active December 3, 2019 00:31
Show Gist options
  • Save voronaam/1e984a2fef66b49d8dcc75872427455c to your computer and use it in GitHub Desktop.
Save voronaam/1e984a2fef66b49d8dcc75872427455c to your computer and use it in GitHub Desktop.
Benthos background processor
package processor
import (
"fmt"
"time"
"github.com/Jeffail/benthos/v3/lib/log"
"github.com/Jeffail/benthos/v3/lib/metrics"
"github.com/Jeffail/benthos/v3/lib/processor"
"github.com/Jeffail/benthos/v3/lib/types"
)
func init() {
processor.RegisterPlugin(
"background",
func() interface{} {
b := BackgroundConfig{}
return &b
},
func(
iconf interface{},
mgr types.Manager,
logger log.Modular,
stats metrics.Type,
) (types.Processor, error) {
mConf, ok := iconf.(*BackgroundConfig)
if !ok {
panic(fmt.Errorf("failed to cast config: %T", iconf))
}
return NewBackground(mConf, mgr, logger, stats)
},
)
processor.DocumentPlugin(
"background",
`Executes sub processors in background ignoring their results`,
nil,
)
}
//------------------------------------------------------------------------------
// BackgroundConfig is a config struct containing fields for the Background
// processor.
type BackgroundConfig struct {
Processors []processor.Config `json:"processors" yaml:"processors"`
}
//------------------------------------------------------------------------------
//Background allows children to run in background
type Background struct {
children []types.Processor
log log.Modular
mCount metrics.StatCounter
mErr metrics.StatCounter
mSent metrics.StatCounter
mBatchSent metrics.StatCounter
}
// NewBackground returns a Background processor.
func NewBackground(
conf *BackgroundConfig, mgr types.Manager, log log.Modular, stats metrics.Type,
) (processor.Type, error) {
var children []types.Processor
for i, pconf := range conf.Processors {
prefix := fmt.Sprintf("%v", i)
proc, err := processor.New(pconf, mgr, log.NewModule("."+prefix), metrics.Namespaced(stats, prefix))
if err != nil {
return nil, err
}
children = append(children, proc)
}
return &Background{
children: children,
log: log,
mCount: stats.GetCounter("count"),
mErr: stats.GetCounter("error"),
mSent: stats.GetCounter("sent"),
mBatchSent: stats.GetCounter("batch.sent"),
}, nil
}
//------------------------------------------------------------------------------
// ProcessMessage applies the processor to a message
func (p *Background) ProcessMessage(msg types.Message) ([]types.Message, types.Response) {
msgs := [1]types.Message{msg}
go func() {
for _, msg := range msgs {
_, res := processor.ExecuteAll(p.children, msg)
if res != nil && res.Error() != nil {
// TODO: Log something
// TODO: Metrics
panic(res.Error())
}
}
}()
return msgs[:], nil
}
// CloseAsync shuts down the processor and stops processing requests.
func (p *Background) CloseAsync() {
for _, c := range p.children {
c.CloseAsync()
}
}
// WaitForClose blocks until the processor has closed down.
func (p *Background) WaitForClose(timeout time.Duration) error {
stopBy := time.Now().Add(timeout)
for _, c := range p.children {
if err := c.WaitForClose(time.Until(stopBy)); err != nil {
return err
}
}
return nil
}
//------------------------------------------------------------------------------
---
# Sample YAML, which would timeout on a 30s sleep without background processor
input:
http_server:
timeout: 2s
pipeline:
processors:
- type: background
plugin:
processors:
- log:
message: "Sleeping"
- sleep:
duration: 30s
- log:
message: "Waking"
- log:
message: "Inside!!! ${!content}"
- log:
message: "Outside ${!content}"
output:
sync_response: {}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment