Skip to content

Instantly share code, notes, and snippets.

@sashaaro
Last active December 23, 2020 17:57
Show Gist options
  • Save sashaaro/24b0b0ffc8f85f90257f88cb278baf4b to your computer and use it in GitHub Desktop.
Save sashaaro/24b0b0ffc8f85f90257f88cb278baf4b to your computer and use it in GitHub Desktop.
for i in {1..200}; do nice -n 19 bin/console rabbitmq:consumer upload_picture & done
module rabbitmq-example
go 1.15
require github.com/streadway/amqp v1.0.0
package main
import (
"fmt"
"github.com/streadway/amqp"
"math/rand"
"strconv"
"sync"
"time"
)
func main() {
wg := sync.WaitGroup{}
connectionNumber, err := strconv.ParseInt(os.Args[1], 10, 32)
if err != nil {
panic(err)
}
var connections []*amqp.Connection
var channels []*amqp.Channel
for i := 0; i < int(connectionNumber); i++ {
connection := createConnection()
connections = append(connections, connection)
channel, err := connection.Channel()
//defer func() {
// err = channel.Close()
// if err != nil {
// panic(err)
// }
//}()
channels = append(channels, channel)
exchangeName := fmt.Sprintf("tmp_%s", strconv.Itoa(i))
err = channel.ExchangeDeclare(exchangeName, "direct", true, true, false, false, nil)
if err != nil {
panic(err)
}
queue, err := channel.QueueDeclare(exchangeName, true, true, false, false, nil)
if err != nil {
panic(err)
}
err = channel.QueueBind(queue.Name, "", exchangeName, false, nil)
if err != nil {
panic(err)
}
fmt.Printf("Connection #%v declare queue %s binded to exchange %s\n", i, queue.Name, exchangeName)
}
for i := 0; i < len(connections); i++ {
wg.Add(1)
connection := connections[i]
channel := channels[i]
exchangeName := fmt.Sprintf("tmp_%s", strconv.Itoa(i))
queueName := exchangeName
go func(queueName string, i int) {
defer wg.Done()
//err = subChannel.Close()
//if err != nil {
// panic(err)
//}
go func() {
fmt.Printf("Connection #%v start consume queue %s\n", i, queueName)
deliveryCh, err := channel.Consume(queueName, "", false, true, false, false, amqp.Table{})
if err != nil {
panic(err)
}
for {
delivery := <- deliveryCh
fmt.Printf("Consumer #%v connection recieved msg: %s\n", i, string(delivery.Body))
err = delivery.Ack(false)
if err != nil {
panic(err)
}
}
}()
pubChannel, err := connection.Channel()
if err != nil {
panic(err)
}
for {
fmt.Printf("Run publish look for #%v connection\n", i)
random := time.Duration(rand.Intn(5)) * time.Second
time.Sleep(random)
body := []byte(fmt.Sprintf("Order #%v", random.String()))
err := pubChannel.Publish(exchangeName, "", false, false, amqp.Publishing{Body: body, DeliveryMode: 2})
if err != nil {
fmt.Printf("Closed: %v\n", connection.IsClosed())
panic(err)
}
}
}(queueName, i)
}
wg.Wait()
}
func createConnection() *amqp.Connection {
amqpUrl := os.Getenv("AMQP_URL")
if ("" == amqpUrl) {
amqpUrl = "amqp://user:user@172.20.0.2:5672"
}
connection, err := amqp.Dial(amqpUrl)
if err != nil {
panic(err)
}
return connection
}
docker run --rm -d -p 15672:15672 -p 5672:5672 --name rabbitmq -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=user rabbitmq:3-management
docker run --rm -v ${PWD}:/app -w /app --net=host -e AMQP_URL=amqp://user:user@127.0.0.1:5672 golang go run .
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment