Last active
December 23, 2020 17:57
-
-
Save sashaaro/24b0b0ffc8f85f90257f88cb278baf4b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
for i in {1..200}; do nice -n 19 bin/console rabbitmq:consumer upload_picture & done |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module rabbitmq-example | |
go 1.15 | |
require github.com/streadway/amqp v1.0.0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"github.com/streadway/amqp" | |
"math/rand" | |
"strconv" | |
"sync" | |
"time" | |
) | |
func main() { | |
wg := sync.WaitGroup{} | |
connectionNumber, err := strconv.ParseInt(os.Args[1], 10, 32) | |
if err != nil { | |
panic(err) | |
} | |
var connections []*amqp.Connection | |
var channels []*amqp.Channel | |
for i := 0; i < int(connectionNumber); i++ { | |
connection := createConnection() | |
connections = append(connections, connection) | |
channel, err := connection.Channel() | |
//defer func() { | |
// err = channel.Close() | |
// if err != nil { | |
// panic(err) | |
// } | |
//}() | |
channels = append(channels, channel) | |
exchangeName := fmt.Sprintf("tmp_%s", strconv.Itoa(i)) | |
err = channel.ExchangeDeclare(exchangeName, "direct", true, true, false, false, nil) | |
if err != nil { | |
panic(err) | |
} | |
queue, err := channel.QueueDeclare(exchangeName, true, true, false, false, nil) | |
if err != nil { | |
panic(err) | |
} | |
err = channel.QueueBind(queue.Name, "", exchangeName, false, nil) | |
if err != nil { | |
panic(err) | |
} | |
fmt.Printf("Connection #%v declare queue %s binded to exchange %s\n", i, queue.Name, exchangeName) | |
} | |
for i := 0; i < len(connections); i++ { | |
wg.Add(1) | |
connection := connections[i] | |
channel := channels[i] | |
exchangeName := fmt.Sprintf("tmp_%s", strconv.Itoa(i)) | |
queueName := exchangeName | |
go func(queueName string, i int) { | |
defer wg.Done() | |
//err = subChannel.Close() | |
//if err != nil { | |
// panic(err) | |
//} | |
go func() { | |
fmt.Printf("Connection #%v start consume queue %s\n", i, queueName) | |
deliveryCh, err := channel.Consume(queueName, "", false, true, false, false, amqp.Table{}) | |
if err != nil { | |
panic(err) | |
} | |
for { | |
delivery := <- deliveryCh | |
fmt.Printf("Consumer #%v connection recieved msg: %s\n", i, string(delivery.Body)) | |
err = delivery.Ack(false) | |
if err != nil { | |
panic(err) | |
} | |
} | |
}() | |
pubChannel, err := connection.Channel() | |
if err != nil { | |
panic(err) | |
} | |
for { | |
fmt.Printf("Run publish look for #%v connection\n", i) | |
random := time.Duration(rand.Intn(5)) * time.Second | |
time.Sleep(random) | |
body := []byte(fmt.Sprintf("Order #%v", random.String())) | |
err := pubChannel.Publish(exchangeName, "", false, false, amqp.Publishing{Body: body, DeliveryMode: 2}) | |
if err != nil { | |
fmt.Printf("Closed: %v\n", connection.IsClosed()) | |
panic(err) | |
} | |
} | |
}(queueName, i) | |
} | |
wg.Wait() | |
} | |
func createConnection() *amqp.Connection { | |
amqpUrl := os.Getenv("AMQP_URL") | |
if ("" == amqpUrl) { | |
amqpUrl = "amqp://user:user@172.20.0.2:5672" | |
} | |
connection, err := amqp.Dial(amqpUrl) | |
if err != nil { | |
panic(err) | |
} | |
return connection | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
docker run --rm -d -p 15672:15672 -p 5672:5672 --name rabbitmq -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=user rabbitmq:3-management | |
docker run --rm -v ${PWD}:/app -w /app --net=host -e AMQP_URL=amqp://user:user@127.0.0.1:5672 golang go run . |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment