Skip to content

Instantly share code, notes, and snippets.

@anuchandy
Last active April 19, 2023 05:24
Show Gist options
  • Save anuchandy/075257a92fbc1cff39de4fb0d0dbe9b3 to your computer and use it in GitHub Desktop.
Save anuchandy/075257a92fbc1cff39de4fb0d0dbe9b3 to your computer and use it in GitHub Desktop.

The message batching

When sending a batch of messages, Service Bus and Event Hubs libraries have the following logic -

  1. Allocate a byte array for the Uber batch message.
  2. Iterate through the list of messages, encode each message, and append it to the above byte array.

The encode and append operations are handled by the 'encode' API1 in the QPid library.

int Message::encode(byte[] byteArray, int offset, int length)

Because the client doesn't know the final size of the encoded Uber message, today, it allocates the byte array upfront with a size equal to the maximum size link support, which is the MaxMessageSize configured for the entity, new byte[maxMessageSize]. For a Service Bus Queue, the MaxMessageSize can range from 1024KB (1MB) to 102400KB (100MB).

This is an allocation overhead that customers recently started to report. For example, even if the total encoded size of the batch is 2 KB, the library allocates 50MB if the Queue's configured MaxMessageSize is 50MB. The allocation becomes prominent in concurrent send cases.

The above allocation pattern in both Track1 and Track2 SDK are the same.

Alternate API for encoing and appending

Looking into how to improve the experience, we came across an overload of QPid 'encode' API2 -

int encode(WritableBuffer buffer);

The currently used 'encode' API1 underneath uses this API2 by wrapping the fixed-size input byte array into java.nio.ByteBuffer, then a 'WritableBuffer' internal impl in QPid abstract this ByteBuffer.

It means the messaging libraries can provide 'WritableBuffer' implementation with the following properties -

  1. Starts with an internal buffer (byte array) of an initial size.
  2. Grow the internal buffer as needed, with MaxMessageSize as the upper limit.

It is very much like JDK's ByteArrayOutputStream, and using it with Jackson serialization (encoding) API, that we use in azure-core.

The second gist shows a referance implementation of 'WritableBuffer', 'ByteArrayWritableBuffer' with the above properties.

Notes

Further exploration shows that it's not just Azure Messaging libraries that run into allocation over ahead. The Redhat Vertex project using QPid has to roll out it's own 'WritableBuffer' reference. It uses Netty's "Heap" based "Unpooled" ByteBuf underneath as the buffer.

It may not be wise to add the Netty dependency to azure-core-amqp to use ByteBuf -

  • First, it's an extra dependency we're bringing in.
  • Second, the internal code of the Netty ByteBuf API subset that Vertex uses for their 'WritableBuffer' is the same as 'ByteArrayWritableBuffer'.
  • Third, The Netty ByteBuf type has additional complexity for other APIs it has to support but is not needed to back 'WritableBuffer'.

Finally, the buffer reallocation algorithm in 'ByteArrayWritableBuffer' is exactly the same as JDK's ByteArrayOutputStream.

We can also use MethodHandle or Reflection to runtime detect Netty in the classpath and access its APIs. But, it is not worth at least now. It's worthwhile if we ever want to use Netty's "Pooled" ByeBuf, which comes with the great responsability for the libraries to managing reference counted buffers. That's totally a different territory to explore.

package com.azure.messaging.servicebus.sendalloc;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.codec.WritableBuffer;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.Arrays;
public final class ByteArrayWritableBuffer implements WritableBuffer {
private final int maxBufferSize;
private int position;
private byte buffer[];
public ByteArrayWritableBuffer(int initialBufferSize, int maxBufferSize) {
if (initialBufferSize < 0 || initialBufferSize > maxBufferSize) {
throw new IllegalArgumentException("initialBufferSize should be in the range [0:maxBufferSize].");
}
this.maxBufferSize = maxBufferSize;
this.position = 0;
this.buffer = new byte[initialBufferSize];
}
@Override
public void put(byte b) {
allocateIfNeeded(1);
buffer[position] = b;
position += 1;
}
@Override
public void putShort(short s) {
allocateIfNeeded(2);
buffer[position] = (byte)(s >>> 8);
buffer[position + 1] = (byte)(s >>> 0);
position += 2;
}
@Override
public void putInt(int i) {
allocateIfNeeded(4);
buffer[position] = (byte)(i >>> 24);
buffer[position + 1] = (byte)(i >>> 16);
buffer[position + 2] = (byte)(i >>> 8);
buffer[position + 3] = (byte)(i >>> 0);
position += 4;
}
@Override
public void putFloat(float f) {
putInt(Float.floatToIntBits(f));
}
@Override
public void putLong(long l) {
allocateIfNeeded(8);
buffer[position] = (byte)(l >>> 56);
buffer[position + 1] = (byte)(l >>> 48);
buffer[position + 2] = (byte)(l >>> 40);
buffer[position + 3] = (byte)(l >>> 32);
buffer[position + 4] = (byte)(l >>> 24);
buffer[position + 5] = (byte)(l >>> 16);
buffer[position + 6] = (byte)(l >>> 8);
buffer[position + 7] = (byte)(l >>> 0);
position += 8;
}
@Override
public void putDouble(double d) {
putLong(Double.doubleToLongBits(d));
}
@Override
public void put(byte[] src, int offset, int length) {
if (offset < 0 || length < 0 || length > src.length - offset) {
throw new IndexOutOfBoundsException("offset=" + offset + ",length=" + length + ",src.length=" + src.length);
}
allocateIfNeeded(length);
System.arraycopy(src, offset, buffer, position, length);
position += length;
}
@Override
public void put(ByteBuffer payload) {
final int n = payload.remaining();
allocateIfNeeded(n);
payload.get(buffer, position, n);
position += n;
}
@Override
public void put(ReadableBuffer payload) {
payload.get(this);
}
@Override
public void put(final String value) {
final int length = value.length();
// TODO: anu - double-check String with unicode points and adjust the growIfNeeded arg accordingly.
allocateIfNeeded(length);
for (int i = 0; i < length; i++) {
int c = value.charAt(i);
if ((c & 0xFF80) == 0) {
// U+0000..U+007F
buffer[position] = (byte) c;
position += 1;
} else if ((c & 0xF800) == 0) {
allocateIfNeeded(1);
// U+0080..U+07FF
buffer[position] = (byte) (0xC0 | ((c >> 6) & 0x1F));
buffer[position + 1] = (byte) (0x80 | (c & 0x3F));
position += 2;
} else if ((c & 0xD800) != 0xD800 || (c > 0xDBFF)) {
allocateIfNeeded(2);
// U+0800..U+FFFF - excluding surrogate pairs
buffer[position] = (byte) (0xE0 | ((c >> 12) & 0x0F));
buffer[position + 1] = (byte) (0x80 | ((c >> 6) & 0x3F));
buffer[position + 2] = (byte) (0x80 | (c & 0x3F));
position += 3;
} else {
int low;
if ((++i == length) || ((low = value.charAt(i)) & 0xDC00) != 0xDC00) {
throw new IllegalArgumentException("String contains invalid Unicode code points.");
}
allocateIfNeeded(3);
c = 0x010000 + ((c & 0x03FF) << 10) + (low & 0x03FF);
buffer[position] = (byte) (0xF0 | ((c >> 18) & 0x07));
buffer[position + 1] = (byte) (0x80 | ((c >> 12) & 0x3F));
buffer[position + 2] = (byte) (0x80 | ((c >> 6) & 0x3F));
buffer[position + 3] = (byte) (0x80 | (c & 0x3F));
position += 4;
}
}
}
@Override
public boolean hasRemaining() {
return position < maxBufferSize;
}
@Override
public int remaining() {
return maxBufferSize - position;
}
@Override
public int position() {
return this.position;
}
@Override
public void position(int position) {
if (position < 0 || position >= this.buffer.length) {
throw new IndexOutOfBoundsException("position=" + position + ",length=" + this.buffer.length);
}
this.position = position;
}
@Override
public int limit() {
return this.buffer.length;
}
private void allocateIfNeeded(int minGrowth) {
final int currentBufferSize = this.buffer.length;
if (currentBufferSize - this.position > minGrowth) {
return;
}
// todo anu: apply limit maxBufferSize
final int growBy = Math.max(minGrowth, currentBufferSize);
final int newBufferSize0 = currentBufferSize + growBy;
if (!isAdditionOverflowed(newBufferSize0, currentBufferSize, growBy)) {
if (newBufferSize0 > maxBufferSize) {
throw new BufferOverflowException();
}
buffer = Arrays.copyOf(buffer, newBufferSize0);
return;
}
final int newBufferSize1 = currentBufferSize + minGrowth;
if (!isAdditionOverflowed(newBufferSize1, currentBufferSize, minGrowth)) {
if (newBufferSize1 > maxBufferSize) {
throw new BufferOverflowException();
}
buffer = Arrays.copyOf(buffer, newBufferSize1);
return;
}
throw new OutOfMemoryError("Unable to increase the buffer size (current-size:" + currentBufferSize + ") to accommodate " + minGrowth + " bytes.");
}
private boolean isAdditionOverflowed(int sum, int addends0, int addends1) {
return ((addends0 ^ sum) & (addends1 ^ sum)) < 0;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment