Skip to content

Instantly share code, notes, and snippets.

@binodluitel
Last active August 29, 2015 14:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save binodluitel/2d3c12791f101b653c3a to your computer and use it in GitHub Desktop.
Save binodluitel/2d3c12791f101b653c3a to your computer and use it in GitHub Desktop.
package main
import (
"github.com/streadway/amqp"
"log"
"os"
"os/signal"
"runtime"
"syscall"
"time"
)
type (
amqpConfig struct {
broker string
consumerID string
consumerExchange string
consumerExchangeType string
consumerExchangeDurable bool
consumerExchangeAutodelete bool
consumerQueue string
consumerQueueDurable bool
consumerQueueAutodelete bool
consumerQueueExclusive bool
consumerBindingKey string
}
)
var (
config amqpConfig
quit chan bool
maxConcurrency int
sigQuit chan os.Signal
amqpConsumer struct {
connection *amqp.Connection
channel *amqp.Channel
queue amqp.Queue
messages <-chan amqp.Delivery
cError chan *amqp.Error
running bool
}
)
func init() {
quit = make(chan bool, 1)
sigQuit = make(chan os.Signal, 1)
signal.Notify(sigQuit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL)
go func() {
<-sigQuit // Block until we receive a notification on the chan from signal handler
log.Println("received termination signal")
quit <- true
}()
config = amqpConfig{
broker: "amqp://guest:guest@rabbitmq.local:5672/",
consumerID: "blah",
consumerExchange: "amq.direct",
consumerExchangeType: "direct",
consumerExchangeDurable: true,
consumerExchangeAutodelete: false,
consumerQueue: "plop",
consumerQueueDurable: true,
consumerQueueAutodelete: false,
consumerQueueExclusive: false,
consumerBindingKey: "plop",
}
maxConcurrency = 10
amqpConsumer.running = false
}
func main() {
var err error
if amqpConsumer.connection, err = amqp.Dial(config.broker); err != nil {
log.Fatalf("consumer: error: unable to connect to broker: %s", err)
}
if amqpConsumer.channel, err = amqpConsumer.connection.Channel(); err != nil {
log.Fatalf("consumer: error: unable to open channel on broker: %s", err)
}
log.Printf("consumer: connected to broker")
if err = amqpConsumer.channel.ExchangeDeclare(
config.consumerExchange, // exchange name
config.consumerExchangeType, // exchange type
config.consumerExchangeDurable, // `durable` flag
config.consumerExchangeAutodelete, // `auto delete` flag
false, // `internal` flag
false, // `nowait` flag
nil, // arguments
); err != nil {
log.Fatalf("consumer: error: unable to declare exchange \"%s\": %s", config.consumerExchange, err)
}
log.Printf("consumer: declared exchange %s (%s)", config.consumerExchange, config.consumerExchangeType)
if amqpConsumer.queue, err = amqpConsumer.channel.QueueDeclare(
config.consumerQueue, // queue name
config.consumerQueueDurable, // `durable` flag
config.consumerQueueAutodelete, // `auto delete` flag
config.consumerQueueExclusive, // `exclusive` flag
false, // `nowait` flag
nil, // arguments
); err != nil {
log.Fatalf("consumer: error: unable to declare queue \"%s\": %s", config.consumerQueue, err)
}
if err = amqpConsumer.channel.QueueBind(
config.consumerQueue, // queue name
config.consumerBindingKey, // binding (routing) key
config.consumerExchange, // exchange to bind
false, // `nowait` flag
nil, // arguments
); err != nil {
log.Fatalf("consumer: error: unable to bind queue %s to exchange %s: %s",
config.consumerQueue,
config.consumerExchange,
err)
}
log.Printf("consumer: bound queue %q matching key %q to exchange %q",
amqpConsumer.queue.Name,
config.consumerBindingKey,
config.consumerExchange)
// if err := amqpStartConsumer(); err != nil {
// log.Fatalf("consumer: error: unable to consume messages from queue \"%s\": %s",
// config.consumerQueue, err)
// }
go func() {
internalGoroutines := runtime.NumGoroutine()
for {
if runtime.NumGoroutine()-internalGoroutines > maxConcurrency {
// Stop running consumer if reaching max concurrency
if amqpConsumer.running {
log.Println("consumer: reached maximum concurrency limit, stopping consumer")
if err := amqpStopConsumer(); err != nil {
log.Fatalf("consumer: error: unable to stop consuming messages from queue \"%s\": %s",
config.consumerQueue,
err)
}
}
} else {
// (Re)start stopped consumer if below max concurrency
if !amqpConsumer.running {
log.Println("consumer: starting consumer")
if err := amqpStartConsumer(); err != nil {
log.Fatalf("consumer: error: unable to consume messages from queue \"%s\": %s",
config.consumerQueue,
err)
}
}
}
time.Sleep(1 * time.Second)
}
}()
go func() {
for {
for message := range amqpConsumer.messages {
go func() {
log.Printf("received message: %s", message.Body)
}()
}
log.Printf("consumer channel closed")
time.Sleep(1 * time.Second)
}
}()
<-quit
if err = amqpStopConsumer(); err != nil {
log.Fatalf("consumer: error: unable to stop consuming messages from queue \"%s\": %s",
config.consumerQueue, err)
}
log.Printf("consumer: disconnecting from broker")
amqpConsumer.channel.Close()
amqpConsumer.connection.Close()
log.Printf("consumer: exiting")
}
func amqpStartConsumer() error {
var err error
if !amqpConsumer.running {
if amqpConsumer.messages, err = amqpConsumer.channel.Consume(
config.consumerQueue, // queue name
config.consumerID, // consumer identifier
true, // `noack` flag
config.consumerQueueExclusive, // `exclusive` flag
false, // `nolocal` flag
false, // `nowait` flag
nil, // arguments
); err != nil {
return err
}
amqpConsumer.running = true
log.Printf("consumer: start consuming from queue \"%s\"", config.consumerQueue)
}
return nil
}
func amqpStopConsumer() error {
if amqpConsumer.running {
amqpConsumer.running = false
log.Printf("consumer: stopped consuming from queue \"%s\"", config.consumerQueue)
return amqpConsumer.channel.Cancel(config.consumerID, false)
}
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment