Skip to content

Instantly share code, notes, and snippets.

@siredmar
Created March 9, 2021 12:16
Show Gist options
  • Save siredmar/20cf862b3191b13b3b3d8272d4d6ea7b to your computer and use it in GitHub Desktop.
Save siredmar/20cf862b3191b13b3b3d8272d4d6ea7b to your computer and use it in GitHub Desktop.
eventhub_and_avro.go
package main
import (
"bytes"
"context"
"fmt"
"log"
"os"
"os/signal"
"time"
eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/linkedin/goavro"
)
func main() {
connStr := "Endpoint=sb://namespace1.servicebus.windows.net/;SharedAccessKeyName=ar1;SharedAccessKey=supersecret;EntityPath=eventhub1"
hub, err := eventhub.NewHubFromConnectionString(connStr)
if err != nil {
fmt.Println(err)
return
}
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
handler := func(c context.Context, event *eventhub.Event) error {
fmt.Println(event.Data)
ocfr, err := goavro.NewOCFReader(bytes.NewReader(event.Data))
if err != nil {
fmt.Println(err)
os.Exit(1)
}
r := ocfr.Codec().Schema()
readCodec, err := goavro.NewCodec(r)
if err != nil {
log.Fatalln(err)
}
m := make(map[string]interface{})
for ocfr.Scan() {
datum, _ := ocfr.Read()
m = datum.(map[string]interface{})
}
// Output as direct access
fmt.Println("Reading...")
fmt.Println("device: " + m["device"].(string))
fmt.Println("acqTime: " + fmt.Sprintf("%d", (m["acqTime"].(int32))))
fmt.Println("lat: " + fmt.Sprintf("%f", (m["lat"].(float64))))
fmt.Println("lon: " + fmt.Sprintf("%f", (m["lon"].(float64))))
// Output as json
jbytes, err := readCodec.TextualFromNative(nil, m)
if err != nil {
log.Fatalln(err)
}
fmt.Println(string(jbytes))
return nil
}
// listen to each partition of the Event Hub
runtimeInfo, err := hub.GetRuntimeInformation(ctx)
if err != nil {
fmt.Println(err)
return
}
for _, partitionID := range runtimeInfo.PartitionIDs {
listenerHandle, err := hub.Receive(ctx, partitionID, handler, eventhub.ReceiveWithLatestOffset())
if err != nil {
fmt.Println(err)
return
}
listenerHandle.Done()
}
// Wait for a signal to quit:
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, os.Kill)
<-signalChan
err = hub.Close(context.Background())
if err != nil {
fmt.Println(err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment