Skip to content

Instantly share code, notes, and snippets.

@dhanush
Last active December 25, 2020 04:02
Show Gist options
  • Save dhanush/59749ad5d14a41777dd1ae0ae7009785 to your computer and use it in GitHub Desktop.
Save dhanush/59749ad5d14a41777dd1ae0ae7009785 to your computer and use it in GitHub Desktop.
Handles the consuming of messages from queues
package comms
//Consume consumes the messages from the queues and passes it as map of chan of amqp.Delivery
func (c *Connection) Consume() (map[string]<-chan amqp.Delivery, error) {
m := make(map[string]<-chan amqp.Delivery)
for _, q := range c.queues {
deliveries, err := c.channel.Consume(q, "", false, false, false, false, nil)
if err != nil {
return nil ,err
}
m[q] = deliveries
}
return m, nil
}
//HandleConsumedDeliveries handles the consumed deliveries from the queues. Should be called only for a consumer connection
func (c *Connection) HandleConsumedDeliveries(q string, delivery <-chan amqp.Delivery, fn func(Connection, string, <-chan amqp.Delivery)) {
for {
go fn(*c, q, delivery)
if err := <-c.err; err != nil {
c.Reconnect()
deliveries, err := c.Consume()
if err != nil {
panic(err) //raising panic if consume fails even after reconnecting
}
delivery = deliveries[q]
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment