Skip to content

Instantly share code, notes, and snippets.

@Gsantomaggio
Created July 16, 2021 08:46
Show Gist options
  • Save Gsantomaggio/cceafd49af8662f78e9be216c920de60 to your computer and use it in GitHub Desktop.
Save Gsantomaggio/cceafd49af8662f78e9be216c920de60 to your computer and use it in GitHub Desktop.
consumer.go
package main
import (
"fmt"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
"os"
"time"
)
func CheckErr(err error) {
if err != nil {
fmt.Printf("%s ", err)
os.Exit(1)
}
}
func consumerClose(channelClose stream.ChannelClose) {
go func() {
event := <-channelClose
fmt.Printf("Consumer: %s closed on the stream: %s, reason: %s \n", event.Name, event.StreamName, event.Reason)
}()
}
func main() {
env, err := stream.NewEnvironment(nil)
CheckErr(err)
streamName := "consumer_test"
fmt.Printf("Define stream: %s\n", streamName)
err = env.DeclareStream(streamName,
&stream.StreamOptions{
MaxLengthBytes: stream.ByteCapacity{}.GB(2),
},
)
if err != stream.StreamAlreadyExists {
CheckErr(err)
}
handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
}
fmt.Printf("Define consumer on stream: %s\n", streamName)
consumer, err := env.NewConsumer(
streamName,
handleMessages,
stream.NewConsumerOptions().
SetConsumerName("my_consumer"). // set a consumer name
SetOffset(stream.OffsetSpecification{}.First())) // start consuming from the beginning
CheckErr(err)
channelClose := consumer.NotifyClose()
consumerClose(channelClose)
fmt.Printf("Closing consumer in 5 seconds\n")
time.Sleep(5 * time.Second)
err = consumer.Close()
CheckErr(err)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment