Skip to content

Instantly share code, notes, and snippets.

@nenodias
Last active March 29, 2023 00:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nenodias/d0482ed7c032c70c4e30e67500d555c3 to your computer and use it in GitHub Desktop.
Save nenodias/d0482ed7c032c70c4e30e67500d555c3 to your computer and use it in GitHub Desktop.
Go Kafka Avro Producer
package main
import (
"encoding/binary"
"encoding/json"
"fmt"
"log"
"net/http"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/linkedin/goavro/v2"
)
type Schema struct {
Subject string `json:"subject"`
Version uint32 `json:"version"`
ID uint32 `json:"id"`
Schema string `json:"schema"`
}
func GetSchema(schemaRegistryURL string, subject string) (*Schema, error) {
url := fmt.Sprintf("%s/subjects/%s/versions/latest", schemaRegistryURL, subject)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/vnd.schemaregistry.v1+json")
res, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
defer res.Body.Close()
response := Schema{}
err = json.NewDecoder(res.Body).Decode(&response)
if err != nil {
return nil, err
}
return &response, nil
}
func main() {
brokers := "localhost:9092"
schemaRegistryURL := "http://localhost:8081"
topic := "create.user"
key := "teste"
value := `{
"name":"Gopher",
"age":2513
}`
p, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": brokers,
})
if err != nil {
log.Fatal(err)
}
schemaRegistrySubject := topic + "-value"
schemaValue, err := GetSchema(schemaRegistryURL, schemaRegistrySubject)
if err != nil {
log.Fatal(err)
}
//create an Avro codec with the schema
codec, err := goavro.NewCodec(schemaValue.Schema)
if err != nil {
log.Fatal(err)
}
//delivery report handler for produced messages
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
} else {
fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
}
}
}
}()
//encode message in Avro
valueData, _, err := codec.NativeFromTextual([]byte(value))
if err != nil {
log.Fatal(err)
}
fmt.Printf("Sending:[%s][%s][%v]\n", topic, key, valueData)
binaryData, err := codec.BinaryFromNative(nil, valueData)
if err != nil {
log.Fatal(err)
}
// Construct the message bytes with magic byte and schema ID
magicByte := byte(0x0)
messageBytes := make([]byte, 5+len(binaryData))
messageBytes[0] = magicByte
schemaID := uint32(schemaValue.ID)
binary.BigEndian.PutUint32(messageBytes[1:], schemaID)
messageBytes = append(messageBytes[0:5], binaryData...)
fmt.Printf("Sending:[%s]\n", string(messageBytes))
//produce message
err = p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: messageBytes,
Key: []byte(key),
}, nil)
if err != nil {
log.Fatal(err)
}
//wait for messages to be delivered
unflushed := p.Flush(30 * 1000)
fmt.Printf("\nUnflushed %d", unflushed)
}
{
"info": {
"_postman_id": "69fd76f6-9327-4ba9-9359-2d8e0f5ba477",
"name": "SchemaRegistry",
"schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
},
"item": [
{
"name": "POST",
"event": [
{
"listen": "prerequest",
"script": {
"exec": [
"pm.globals.set(\"subject\",\"create.user\");"
],
"type": "text/javascript"
}
}
],
"request": {
"method": "POST",
"header": [
{
"key": "Accept",
"value": "application/vnd.schemaregistry.v1+json",
"type": "default"
}
],
"body": {
"mode": "raw",
"raw": "{\r\n \"schema\": \"{\\\"type\\\": \\\"record\\\",\\\"name\\\": \\\"user\\\",\\\"fields\\\":[{\\\"type\\\": \\\"string\\\",\\\"name\\\": \\\"name\\\"},{\\\"type\\\": \\\"int\\\",\\\"name\\\": \\\"age\\\"}]}\"\r\n}",
"options": {
"raw": {
"language": "json"
}
}
},
"url": {
"raw": "http://localhost:8081/subjects/{{subject}}-value/versions",
"protocol": "http",
"host": [
"localhost"
],
"port": "8081",
"path": [
"subjects",
"{{subject}}-value",
"versions"
]
}
},
"response": []
},
{
"name": "Get Schema Version",
"event": [
{
"listen": "prerequest",
"script": {
"exec": [
"pm.globals.set(\"subject\",\"create.user\");"
],
"type": "text/javascript"
}
}
],
"request": {
"method": "GET",
"header": [
{
"key": "Accept",
"value": "application/vnd.schemaregistry.v1+json",
"type": "default"
}
],
"url": {
"raw": "http://localhost:8081/subjects/{{subject}}-value/versions",
"protocol": "http",
"host": [
"localhost"
],
"port": "8081",
"path": [
"subjects",
"{{subject}}-value",
"versions"
]
}
},
"response": []
},
{
"name": "Get Latest Schema",
"event": [
{
"listen": "prerequest",
"script": {
"exec": [
"pm.globals.set(\"subject\",\"create.user\");"
],
"type": "text/javascript"
}
}
],
"request": {
"method": "GET",
"header": [
{
"key": "Accept",
"value": "application/vnd.schemaregistry.v1+json",
"type": "default"
}
],
"url": {
"raw": "http://localhost:8081/subjects/{{subject}}-value/versions/latest",
"protocol": "http",
"host": [
"localhost"
],
"port": "8081",
"path": [
"subjects",
"{{subject}}-value",
"versions",
"latest"
]
}
},
"response": []
},
{
"name": "Get Latest Schema Copy",
"event": [
{
"listen": "prerequest",
"script": {
"exec": [
"pm.globals.set(\"subject\",\"create.user\");\r",
"pm.globals.set(\"version\",\"1\");"
],
"type": "text/javascript"
}
}
],
"request": {
"method": "GET",
"header": [
{
"key": "Accept",
"value": "application/vnd.schemaregistry.v1+json",
"type": "default"
}
],
"url": {
"raw": "http://localhost:8081/subjects/{{subject}}-value/versions/{{version}}",
"protocol": "http",
"host": [
"localhost"
],
"port": "8081",
"path": [
"subjects",
"{{subject}}-value",
"versions",
"{{version}}"
]
}
},
"response": []
}
]
}
{
"type": "record",
"name": "user",
"fields": [
{
"type": "string",
"name": "name"
},
{
"type": "int",
"name": "age"
}
]
}
@nenodias
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment