Last active
December 25, 2020 04:01
-
-
Save dhanush/251b6a990bb9e5dd13a30864fb145d9c to your computer and use it in GitHub Desktop.
Has the code for initialising an amqp connection with auto reconnect feature
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package comms | |
import ( | |
"errors" | |
"fmt" | |
"github.com/streadway/amqp" | |
) | |
//MessageBody is the struct for the body passed in the AMQP message. The type will be set on the Request header | |
type MessageBody struct { | |
Data []byte | |
Type string | |
} | |
//Message is the amqp request to publish | |
type Message struct { | |
Queue string | |
ReplyTo string | |
ContentType string | |
CorrelationID string | |
Priority uint8 | |
Body MessageBody | |
} | |
//Connection is the connection created | |
type Connection struct { | |
name string | |
conn *amqp.Connection | |
channel *amqp.Channel | |
exchange string | |
queues []string | |
err chan error | |
} | |
var ( | |
connectionPool = make(map[string]*Connection) | |
) | |
//NewConnection returns the new connection object | |
func NewConnection(name, exchange string, queues []string) *Connection { | |
if c, ok := connectionPool[name]; ok { | |
return c | |
} | |
c := &Connection{ | |
exchange: exchange, | |
queues: queues, | |
err: make(chan error), | |
} | |
connectionPool[name] = c | |
return c | |
} | |
//GetConnection returns the connection which was instantiated | |
func GetConnection(name string) *Connection { | |
return connectionPool[name] | |
} | |
func (c *Connection) Connect() error { | |
var err error | |
c.conn, err = amqp.Dial("amqp://guest:guest@localhost:5672/") | |
if err != nil { | |
return fmt.Errorf("Error in creating rabbitmq connection with %s : %s", amqpURI, err.Error()) | |
} | |
go func() { | |
<-c.conn.NotifyClose(make(chan *amqp.Error)) //Listen to NotifyClose | |
c.err <- errors.New("Connection Closed") | |
}() | |
c.channel, err = c.conn.Channel() | |
if err != nil { | |
return fmt.Errorf("Channel: %s", err) | |
} | |
if err := c.channel.ExchangeDeclare( | |
c.exchange, // name | |
"direct", // type | |
true, // durable | |
false, // auto-deleted | |
false, // internal | |
false, // noWait | |
nil, // arguments | |
); err != nil { | |
return fmt.Errorf("Error in Exchange Declare: %s", err) | |
} | |
return nil | |
} | |
func (c *Connection) BindQueue() error { | |
for _, q := range c.queues { | |
if _, err := c.channel.QueueDeclare(q, true, false, false, false, nil); err != nil { | |
return fmt.Errorf("error in declaring the queue %s", err) | |
} | |
if err := c.channel.QueueBind(q, "my_routing_key", c.exchange, false, nil); err != nil { | |
return fmt.Errorf("Queue Bind error: %s", err) | |
} | |
} | |
return nil | |
} | |
//Reconnect reconnects the connection | |
func (c *Connection) Reconnect() error { | |
if err := c.Connect(); err != nil { | |
return err | |
} | |
if err := c.BindQueue(); err != nil { | |
return err | |
} | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment