Skip to content

Instantly share code, notes, and snippets.

@melardev
Created January 3, 2022 12:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save melardev/1b9c7e1b1a4ac37cb31e57dc6cde99c7 to your computer and use it in GitHub Desktop.
Save melardev/1b9c7e1b1a4ac37cb31e57dc6cde99c7 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
"sync"
"time"
)
type IAbstractChannel interface {
initChannelInternal() error
}
type AbstractChannel struct {
running bool
Name string
channel *amqp.Channel
connection *amqp.Connection
errorChan chan *amqp.Error
channelLock sync.Mutex
impl IAbstractChannel
parent *ConnectionRabbitMq
listeningErrors bool
}
type ReadChannelConfig struct {
AckEvents bool
AckMultiple bool
RawEvents bool
AckErrorMessages bool
}
type InChannelSetupHandler interface {
InitInChannel(name string, channel *amqp.Channel) (eventQueue <-chan amqp.Delivery, queueName string, err error)
}
func NewBaseChannel(name string, connection *ConnectionRabbitMq, parent IAbstractChannel) *AbstractChannel {
r := &AbstractChannel{
Name: name,
running: false,
connection: connection.GetConnection(),
parent: connection,
impl: parent,
}
return r
}
func (r *AbstractChannel) run() {
if r.running {
return
}
r.running = true
if r.connection != nil && !r.connection.IsClosed() {
r.OnNewConnection(r.connection)
}
r.listenErrors()
}
func (r *AbstractChannel) OnNewConnection(connection *amqp.Connection) {
fmt.Print(fmt.Sprintf("[Channel %s] OnNewConnection Called\n", r.Name))
if !r.running {
fmt.Print(fmt.Sprintf("[%s] Not OnNewConnection::Running\n", r.Name))
return
}
r.connection = connection
fmt.Print(fmt.Sprintf("[Channel %s] OnNewConnection Calling initChannel\n", r.Name))
_ = r.initChannel()
fmt.Print(fmt.Sprintf("[Channel %s] OnNewConnection Exited\n", r.Name))
}
func (r *AbstractChannel) initChannel() error {
fmt.Print(fmt.Sprintf("[%s] initChannel Called\n", r.Name))
if !r.running {
fmt.Print(fmt.Sprintf("[%s] initChannel not running\n", r.Name))
return fmt.Errorf("Channel is not running\n")
}
if !r.running {
return fmt.Errorf("Channel is not running\n")
}
if r.connection == nil {
return fmt.Errorf("Connection is null\n")
} else {
if r.connection.IsClosed() {
return fmt.Errorf("Connection is closed\n")
}
}
if r.channel != nil {
oldChannel := r.channel
if oldChannel != nil {
fmt.Print(fmt.Sprintf("%s - Trying to close channel %p\n", r.Name, oldChannel))
err := oldChannel.Close()
fmt.Print(fmt.Sprintf("%s - Closed channel %p, %v\n", r.Name, oldChannel, err))
}
}
var err error
fmt.Print(fmt.Sprintf("[%s] initChannel retrieving channel\n", r.Name))
r.channel, err = r.connection.Channel()
if err != nil {
fmt.Print(fmt.Sprintf("[%s] initChannel channel() returned err - %v\n", r.Name, err))
return err
}
fmt.Print(fmt.Sprintf("[%s] initChannel retrieved channel\n", r.Name))
r.errorChan = make(chan *amqp.Error)
r.channel.NotifyClose(r.errorChan)
fmt.Print(fmt.Sprintf("Chan %s - Calling initChannelInternal\n", r.Name))
err = r.impl.initChannelInternal()
fmt.Print(fmt.Sprintf("Chan %s - Called initChannelInternal\n", r.Name))
fmt.Print(fmt.Sprintf("[%s] initChannel Exited\n", r.Name))
return err
}
func (r *AbstractChannel) listenErrors() {
if r.listeningErrors {
return
}
r.listeningErrors = true
go func() {
for r.running && r.parent.running {
amqpErr := <-r.errorChan
if amqpErr != nil {
fmt.Print(fmt.Sprintf("Channel %s - An error occurred with errorChan - %v\n", r.Name, amqpErr))
}
for r.running && r.parent.running {
if r.connection == nil || r.connection.IsClosed() {
var message string
if r.connection == nil {
message = fmt.Sprintf("Channel %s can not listen channel errors as connection is nil\n", r.Name)
} else {
message = fmt.Sprintf("Channel %s can not listen channel errors as connection is closed\n", r.Name)
}
fmt.Print(message)
time.Sleep(time.Second)
continue
} else {
if !r.running || !r.parent.running {
break
}
err := r.initChannel()
time.Sleep(time.Second)
if err != nil {
continue
} else {
break
}
}
}
}
r.listeningErrors = false
fmt.Print(fmt.Sprintf("Base Channel %s exited, Running: %t, ParentRunning: %t\n",
r.Name, r.running, r.parent.running))
}()
}
package main
import (
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
"sync"
"time"
)
type ConnectionListener interface {
OnNewConnection(name string, connection *amqp.Connection)
}
type ConnectionRabbitMq struct {
running bool
conListenerLock sync.Mutex
connectionListeners []ConnectionListener
connection *amqp.Connection
inConnErrCh chan *amqp.Error
connErrChan chan *amqp.Error
name string
connBlockChan chan amqp.Blocking
}
func NewConnectionRabbitMq(name string) (*ConnectionRabbitMq, error) {
if name == "" {
return nil, fmt.Errorf("Name can not be empty\n")
}
r := &ConnectionRabbitMq{
name: name,
}
return r, nil
}
func (r *ConnectionRabbitMq) Start() error {
if r.running {
return nil
}
r.running = true
err := r.initConnection()
if err != nil {
r.running = false
return err
}
r.listenConnErrors()
return nil
}
func (r *ConnectionRabbitMq) initConnection() error {
fmt.Print(fmt.Sprintf("Connection %s - initConnection called\n", r.name))
var err error
start := time.Now()
if !r.running {
return fmt.Errorf("Connection %s is not running\n", r.name)
}
if r.connection != nil && !r.connection.IsClosed() {
oldConnection := r.connection
fmt.Print(fmt.Sprintf("Closing connection %p\n", oldConnection))
err = oldConnection.Close()
fmt.Print(fmt.Sprintf("Closed connection %p, err: %v\n", oldConnection, err))
time.Sleep(time.Millisecond * 100)
}
fmt.Print(fmt.Sprintf("Connection %s - Calling getConnection\n", r.name))
r.connection, err = getConnection()
fmt.Print(fmt.Sprintf("Connection %s - getConnection took %s\n", r.name, time.Now().Sub(start)))
if err != nil {
fmt.Print(fmt.Sprintf("Connection %s - initConnection Exit because error %v\n", r.name, err))
return err
}
r.connErrChan = make(chan *amqp.Error)
r.connection.NotifyClose(r.connErrChan)
r.connBlockChan = make(chan amqp.Blocking)
r.connection.NotifyBlocked(r.connBlockChan)
fmt.Print(fmt.Sprintf("Connection %s - Calling OnNewConnection for %d listeners\n", r.name, len(r.connectionListeners)))
r.conListenerLock.Lock()
for _, l := range r.connectionListeners {
l.OnNewConnection(r.name, r.connection)
}
r.conListenerLock.Unlock()
fmt.Print(fmt.Sprintf("Connection %s - Called OnNewConnection for %d listeners\n", r.name, len(r.connectionListeners)))
return nil
}
func (r *ConnectionRabbitMq) listenConnErrors() {
fmt.Print(fmt.Sprintf("Connection %s - listenConnErrors called\n", r.name))
go func() {
for r.running {
amqpErr := <-r.connErrChan
if amqpErr != nil {
message := fmt.Sprintf("Connection - %s connection got an error %s - %v\n", r.name, amqpErr.Error(), amqpErr)
fmt.Print(message)
} else {
fmt.Print(fmt.Sprintf("Connection %s - broke out of connErrChan but no error was returned\n", r.name))
}
if !r.running {
break
}
if r.connection.IsClosed() {
message := fmt.Sprintf("Connection %s - IsClosed() == true\n", r.name)
fmt.Print(message)
}
for r.running {
err := r.initConnection()
if err != nil {
time.Sleep(time.Second * 3)
} else {
message := fmt.Sprintf("Connection %s - re-established\n", r.name)
fmt.Print(message)
break
}
}
}
fmt.Print(fmt.Sprintf("Connection %s - Exited SingleConnection::ListenErrorEvents- Running: %t\n",
r.name, r.running))
}()
go func() {
waitedClosed := 0
for r.running {
block := <-r.connBlockChan
fmt.Print(fmt.Sprintf("Block Reason: %s, Active: %t\n", block.Reason, block.Active))
if !r.running {
break
}
for r.running {
if r.connection != nil && !r.connection.IsClosed() {
break
} else {
message := fmt.Sprintf("Connection %s - Blocked Ptr: %p IsClosed() == true, WaitedClose %d\n",
r.name, r.connection, waitedClosed)
fmt.Print(message)
time.Sleep(time.Second * 6)
continue
}
}
continue
}
fmt.Print(fmt.Sprintf("Connection %s - Exited SingleConnection::ListenBlock - Running: %t\n",
r.name, r.running))
}()
}
func (r *ConnectionRabbitMq) AddConnectionListener(l ConnectionListener) {
fmt.Print(fmt.Sprintf("Connection %s - AddConnectionListener called\n", r.name))
r.conListenerLock.Lock()
found := false
for _, a := range r.connectionListeners {
if a == l {
found = true
break
}
}
if !found {
fmt.Print(fmt.Sprintf("Adding a new connection %s listener\n", r.name))
r.connectionListeners = append(r.connectionListeners, l)
}
r.conListenerLock.Unlock()
if r.connection != nil && !r.connection.IsClosed() {
l.OnNewConnection(r.name, r.connection)
}
fmt.Print(fmt.Sprintf("Connection %s - exiting AddConnectionListener\n", r.name))
}
func getConnection() (*amqp.Connection, error) {
connectionString := fmt.Sprintf("amqp://guest:guest@localhost:5672/")
conn, err := amqp.Dial(connectionString)
if err != nil {
return nil, err
}
return conn, err
}
func (r *ConnectionRabbitMq) GetConnection() *amqp.Connection {
return r.connection
}
package main
import (
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
"math/rand"
"time"
)
type MockReadableChannel struct {
}
type MockWriteableChannel struct{}
func (m MockWriteableChannel) InitOutChannel(name string, channel *amqp.Channel) error {
err := channel.ExchangeDeclare("issue-ex", "fanout", true, true, false, false, nil)
if err != nil {
return err
}
_, err = channel.QueueDeclare("issue-q", false, false, false, false, nil)
if err != nil {
return err
}
return err
}
func (r *MockReadableChannel) InitInChannel(name string, channel *amqp.Channel) (eventQueue <-chan amqp.Delivery, queueName string, err error) {
err = channel.ExchangeDeclare("issue-ex", "fanout", true, true, false, false, nil)
if err != nil {
return nil, "", err
}
_, err = channel.QueueDeclare("issue-q", false, false, false, false, nil)
if err != nil {
return nil, "", err
}
err = channel.QueueBind("issue-q", "", "issue-ex", false, nil)
if err != nil {
return nil, "", err
}
eQ, err := channel.Consume("issue-q", "", true, false, false, false, nil)
if err != nil {
return nil, "", err
}
return eQ, "issue-q", nil
}
func main() {
singleConnection, err := NewSingleRabbitMQEventReceiver()
if err != nil {
panic(err)
}
r1 := &MockReadableChannel{}
inCh, err := singleConnection.AddReadableChannel("channel-input-1", r1, nil)
if err != nil {
panic(err)
}
w1 := &MockWriteableChannel{}
outCh, err := singleConnection.AddWriteableChannel("channel-output-1", w1)
singleConnection.Run()
go func() {
for {
timeToSleep := rand.Intn(5)
time.Sleep(time.Duration(timeToSleep) * time.Second)
go func() {
if rand.Intn(2) == 1 {
err = singleConnection.connection.connection.Close()
fmt.Printf("Closed Connection Err: %v\n", err)
} else {
err = inCh.channel.Close()
fmt.Printf("Closed Channel Err: %v\n", err)
}
}()
}
}()
for i := 0; i < 50; i++ {
go func() {
for {
err = outCh.SendEvent("issue-ex", amqp.Publishing{
ContentType: "application/json",
Body: []byte("{\"Message\":\"Hello World\"}"),
})
if err != nil {
fmt.Printf("An error occurred sending event - %v\n", err)
}
timeToSleep := rand.Intn(2)
time.Sleep(time.Duration(timeToSleep) * time.Second)
}
}()
}
time.Sleep(time.Hour)
}
package main
import (
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
"sync"
"time"
)
type ReadableChannel struct {
*AbstractChannel
EventQueue <-chan amqp.Delivery
channelSetupHandler InChannelSetupHandler
eventListenersLock sync.Mutex
listeningEvents bool
config *ReadChannelConfig
}
func NewReadableChannel(name string, connection *ConnectionRabbitMq, handler InChannelSetupHandler, config *ReadChannelConfig) *ReadableChannel {
r := &ReadableChannel{
channelSetupHandler: handler,
config: config,
}
r.AbstractChannel = NewBaseChannel(name, connection, r)
r.AbstractChannel.impl = r
return r
}
func (r *ReadableChannel) Run() {
if r.running {
return
}
r.run()
r.listenEvents()
}
func (r *ReadableChannel) initChannelInternal() error {
var err error
r.EventQueue, _, err = r.channelSetupHandler.InitInChannel(r.Name, r.channel)
return err
}
func (r *ReadableChannel) listenEvents() {
if r.listeningEvents {
return
}
r.listeningEvents = true
go func() {
for r.running && r.parent.running {
for e := range r.EventQueue {
if e.Body == nil {
fmt.Print(fmt.Sprintf("%s - MessageBody is empty - %s %s %d\n",
r.Name, e.ContentType, e.Type, e.DeliveryTag))
if !r.running || !r.parent.running {
break
}
continue
}
if !r.running || !r.parent.running {
break
}
}
if !r.running || !r.parent.running {
break
}
message := fmt.Sprintf("[%s] - Broke out of EventQueue\n",
r.Name)
fmt.Print(message)
for r.running && r.parent.running {
if r.connection == nil || r.connection.IsClosed() {
if r.connection == nil {
message = fmt.Sprintf("Channel %s can not listen channel errors as connection is nil\n", r.Name)
} else {
message = fmt.Sprintf("Channel %s can not listen channel errors as connection is closed\n", r.Name)
}
fmt.Print(message)
time.Sleep(time.Second)
continue
} else {
if !r.running || !r.parent.running {
break
}
err := r.initChannel()
if err != nil {
continue
} else {
break
}
}
}
}
fmt.Print(fmt.Sprintf("Readable %s - Exiting, Running: %t, Parent: %t\n", r.Name,
r.running, r.parent.running))
r.listeningEvents = false
}()
}
package main
import (
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
"sync"
)
type SingleConnectionRabbitMQ struct {
connection *ConnectionRabbitMq
running bool
connectionListenersLock sync.Mutex
connectionListeners []SingleConnectionChangeListener
readableChannelsLock sync.Mutex
readableChannels map[string]SingleConnectionChangeListener
writeableChannelsLock sync.Mutex
writeableChannels map[string]*WriteableChannel
}
type SingleConnectionChangeListener interface {
OnNewConnection(connection *amqp.Connection)
Run()
}
func NewSingleRabbitMQEventReceiver() (*SingleConnectionRabbitMQ, error) {
connection, err := NewConnectionRabbitMq("input-output")
if err != nil {
return nil, err
}
r := &SingleConnectionRabbitMQ{
readableChannels: map[string]SingleConnectionChangeListener{},
writeableChannels: map[string]*WriteableChannel{},
connection: connection,
}
r.connection.AddConnectionListener(r)
err = r.connection.Start()
if err != nil {
return nil, err
}
return r, nil
}
func (r *SingleConnectionRabbitMQ) Run() {
if r.running {
return
}
r.running = true
r.readableChannelsLock.Lock()
for _, readableChannel := range r.readableChannels {
readableChannel.Run()
}
r.readableChannelsLock.Unlock()
r.writeableChannelsLock.Lock()
for _, a := range r.writeableChannels {
a.Run()
}
r.writeableChannelsLock.Unlock()
}
func (r *SingleConnectionRabbitMQ) OnNewConnection(name string, connection *amqp.Connection) {
fmt.Print(fmt.Sprintf("SingleConnection - OnNewConnection Called\n"))
if connection == nil {
fmt.Print("SingleCon conn is nil, returning\n")
return
}
r.connectionListenersLock.Lock()
for index, l := range r.connectionListeners {
fmt.Print(fmt.Sprintf("SingleCon - Calling OnNewConnection for %d\n", index))
l.OnNewConnection(connection)
fmt.Print(fmt.Sprintf("SingleCon - Finished calling OnNewConnection for %d\n", index))
}
r.connectionListenersLock.Unlock()
r.readableChannelsLock.Lock()
for index, l := range r.readableChannels {
fmt.Print(fmt.Sprintf("SingleCon - Calling OnNewConnection for readableChannel %s\n", index))
l.OnNewConnection(connection)
fmt.Print(fmt.Sprintf("SingleCon - Finished calling OnNewConnection for readableChannel %s\n", index))
}
r.readableChannelsLock.Unlock()
r.writeableChannelsLock.Lock()
for index, l := range r.writeableChannels {
fmt.Print(fmt.Sprintf("SingleCon - Calling OnNewConnection for writeableChannel %s\n", index))
l.OnNewConnection(connection)
fmt.Print(fmt.Sprintf("SingleCon - Finished OnNewConnection for writeableChannel %s\n", index))
}
r.writeableChannelsLock.Unlock()
fmt.Print(fmt.Sprintf("SingleConnection - OnNewConnection Exited\n"))
}
func (r *SingleConnectionRabbitMQ) AddReadableChannel(name string, channelSetupHandler InChannelSetupHandler, config *ReadChannelConfig) (*ReadableChannel, error) {
fmt.Print(fmt.Sprintf("SingleConnection - AddReadableChannel Called\n"))
r.readableChannelsLock.Lock()
var readableChannel *ReadableChannel
if _, found := r.readableChannels[name]; !found {
readableChannel = NewReadableChannel(name, r.connection, channelSetupHandler, config)
r.readableChannels[name] = readableChannel
if r.running {
readableChannel.Run()
}
} else {
r.readableChannelsLock.Unlock()
return nil, fmt.Errorf("Channel was already present\n")
}
r.readableChannelsLock.Unlock()
fmt.Print(fmt.Sprintf("SingleConnection - AddReadableChannel Exited\n"))
return readableChannel, nil
}
func (r *SingleConnectionRabbitMQ) AddWriteableChannel(name string, channelSetupHandler OutChannelSetupHandler) (*WriteableChannel, error) {
fmt.Print(fmt.Sprintf("SingleConnection - AddWriteableChannel called\n"))
r.writeableChannelsLock.Lock()
var writeableChannel *WriteableChannel
if _, found := r.writeableChannels[name]; !found {
writeableChannel = NewWriteableChannel(name, r.connection, channelSetupHandler)
r.writeableChannels[name] = writeableChannel
if r.running {
writeableChannel.Run()
}
} else {
r.writeableChannelsLock.Unlock()
return nil, fmt.Errorf("Channel was already present\n")
}
r.writeableChannelsLock.Unlock()
fmt.Print(fmt.Sprintf("SingleConnection - AddWriteableChannel Exited\n"))
return writeableChannel, nil
}
package main
import (
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
)
type OutChannelSetupHandler interface {
InitOutChannel(name string, channel *amqp.Channel) error
}
type WriteableChannel struct {
*AbstractChannel
channelSetupHandler OutChannelSetupHandler
}
func NewWriteableChannel(name string, connection *ConnectionRabbitMq, handler OutChannelSetupHandler) *WriteableChannel {
r := &WriteableChannel{
channelSetupHandler: handler,
}
r.AbstractChannel = NewBaseChannel(name, connection, r)
r.AbstractChannel.impl = r
return r
}
func (r *WriteableChannel) Run() {
if r.running {
return
}
r.run()
}
func (r *WriteableChannel) initChannelInternal() error {
err := r.channelSetupHandler.InitOutChannel(r.Name, r.channel)
return err
}
func (r *WriteableChannel) SendEvent(exchangeName string, message amqp.Publishing) error {
if r.channel == nil {
return fmt.Errorf("Channel %s is nil\n", r.Name)
}
return r.channel.Publish(exchangeName, "", false, false, message)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment