Skip to content

Instantly share code, notes, and snippets.

@MSAdministrator
Created July 6, 2023 16:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MSAdministrator/8aea2de1c4c7446f4275940a32fad5c0 to your computer and use it in GitHub Desktop.
Save MSAdministrator/8aea2de1c4c7446f4275940a32fad5c0 to your computer and use it in GitHub Desktop.
Go ETL Code Review

Go ETL Code Review

The goal of this exercise is to have the candidate call out issues with a simulated pull request creating a new ETL application.

A good code review reference: https://www.morling.dev/blog/the-code-review-pyramid/

Functional Requirements

  • Continuously receive messages from an at-least-once delivery message queue
  • Apply a message transformation to each message
  • Append the transformed message to a database
  • Log any errors to stdout
  • Continue to run and retry on network connectivity or other errors

Code Quality

The goal of this exercise is to find issues that linters would not easily identify. Most linters will find no warnings or errors with this code.

Linters including go vet and default linters for golangci-lint find no issues with this code.

File and Package Layout

├── go.mod
└── main.go
// This is an application that pulls messages from a message queue, applies a transformation, and
// then stores the resulting data in a datastore.
package main
import (
"context"
"fmt"
"hash/fnv"
"strconv"
"time"
"go-etl-review/backend"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
defer cancel()
for {
mqClient := GetNewMQClient("my-queue")
dbClient := GetNewDBClient("my-db")
err := mqClient.Receive(
ctx, func(ctx context.Context, msg *backend.Message) {
fmt.Printf("Received message: %q\n", msg.Data)
// Apply transformation to the message data
transformedMsg, err := messageFormatter(msg)
if err != nil {
fmt.Printf("Error formatting message content: %v\n", err.Error())
msg.Reject()
return
}
// Create a row to insert into the database
row := &backend.Row{Content: string(transformedMsg.Data)}
// Use the database client to insert the row into the table
if err = dbClient.Insert(ctx, "my-table", row); err != nil {
println(fmt.Sprintf("Failed to insert data to database: %v", err))
}
// Acknowledge the message
msg.Acknowledge()
},
)
if err != nil {
panic("Failed to receive messages")
}
}
}
func GetNewMQClient(queue string) backend.MQClient {
ctx := context.Background()
// Initialize a new MQ client
mqClient, err := backend.NewMQClient(ctx, queue)
if err != nil {
fmt.Printf("Failed to create message queue client: %v", err)
return nil
}
return mqClient
}
func GetNewDBClient(table string) backend.DBClient {
ctx := context.Background()
// Initialize a new database client
dbClient, err := backend.NewDBClient(ctx, table)
if err != nil {
fmt.Printf("Failed to create database client: %v", err)
return nil
}
return dbClient
}
func messageFormatter(msg *backend.Message) (*backend.Message, error) {
h := fnv.New32a()
_, _ = h.Write(msg.Data)
hash := h.Sum32()
newData := backend.Message{
ID: strconv.Itoa(int(hash)),
}
newData.Data = []byte(string(msg.Data) + " - This is the transformed data")
return &newData, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment