Skip to content

Instantly share code, notes, and snippets.

@kwhitaker
Last active February 28, 2023 14:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kwhitaker/a8fab8707be4aa1f8119fc994d43b984 to your computer and use it in GitHub Desktop.
Save kwhitaker/a8fab8707be4aa1f8119fc994d43b984 to your computer and use it in GitHub Desktop.
Message Broker
package messaging
import (
stdContext "context"
"github.com/pkg/errors"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)
type ASBBroker struct {
MessageBrokerImpl
client *azservicebus.Client
}
func NewASBBroker(ctx stdContext.Context, connectStr string, qName string, incMsgChan chan ParsedMessage, errChan chan error) (ASBBroker, error) {
client, err := azservicebus.NewClientFromConnectionString(connectStr, nil)
var broker ASBBroker
if err != nil {
return broker, err
}
broker.context = ctx
broker.queueName = qName
broker.incMsgChan = incMsgChan
broker.errChan = errChan
broker.client = client
return broker, nil
}
func (a *ASBBroker) Subscribe(cName string, qName string) error {
// handle subscriptions
}
func (a *ASBBroker) Publish(msg string) error {
// handle publishing
}
func (a *ASBBroker) Shutdown() {
// handle shutdown
}
func main() {
outChan := make(chan messaging.OutGoingMessage)
broker := messaging.NewASBBroker(...)
go func() {
for {
select {
case msg := <-outChan:
// Throws 'cannot use pubBroker (variable of type messaging.ASBBroker) as messaging.MessageBrokerImpl value in argument to processOutgoingMessage'
processOutgoingMessage(broker, msg)
}
}
}
}
func processOutgoingMessage(broker messaging.MessageBrokerImpl, msg messaging.OutgoingMessage) {
// ... do stuff with message broker
}
package messaging
import (
stdContext "context"
)
type MessageBrokerImpl struct {
context stdContext.Context
queueName string
incMsgChan chan ParsedMessage
errChan chan error
}
type MessageBroker interface {
Publish(msg string) error
Subscribe(listenTo string) error
Shutdown()
}
type IncomingMessage struct {
ChannelName string
Message interface{}
}
type OutgoingMessage struct {
Type string
Payload interface{}
}
type ParsedMessage struct {
ChannelName string
MesssageID string
MessageBody string
}
type MessagePayload struct {
Payload interface{} `json:"payload"`
Type string `json:"type"`
}
func (m *MessageBrokerImpl) Publish(msg string) error {
return nil
}
func (m *MessageBrokerImpl) Subscribe(lisenTo string) error {
return nil
}
func (m *MessageBrokerImpl) Shutdown() {
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment