Created
June 12, 2016 17:25
-
-
Save gillesdemey/0f8dffefc6817ffcc006bf46951b090e to your computer and use it in GitHub Desktop.
HTTP to Kafka bridge
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"log" | |
"net/http" | |
"github.com/Shopify/sarama" | |
"goji.io" | |
"goji.io/pat" | |
"golang.org/x/net/context" | |
) | |
type server struct { | |
DataCollector sarama.SyncProducer | |
Mux *goji.Mux | |
} | |
// Close our server, closing the Kafka data collector | |
func (s *server) Close() error { | |
log.Println("Shutting down gracefully...") | |
if err := s.DataCollector.Close(); err != nil { | |
log.Println("Failed to shut down data collector cleanly", err) | |
} | |
return nil | |
} | |
func (s *server) postMessageToResource(ctx context.Context, w http.ResponseWriter, r *http.Request) { | |
resourceID := pat.Param(ctx, "resourceID") | |
log.Println("Forwarding message to Kafka...") | |
// We are not setting a message key, which means that all messages will | |
// be distributed randomly over the different partitions. | |
partition, offset, err := s.DataCollector.SendMessage(&sarama.ProducerMessage{ | |
Topic: "messages", | |
Value: sarama.StringEncoder(resourceID), | |
}) | |
if err != nil { | |
w.WriteHeader(http.StatusInternalServerError) | |
fmt.Fprintf(w, "Failed to store your data:, %s", err) | |
log.Println("Failed to forward to Kafka") | |
} else { | |
fmt.Fprintf(w, "Your data is stored with unique identifier message/%d/%d", partition, offset) | |
log.Println("Successfully forwarded to Kafka") | |
} | |
} | |
func newDataCollector(brokerList []string) sarama.SyncProducer { | |
config := sarama.NewConfig() | |
// Wait for all in-sync replicas to ack the message | |
config.Producer.RequiredAcks = sarama.WaitForAll | |
// Retry up to 3 times to produce the message | |
config.Producer.Retry.Max = 10 | |
producer, err := sarama.NewSyncProducer(brokerList, config) | |
if err != nil { | |
log.Fatalln("Failed to create new SyncProducer", err) | |
} | |
return producer | |
} | |
// Create HTTP handler(s) | |
func newHTTPMuxer(s *server) *goji.Mux { | |
mux := goji.NewMux() // create router | |
mux.HandleFuncC(pat.Post("/resources/:resourceID"), s.postMessageToResource) | |
return mux | |
} | |
func main() { | |
brokers := []string{"localhost:9092"} | |
server := &server{ | |
DataCollector: newDataCollector(brokers), // create Kafka data collector | |
} | |
http.ListenAndServe("0.0.0.0:8000", newHTTPMuxer(server)) | |
log.Println("HTTP Server listening on http://0.0.0.0:8000") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment