Skip to content

Instantly share code, notes, and snippets.

@wunki
Created August 26, 2014 14:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wunki/671073ec8eadba77f055 to your computer and use it in GitHub Desktop.
Save wunki/671073ec8eadba77f055 to your computer and use it in GitHub Desktop.
package main
import (
"database/sql"
"encoding/json"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"github.com/Shopify/sarama"
_ "github.com/lib/pq"
"github.com/kelseyhightower/envconfig"
)
type KikazaruConfig struct {
KafkaUri string "URI to connect to RabbitMQ"
PgUri string "URI to connect to PostgreSQL"
}
type Event struct {
id string
}
func newConfig() (*KikazaruConfig, error) {
config := &KikazaruConfig{
PgUri: "postgres://wunki@localhost/gibbon-test?sslmode=disable",
}
if err := envconfig.Process("kikazaru", config); err != nil {
return config, err
}
return config, nil
}
func main() {
config, err := newConfig()
if err != nil {
log.Fatalf("Couldn't setup configuration: %s", err)
}
// Connect to Kafka
client, err := sarama.NewClient("kikazaru", []string{"localhost:9092"}, nil)
if err != nil {
panic(err)
} else {
fmt.Println("Connected to Kafka...")
}
defer client.Close()
// Consume
consumerConfig := sarama.NewConsumerConfig()
// Manually set the offset?
// consumerConfig.OffsetValue = 702
consumer, err := sarama.NewConsumer(client, "django.drop.deleted", 0, "kikazaru", consumerConfig)
if err != nil {
panic(err)
} else {
fmt.Println("Kafka consumer ready...")
}
defer consumer.Close()
db, err := sql.Open("postgres", config.PgUri)
defer db.Close()
if err != nil {
log.Fatalf("Couldn't connect to PostgreSQL: %s", err)
}
go Handler(consumer.Events(), db)
// Catch interrupts
interrupt := make(chan os.Signal, 1)
defer close(interrupt)
signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM)
// Close
if nil != <-interrupt {
log.Println("Caught interrupt... Exiting!")
}
}
func Handler(reads <-chan *sarama.ConsumerEvent, db *sql.DB) {
for d := range reads {
msg := &Event{}
if err := json.Unmarshal(d.Value, &msg); err != nil {
log.Printf("Couldn't decode received message: %q", d.Value)
break
} else {
log.Printf("Received event: %q with %d", d.Value, d.Offset)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment