Skip to content

Instantly share code, notes, and snippets.

@cosmo0920
Last active October 5, 2023 09:21
Show Gist options
  • Save cosmo0920/14a673eb25401576c8eda743366311e5 to your computer and use it in GitHub Desktop.
Save cosmo0920/14a673eb25401576c8eda743366311e5 to your computer and use it in GitHub Desktop.
Reproduce Go plugin for channel issue
module repro-plugin-go
go 1.17
require github.com/calyptia/plugin v1.0.4-0.20231004081153-1a497f3a46b9
require (
github.com/calyptia/cmetrics-go v0.1.7 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect
)
github.com/calyptia/cmetrics-go v0.1.7 h1:A4kEFuFqVuWzytIbbey9KivHi0GQVjOkE2JJkdRbQ2U=
github.com/calyptia/cmetrics-go v0.1.7/go.mod h1:K1IEPgICDtD4mJW7RVhfG4BkCywnjCdYZwbKs0jSw/U=
github.com/calyptia/plugin v1.0.4-0.20231004081153-1a497f3a46b9 h1:c/NoALioCrs9rLOqT8PkThs/lWqo12jU5m7g6H2NN6A=
github.com/calyptia/plugin v1.0.4-0.20231004081153-1a497f3a46b9/go.mod h1:V5LqfR3UJ7G+NMf80Hm3VUgSocDJQJ7OuXlgorCw++M=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU=
github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
package main
import (
"context"
"errors"
"time"
"github.com/calyptia/plugin"
"github.com/calyptia/plugin/metric"
)
func init() {
plugin.RegisterInput("repro", "reproduce issue", &greproPlugin{})
}
type greproPlugin struct {
counterSuccess metric.Counter
counterFailure metric.Counter
log plugin.Logger
}
func (plug *greproPlugin) Init(ctx context.Context, fbit *plugin.Fluentbit) error {
plug.counterSuccess = fbit.Metrics.NewCounter("operation_succeeded_total", "Total number of succeeded operations", "grepro")
plug.counterFailure = fbit.Metrics.NewCounter("operation_failed_total", "Total number of failed operations", "grepro")
plug.log = fbit.Logger
return nil
}
func (plug greproPlugin) Collect(ctx context.Context, ch chan<- plugin.Message) error {
tick := time.NewTicker(time.Second)
for {
select {
case <-ctx.Done():
err := ctx.Err()
if err != nil && !errors.Is(err, context.Canceled) {
plug.counterFailure.Add(1)
plug.log.Error("[grepro] operation failed")
return err
}
return nil
case <-tick.C:
plug.counterSuccess.Add(1)
plug.log.Debug("[grepro] operation succeeded")
ch <- plugin.Message{
Time: time.Now(),
Record: map[string]string{
"message": "dummy",
},
}
}
}
}
func main() {
}
all:
go build -buildmode=c-shared -buildvcs=false -o in_repro.so .
fast:
go build in_repro.go
clean:
rm -rf *.so *.h *~
panic: send on closed channel
goroutine 6 [running]:
main.greproPlugin.Collect({{0x7fbe4c65cea0, 0x1c00006fd70}, {0x7fbe4c65cea0, 0x1c00006fd90}, {0x7fbe4c65dea8, 0x1c0000144d8}}, {0x7fbe4c65dd90, 0x1c00007c190}, 0x0?)
/media/Data3/Gitrepo/go-plugin-repro/in_repro.go:49 +0xec
github.com/calyptia/plugin.prepareInputCollector.func1({0x7fbe4c65dd90, 0x1c00007c190})
/home/cosmo/go/1.20.4/pkg/mod/github.com/calyptia/plugin@v1.0.4-0.20231004081153-1a497f3a46b9/cshared.go:155 +0x75
created by github.com/calyptia/plugin.prepareInputCollector
/home/cosmo/go/1.20.4/pkg/mod/github.com/calyptia/plugin@v1.0.4-0.20231004081153-1a497f3a46b9/cshared.go:148 +0x16a
zsh: abort (core dumped) bin/fluent-bit -c fluent-bit.conf -Y -vv -H -P 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment