Skip to content

Instantly share code, notes, and snippets.

@omerkaya1
Created February 1, 2021 07:23
Show Gist options
  • Save omerkaya1/549ff0130554bd8f29e7f08b255635e5 to your computer and use it in GitHub Desktop.
Save omerkaya1/549ff0130554bd8f29e7f08b255635e5 to your computer and use it in GitHub Desktop.
An interface and an implementation of the message queue service
package mq
import (
"context"
"time"
"github.com/pkg/errors"
"github.com/streadway/amqp"
)
type (
// Service is the general interface that all MQ service implementations must satisfy
// In case we ever decide to use another MQ server
Service interface {
Start(ctx context.Context, errChan chan error) error
Publish(body []byte, appID, queueName, msgType string) error
Consume() <-chan Message
}
// RabbitMQ is an object that is used to communicate with the RabbitMQ server
// TODO(o.kaya): according to the documentation, we should separate connections for publishing and consuming
// messages in order to avoid TCP push-backs. Let's keep that in mind until the issue becomes noticeable.
RabbitMQ struct {
conn *amqp.Connection
cfg config
channel *amqp.Channel
closure chan *amqp.Error
blocking chan amqp.Blocking
inputMessages chan amqp.Delivery
outputMessages chan Message
queue chan Message
isBlocked bool
}
// config is the interface used to configure the MQ service
config interface {
GetConnectionURI() string
GetServiceNames() []string
}
// Message is the structure that gets sent and received by the MQ service to and from the MQ server
// It abstracts the main info on the message in the way that is convenient for consumption by the MQ service
Message struct {
Body []byte
AppID string
Type string
QueueName string // May be redundant, if we end up using the AppID as a queue name
}
)
const (
maxBufferSize = 10 // Mainly used for buffered channels
maxRetry = 10 // The number of retries the service will perform to re-connect
retryTimeout = time.Second * maxRetry // To avoid false positives, the recommended retry interval is 5 seconds
)
// ErrFatalConnFailure is an error that signals the complete loss of connection between the MQ Service and the MQ server
var ErrFatalConnFailure = errors.New("fatal MQ connection failure")
// NewRabbitMQ returns a new instance of RabbitMQ to the caller
// It only initialises the connection to the MQ server and a channel of messages
func NewRabbitMQ(cfg config) *RabbitMQ {
return &RabbitMQ{
cfg: cfg,
inputMessages: make(chan amqp.Delivery),
outputMessages: make(chan Message, maxBufferSize),
queue: make(chan Message, maxBufferSize),
}
}
// Start initialises the connection to the MQ server and keeps it alive throughout the whole lifecycle of the programme
// It also re-establishes the connection if it receives the notification that the connection is closed
func (rmq *RabbitMQ) Start(ctx context.Context, errChan chan error) error {
// Deploy the main work cycle
go func(errChannel chan error) {
// Ensure that we release all resources when done, which in our case, may happen if:
// - we receive the context cancellation;
// - there was a critical connection failure.
defer func() {
close(rmq.inputMessages)
close(rmq.outputMessages)
close(rmq.queue)
var err error
// Close the channel
if rmq.channel != nil {
err = rmq.channel.Close()
if err != nil {
errChan <- errors.Wrap(err, "failed to close the MQ channel")
}
}
// Close the connection
if rmq.conn != nil {
err = rmq.conn.Close()
if err != nil {
errChan <- errors.Wrap(err, "failed to close the MQ connection")
}
}
}()
for {
select {
// Occurs once, when the programme receives interrupt from the top level
// We assume that the closing is intentional or enforced thus releasing the associated resources and exiting
case <-ctx.Done():
return
// The closure channel will be re-initialised every time the connection gets lost for some reason
case err := <-rmq.closure:
if err != nil {
errChannel <- errors.Wrap(err, "mq service: closure signal received")
}
// Block the message queue and try to reconnect
rmq.isBlocked = true
for i := 1; i <= maxRetry; i++ {
if err := rmq.connect(); err != nil {
errChannel <- errors.Wrapf(err, "mq service: %d connection attempt failed", i)
time.Sleep(retryTimeout)
continue
}
rmq.isBlocked = false
}
if rmq.isBlocked {
errChannel <- ErrFatalConnFailure
return
}
// This signal indicates that the connection is blocked; we cannot push messages to the queue and we need
// to pause the execution until the connection is ready to consume messages again
case block := <-rmq.blocking:
if block.Active {
errChannel <- errors.Errorf("mq service: received blocked notification: %s", block.Reason)
rmq.isBlocked = true
} else {
rmq.isBlocked = false
}
// Send a message to the MQ server
case msg := <-rmq.queue:
err := rmq.channel.Publish("", msg.QueueName, false, false, amqp.Publishing{
ContentType: "application/json",
Timestamp: time.Now(),
AppId: msg.AppID,
Body: msg.Body,
})
if err != nil {
errChannel <- errors.Wrap(err, "mq service: failed to deliver the message")
}
// Receive messages and push them to the output channel
case msg := <-rmq.inputMessages:
rmq.outputMessages <- Message{
Body: msg.Body,
AppID: msg.AppId,
Type: msg.Type,
}
// This case provides a non-blocking operation for the loop
default:
}
}
}(errChan)
return rmq.connect()
}
// Publish enqueues the message for sending to the MQ server
// It either performs a non-blocking send to the internal message queue or reports an error if either the connection is
// blocked or the queue is not ready to receive messages.
func (rmq RabbitMQ) Publish(body []byte, appID, qn, t string) error {
if rmq.isBlocked {
return errors.New("mq service: failed to push a message to the queue: connection is blocked")
}
select {
case rmq.queue <- Message{body, qn, appID, t}:
return nil
default:
return errors.New("mq service: failed to push a message to the queue: channel is not ready to receive messages")
}
}
// Consume returns a generalised channel of Message items to its caller
// All messages sent to SYR combined in a single message channel
func (rmq RabbitMQ) Consume() <-chan Message {
return rmq.outputMessages
}
// connect establishes a connection to the RabbitMQ server
func (rmq *RabbitMQ) connect() error {
var err error
rmq.conn, err = amqp.Dial(rmq.cfg.GetConnectionURI())
if err != nil {
return errors.Wrap(err, "mq service: failed to establish the connection")
}
// Initialise a general RabbitMQ server channel to fetch messages
rmq.channel, err = rmq.conn.Channel()
if err != nil {
return errors.Wrap(err, "mq service: failed to initialise the message channel")
}
return rmq.configure()
}
// configure should be called whenever the service is started/re-started
// It initialises notification channels, registers message queues and message channels
func (rmq *RabbitMQ) configure() error {
// Register the closure notification channel
rmq.closure = rmq.conn.NotifyClose(make(chan *amqp.Error))
// Register the blocking channel that will report the blocking conditions on the RabbitMQ server
rmq.blocking = rmq.conn.NotifyBlocked(make(chan amqp.Blocking))
// Define queues
if err := rmq.configureQueues(); err != nil {
return err
}
// Register consumers
return rmq.configureConsumers()
}
// configureQueues declares message queues that will be transmitted and consumed from
// This part is purely for configuration purposes and should not be changed without
func (rmq *RabbitMQ) configureQueues() error {
var queues = rmq.cfg.GetServiceNames()
var err error
for i := range queues {
if _, err = rmq.channel.QueueDeclare(queues[i], true, false, false, false, nil); err != nil {
return errors.Wrapf(err, "mq service: failed to register '%s' queue", queues[i])
}
}
return nil
}
// configureConsumers deploys a pool of workers (MQ consumers) and aggregates their output into a single channel
// The workers stop once the
func (rmq *RabbitMQ) configureConsumers() error {
var queues = rmq.cfg.GetServiceNames()
for i := range queues {
msgs, err := rmq.channel.Consume(queues[i], "", true, false, false, false, nil)
if err != nil {
return errors.Wrapf(err, "mq service: failed to register '%s' consumer", queues[i])
}
// Deploy a routine that will merge all its messages into one channel
go func(messages <-chan amqp.Delivery) {
for msg := range messages {
rmq.inputMessages <- msg
}
}(msgs)
}
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment