Skip to content

Instantly share code, notes, and snippets.

@ORBAT
Last active September 22, 2015 20:32
Show Gist options
  • Save ORBAT/fd5786828f1d6b2ae1e0 to your computer and use it in GitHub Desktop.
Save ORBAT/fd5786828f1d6b2ae1e0 to your computer and use it in GitHub Desktop.
HTTP -> Kafka server
package main
import (
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"strconv"
"time"
"gopkg.in/Shopify/sarama.v1"
"gopkg.in/alecthomas/kingpin.v2"
"gopkg.in/bahlo/goat.v1"
)
var (
okMsg = []byte("{\"ok\": true}")
logger *log.Logger
)
var (
listenAddr = kingpin.Flag("listen", "Address to listen on").Short('l').Default(":8080").String()
brokerAddrs = kingpin.Flag("brokers", "Kafka broker hostname list").Short('b').Default("localhost:9092").OverrideDefaultFromEnvar("KAFKA_PEERS").Strings()
topicList = kingpin.Flag("topic", "Topic to create route for. Can be given multiple times").Short('t').Required().Strings()
verbose = kingpin.Flag("verbose", "Output debug information to stderr").Short('v').Bool()
)
func writeError(w http.ResponseWriter, code int, err error) error {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(code)
return goat.WriteJSON(w, map[string]interface{}{"ok": false, "error": err.Error()})
}
func newSyncProducer(topic string, reqAcks sarama.RequiredAcks) goat.Handle {
config := sarama.NewConfig()
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.RequiredAcks = reqAcks
sp, err := sarama.NewSyncProducer(*brokerAddrs, config)
if err != nil {
panic(err)
}
return func(w http.ResponseWriter, r *http.Request, p goat.Params) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
writeError(w, 400, errors.New("Error reading request body"))
return
}
msg := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(strconv.Itoa(int(time.Now().Unix()))),
Value: sarama.ByteEncoder(body),
}
log.Printf("Sync send of %p with RequiredAcks %d", msg, reqAcks)
if p, o, err := sp.SendMessage(msg); err != nil {
writeError(w, 500, err)
} else {
log.Printf("Sent %p", msg)
goat.WriteJSON(w, map[string]interface{}{"ok": true, "partition": p, "offset": o})
}
}
}
func newAsyncProducer(topic string) goat.Handle {
config := sarama.NewConfig()
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.RequiredAcks = sarama.NoResponse
config.Producer.Return.Errors = false
ap, err := sarama.NewAsyncProducer(*brokerAddrs, config)
if err != nil {
panic(err)
}
return func(w http.ResponseWriter, r *http.Request, p goat.Params) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
writeError(w, 400, errors.New("Error reading request body"))
return
}
log.Printf("Async write of %d bytes to topic %s", len(body), topic)
ap.Input() <- &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(strconv.Itoa(int(time.Now().Unix()))),
Value: sarama.ByteEncoder(body),
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.Write(okMsg)
}
}
func main() {
kingpin.Parse()
var logOut io.Writer
if *verbose {
logOut = os.Stderr
} else {
logOut = ioutil.Discard
}
logger = log.New(logOut, "[http-kafka] ", log.LstdFlags)
r := goat.New()
v1 := r.Subrouter("/v1/")
replicaVals := []sarama.RequiredAcks{sarama.WaitForLocal, sarama.WaitForAll, sarama.NoResponse}
for _, topic := range *topicList {
tr := v1.Subrouter(topic)
log.Printf("Creating routes for topic %s", topic)
for _, raq := range replicaVals {
tr.Post(fmt.Sprintf("/send/sync/%d", raq), "send_sync", newSyncProducer(topic, raq))
}
tr.Post("/send/async", "send_async", newAsyncProducer(topic))
}
if err := r.Run(*listenAddr); err != nil {
panic(err)
}
}
@ORBAT
Copy link
Author

ORBAT commented Sep 22, 2015

  • Start Zookeeper & Kafka with default settings
  • go run http-kafka.go --topic topic1 --topic topic2 -v &
  • curl -X POST http://localhost:8080/v1/topic1/send/sync/1 -d "DOHOI"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment