Skip to content

Instantly share code, notes, and snippets.

@ms2008
Created April 9, 2021 10:28
Show Gist options
  • Save ms2008/e291ffedbb094e59057689606b87f885 to your computer and use it in GitHub Desktop.
Save ms2008/e291ffedbb094e59057689606b87f885 to your computer and use it in GitHub Desktop.
rabbitmq producer
package rabbitmq
import (
"log"
"time"
"github.com/pborman/uuid"
"github.com/streadway/amqp"
)
type Producer struct {
conn *amqp.Connection
channel *amqp.Channel
connNotify chan *amqp.Error
channelNotify chan *amqp.Error
quit chan struct{}
addr string
exchange string
routingKey string
}
func NewProducer(addr, exchange string) *Producer {
p := &Producer{
addr: addr,
exchange: exchange,
routingKey: "",
quit: make(chan struct{}),
}
return p
}
func (p *Producer) Start() error {
if err := p.Run(); err != nil {
return err
}
go p.ReConnect()
return nil
}
func (p *Producer) Stop() {
close(p.quit)
if !p.conn.IsClosed() {
if err := p.conn.Close(); err != nil {
log.Println("rabbitmq producer - connection close failed: ", err)
}
}
}
func (p *Producer) Run() error {
var err error
if p.conn, err = amqp.Dial(p.addr); err != nil {
return err
}
if p.channel, err = p.conn.Channel(); err != nil {
p.conn.Close()
return err
}
p.connNotify = p.conn.NotifyClose(make(chan *amqp.Error))
p.channelNotify = p.channel.NotifyClose(make(chan *amqp.Error))
return err
}
func (p *Producer) ReConnect() {
for {
select {
case err := <-p.connNotify:
if err != nil {
log.Println("rabbitmq producer - connection NotifyClose: ", err)
}
case err := <-p.channelNotify:
if err != nil {
log.Println("rabbitmq producer - channel NotifyClose: ", err)
}
case <-p.quit:
return
}
// backstop
if !p.conn.IsClosed() {
if err := p.conn.Close(); err != nil {
log.Println("rabbitmq producer - connection close failed: ", err)
}
}
// IMPORTANT: 必须清空 Notify,否则死连接不会释放
for err := range p.channelNotify {
log.Println(err)
}
for err := range p.connNotify {
log.Println(err)
}
quit:
for {
select {
case <-p.quit:
return
default:
log.Println("rabbitmq producer - reconnect")
if err := p.Run(); err != nil {
log.Println("rabbitmq producer - failCheck: ", err)
// sleep 5s reconnect
time.Sleep(time.Second * 5)
continue
}
break quit
}
}
}
}
func (p *Producer) Publish(msg []byte) error {
return p.channel.Publish(
p.exchange, // exchange
p.routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
MessageId: uuid.New(),
Type: "",
Body: msg,
})
}
@weilan008
Copy link

Stop() may be not right
first close(p.quit)
when run conn.close() conn.NotifyClose may be block

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment