Skip to content

Instantly share code, notes, and snippets.

@qloog
Forked from ms2008/producer.go
Created April 9, 2021 11:00
Show Gist options
  • Save qloog/a5ad99a38be575ffeaa9a6a239419fd3 to your computer and use it in GitHub Desktop.
Save qloog/a5ad99a38be575ffeaa9a6a239419fd3 to your computer and use it in GitHub Desktop.
rabbitmq producer
package rabbitmq
import (
"log"
"time"
"github.com/pborman/uuid"
"github.com/streadway/amqp"
)
type Producer struct {
conn *amqp.Connection
channel *amqp.Channel
connNotify chan *amqp.Error
channelNotify chan *amqp.Error
quit chan struct{}
addr string
exchange string
routingKey string
}
func NewProducer(addr, exchange string) *Producer {
p := &Producer{
addr: addr,
exchange: exchange,
routingKey: "",
quit: make(chan struct{}),
}
return p
}
func (p *Producer) Start() error {
if err := p.Run(); err != nil {
return err
}
go p.ReConnect()
return nil
}
func (p *Producer) Stop() {
close(p.quit)
if !p.conn.IsClosed() {
if err := p.conn.Close(); err != nil {
log.Println("rabbitmq producer - connection close failed: ", err)
}
}
}
func (p *Producer) Run() error {
var err error
if p.conn, err = amqp.Dial(p.addr); err != nil {
return err
}
if p.channel, err = p.conn.Channel(); err != nil {
p.conn.Close()
return err
}
p.connNotify = p.conn.NotifyClose(make(chan *amqp.Error))
p.channelNotify = p.channel.NotifyClose(make(chan *amqp.Error))
return err
}
func (p *Producer) ReConnect() {
for {
select {
case err := <-p.connNotify:
if err != nil {
log.Println("rabbitmq producer - connection NotifyClose: ", err)
}
case err := <-p.channelNotify:
if err != nil {
log.Println("rabbitmq producer - channel NotifyClose: ", err)
}
case <-p.quit:
return
}
// backstop
if !p.conn.IsClosed() {
if err := p.conn.Close(); err != nil {
log.Println("rabbitmq producer - connection close failed: ", err)
}
}
// IMPORTANT: 必须清空 Notify,否则死连接不会释放
for err := range p.channelNotify {
log.Println(err)
}
for err := range p.connNotify {
log.Println(err)
}
quit:
for {
select {
case <-p.quit:
return
default:
log.Println("rabbitmq producer - reconnect")
if err := p.Run(); err != nil {
log.Println("rabbitmq producer - failCheck: ", err)
// sleep 5s reconnect
time.Sleep(time.Second * 5)
continue
}
break quit
}
}
}
}
func (p *Producer) Publish(msg []byte) error {
return p.channel.Publish(
p.exchange, // exchange
p.routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
MessageId: uuid.New(),
Type: "",
Body: msg,
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment