Skip to content

Instantly share code, notes, and snippets.

@cascad-s
Created December 15, 2016 10:49
Show Gist options
  • Save cascad-s/d6613665a714c6f6f9c904371d5505b5 to your computer and use it in GitHub Desktop.
Save cascad-s/d6613665a714c6f6f9c904371d5505b5 to your computer and use it in GitHub Desktop.
Some amqp
package main
import (
"fmt"
"log"
"time"
"github.com/streadway/amqp"
)
var amqpUri = "amqp://guest:guest@192.168.56.101:5672/"
var rabbitCloseError chan *amqp.Error
func connectToRabbitMQ(uri string) *amqp.Connection {
for {
conn, err := amqp.Dial(uri)
if err == nil {
return conn
}
log.Println(err)
log.Printf("Trying to reconnect to RabbitMQ at %s\n", uri)
time.Sleep(500 * time.Millisecond)
}
}
func rabbitConnector(uri string, storage *Storage) {
// var rabbitErr *amqp.Error
for {
rabbitErr := <-rabbitCloseError
if rabbitErr != nil {
fmt.Println(rabbitErr)
log.Printf("Connecting to %s\n..", amqpUri)
storage.conn, storage.ch, storage.q, storage.msgs = connect2amqp(uri)
// rabbitConn = connectToRabbitMQ(uri)
storage.conn.NotifyClose(rabbitCloseError)
fmt.Println("Connected.")
}
}
}
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func connect2amqp(uri string) (*amqp.Connection, *amqp.Channel, *amqp.Queue, *<-chan amqp.Delivery) {
conn := connectToRabbitMQ(uri)
fmt.Println("done connect")
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when usused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to channel consume")
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
return conn, ch, &q, &msgs
}
type Storage struct {
conn *amqp.Connection
ch *amqp.Channel
q *amqp.Queue
msgs *<-chan amqp.Delivery
}
func main() {
storage := new(Storage)
rabbitCloseError = make(chan *amqp.Error)
go rabbitConnector(amqpUri, storage)
var tmp <-chan amqp.Delivery
tmp = *storage.msgs
for {
select {
case <-rabbitCloseError:
fmt.Println("close error")
case msg := <-tmp:
fmt.Println("message", msg.Body)
}
}
fmt.Println("shit")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment