Skip to content

Instantly share code, notes, and snippets.

@nenodias
Created April 27, 2023 15:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nenodias/b101d72cbdc88cb8821449c8f02beaec to your computer and use it in GitHub Desktop.
Save nenodias/b101d72cbdc88cb8821449c8f02beaec to your computer and use it in GitHub Desktop.
Go Rabbitmq
package main
import (
"context"
"fmt"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func DeclareQueue(ch *amqp.Channel, name string, durable bool) amqp.Queue {
q, err := ch.QueueDeclare(
name, // name
durable, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
fmt.Printf("Queue: %s created.\n", q.Name)
return q
}
func Send(ch *amqp.Channel, queue string, message string, contentType string) {
q := DeclareQueue(ch, queue, true)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
body := message
err := ch.PublishWithContext(ctx,
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: contentType,
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s\n", body)
}
func Consume(ch *amqp.Channel, name string) {
msgs, err := ch.Consume(
name, // queue
"consumer-em-go", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
log.Printf("\nContentType:%s\n", d.ContentType)
log.Printf("\nConsumerTag:%s\n", d.ConsumerTag)
log.Printf("\nTimestamp:%s\n", d.Timestamp)
log.Printf("\nMessageId:%s\n", d.MessageId)
log.Printf("\nPriority:%s\n", d.Priority)
d.Ack(true)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
func main() {
conn, err := amqp.Dial("amqps://user:pass@host/vhos")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
//Send(ch, "hello", "Hello World", "text/plain")
Consume(ch, "hello")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment