Skip to content

Instantly share code, notes, and snippets.

@siredmar
Created March 9, 2021 11:56
Show Gist options
  • Save siredmar/bf60cae06114b2392976dac04619b4b7 to your computer and use it in GitHub Desktop.
Save siredmar/bf60cae06114b2392976dac04619b4b7 to your computer and use it in GitHub Desktop.
eventhub_subscribe.go
package main
import (
"context"
"fmt"
"os"
"os/signal"
"time"
eventhub "github.com/Azure/azure-event-hubs-go/v3"
)
func main() {
connStr := "Endpoint=sb://namespace1.servicebus.windows.net/;SharedAccessKeyName=authorizationRule1;SharedAccessKey=supersecret;EntityPath=eventhub1"
hub, err := eventhub.NewHubFromConnectionString(connStr)
if err != nil {
fmt.Println(err)
return
}
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
handler := func(c context.Context, event *eventhub.Event) error {
fmt.Println(event.Data)
// Todo: make avro decding here
return nil
}
// listen to each partition of the Event Hub
runtimeInfo, err := hub.GetRuntimeInformation(ctx)
if err != nil {
fmt.Println(err)
return
}
for _, partitionID := range runtimeInfo.PartitionIDs {
listenerHandle, err := hub.Receive(ctx, partitionID, handler, eventhub.ReceiveWithLatestOffset())
if err != nil {
fmt.Println(err)
return
}
listenerHandle.Done()
}
// Wait for a signal to quit:
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, os.Kill)
<-signalChan
err = hub.Close(context.Background())
if err != nil {
fmt.Println(err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment