Skip to content

Instantly share code, notes, and snippets.

@Zerpet
Created June 20, 2023 10:55
Show Gist options
  • Save Zerpet/b894e894cd2b7c314c61f8ad13497ae6 to your computer and use it in GitHub Desktop.
Save Zerpet/b894e894cd2b7c314c61f8ad13497ae6 to your computer and use it in GitHub Desktop.
Benchmark: buffered deliveries channel batching vs un-buffered deliveries channel batching
//+go:build integration
package amqp091_test
import (
"context"
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
"os"
"strconv"
"testing"
"time"
)
// Dials RabbitMQ, creates a channel, puts the channel in confirm mode, creates a
// non-durable auto-delete queue, and returns the connection and channel.
//
// It is the caller's responsibility to close the connection/channel and delete
// the queue.
func setupBenchmarkQueue(b *testing.B) (*amqp.Connection, *amqp.Channel) {
b.Helper()
const envAMQPURLName = "AMQP_URL"
var amqpURL = "amqp://guest:guest@127.0.0.1:5672/"
u := os.Getenv(envAMQPURLName)
if u != "" {
amqpURL = u
}
amqpConn, err := amqp.Dial(amqpURL)
if err != nil {
b.Fatalf("failed to connect to RabbitMQ: %v", err)
}
amqpChannel, err := amqpConn.Channel()
if err != nil {
b.Fatalf("Failed to open a channel: %v", err)
}
// This will make sure all messages are received by the server before beginning consuming
err = amqpChannel.Confirm(false)
if err != nil {
b.Fatalf("Failed to set noWait for deliveries: %v", err)
}
_, err = amqpChannel.QueueDeclare(b.Name(), false, true, false, false, amqp.Table{"x-queue-version": 2})
if err != nil {
b.Fatalf("Failed to declare a queue: %v", err)
}
return amqpConn, amqpChannel
}
func setupQueueMessages(b *testing.B, ch *amqp.Channel, numMessages int) {
b.Helper()
for i := 0; i < numMessages; i++ {
err := ch.PublishWithContext(context.TODO(),
amqp.DefaultExchange, // exchange
b.Name(), // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(fmt.Sprintf("Message %d", i)),
DeliveryMode: 0, // transient
})
if err != nil {
b.Fatalf("failed to publish to AMQP server: %v", err)
}
}
ackNotice, nackNotice := ch.NotifyConfirm(make(chan uint64), make(chan uint64))
for {
select {
case nack := <-nackNotice:
b.Fatalf("nack: %d", nack)
case ack := <-ackNotice:
if int(ack) >= numMessages {
// confirmations may arrive more than once?
return
}
case <-time.After(60 * time.Second):
b.Fatal("timeout waiting for all confirmations")
}
}
}
func BenchmarkQoSAndConsumeBatching(b *testing.B) {
bench := func(b *testing.B) {
prefetchCount, err := strconv.Atoi(os.Getenv("QoS"))
if err != nil {
b.Fatalf("cloud not parse QoS: %v", err)
}
totalMessages := prefetchCount * 10
amqpConn, amqpChannel := setupBenchmarkQueue(b)
b.Cleanup(func() {
_ = amqpChannel.Close()
_ = amqpConn.Close()
})
setupQueueMessages(b, amqpChannel, totalMessages)
if err := amqpChannel.Qos(prefetchCount, 0, false); err != nil {
b.Fatalf("failed to set QoS: %v", err)
}
deliveryChannel, err := amqpChannel.Consume(
b.Name(), // queue
b.Name(), // consumer
false, // auto-ack
false, // exclusive
false, // noLocal (not supported by rabbitMQ)
false, // no-wait
nil, // args
)
if err != nil {
b.Fatalf("failed to subscribe: %v", err)
}
// Receiving loop, loop until we reach the number of delivery we just published
totalDeliveries := 0
var delivery amqp.Delivery
nbBatch := 0
b.ResetTimer()
readAll:
for {
select {
case delivery = <-deliveryChannel:
nbBatch++
totalDeliveries++
// loop to force emptying deliveryChannel because default case would trigger randomly otherwise
batchingLoop:
for {
select {
case delivery = <-deliveryChannel:
totalDeliveries++
if totalDeliveries == prefetchCount {
break batchingLoop
}
// ... some code here would be copying deliveries to another data structure ...
case <-time.After(100 * time.Millisecond):
break batchingLoop
}
}
// cannot sleep in benchmark so using a cpu-busy loop to simulate processing of delivery
for i := 0; i < 100_000; i++ {
// very much work on the batch
// this could be dispatched to a goroutine or some worker process
}
// Ack multiple
if err := delivery.Ack(true); err != nil {
b.Fatalf("failed to ack multiple deliveries: %v", err)
}
default:
if amqpChannel.IsClosed() {
b.Fatalf("deliveries was closed")
}
if totalDeliveries >= totalMessages {
// end benchmark
break readAll
}
}
}
b.ReportMetric(float64(totalDeliveries)/float64(nbBatch), "msg/batch")
b.ReportMetric(float64(totalDeliveries)/b.Elapsed().Seconds(), "msg/s")
}
b.Setenv("QoS", "10")
b.Run("QoS=10", bench)
b.Setenv("QoS", "100")
b.Run("QoS=100", bench)
b.Setenv("QoS", "1000")
b.Run("QoS=1000", bench)
}
func BenchmarkQosAndBufferedDeliveries(b *testing.B) {
bench := func(b *testing.B) {
prefetchCount, err := strconv.Atoi(os.Getenv("QoS"))
if err != nil {
b.Fatalf("cloud not parse QoS: %v", err)
}
totalMessages := prefetchCount * 10
amqpConn, amqpChannel := setupBenchmarkQueue(b)
b.Cleanup(func() {
_ = amqpChannel.Close()
_ = amqpConn.Close()
})
setupQueueMessages(b, amqpChannel, totalMessages)
if err := amqpChannel.Qos(prefetchCount, 0, false); err != nil {
b.Fatalf("failed to set QoS: %v", err)
}
deliveryChannel, err := amqpChannel.ConsumeBuffered(
b.Name(), // queue
b.Name(), // consumer
false, // auto-ack
false, // exclusive
false, // noLocal (not supported by rabbitMQ)
false, // no-wait
nil, // args
prefetchCount,
)
if err != nil {
b.Fatalf("failed to subscribe: %v", err)
}
// Receiving loop, loop until we reach the number of delivery we just published
totalDeliveries := 0
var delivery amqp.Delivery
nbBatch := 0
b.ResetTimer()
readAll:
for {
select {
case delivery = <-deliveryChannel:
nbBatch++
totalDeliveries++
// loop to force emptying deliveryChannel because default case would trigger randomly otherwise
batchingLoop:
for {
select {
case delivery = <-deliveryChannel:
totalDeliveries++
if totalDeliveries == prefetchCount {
break batchingLoop
}
// ... some code here would be copying deliveries to another data structure ...
case <-time.After(100 * time.Millisecond):
break batchingLoop
}
}
// cannot sleep in benchmark so using a cpu-busy loop to simulate processing of delivery
for i := 0; i < 100_000; i++ {
// very much work on the batch
// this could be dispatched to a goroutine or some worker process
}
// Ack multiple
if err := delivery.Ack(true); err != nil {
b.Fatalf("failed to ack multiple deliveries: %v", err)
}
default:
if amqpChannel.IsClosed() {
b.Fatalf("deliveries was closed")
}
if totalDeliveries >= totalMessages {
// end benchmark
break readAll
}
}
}
b.ReportMetric(float64(totalDeliveries)/float64(nbBatch), "msg/batch")
b.ReportMetric(float64(totalDeliveries)/b.Elapsed().Seconds(), "msg/s")
}
b.Setenv("QoS", "10")
b.Run("QoS=10", bench)
b.Setenv("QoS", "100")
b.Run("QoS=100", bench)
b.Setenv("QoS", "1000")
b.Run("QoS=1000", bench)
}
func BenchmarkBufferedChannels(b *testing.B) {
// We use a direct exchange here so the name of the queue is the routing key
exchange := ""
amqpUrl := "amqp://127.0.0.1:5672"
queue := "queue"
prefetchCount := b.N
b.Logf("Running for b.N=%d", b.N)
// Init AMQP connection and channel
amqpConn, err := amqp.Dial(amqpUrl)
if err != nil {
b.Fatalf("Failed to connect to RabbitMQ: %v", err)
}
amqpChannel, err := amqpConn.Channel()
if err != nil {
b.Fatalf("Failed to open a channel: %v", err)
}
_, err = amqpChannel.QueueDeclare(queue, false, true, false, false, amqp.Table{"x-queue-version": 2})
if err != nil {
b.Fatalf("Failed to declare a queue: %v", err)
}
// This will make sure all messages are received by the server before beginning consuming
err = amqpChannel.Confirm(false)
if err != nil {
b.Fatalf("Failed to set noWait for deliveries: %v", err)
}
// Publish a number of messages defined in the -bench arg
for i := 0; i < b.N*10; i++ {
func() {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
err := amqpChannel.PublishWithContext(ctx,
exchange, // exchange
queue, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte{},
DeliveryMode: 2, // persistent
})
if err != nil {
b.Fatalf("Failed to publish to AMQP server: %v", err)
}
}()
}
// Start the consumer
err = amqpChannel.Qos(prefetchCount, 0, false)
if err != nil {
b.Fatalf("Failed to set prefetchCount: %v", err)
}
deliveryChannel, err := amqpChannel.ConsumeBuffered(
queue, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // noLocal (not supported by rabbitMQ)
false, // no-wait
nil, // args
prefetchCount,
)
if err != nil {
b.Fatalf("Failed to create consumer: %v", err)
}
// Receiving loop, loop until we reach the number of delivery we just published
totalDeliveries := 0
var delivery amqp.Delivery
nbBatch := 0
b.ResetTimer()
BatchingLoop:
for {
select {
case delivery = <-deliveryChannel:
nbBatch++
totalDeliveries++
currentBatch := 1
// cannot sleep in benchmark so using a cpu-busy loop to simulate processing of delivery
for i := 0; i < 100000; i++ {
}
// loop to force emptying deliveryChannel because default case would trigger randomly otherwise
readAll:
for {
select {
case delivery = <-deliveryChannel:
totalDeliveries++
currentBatch++
// cannot sleep in benchmark so using a cpu-busy loop to simulate processing of delivery
for i := 0; i < 100000; i++ {
}
default:
break readAll
}
}
// Ack multiple
err := delivery.Ack(true)
if err != nil {
b.Fatalf("Failed to ack multiple deliveries: %v", err)
}
default:
if amqpChannel.IsClosed() {
b.Fatalf("Channel was closed")
}
if totalDeliveries >= b.N {
// end benchmark
amqpChannel.Close()
amqpConn.Close()
b.Logf("Average batch size: %f", float64(totalDeliveries/nbBatch))
b.Logf("Messages per second (N=%d): %f", b.N, float64(totalDeliveries)/b.Elapsed().Seconds())
break BatchingLoop
}
}
}
b.ReportMetric(float64(totalDeliveries)/b.Elapsed().Seconds(), "msg/s")
}
go test -tags integration -bench '.' -run '^$' -v -count=1 -benchtime '10x'
environment variable envAMQPURLName undefined or empty, using default: "amqp://guest:guest@127.0.0.1:5672/"
goos: darwin
goarch: arm64
pkg: github.com/rabbitmq/amqp091-go
BenchmarkSequentialBufferedConfirms
BenchmarkSequentialBufferedConfirms-10 10 4950 ns/op
BenchmarkQoSAndConsumeBatching
BenchmarkQoSAndConsumeBatching/QoS=10
BenchmarkQoSAndConsumeBatching/QoS=10-10 10 94708196 ns/op 10.00 msg/batch 105.6 msg/s
BenchmarkQoSAndConsumeBatching/QoS=100
BenchmarkQoSAndConsumeBatching/QoS=100-10 10 95461850 ns/op 100.0 msg/batch 1048 msg/s
BenchmarkQoSAndConsumeBatching/QoS=1000
BenchmarkQoSAndConsumeBatching/QoS=1000-10 10 105675596 ns/op 1000 msg/batch 9463 msg/s
BenchmarkQosAndBufferedDeliveries
BenchmarkQosAndBufferedDeliveries/QoS=10
BenchmarkQosAndBufferedDeliveries/QoS=10-10 10 96095033 ns/op 10.00 msg/batch 104.1 msg/s
BenchmarkQosAndBufferedDeliveries/QoS=100
BenchmarkQosAndBufferedDeliveries/QoS=100-10 10 98800742 ns/op 100.0 msg/batch 1012 msg/s
BenchmarkQosAndBufferedDeliveries/QoS=1000
BenchmarkQosAndBufferedDeliveries/QoS=1000-10 10 106693950 ns/op 1000 msg/batch 9373 msg/s
go test -tags integration -bench 'BenchmarkBufferedChannels' -run '^$' -v -count=1
environment variable envAMQPURLName undefined or empty, using default: "amqp://guest:guest@127.0.0.1:5672/"
goos: darwin
goarch: arm64
pkg: github.com/rabbitmq/amqp091-go
BenchmarkBufferedChannels
bench_qos_test.go:288: Running for b.N=1
bench_qos_test.go:394: Average batch size: 1.000000
bench_qos_test.go:395: Messages per second (N=1): 74.960648
bench_qos_test.go:288: Running for b.N=88
bench_qos_test.go:394: Average batch size: 88.000000
bench_qos_test.go:395: Messages per second (N=88): 7665.533271
bench_qos_test.go:288: Running for b.N=8800
bench_qos_test.go:394: Average batch size: 8800.000000
bench_qos_test.go:395: Messages per second (N=8800): 19000.045237
bench_qos_test.go:288: Running for b.N=22798
bench_qos_test.go:394: Average batch size: 22798.000000
bench_qos_test.go:395: Messages per second (N=22798): 19309.582351
BenchmarkBufferedChannels-10 22798 51788 ns/op 19309 msg/s
PASS
ok github.com/rabbitmq/amqp091-go 8.315s
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment