-
-
Save melardev/1b9c7e1b1a4ac37cb31e57dc6cde99c7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
amqp "github.com/rabbitmq/amqp091-go" | |
"sync" | |
"time" | |
) | |
type IAbstractChannel interface { | |
initChannelInternal() error | |
} | |
type AbstractChannel struct { | |
running bool | |
Name string | |
channel *amqp.Channel | |
connection *amqp.Connection | |
errorChan chan *amqp.Error | |
channelLock sync.Mutex | |
impl IAbstractChannel | |
parent *ConnectionRabbitMq | |
listeningErrors bool | |
} | |
type ReadChannelConfig struct { | |
AckEvents bool | |
AckMultiple bool | |
RawEvents bool | |
AckErrorMessages bool | |
} | |
type InChannelSetupHandler interface { | |
InitInChannel(name string, channel *amqp.Channel) (eventQueue <-chan amqp.Delivery, queueName string, err error) | |
} | |
func NewBaseChannel(name string, connection *ConnectionRabbitMq, parent IAbstractChannel) *AbstractChannel { | |
r := &AbstractChannel{ | |
Name: name, | |
running: false, | |
connection: connection.GetConnection(), | |
parent: connection, | |
impl: parent, | |
} | |
return r | |
} | |
func (r *AbstractChannel) run() { | |
if r.running { | |
return | |
} | |
r.running = true | |
if r.connection != nil && !r.connection.IsClosed() { | |
r.OnNewConnection(r.connection) | |
} | |
r.listenErrors() | |
} | |
func (r *AbstractChannel) OnNewConnection(connection *amqp.Connection) { | |
fmt.Print(fmt.Sprintf("[Channel %s] OnNewConnection Called\n", r.Name)) | |
if !r.running { | |
fmt.Print(fmt.Sprintf("[%s] Not OnNewConnection::Running\n", r.Name)) | |
return | |
} | |
r.connection = connection | |
fmt.Print(fmt.Sprintf("[Channel %s] OnNewConnection Calling initChannel\n", r.Name)) | |
_ = r.initChannel() | |
fmt.Print(fmt.Sprintf("[Channel %s] OnNewConnection Exited\n", r.Name)) | |
} | |
func (r *AbstractChannel) initChannel() error { | |
fmt.Print(fmt.Sprintf("[%s] initChannel Called\n", r.Name)) | |
if !r.running { | |
fmt.Print(fmt.Sprintf("[%s] initChannel not running\n", r.Name)) | |
return fmt.Errorf("Channel is not running\n") | |
} | |
if !r.running { | |
return fmt.Errorf("Channel is not running\n") | |
} | |
if r.connection == nil { | |
return fmt.Errorf("Connection is null\n") | |
} else { | |
if r.connection.IsClosed() { | |
return fmt.Errorf("Connection is closed\n") | |
} | |
} | |
if r.channel != nil { | |
oldChannel := r.channel | |
if oldChannel != nil { | |
fmt.Print(fmt.Sprintf("%s - Trying to close channel %p\n", r.Name, oldChannel)) | |
err := oldChannel.Close() | |
fmt.Print(fmt.Sprintf("%s - Closed channel %p, %v\n", r.Name, oldChannel, err)) | |
} | |
} | |
var err error | |
fmt.Print(fmt.Sprintf("[%s] initChannel retrieving channel\n", r.Name)) | |
r.channel, err = r.connection.Channel() | |
if err != nil { | |
fmt.Print(fmt.Sprintf("[%s] initChannel channel() returned err - %v\n", r.Name, err)) | |
return err | |
} | |
fmt.Print(fmt.Sprintf("[%s] initChannel retrieved channel\n", r.Name)) | |
r.errorChan = make(chan *amqp.Error) | |
r.channel.NotifyClose(r.errorChan) | |
fmt.Print(fmt.Sprintf("Chan %s - Calling initChannelInternal\n", r.Name)) | |
err = r.impl.initChannelInternal() | |
fmt.Print(fmt.Sprintf("Chan %s - Called initChannelInternal\n", r.Name)) | |
fmt.Print(fmt.Sprintf("[%s] initChannel Exited\n", r.Name)) | |
return err | |
} | |
func (r *AbstractChannel) listenErrors() { | |
if r.listeningErrors { | |
return | |
} | |
r.listeningErrors = true | |
go func() { | |
for r.running && r.parent.running { | |
amqpErr := <-r.errorChan | |
if amqpErr != nil { | |
fmt.Print(fmt.Sprintf("Channel %s - An error occurred with errorChan - %v\n", r.Name, amqpErr)) | |
} | |
for r.running && r.parent.running { | |
if r.connection == nil || r.connection.IsClosed() { | |
var message string | |
if r.connection == nil { | |
message = fmt.Sprintf("Channel %s can not listen channel errors as connection is nil\n", r.Name) | |
} else { | |
message = fmt.Sprintf("Channel %s can not listen channel errors as connection is closed\n", r.Name) | |
} | |
fmt.Print(message) | |
time.Sleep(time.Second) | |
continue | |
} else { | |
if !r.running || !r.parent.running { | |
break | |
} | |
err := r.initChannel() | |
time.Sleep(time.Second) | |
if err != nil { | |
continue | |
} else { | |
break | |
} | |
} | |
} | |
} | |
r.listeningErrors = false | |
fmt.Print(fmt.Sprintf("Base Channel %s exited, Running: %t, ParentRunning: %t\n", | |
r.Name, r.running, r.parent.running)) | |
}() | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
amqp "github.com/rabbitmq/amqp091-go" | |
"sync" | |
"time" | |
) | |
type ConnectionListener interface { | |
OnNewConnection(name string, connection *amqp.Connection) | |
} | |
type ConnectionRabbitMq struct { | |
running bool | |
conListenerLock sync.Mutex | |
connectionListeners []ConnectionListener | |
connection *amqp.Connection | |
inConnErrCh chan *amqp.Error | |
connErrChan chan *amqp.Error | |
name string | |
connBlockChan chan amqp.Blocking | |
} | |
func NewConnectionRabbitMq(name string) (*ConnectionRabbitMq, error) { | |
if name == "" { | |
return nil, fmt.Errorf("Name can not be empty\n") | |
} | |
r := &ConnectionRabbitMq{ | |
name: name, | |
} | |
return r, nil | |
} | |
func (r *ConnectionRabbitMq) Start() error { | |
if r.running { | |
return nil | |
} | |
r.running = true | |
err := r.initConnection() | |
if err != nil { | |
r.running = false | |
return err | |
} | |
r.listenConnErrors() | |
return nil | |
} | |
func (r *ConnectionRabbitMq) initConnection() error { | |
fmt.Print(fmt.Sprintf("Connection %s - initConnection called\n", r.name)) | |
var err error | |
start := time.Now() | |
if !r.running { | |
return fmt.Errorf("Connection %s is not running\n", r.name) | |
} | |
if r.connection != nil && !r.connection.IsClosed() { | |
oldConnection := r.connection | |
fmt.Print(fmt.Sprintf("Closing connection %p\n", oldConnection)) | |
err = oldConnection.Close() | |
fmt.Print(fmt.Sprintf("Closed connection %p, err: %v\n", oldConnection, err)) | |
time.Sleep(time.Millisecond * 100) | |
} | |
fmt.Print(fmt.Sprintf("Connection %s - Calling getConnection\n", r.name)) | |
r.connection, err = getConnection() | |
fmt.Print(fmt.Sprintf("Connection %s - getConnection took %s\n", r.name, time.Now().Sub(start))) | |
if err != nil { | |
fmt.Print(fmt.Sprintf("Connection %s - initConnection Exit because error %v\n", r.name, err)) | |
return err | |
} | |
r.connErrChan = make(chan *amqp.Error) | |
r.connection.NotifyClose(r.connErrChan) | |
r.connBlockChan = make(chan amqp.Blocking) | |
r.connection.NotifyBlocked(r.connBlockChan) | |
fmt.Print(fmt.Sprintf("Connection %s - Calling OnNewConnection for %d listeners\n", r.name, len(r.connectionListeners))) | |
r.conListenerLock.Lock() | |
for _, l := range r.connectionListeners { | |
l.OnNewConnection(r.name, r.connection) | |
} | |
r.conListenerLock.Unlock() | |
fmt.Print(fmt.Sprintf("Connection %s - Called OnNewConnection for %d listeners\n", r.name, len(r.connectionListeners))) | |
return nil | |
} | |
func (r *ConnectionRabbitMq) listenConnErrors() { | |
fmt.Print(fmt.Sprintf("Connection %s - listenConnErrors called\n", r.name)) | |
go func() { | |
for r.running { | |
amqpErr := <-r.connErrChan | |
if amqpErr != nil { | |
message := fmt.Sprintf("Connection - %s connection got an error %s - %v\n", r.name, amqpErr.Error(), amqpErr) | |
fmt.Print(message) | |
} else { | |
fmt.Print(fmt.Sprintf("Connection %s - broke out of connErrChan but no error was returned\n", r.name)) | |
} | |
if !r.running { | |
break | |
} | |
if r.connection.IsClosed() { | |
message := fmt.Sprintf("Connection %s - IsClosed() == true\n", r.name) | |
fmt.Print(message) | |
} | |
for r.running { | |
err := r.initConnection() | |
if err != nil { | |
time.Sleep(time.Second * 3) | |
} else { | |
message := fmt.Sprintf("Connection %s - re-established\n", r.name) | |
fmt.Print(message) | |
break | |
} | |
} | |
} | |
fmt.Print(fmt.Sprintf("Connection %s - Exited SingleConnection::ListenErrorEvents- Running: %t\n", | |
r.name, r.running)) | |
}() | |
go func() { | |
waitedClosed := 0 | |
for r.running { | |
block := <-r.connBlockChan | |
fmt.Print(fmt.Sprintf("Block Reason: %s, Active: %t\n", block.Reason, block.Active)) | |
if !r.running { | |
break | |
} | |
for r.running { | |
if r.connection != nil && !r.connection.IsClosed() { | |
break | |
} else { | |
message := fmt.Sprintf("Connection %s - Blocked Ptr: %p IsClosed() == true, WaitedClose %d\n", | |
r.name, r.connection, waitedClosed) | |
fmt.Print(message) | |
time.Sleep(time.Second * 6) | |
continue | |
} | |
} | |
continue | |
} | |
fmt.Print(fmt.Sprintf("Connection %s - Exited SingleConnection::ListenBlock - Running: %t\n", | |
r.name, r.running)) | |
}() | |
} | |
func (r *ConnectionRabbitMq) AddConnectionListener(l ConnectionListener) { | |
fmt.Print(fmt.Sprintf("Connection %s - AddConnectionListener called\n", r.name)) | |
r.conListenerLock.Lock() | |
found := false | |
for _, a := range r.connectionListeners { | |
if a == l { | |
found = true | |
break | |
} | |
} | |
if !found { | |
fmt.Print(fmt.Sprintf("Adding a new connection %s listener\n", r.name)) | |
r.connectionListeners = append(r.connectionListeners, l) | |
} | |
r.conListenerLock.Unlock() | |
if r.connection != nil && !r.connection.IsClosed() { | |
l.OnNewConnection(r.name, r.connection) | |
} | |
fmt.Print(fmt.Sprintf("Connection %s - exiting AddConnectionListener\n", r.name)) | |
} | |
func getConnection() (*amqp.Connection, error) { | |
connectionString := fmt.Sprintf("amqp://guest:guest@localhost:5672/") | |
conn, err := amqp.Dial(connectionString) | |
if err != nil { | |
return nil, err | |
} | |
return conn, err | |
} | |
func (r *ConnectionRabbitMq) GetConnection() *amqp.Connection { | |
return r.connection | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
amqp "github.com/rabbitmq/amqp091-go" | |
"math/rand" | |
"time" | |
) | |
type MockReadableChannel struct { | |
} | |
type MockWriteableChannel struct{} | |
func (m MockWriteableChannel) InitOutChannel(name string, channel *amqp.Channel) error { | |
err := channel.ExchangeDeclare("issue-ex", "fanout", true, true, false, false, nil) | |
if err != nil { | |
return err | |
} | |
_, err = channel.QueueDeclare("issue-q", false, false, false, false, nil) | |
if err != nil { | |
return err | |
} | |
return err | |
} | |
func (r *MockReadableChannel) InitInChannel(name string, channel *amqp.Channel) (eventQueue <-chan amqp.Delivery, queueName string, err error) { | |
err = channel.ExchangeDeclare("issue-ex", "fanout", true, true, false, false, nil) | |
if err != nil { | |
return nil, "", err | |
} | |
_, err = channel.QueueDeclare("issue-q", false, false, false, false, nil) | |
if err != nil { | |
return nil, "", err | |
} | |
err = channel.QueueBind("issue-q", "", "issue-ex", false, nil) | |
if err != nil { | |
return nil, "", err | |
} | |
eQ, err := channel.Consume("issue-q", "", true, false, false, false, nil) | |
if err != nil { | |
return nil, "", err | |
} | |
return eQ, "issue-q", nil | |
} | |
func main() { | |
singleConnection, err := NewSingleRabbitMQEventReceiver() | |
if err != nil { | |
panic(err) | |
} | |
r1 := &MockReadableChannel{} | |
inCh, err := singleConnection.AddReadableChannel("channel-input-1", r1, nil) | |
if err != nil { | |
panic(err) | |
} | |
w1 := &MockWriteableChannel{} | |
outCh, err := singleConnection.AddWriteableChannel("channel-output-1", w1) | |
singleConnection.Run() | |
go func() { | |
for { | |
timeToSleep := rand.Intn(5) | |
time.Sleep(time.Duration(timeToSleep) * time.Second) | |
go func() { | |
if rand.Intn(2) == 1 { | |
err = singleConnection.connection.connection.Close() | |
fmt.Printf("Closed Connection Err: %v\n", err) | |
} else { | |
err = inCh.channel.Close() | |
fmt.Printf("Closed Channel Err: %v\n", err) | |
} | |
}() | |
} | |
}() | |
for i := 0; i < 50; i++ { | |
go func() { | |
for { | |
err = outCh.SendEvent("issue-ex", amqp.Publishing{ | |
ContentType: "application/json", | |
Body: []byte("{\"Message\":\"Hello World\"}"), | |
}) | |
if err != nil { | |
fmt.Printf("An error occurred sending event - %v\n", err) | |
} | |
timeToSleep := rand.Intn(2) | |
time.Sleep(time.Duration(timeToSleep) * time.Second) | |
} | |
}() | |
} | |
time.Sleep(time.Hour) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
amqp "github.com/rabbitmq/amqp091-go" | |
"sync" | |
"time" | |
) | |
type ReadableChannel struct { | |
*AbstractChannel | |
EventQueue <-chan amqp.Delivery | |
channelSetupHandler InChannelSetupHandler | |
eventListenersLock sync.Mutex | |
listeningEvents bool | |
config *ReadChannelConfig | |
} | |
func NewReadableChannel(name string, connection *ConnectionRabbitMq, handler InChannelSetupHandler, config *ReadChannelConfig) *ReadableChannel { | |
r := &ReadableChannel{ | |
channelSetupHandler: handler, | |
config: config, | |
} | |
r.AbstractChannel = NewBaseChannel(name, connection, r) | |
r.AbstractChannel.impl = r | |
return r | |
} | |
func (r *ReadableChannel) Run() { | |
if r.running { | |
return | |
} | |
r.run() | |
r.listenEvents() | |
} | |
func (r *ReadableChannel) initChannelInternal() error { | |
var err error | |
r.EventQueue, _, err = r.channelSetupHandler.InitInChannel(r.Name, r.channel) | |
return err | |
} | |
func (r *ReadableChannel) listenEvents() { | |
if r.listeningEvents { | |
return | |
} | |
r.listeningEvents = true | |
go func() { | |
for r.running && r.parent.running { | |
for e := range r.EventQueue { | |
if e.Body == nil { | |
fmt.Print(fmt.Sprintf("%s - MessageBody is empty - %s %s %d\n", | |
r.Name, e.ContentType, e.Type, e.DeliveryTag)) | |
if !r.running || !r.parent.running { | |
break | |
} | |
continue | |
} | |
if !r.running || !r.parent.running { | |
break | |
} | |
} | |
if !r.running || !r.parent.running { | |
break | |
} | |
message := fmt.Sprintf("[%s] - Broke out of EventQueue\n", | |
r.Name) | |
fmt.Print(message) | |
for r.running && r.parent.running { | |
if r.connection == nil || r.connection.IsClosed() { | |
if r.connection == nil { | |
message = fmt.Sprintf("Channel %s can not listen channel errors as connection is nil\n", r.Name) | |
} else { | |
message = fmt.Sprintf("Channel %s can not listen channel errors as connection is closed\n", r.Name) | |
} | |
fmt.Print(message) | |
time.Sleep(time.Second) | |
continue | |
} else { | |
if !r.running || !r.parent.running { | |
break | |
} | |
err := r.initChannel() | |
if err != nil { | |
continue | |
} else { | |
break | |
} | |
} | |
} | |
} | |
fmt.Print(fmt.Sprintf("Readable %s - Exiting, Running: %t, Parent: %t\n", r.Name, | |
r.running, r.parent.running)) | |
r.listeningEvents = false | |
}() | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
amqp "github.com/rabbitmq/amqp091-go" | |
"sync" | |
) | |
type SingleConnectionRabbitMQ struct { | |
connection *ConnectionRabbitMq | |
running bool | |
connectionListenersLock sync.Mutex | |
connectionListeners []SingleConnectionChangeListener | |
readableChannelsLock sync.Mutex | |
readableChannels map[string]SingleConnectionChangeListener | |
writeableChannelsLock sync.Mutex | |
writeableChannels map[string]*WriteableChannel | |
} | |
type SingleConnectionChangeListener interface { | |
OnNewConnection(connection *amqp.Connection) | |
Run() | |
} | |
func NewSingleRabbitMQEventReceiver() (*SingleConnectionRabbitMQ, error) { | |
connection, err := NewConnectionRabbitMq("input-output") | |
if err != nil { | |
return nil, err | |
} | |
r := &SingleConnectionRabbitMQ{ | |
readableChannels: map[string]SingleConnectionChangeListener{}, | |
writeableChannels: map[string]*WriteableChannel{}, | |
connection: connection, | |
} | |
r.connection.AddConnectionListener(r) | |
err = r.connection.Start() | |
if err != nil { | |
return nil, err | |
} | |
return r, nil | |
} | |
func (r *SingleConnectionRabbitMQ) Run() { | |
if r.running { | |
return | |
} | |
r.running = true | |
r.readableChannelsLock.Lock() | |
for _, readableChannel := range r.readableChannels { | |
readableChannel.Run() | |
} | |
r.readableChannelsLock.Unlock() | |
r.writeableChannelsLock.Lock() | |
for _, a := range r.writeableChannels { | |
a.Run() | |
} | |
r.writeableChannelsLock.Unlock() | |
} | |
func (r *SingleConnectionRabbitMQ) OnNewConnection(name string, connection *amqp.Connection) { | |
fmt.Print(fmt.Sprintf("SingleConnection - OnNewConnection Called\n")) | |
if connection == nil { | |
fmt.Print("SingleCon conn is nil, returning\n") | |
return | |
} | |
r.connectionListenersLock.Lock() | |
for index, l := range r.connectionListeners { | |
fmt.Print(fmt.Sprintf("SingleCon - Calling OnNewConnection for %d\n", index)) | |
l.OnNewConnection(connection) | |
fmt.Print(fmt.Sprintf("SingleCon - Finished calling OnNewConnection for %d\n", index)) | |
} | |
r.connectionListenersLock.Unlock() | |
r.readableChannelsLock.Lock() | |
for index, l := range r.readableChannels { | |
fmt.Print(fmt.Sprintf("SingleCon - Calling OnNewConnection for readableChannel %s\n", index)) | |
l.OnNewConnection(connection) | |
fmt.Print(fmt.Sprintf("SingleCon - Finished calling OnNewConnection for readableChannel %s\n", index)) | |
} | |
r.readableChannelsLock.Unlock() | |
r.writeableChannelsLock.Lock() | |
for index, l := range r.writeableChannels { | |
fmt.Print(fmt.Sprintf("SingleCon - Calling OnNewConnection for writeableChannel %s\n", index)) | |
l.OnNewConnection(connection) | |
fmt.Print(fmt.Sprintf("SingleCon - Finished OnNewConnection for writeableChannel %s\n", index)) | |
} | |
r.writeableChannelsLock.Unlock() | |
fmt.Print(fmt.Sprintf("SingleConnection - OnNewConnection Exited\n")) | |
} | |
func (r *SingleConnectionRabbitMQ) AddReadableChannel(name string, channelSetupHandler InChannelSetupHandler, config *ReadChannelConfig) (*ReadableChannel, error) { | |
fmt.Print(fmt.Sprintf("SingleConnection - AddReadableChannel Called\n")) | |
r.readableChannelsLock.Lock() | |
var readableChannel *ReadableChannel | |
if _, found := r.readableChannels[name]; !found { | |
readableChannel = NewReadableChannel(name, r.connection, channelSetupHandler, config) | |
r.readableChannels[name] = readableChannel | |
if r.running { | |
readableChannel.Run() | |
} | |
} else { | |
r.readableChannelsLock.Unlock() | |
return nil, fmt.Errorf("Channel was already present\n") | |
} | |
r.readableChannelsLock.Unlock() | |
fmt.Print(fmt.Sprintf("SingleConnection - AddReadableChannel Exited\n")) | |
return readableChannel, nil | |
} | |
func (r *SingleConnectionRabbitMQ) AddWriteableChannel(name string, channelSetupHandler OutChannelSetupHandler) (*WriteableChannel, error) { | |
fmt.Print(fmt.Sprintf("SingleConnection - AddWriteableChannel called\n")) | |
r.writeableChannelsLock.Lock() | |
var writeableChannel *WriteableChannel | |
if _, found := r.writeableChannels[name]; !found { | |
writeableChannel = NewWriteableChannel(name, r.connection, channelSetupHandler) | |
r.writeableChannels[name] = writeableChannel | |
if r.running { | |
writeableChannel.Run() | |
} | |
} else { | |
r.writeableChannelsLock.Unlock() | |
return nil, fmt.Errorf("Channel was already present\n") | |
} | |
r.writeableChannelsLock.Unlock() | |
fmt.Print(fmt.Sprintf("SingleConnection - AddWriteableChannel Exited\n")) | |
return writeableChannel, nil | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
amqp "github.com/rabbitmq/amqp091-go" | |
) | |
type OutChannelSetupHandler interface { | |
InitOutChannel(name string, channel *amqp.Channel) error | |
} | |
type WriteableChannel struct { | |
*AbstractChannel | |
channelSetupHandler OutChannelSetupHandler | |
} | |
func NewWriteableChannel(name string, connection *ConnectionRabbitMq, handler OutChannelSetupHandler) *WriteableChannel { | |
r := &WriteableChannel{ | |
channelSetupHandler: handler, | |
} | |
r.AbstractChannel = NewBaseChannel(name, connection, r) | |
r.AbstractChannel.impl = r | |
return r | |
} | |
func (r *WriteableChannel) Run() { | |
if r.running { | |
return | |
} | |
r.run() | |
} | |
func (r *WriteableChannel) initChannelInternal() error { | |
err := r.channelSetupHandler.InitOutChannel(r.Name, r.channel) | |
return err | |
} | |
func (r *WriteableChannel) SendEvent(exchangeName string, message amqp.Publishing) error { | |
if r.channel == nil { | |
return fmt.Errorf("Channel %s is nil\n", r.Name) | |
} | |
return r.channel.Publish(exchangeName, "", false, false, message) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment