Created
August 26, 2014 14:04
-
-
Save wunki/671073ec8eadba77f055 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"database/sql" | |
"encoding/json" | |
"fmt" | |
"log" | |
"os" | |
"os/signal" | |
"syscall" | |
"github.com/Shopify/sarama" | |
_ "github.com/lib/pq" | |
"github.com/kelseyhightower/envconfig" | |
) | |
type KikazaruConfig struct { | |
KafkaUri string "URI to connect to RabbitMQ" | |
PgUri string "URI to connect to PostgreSQL" | |
} | |
type Event struct { | |
id string | |
} | |
func newConfig() (*KikazaruConfig, error) { | |
config := &KikazaruConfig{ | |
PgUri: "postgres://wunki@localhost/gibbon-test?sslmode=disable", | |
} | |
if err := envconfig.Process("kikazaru", config); err != nil { | |
return config, err | |
} | |
return config, nil | |
} | |
func main() { | |
config, err := newConfig() | |
if err != nil { | |
log.Fatalf("Couldn't setup configuration: %s", err) | |
} | |
// Connect to Kafka | |
client, err := sarama.NewClient("kikazaru", []string{"localhost:9092"}, nil) | |
if err != nil { | |
panic(err) | |
} else { | |
fmt.Println("Connected to Kafka...") | |
} | |
defer client.Close() | |
// Consume | |
consumerConfig := sarama.NewConsumerConfig() | |
// Manually set the offset? | |
// consumerConfig.OffsetValue = 702 | |
consumer, err := sarama.NewConsumer(client, "django.drop.deleted", 0, "kikazaru", consumerConfig) | |
if err != nil { | |
panic(err) | |
} else { | |
fmt.Println("Kafka consumer ready...") | |
} | |
defer consumer.Close() | |
db, err := sql.Open("postgres", config.PgUri) | |
defer db.Close() | |
if err != nil { | |
log.Fatalf("Couldn't connect to PostgreSQL: %s", err) | |
} | |
go Handler(consumer.Events(), db) | |
// Catch interrupts | |
interrupt := make(chan os.Signal, 1) | |
defer close(interrupt) | |
signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM) | |
// Close | |
if nil != <-interrupt { | |
log.Println("Caught interrupt... Exiting!") | |
} | |
} | |
func Handler(reads <-chan *sarama.ConsumerEvent, db *sql.DB) { | |
for d := range reads { | |
msg := &Event{} | |
if err := json.Unmarshal(d.Value, &msg); err != nil { | |
log.Printf("Couldn't decode received message: %q", d.Value) | |
break | |
} else { | |
log.Printf("Received event: %q with %d", d.Value, d.Offset) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment