Skip to content

Instantly share code, notes, and snippets.

@shouichi
Created June 2, 2014 04:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shouichi/eb540705c7e8402cbd4d to your computer and use it in GitHub Desktop.
Save shouichi/eb540705c7e8402cbd4d to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
kafka "github.com/Shopify/sarama"
zmq "github.com/pebbe/zmq4"
"os"
"os/signal"
"time"
)
// Read messages from kafka and publish them to workers.
func main() {
client, err := kafka.NewClient(
"my_client",
[]string{"localhost:9092"},
nil,
)
if err != nil {
panic(err)
}
defer client.Close()
consumer, err := kafka.NewConsumer(
client,
"my_topic",
0,
"my_consumer_group",
kafka.NewConsumerConfig(),
)
if err != nil {
panic(err)
}
defer consumer.Close()
socket, err := zmq.NewSocket(zmq.PUB)
if err != nil {
panic(err)
}
defer socket.Close()
err = socket.Bind("tcp://*:3000")
if err != nil {
panic(err)
}
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
time.Sleep(1 * time.Second)
consumerLoop:
for {
select {
case event := <-consumer.Events():
if event.Err != nil {
panic(event.Err)
}
_, err := socket.SendBytes(event.Value, 0)
if err != nil {
fmt.Println(err)
}
case <-c:
break consumerLoop
}
}
}
package main
import (
zmq "github.com/pebbe/zmq4"
"fmt"
)
// Read messages from zmq and do some work.
func main() {
subscriber, err := zmq.NewSocket(zmq.SUB)
if err != nil {
panic(err)
}
defer subscriber.Close()
err = subscriber.Connect("tcp://localhost:3000")
if err != nil {
panic(err)
}
subscriber.SetSubscribe("")
for {
msg, err := subscriber.Recv(0)
if err != nil {
panic(err)
}
fmt.Println(msg)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment