Skip to content

Instantly share code, notes, and snippets.

View Abdulsametileri's full-sized avatar
🎈
I did not find coding, coding found me

A.Samet İleri Abdulsametileri

🎈
I did not find coding, coding found me
View GitHub Profile
@Abdulsametileri
Abdulsametileri / batch-consuming.go
Created July 5, 2023 20:16
kafka-konsumer/batch-consuming.go
func main() {
consumerCfg := &kafka.ConsumerConfig{
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "standart-topic",
GroupID: "standart-cg",
},
RetryEnabled: true,
RetryConfiguration: kafka.RetryConfiguration{
Brokers: []string{"localhost:29092"},
@Abdulsametileri
Abdulsametileri / with-retry-manager.go
Created July 5, 2023 20:15
kafka-konsumer/with-retry-manager.go
func main() {
consumerCfg := &kafka.ConsumerConfig{
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "standart-topic",
GroupID: "standart-cg",
},
RetryEnabled: true,
RetryConfiguration: kafka.RetryConfiguration{
Topic: "retry-topic",
@Abdulsametileri
Abdulsametileri / without-retry-manager.go
Created July 5, 2023 20:10
kafka-konsumer/without-retry-manager.go
func main() {
consumerCfg := &kafka.ConsumerConfig{
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "standart-topic",
GroupID: "standart-cg",
},
ConsumeFn: consumeFn,
RetryEnabled: false,
}
func (s *IntegrationTestSuite) Test_Should_Consume_Successfully() {
// Given
cfg := &kafka.Config{
Brokers: s.wrapper.GetBrokerAddresses(),
Consumer: kafka.ConsumerConfig{
GroupID: "consumer-group-1",
Topic: "test-consume",
},
}
producer := kafka.NewProducer(cfg)
func retryFunc(hostPort int) func() error {
return func() error {
err := kafka.CheckHealth(hostPort)
if err != nil {
fmt.Printf("kafka connection not ready: %v \n", err)
}
return err
}
}
func (l *DockerTestWrapper) CleanUp() {
if err := l.pool.Purge(l.container); err != nil {
log.Printf("Could not purge container: %s\n", err)
}
}
func (l *DockerTestWrapper) GetBrokerAddresses() []string {
return []string{fmt.Sprintf("localhost:%d", l.hostPort)}
}
type DockerTestWrapper struct {
container *dockertest.Resource
pool *dockertest.Pool
hostPort int
}
func (l *DockerTestWrapper) RunContainer() error {
hostPort, err := getFreePort()
if err != nil {
return fmt.Errorf("could not get free hostPort: %w", err)
func (t *TestContainerWrapper) CleanUp() {
ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelFunc()
if err := t.container.Terminate(ctx); err != nil {
log.Printf("could not terminate container: %s\n", err)
}
}
func (t *TestContainerWrapper) GetBrokerAddresses() []string {
func (t *TestContainerWrapper) RunContainer() error {
req := testcontainers.ContainerRequest{
Image: fmt.Sprintf("%s:%s", RedpandaImage, RedpandaVersion),
ExposedPorts: []string{
"9092:9092/tcp",
},
Cmd: []string{"redpanda", "start"},
WaitingFor: wait.ForLog("Successfully started Redpanda!"),
AutoRemove: true,
}
type TestContainerWrapper struct {
container testcontainers.Container
hostPort int
}
type IntegrationTestSuite struct {
suite.Suite
wrapper TestContainerWrapper
}