Skip to content

Instantly share code, notes, and snippets.

@AlexisLeon
Forked from spksoft/main.go
Created June 11, 2020 17:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save AlexisLeon/1a2e9ece26fca36d70a9d203cf604bec to your computer and use it in GitHub Desktop.
Save AlexisLeon/1a2e9ece26fca36d70a9d203cf604bec to your computer and use it in GitHub Desktop.
Example Golang worker with rabbitMQ graceful shutdown
package main
import (
"flag"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/streadway/amqp"
)
// newConsumer is a function create a rabbitMQ consumer
func newConsumer(connectionString, queueName, consumerName string, fetchCount int) (connection *amqp.Connection, channel *amqp.Channel, msgs <-chan amqp.Delivery) {
connection, _ = amqp.Dial(connectionString)
channel, _ = connection.Channel()
channel.Qos(fetchCount, 0, false)
msgs, _ = channel.Consume(
queueName, // queue
consumerName, // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
return
}
func worker(msgs <-chan amqp.Delivery, done chan bool) {
for m := range msgs {
body := string(m.Body)
log.Printf("Processing data %+v\n", body)
time.Sleep(5 * time.Second)
log.Printf("Processing data %+v done\n", body)
m.Ack(false)
log.Printf("Data %+v acked\n", body)
}
done <- true
}
func main() {
defer log.Println("Program stopped successful")
url := flag.String("url", "", "eg. amqp://root:toor@localhost:5672/")
queue := flag.String("queue", "", "eg. test-queue")
name := flag.String("name", "", "eg. consumer-1")
fetchSize := flag.Int("size", 1, "eg. 20")
flag.Parse()
log.Printf("Connecting to %s queue %s fetch-size %d\n", *url, *queue, *fetchSize)
connection, channel, msgs := newConsumer(*url, *queue, *name, *fetchSize)
log.Printf("Consumer %s is subscribing queue %s\n", *name, *queue)
defer connection.Close()
defer channel.Close()
defer log.Println("Closing qeueu channel and connection")
done := make(chan bool)
go worker(msgs, done)
exit := make(chan os.Signal, 1)
signal.Notify(exit, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
// Wait for OS exit signal
<-exit
log.Println("Got exit signal")
// Stop recieving message from queue
channel.Cancel(*name, false)
log.Println("Stopped receiving message from queue")
// Wait for worker procrss recieved message
log.Println("Wait for worker procrss recieved message")
<-done
log.Println("Woker done")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment