Skip to content

Instantly share code, notes, and snippets.

@perbu
Created November 29, 2021 15:16
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save perbu/4328d9533d6e0a4fc77c9689ca5ab74e to your computer and use it in GitHub Desktop.
Save perbu/4328d9533d6e0a4fc77c9689ca5ab74e to your computer and use it in GitHub Desktop.
package main
/*
This is an example of using the paho.golang (supporting MQTT5) library in a way where
the connect call will block and wait for the connection to go down.
I use something like this as I need to generate a TLS cert before each connect.
*/
import (
"context"
"encoding/json"
"fmt"
"github.com/eclipse/paho.golang/paho"
"log"
"net"
"sync"
"time"
)
type msgChan chan *paho.Publish
func blockingConnect(ctx context.Context, subMsgChan msgChan, pubMsgChan msgChan, topic, addr string) error {
log.Println("Connecting")
errChan := make(chan error)
conn, err := net.Dial("tcp", addr)
if err != nil {
return fmt.Errorf("failed to connect: %s", err)
}
client := paho.NewClient(
paho.ClientConfig{
OnClientError: func(e error) {
errChan <- e
},
Router: paho.NewSingleHandlerRouter(func(m *paho.Publish) {
subMsgChan <- m
}),
// Wandering if we should set the
// OnServerDisconnect callback.
Conn: conn,
})
cp := &paho.Connect{
KeepAlive: 30,
ClientID: "paho",
CleanStart: true,
}
ca, err := client.Connect(ctx, cp)
if err != nil {
return fmt.Errorf("mqtt connect error: %s", err)
}
if ca.ReasonCode != 0 {
return fmt.Errorf("mqtt connect failure: %d - %s", ca.ReasonCode, ca.Properties.ReasonString)
}
// goroutine that reads the pubMsgChan and publishes the messages
go func() {
for ctx.Err() == nil {
select {
case <-ctx.Done(): // abort waiting for the channel
log.Println("ctx cancelled in blockingConnect publisher")
case msg := <-pubMsgChan:
log.Println("publisher got a message to publish")
ctxTimeout, timeoutCancel := context.WithTimeout(ctx, time.Second)
_, err := client.Publish(ctxTimeout, msg)
if err != nil {
log.Printf("publish error: %s\n", err)
} else {
log.Println("publish successful.")
}
timeoutCancel()
}
}
log.Println("pub loop aborting: ", ctx.Err())
}()
// goroutine that monitors the context and shuts down if it is cancelled.
go func() {
<-ctx.Done()
log.Println("context cancelled, disconnecting")
if client != nil {
d := &paho.Disconnect{ReasonCode: 0}
client.Disconnect(d)
}
}()
sa, err := client.Subscribe(ctx, &paho.Subscribe{
Subscriptions: map[string]paho.SubscribeOptions{
topic: {QoS: byte(1)},
},
})
if err != nil {
return fmt.Errorf("mqtt subscribe error: %s", err)
}
if sa.Reasons[0] != byte(1) {
return fmt.Errorf("failed to subscribe to %s : %d", topic, sa.Reasons[0])
}
log.Printf("Subscribed to %s", topic)
return <-errChan
}
func main() {
const addr = "localhost:1883"
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
pubMsgChan := make(msgChan, 0)
subMsgChan := make(msgChan, 0)
wg := sync.WaitGroup{}
// Set up the goroutine which handles the connection.
wg.Add(1)
go func() {
for ctx.Err() == nil {
err := blockingConnect(ctx, subMsgChan, pubMsgChan, "test/+", addr)
log.Println("Err: ", err)
time.Sleep(time.Second)
}
wg.Done()
}()
// produces mqtt messages.
wg.Add(1)
go func() {
counter := 0
for ctx.Err() == nil {
payload, _ := json.Marshal(counter)
msg := paho.Publish{
PacketID: 0,
QoS: 1,
Retain: false,
Topic: "pub/pub",
Properties: nil,
Payload: payload,
}
log.Println("Message prepared")
counter++
select {
case pubMsgChan <- &msg:
log.Println("Message sent", string(payload))
case <-ctx.Done():
log.Println("context cancelled. Aborting writer.")
}
time.Sleep(time.Second)
}
wg.Done()
}()
// Set up the consumer.
wg.Add(1)
go func() {
for ctx.Err() == nil {
select {
case <-ctx.Done():
log.Println("context cancelled. Aborting reader.")
case m := <-subMsgChan:
log.Println("Received message:", string(m.Payload))
}
}
wg.Done()
}()
log.Println("Waiting for goroutines to exit")
wg.Wait()
cancel()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment