Skip to content

Instantly share code, notes, and snippets.

@gillesdemey
Created June 12, 2016 17:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gillesdemey/0f8dffefc6817ffcc006bf46951b090e to your computer and use it in GitHub Desktop.
Save gillesdemey/0f8dffefc6817ffcc006bf46951b090e to your computer and use it in GitHub Desktop.
HTTP to Kafka bridge
package main
import (
"fmt"
"log"
"net/http"
"github.com/Shopify/sarama"
"goji.io"
"goji.io/pat"
"golang.org/x/net/context"
)
type server struct {
DataCollector sarama.SyncProducer
Mux *goji.Mux
}
// Close our server, closing the Kafka data collector
func (s *server) Close() error {
log.Println("Shutting down gracefully...")
if err := s.DataCollector.Close(); err != nil {
log.Println("Failed to shut down data collector cleanly", err)
}
return nil
}
func (s *server) postMessageToResource(ctx context.Context, w http.ResponseWriter, r *http.Request) {
resourceID := pat.Param(ctx, "resourceID")
log.Println("Forwarding message to Kafka...")
// We are not setting a message key, which means that all messages will
// be distributed randomly over the different partitions.
partition, offset, err := s.DataCollector.SendMessage(&sarama.ProducerMessage{
Topic: "messages",
Value: sarama.StringEncoder(resourceID),
})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "Failed to store your data:, %s", err)
log.Println("Failed to forward to Kafka")
} else {
fmt.Fprintf(w, "Your data is stored with unique identifier message/%d/%d", partition, offset)
log.Println("Successfully forwarded to Kafka")
}
}
func newDataCollector(brokerList []string) sarama.SyncProducer {
config := sarama.NewConfig()
// Wait for all in-sync replicas to ack the message
config.Producer.RequiredAcks = sarama.WaitForAll
// Retry up to 3 times to produce the message
config.Producer.Retry.Max = 10
producer, err := sarama.NewSyncProducer(brokerList, config)
if err != nil {
log.Fatalln("Failed to create new SyncProducer", err)
}
return producer
}
// Create HTTP handler(s)
func newHTTPMuxer(s *server) *goji.Mux {
mux := goji.NewMux() // create router
mux.HandleFuncC(pat.Post("/resources/:resourceID"), s.postMessageToResource)
return mux
}
func main() {
brokers := []string{"localhost:9092"}
server := &server{
DataCollector: newDataCollector(brokers), // create Kafka data collector
}
http.ListenAndServe("0.0.0.0:8000", newHTTPMuxer(server))
log.Println("HTTP Server listening on http://0.0.0.0:8000")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment