Created
July 16, 2021 08:46
-
-
Save Gsantomaggio/cceafd49af8662f78e9be216c920de60 to your computer and use it in GitHub Desktop.
consumer.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" | |
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" | |
"os" | |
"time" | |
) | |
func CheckErr(err error) { | |
if err != nil { | |
fmt.Printf("%s ", err) | |
os.Exit(1) | |
} | |
} | |
func consumerClose(channelClose stream.ChannelClose) { | |
go func() { | |
event := <-channelClose | |
fmt.Printf("Consumer: %s closed on the stream: %s, reason: %s \n", event.Name, event.StreamName, event.Reason) | |
}() | |
} | |
func main() { | |
env, err := stream.NewEnvironment(nil) | |
CheckErr(err) | |
streamName := "consumer_test" | |
fmt.Printf("Define stream: %s\n", streamName) | |
err = env.DeclareStream(streamName, | |
&stream.StreamOptions{ | |
MaxLengthBytes: stream.ByteCapacity{}.GB(2), | |
}, | |
) | |
if err != stream.StreamAlreadyExists { | |
CheckErr(err) | |
} | |
handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) { | |
} | |
fmt.Printf("Define consumer on stream: %s\n", streamName) | |
consumer, err := env.NewConsumer( | |
streamName, | |
handleMessages, | |
stream.NewConsumerOptions(). | |
SetConsumerName("my_consumer"). // set a consumer name | |
SetOffset(stream.OffsetSpecification{}.First())) // start consuming from the beginning | |
CheckErr(err) | |
channelClose := consumer.NotifyClose() | |
consumerClose(channelClose) | |
fmt.Printf("Closing consumer in 5 seconds\n") | |
time.Sleep(5 * time.Second) | |
err = consumer.Close() | |
CheckErr(err) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment