Last active
August 29, 2015 14:21
-
-
Save binodluitel/2d3c12791f101b653c3a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"github.com/streadway/amqp" | |
"log" | |
"os" | |
"os/signal" | |
"runtime" | |
"syscall" | |
"time" | |
) | |
type ( | |
amqpConfig struct { | |
broker string | |
consumerID string | |
consumerExchange string | |
consumerExchangeType string | |
consumerExchangeDurable bool | |
consumerExchangeAutodelete bool | |
consumerQueue string | |
consumerQueueDurable bool | |
consumerQueueAutodelete bool | |
consumerQueueExclusive bool | |
consumerBindingKey string | |
} | |
) | |
var ( | |
config amqpConfig | |
quit chan bool | |
maxConcurrency int | |
sigQuit chan os.Signal | |
amqpConsumer struct { | |
connection *amqp.Connection | |
channel *amqp.Channel | |
queue amqp.Queue | |
messages <-chan amqp.Delivery | |
cError chan *amqp.Error | |
running bool | |
} | |
) | |
func init() { | |
quit = make(chan bool, 1) | |
sigQuit = make(chan os.Signal, 1) | |
signal.Notify(sigQuit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL) | |
go func() { | |
<-sigQuit // Block until we receive a notification on the chan from signal handler | |
log.Println("received termination signal") | |
quit <- true | |
}() | |
config = amqpConfig{ | |
broker: "amqp://guest:guest@rabbitmq.local:5672/", | |
consumerID: "blah", | |
consumerExchange: "amq.direct", | |
consumerExchangeType: "direct", | |
consumerExchangeDurable: true, | |
consumerExchangeAutodelete: false, | |
consumerQueue: "plop", | |
consumerQueueDurable: true, | |
consumerQueueAutodelete: false, | |
consumerQueueExclusive: false, | |
consumerBindingKey: "plop", | |
} | |
maxConcurrency = 10 | |
amqpConsumer.running = false | |
} | |
func main() { | |
var err error | |
if amqpConsumer.connection, err = amqp.Dial(config.broker); err != nil { | |
log.Fatalf("consumer: error: unable to connect to broker: %s", err) | |
} | |
if amqpConsumer.channel, err = amqpConsumer.connection.Channel(); err != nil { | |
log.Fatalf("consumer: error: unable to open channel on broker: %s", err) | |
} | |
log.Printf("consumer: connected to broker") | |
if err = amqpConsumer.channel.ExchangeDeclare( | |
config.consumerExchange, // exchange name | |
config.consumerExchangeType, // exchange type | |
config.consumerExchangeDurable, // `durable` flag | |
config.consumerExchangeAutodelete, // `auto delete` flag | |
false, // `internal` flag | |
false, // `nowait` flag | |
nil, // arguments | |
); err != nil { | |
log.Fatalf("consumer: error: unable to declare exchange \"%s\": %s", config.consumerExchange, err) | |
} | |
log.Printf("consumer: declared exchange %s (%s)", config.consumerExchange, config.consumerExchangeType) | |
if amqpConsumer.queue, err = amqpConsumer.channel.QueueDeclare( | |
config.consumerQueue, // queue name | |
config.consumerQueueDurable, // `durable` flag | |
config.consumerQueueAutodelete, // `auto delete` flag | |
config.consumerQueueExclusive, // `exclusive` flag | |
false, // `nowait` flag | |
nil, // arguments | |
); err != nil { | |
log.Fatalf("consumer: error: unable to declare queue \"%s\": %s", config.consumerQueue, err) | |
} | |
if err = amqpConsumer.channel.QueueBind( | |
config.consumerQueue, // queue name | |
config.consumerBindingKey, // binding (routing) key | |
config.consumerExchange, // exchange to bind | |
false, // `nowait` flag | |
nil, // arguments | |
); err != nil { | |
log.Fatalf("consumer: error: unable to bind queue %s to exchange %s: %s", | |
config.consumerQueue, | |
config.consumerExchange, | |
err) | |
} | |
log.Printf("consumer: bound queue %q matching key %q to exchange %q", | |
amqpConsumer.queue.Name, | |
config.consumerBindingKey, | |
config.consumerExchange) | |
// if err := amqpStartConsumer(); err != nil { | |
// log.Fatalf("consumer: error: unable to consume messages from queue \"%s\": %s", | |
// config.consumerQueue, err) | |
// } | |
go func() { | |
internalGoroutines := runtime.NumGoroutine() | |
for { | |
if runtime.NumGoroutine()-internalGoroutines > maxConcurrency { | |
// Stop running consumer if reaching max concurrency | |
if amqpConsumer.running { | |
log.Println("consumer: reached maximum concurrency limit, stopping consumer") | |
if err := amqpStopConsumer(); err != nil { | |
log.Fatalf("consumer: error: unable to stop consuming messages from queue \"%s\": %s", | |
config.consumerQueue, | |
err) | |
} | |
} | |
} else { | |
// (Re)start stopped consumer if below max concurrency | |
if !amqpConsumer.running { | |
log.Println("consumer: starting consumer") | |
if err := amqpStartConsumer(); err != nil { | |
log.Fatalf("consumer: error: unable to consume messages from queue \"%s\": %s", | |
config.consumerQueue, | |
err) | |
} | |
} | |
} | |
time.Sleep(1 * time.Second) | |
} | |
}() | |
go func() { | |
for { | |
for message := range amqpConsumer.messages { | |
go func() { | |
log.Printf("received message: %s", message.Body) | |
}() | |
} | |
log.Printf("consumer channel closed") | |
time.Sleep(1 * time.Second) | |
} | |
}() | |
<-quit | |
if err = amqpStopConsumer(); err != nil { | |
log.Fatalf("consumer: error: unable to stop consuming messages from queue \"%s\": %s", | |
config.consumerQueue, err) | |
} | |
log.Printf("consumer: disconnecting from broker") | |
amqpConsumer.channel.Close() | |
amqpConsumer.connection.Close() | |
log.Printf("consumer: exiting") | |
} | |
func amqpStartConsumer() error { | |
var err error | |
if !amqpConsumer.running { | |
if amqpConsumer.messages, err = amqpConsumer.channel.Consume( | |
config.consumerQueue, // queue name | |
config.consumerID, // consumer identifier | |
true, // `noack` flag | |
config.consumerQueueExclusive, // `exclusive` flag | |
false, // `nolocal` flag | |
false, // `nowait` flag | |
nil, // arguments | |
); err != nil { | |
return err | |
} | |
amqpConsumer.running = true | |
log.Printf("consumer: start consuming from queue \"%s\"", config.consumerQueue) | |
} | |
return nil | |
} | |
func amqpStopConsumer() error { | |
if amqpConsumer.running { | |
amqpConsumer.running = false | |
log.Printf("consumer: stopped consuming from queue \"%s\"", config.consumerQueue) | |
return amqpConsumer.channel.Cancel(config.consumerID, false) | |
} | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment