Skip to content

Instantly share code, notes, and snippets.

@qbig
Last active August 18, 2018 18:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save qbig/fac7b880bc4097a80dc3605d1c2f9426 to your computer and use it in GitHub Desktop.
Save qbig/fac7b880bc4097a80dc3605d1c2f9426 to your computer and use it in GitHub Desktop.
Using Kafka with Sarama
        package main

        import (
            "log"

            sarama "gopkg.in/Shopify/sarama.v1"
        )

        func main() {
            consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
            if err != nil {
                panic(err)
            }
            defer consumer.Close()

            partitionConsumer, err :=  consumer.ConsumePartition("example", 0, sarama.OffsetNewest)
            if err != nil {
                panic(err)
            }
            defer partitionConsumer.Close()

            for {
                msg := <-partitionConsumer.Messages()
                log.Printf("Consumed message: \"%s\" at offset: %d\n", 
                msg.Value, msg.Offset)
            }
        }
        package main

        import (

           "fmt"
           "log"

            sarama "gopkg.in/Shopify/sarama.v1"
        )

        func sendMessage(producer sarama.SyncProducer, value string) {
            msg := &sarama.ProducerMessage{Topic: "example", Value: 
                                            sarama.StringEncoder(value)}

            partition, offset, err := producer.SendMessage(msg)
            if err != nil {
               log.Printf("FAILED to send message: %s\n", err)
                return
            }
            log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
        }

        func main() {
            producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, nil)
            if err != nil {
                panic(err)
            }
            defer producer.Close()

            for i := 0; i < 10; i++ {
                sendMessage(producer, fmt.Sprintf("Message %d", i))
            }
        }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment