Created
June 20, 2023 10:55
-
-
Save Zerpet/b894e894cd2b7c314c61f8ad13497ae6 to your computer and use it in GitHub Desktop.
Benchmark: buffered deliveries channel batching vs un-buffered deliveries channel batching
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//+go:build integration | |
package amqp091_test | |
import ( | |
"context" | |
"fmt" | |
amqp "github.com/rabbitmq/amqp091-go" | |
"os" | |
"strconv" | |
"testing" | |
"time" | |
) | |
// Dials RabbitMQ, creates a channel, puts the channel in confirm mode, creates a | |
// non-durable auto-delete queue, and returns the connection and channel. | |
// | |
// It is the caller's responsibility to close the connection/channel and delete | |
// the queue. | |
func setupBenchmarkQueue(b *testing.B) (*amqp.Connection, *amqp.Channel) { | |
b.Helper() | |
const envAMQPURLName = "AMQP_URL" | |
var amqpURL = "amqp://guest:guest@127.0.0.1:5672/" | |
u := os.Getenv(envAMQPURLName) | |
if u != "" { | |
amqpURL = u | |
} | |
amqpConn, err := amqp.Dial(amqpURL) | |
if err != nil { | |
b.Fatalf("failed to connect to RabbitMQ: %v", err) | |
} | |
amqpChannel, err := amqpConn.Channel() | |
if err != nil { | |
b.Fatalf("Failed to open a channel: %v", err) | |
} | |
// This will make sure all messages are received by the server before beginning consuming | |
err = amqpChannel.Confirm(false) | |
if err != nil { | |
b.Fatalf("Failed to set noWait for deliveries: %v", err) | |
} | |
_, err = amqpChannel.QueueDeclare(b.Name(), false, true, false, false, amqp.Table{"x-queue-version": 2}) | |
if err != nil { | |
b.Fatalf("Failed to declare a queue: %v", err) | |
} | |
return amqpConn, amqpChannel | |
} | |
func setupQueueMessages(b *testing.B, ch *amqp.Channel, numMessages int) { | |
b.Helper() | |
for i := 0; i < numMessages; i++ { | |
err := ch.PublishWithContext(context.TODO(), | |
amqp.DefaultExchange, // exchange | |
b.Name(), // routing key | |
false, // mandatory | |
false, // immediate | |
amqp.Publishing{ | |
ContentType: "text/plain", | |
Body: []byte(fmt.Sprintf("Message %d", i)), | |
DeliveryMode: 0, // transient | |
}) | |
if err != nil { | |
b.Fatalf("failed to publish to AMQP server: %v", err) | |
} | |
} | |
ackNotice, nackNotice := ch.NotifyConfirm(make(chan uint64), make(chan uint64)) | |
for { | |
select { | |
case nack := <-nackNotice: | |
b.Fatalf("nack: %d", nack) | |
case ack := <-ackNotice: | |
if int(ack) >= numMessages { | |
// confirmations may arrive more than once? | |
return | |
} | |
case <-time.After(60 * time.Second): | |
b.Fatal("timeout waiting for all confirmations") | |
} | |
} | |
} | |
func BenchmarkQoSAndConsumeBatching(b *testing.B) { | |
bench := func(b *testing.B) { | |
prefetchCount, err := strconv.Atoi(os.Getenv("QoS")) | |
if err != nil { | |
b.Fatalf("cloud not parse QoS: %v", err) | |
} | |
totalMessages := prefetchCount * 10 | |
amqpConn, amqpChannel := setupBenchmarkQueue(b) | |
b.Cleanup(func() { | |
_ = amqpChannel.Close() | |
_ = amqpConn.Close() | |
}) | |
setupQueueMessages(b, amqpChannel, totalMessages) | |
if err := amqpChannel.Qos(prefetchCount, 0, false); err != nil { | |
b.Fatalf("failed to set QoS: %v", err) | |
} | |
deliveryChannel, err := amqpChannel.Consume( | |
b.Name(), // queue | |
b.Name(), // consumer | |
false, // auto-ack | |
false, // exclusive | |
false, // noLocal (not supported by rabbitMQ) | |
false, // no-wait | |
nil, // args | |
) | |
if err != nil { | |
b.Fatalf("failed to subscribe: %v", err) | |
} | |
// Receiving loop, loop until we reach the number of delivery we just published | |
totalDeliveries := 0 | |
var delivery amqp.Delivery | |
nbBatch := 0 | |
b.ResetTimer() | |
readAll: | |
for { | |
select { | |
case delivery = <-deliveryChannel: | |
nbBatch++ | |
totalDeliveries++ | |
// loop to force emptying deliveryChannel because default case would trigger randomly otherwise | |
batchingLoop: | |
for { | |
select { | |
case delivery = <-deliveryChannel: | |
totalDeliveries++ | |
if totalDeliveries == prefetchCount { | |
break batchingLoop | |
} | |
// ... some code here would be copying deliveries to another data structure ... | |
case <-time.After(100 * time.Millisecond): | |
break batchingLoop | |
} | |
} | |
// cannot sleep in benchmark so using a cpu-busy loop to simulate processing of delivery | |
for i := 0; i < 100_000; i++ { | |
// very much work on the batch | |
// this could be dispatched to a goroutine or some worker process | |
} | |
// Ack multiple | |
if err := delivery.Ack(true); err != nil { | |
b.Fatalf("failed to ack multiple deliveries: %v", err) | |
} | |
default: | |
if amqpChannel.IsClosed() { | |
b.Fatalf("deliveries was closed") | |
} | |
if totalDeliveries >= totalMessages { | |
// end benchmark | |
break readAll | |
} | |
} | |
} | |
b.ReportMetric(float64(totalDeliveries)/float64(nbBatch), "msg/batch") | |
b.ReportMetric(float64(totalDeliveries)/b.Elapsed().Seconds(), "msg/s") | |
} | |
b.Setenv("QoS", "10") | |
b.Run("QoS=10", bench) | |
b.Setenv("QoS", "100") | |
b.Run("QoS=100", bench) | |
b.Setenv("QoS", "1000") | |
b.Run("QoS=1000", bench) | |
} | |
func BenchmarkQosAndBufferedDeliveries(b *testing.B) { | |
bench := func(b *testing.B) { | |
prefetchCount, err := strconv.Atoi(os.Getenv("QoS")) | |
if err != nil { | |
b.Fatalf("cloud not parse QoS: %v", err) | |
} | |
totalMessages := prefetchCount * 10 | |
amqpConn, amqpChannel := setupBenchmarkQueue(b) | |
b.Cleanup(func() { | |
_ = amqpChannel.Close() | |
_ = amqpConn.Close() | |
}) | |
setupQueueMessages(b, amqpChannel, totalMessages) | |
if err := amqpChannel.Qos(prefetchCount, 0, false); err != nil { | |
b.Fatalf("failed to set QoS: %v", err) | |
} | |
deliveryChannel, err := amqpChannel.ConsumeBuffered( | |
b.Name(), // queue | |
b.Name(), // consumer | |
false, // auto-ack | |
false, // exclusive | |
false, // noLocal (not supported by rabbitMQ) | |
false, // no-wait | |
nil, // args | |
prefetchCount, | |
) | |
if err != nil { | |
b.Fatalf("failed to subscribe: %v", err) | |
} | |
// Receiving loop, loop until we reach the number of delivery we just published | |
totalDeliveries := 0 | |
var delivery amqp.Delivery | |
nbBatch := 0 | |
b.ResetTimer() | |
readAll: | |
for { | |
select { | |
case delivery = <-deliveryChannel: | |
nbBatch++ | |
totalDeliveries++ | |
// loop to force emptying deliveryChannel because default case would trigger randomly otherwise | |
batchingLoop: | |
for { | |
select { | |
case delivery = <-deliveryChannel: | |
totalDeliveries++ | |
if totalDeliveries == prefetchCount { | |
break batchingLoop | |
} | |
// ... some code here would be copying deliveries to another data structure ... | |
case <-time.After(100 * time.Millisecond): | |
break batchingLoop | |
} | |
} | |
// cannot sleep in benchmark so using a cpu-busy loop to simulate processing of delivery | |
for i := 0; i < 100_000; i++ { | |
// very much work on the batch | |
// this could be dispatched to a goroutine or some worker process | |
} | |
// Ack multiple | |
if err := delivery.Ack(true); err != nil { | |
b.Fatalf("failed to ack multiple deliveries: %v", err) | |
} | |
default: | |
if amqpChannel.IsClosed() { | |
b.Fatalf("deliveries was closed") | |
} | |
if totalDeliveries >= totalMessages { | |
// end benchmark | |
break readAll | |
} | |
} | |
} | |
b.ReportMetric(float64(totalDeliveries)/float64(nbBatch), "msg/batch") | |
b.ReportMetric(float64(totalDeliveries)/b.Elapsed().Seconds(), "msg/s") | |
} | |
b.Setenv("QoS", "10") | |
b.Run("QoS=10", bench) | |
b.Setenv("QoS", "100") | |
b.Run("QoS=100", bench) | |
b.Setenv("QoS", "1000") | |
b.Run("QoS=1000", bench) | |
} | |
func BenchmarkBufferedChannels(b *testing.B) { | |
// We use a direct exchange here so the name of the queue is the routing key | |
exchange := "" | |
amqpUrl := "amqp://127.0.0.1:5672" | |
queue := "queue" | |
prefetchCount := b.N | |
b.Logf("Running for b.N=%d", b.N) | |
// Init AMQP connection and channel | |
amqpConn, err := amqp.Dial(amqpUrl) | |
if err != nil { | |
b.Fatalf("Failed to connect to RabbitMQ: %v", err) | |
} | |
amqpChannel, err := amqpConn.Channel() | |
if err != nil { | |
b.Fatalf("Failed to open a channel: %v", err) | |
} | |
_, err = amqpChannel.QueueDeclare(queue, false, true, false, false, amqp.Table{"x-queue-version": 2}) | |
if err != nil { | |
b.Fatalf("Failed to declare a queue: %v", err) | |
} | |
// This will make sure all messages are received by the server before beginning consuming | |
err = amqpChannel.Confirm(false) | |
if err != nil { | |
b.Fatalf("Failed to set noWait for deliveries: %v", err) | |
} | |
// Publish a number of messages defined in the -bench arg | |
for i := 0; i < b.N*10; i++ { | |
func() { | |
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) | |
defer cancel() | |
err := amqpChannel.PublishWithContext(ctx, | |
exchange, // exchange | |
queue, // routing key | |
false, // mandatory | |
false, // immediate | |
amqp.Publishing{ | |
ContentType: "text/plain", | |
Body: []byte{}, | |
DeliveryMode: 2, // persistent | |
}) | |
if err != nil { | |
b.Fatalf("Failed to publish to AMQP server: %v", err) | |
} | |
}() | |
} | |
// Start the consumer | |
err = amqpChannel.Qos(prefetchCount, 0, false) | |
if err != nil { | |
b.Fatalf("Failed to set prefetchCount: %v", err) | |
} | |
deliveryChannel, err := amqpChannel.ConsumeBuffered( | |
queue, // queue | |
"", // consumer | |
false, // auto-ack | |
false, // exclusive | |
false, // noLocal (not supported by rabbitMQ) | |
false, // no-wait | |
nil, // args | |
prefetchCount, | |
) | |
if err != nil { | |
b.Fatalf("Failed to create consumer: %v", err) | |
} | |
// Receiving loop, loop until we reach the number of delivery we just published | |
totalDeliveries := 0 | |
var delivery amqp.Delivery | |
nbBatch := 0 | |
b.ResetTimer() | |
BatchingLoop: | |
for { | |
select { | |
case delivery = <-deliveryChannel: | |
nbBatch++ | |
totalDeliveries++ | |
currentBatch := 1 | |
// cannot sleep in benchmark so using a cpu-busy loop to simulate processing of delivery | |
for i := 0; i < 100000; i++ { | |
} | |
// loop to force emptying deliveryChannel because default case would trigger randomly otherwise | |
readAll: | |
for { | |
select { | |
case delivery = <-deliveryChannel: | |
totalDeliveries++ | |
currentBatch++ | |
// cannot sleep in benchmark so using a cpu-busy loop to simulate processing of delivery | |
for i := 0; i < 100000; i++ { | |
} | |
default: | |
break readAll | |
} | |
} | |
// Ack multiple | |
err := delivery.Ack(true) | |
if err != nil { | |
b.Fatalf("Failed to ack multiple deliveries: %v", err) | |
} | |
default: | |
if amqpChannel.IsClosed() { | |
b.Fatalf("Channel was closed") | |
} | |
if totalDeliveries >= b.N { | |
// end benchmark | |
amqpChannel.Close() | |
amqpConn.Close() | |
b.Logf("Average batch size: %f", float64(totalDeliveries/nbBatch)) | |
b.Logf("Messages per second (N=%d): %f", b.N, float64(totalDeliveries)/b.Elapsed().Seconds()) | |
break BatchingLoop | |
} | |
} | |
} | |
b.ReportMetric(float64(totalDeliveries)/b.Elapsed().Seconds(), "msg/s") | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
go test -tags integration -bench '.' -run '^$' -v -count=1 -benchtime '10x' | |
environment variable envAMQPURLName undefined or empty, using default: "amqp://guest:guest@127.0.0.1:5672/" | |
goos: darwin | |
goarch: arm64 | |
pkg: github.com/rabbitmq/amqp091-go | |
BenchmarkSequentialBufferedConfirms | |
BenchmarkSequentialBufferedConfirms-10 10 4950 ns/op | |
BenchmarkQoSAndConsumeBatching | |
BenchmarkQoSAndConsumeBatching/QoS=10 | |
BenchmarkQoSAndConsumeBatching/QoS=10-10 10 94708196 ns/op 10.00 msg/batch 105.6 msg/s | |
BenchmarkQoSAndConsumeBatching/QoS=100 | |
BenchmarkQoSAndConsumeBatching/QoS=100-10 10 95461850 ns/op 100.0 msg/batch 1048 msg/s | |
BenchmarkQoSAndConsumeBatching/QoS=1000 | |
BenchmarkQoSAndConsumeBatching/QoS=1000-10 10 105675596 ns/op 1000 msg/batch 9463 msg/s | |
BenchmarkQosAndBufferedDeliveries | |
BenchmarkQosAndBufferedDeliveries/QoS=10 | |
BenchmarkQosAndBufferedDeliveries/QoS=10-10 10 96095033 ns/op 10.00 msg/batch 104.1 msg/s | |
BenchmarkQosAndBufferedDeliveries/QoS=100 | |
BenchmarkQosAndBufferedDeliveries/QoS=100-10 10 98800742 ns/op 100.0 msg/batch 1012 msg/s | |
BenchmarkQosAndBufferedDeliveries/QoS=1000 | |
BenchmarkQosAndBufferedDeliveries/QoS=1000-10 10 106693950 ns/op 1000 msg/batch 9373 msg/s | |
go test -tags integration -bench 'BenchmarkBufferedChannels' -run '^$' -v -count=1 | |
environment variable envAMQPURLName undefined or empty, using default: "amqp://guest:guest@127.0.0.1:5672/" | |
goos: darwin | |
goarch: arm64 | |
pkg: github.com/rabbitmq/amqp091-go | |
BenchmarkBufferedChannels | |
bench_qos_test.go:288: Running for b.N=1 | |
bench_qos_test.go:394: Average batch size: 1.000000 | |
bench_qos_test.go:395: Messages per second (N=1): 74.960648 | |
bench_qos_test.go:288: Running for b.N=88 | |
bench_qos_test.go:394: Average batch size: 88.000000 | |
bench_qos_test.go:395: Messages per second (N=88): 7665.533271 | |
bench_qos_test.go:288: Running for b.N=8800 | |
bench_qos_test.go:394: Average batch size: 8800.000000 | |
bench_qos_test.go:395: Messages per second (N=8800): 19000.045237 | |
bench_qos_test.go:288: Running for b.N=22798 | |
bench_qos_test.go:394: Average batch size: 22798.000000 | |
bench_qos_test.go:395: Messages per second (N=22798): 19309.582351 | |
BenchmarkBufferedChannels-10 22798 51788 ns/op 19309 msg/s | |
PASS | |
ok github.com/rabbitmq/amqp091-go 8.315s |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment