Skip to content

Instantly share code, notes, and snippets.

@jpastoor
Created January 30, 2020 13:00
Show Gist options
  • Save jpastoor/43fad4214d535154f8bb08b02df530d5 to your computer and use it in GitHub Desktop.
Save jpastoor/43fad4214d535154f8bb08b02df530d5 to your computer and use it in GitHub Desktop.
example amqp to mqtt bridge
package main
import (
"fmt"
"github.com/eclipse/paho.mqtt.golang"
"github.com/streadway/amqp"
"net/url"
)
func main() {
conn, err := amqp.Dial(fmt.Sprintf("%s://%s:%s@%s:%d/", "amqp", "username", "pwd", "amqphost", 5672))
if err != nil {
fmt.Printf("could not connect to AMQP: %v", err)
}
ch, err := conn.Channel()
if err != nil {
fmt.Printf("could not create channel to AMQP: %v", err)
}
queue, err := ch.QueueDeclare("dshtest", false, true, true, false, nil)
if err != nil {
fmt.Printf("could create queue: %v", err)
}
err = ch.QueueBind(queue.Name, "#", "results", false, nil)
if err != nil {
fmt.Printf("could bind queue: %v", err)
}
consume, err := ch.Consume(queue.Name, "dshtest", true, true, false, false, nil)
if err != nil {
fmt.Printf("could consume queue: %v", err)
}
mqttClient := mqtt.NewClient(&mqtt.ClientOptions{
Servers: []*url.URL{{Scheme: "wss", Host: "mqtthost:8443", Path: "/mqtt"}},
Username: "name",
Password: "mytoken",
WillQos: 1,
KeepAlive: 60,
PingTimeout: 3,
})
connectToken := mqttClient.Connect()
if canConnect := connectToken.Wait(); !canConnect {
fmt.Print("Cannot connect")
return
}
fmt.Println(connectToken.Error())
topic := "/mytopic"
for msg := range consume {
deliveryToken := mqttClient.Publish(topic, 1, false, string(msg.Body))
go func() {
deliveryToken.Wait()
connectToken.Error()
}()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment