Skip to content

Instantly share code, notes, and snippets.

@kasobol-msft
Last active May 20, 2022 21:34
Show Gist options
  • Save kasobol-msft/dd1458f99cf46a31fd881ac04ac2997b to your computer and use it in GitHub Desktop.
Save kasobol-msft/dd1458f99cf46a31fd881ac04ac2997b to your computer and use it in GitHub Desktop.
Handling bad queue messages.

Scenario

Customers using Queue SDK can opt-in to base64-encode/decode queue messages in order to handle payloads that are not valid UTF-8 strings. This is currently supported in .NET and Python SDK.

This is working well if messages in the queue are uniform. I.e. they are all base64 encoded. However, if non-base64-encoded message (due to a bug or migration) appears in the queue we lack mechanism to gracefully handle this situation, i.e. discard the message from the queue and log/persist it somewhere else for further processing/audit (e.g. transfer such message to poison-queue).

Current behavior (.NET, Python)

Let's assume that customer has a queue with 20 messages and one of them is "bad", i.e. 19 base64-encoded messages and 1 message which is plain text (i.e. not valid base64 string).

  1. Customer calls "receive messages" with "max count" 20.
  2. SDK fetches all 20 messages.
  3. SDK attempts to transform all 20 messages in a loop.
  4. While decoding "bad" message exception/error is thrown.
  5. Whole "receive messages" call fails.
  6. All messages are invisible for "visibility timeout" duration, even though some of them were legit to process, i.e. one bad message disrupts processing of other messages.
  7. "Bad" message isn't discarded from queue, i.e. after "visibility timeout" passes it becomes visible again poisining another batch of messages.

Proposed solution

The proposal is to give customer ability to provide a callback/hook while constructing QueueClient to handle messages that contain payload which is invalid for selected encoding. The SDK will call that callback for each message it fails to process instead of failing entire operation. This will be contingent upon callback/hook being passed to the client. In absence of that existing behavior shall be preserved, i.e. operation will fail.

The callback/hook will be applied to both "receive messages" as well as "peek messages".

Since customer may choose to persist "bad" message we should offer both sync and async style matching the api type they're calling. I.e. async "receive messages" should call to async callback.

When calling callback/hook we'll provide a message model that we'd normally use to return from the receive/peek APIs, including id, pop receipt and other attributes, so that customer can take an action on that message (e.g. delete it from queue). However, the payload will be passed verbatim without applying any decoding transformations (the underlying REST API defines it as UTF8 string).

.NET

Option 1: AzureCallback with both sync and async inteface. Factory method for simple cases.

Callback interface.

namespace Azure.Core
{
    public abstract class AzureCallback<TSender, TEventArgs>
    {
        public abstract void Handle(TSender sender, TEventArgs eventArgs, CancellationToken cancellationToken);
        public abstract Task HandleAsync(TSender sender, TEventArgs eventArgs, CancellationToken cancellationToken);

        public static AzureCallback<TSender, TEventArgs> Create(Action<TSender, TEventArgs, CancellationToken> action) =>
            new SyncActionCallback<TSender, TEventArgs>(action);
    }

    internal class SyncActionCallback<TSender, TEventArgs> : AzureCallback<TSender, TEventArgs>
    {
        private Action<TSender, TEventArgs, CancellationToken> _action;
        public SyncActionCallback(Action<TSender, TEventArgs, CancellationToken> action) => _action = action;
        public override void Handle(TSender sender, TEventArgs eventArgs, CancellationToken cancellationToken) =>
            _action(sender, eventArgs, cancellationToken);
        public override Task HandleAsync(TSender sender, TEventArgs eventArgs, CancellationToken cancellationToken)
        {
            _action(sender, eventArgs, cancellationToken);
            return Task.CompletedTask;
        }
    }
}

namespace Azure.Storage.Queues
{
    public abstract class InvalidQueueMessageHandler : AzureCallback<QueueClient, object>
    {
    }
}

Simple usage.

string connectionString = "";
string queueName = "";
QueueClientOptions queueClientOptions = new QueueClientOptions()
{
    MessageEncoding = QueueMessageEncoding.Base64
};

queueClientOptions.InvalidQueueMessageHandler = InvalidQueueMessageHandler.Create((queueClient, message, cancellationToken) =>
{
    if (message is QueueMessage queueMessage)
    {
        Console.WriteLine($"Received message could not be decoded ${queueMessage.MessageId}, ${queueMessage.Body}");
            queueClient.DeleteMessage(queueMessage.MessageId, queueMessage.PopReceipt, cancellationToken);
    }
    else if (message is PeekedMessage peekedMessage)
    {
        Console.WriteLine($"Peeked message could not be decoded ${peekedMessage.MessageId}, ${peekedMessage.Body}");
    }
});

QueueClient queueClient = new QueueClient(connectionString, queueName, queueClientOptions);

QueueMessage[] queueMessages = await queueClient.ReceiveMessagesAsync(20).ConfigureAwait(false);

Advanced usage.

public class MyInvalidQueueMessageHandler : InvalidQueueMessageHandler
    {
        public override void Handle(QueueClient queueClient, object message, CancellationToken cancellationToken)
        {
            if (message is QueueMessage queueMessage)
            {
                Console.WriteLine($"Received message could not be decoded ${queueMessage.MessageId}, ${queueMessage.Body}");
                queueClient.DeleteMessage(queueMessage.MessageId, queueMessage.PopReceipt, cancellationToken);
            }
            else if (message is PeekedMessage peekedMessage)
            {
                Console.WriteLine($"Peeked message could not be decoded ${peekedMessage.MessageId}, ${peekedMessage.Body}");
            }
        }

        public override async Task HandleAsync(QueueClient queueClient, object message, CancellationToken cancellationToken)
        {
            if (message is QueueMessage queueMessage)
            {
                Console.WriteLine($"Received message could not be decoded ${queueMessage.MessageId}, ${queueMessage.Body}");
                await queueClient.DeleteMessageAsync(queueMessage.MessageId, queueMessage.PopReceipt, cancellationToken).ConfigureAwait(false);
            }
            else if (message is PeekedMessage peekedMessage)
            {
                Console.WriteLine($"Peeked message could not be decoded ${peekedMessage.MessageId}, ${peekedMessage.Body}");
            }
        }
    }
string connectionString;
string queueName;
QueueClient queueClient = new QueueClient(connectionString, queueName, new QueueClientOptions()
{
    MessageEncoding = QueueMessageEncoding.Base64,
    InvalidQueueMessageHandler = new MyInvalidQueueMessageHandler()
});

QueueMessage[] queueMessages = await queueClient.ReceiveMessagesAsync(20);

Option 2: AzureCallback that takes bool RunAsync flag.

Callback interface.

namespace Azure.Core
{
    public abstract class AzureCallback<TSender, TEventArgs>
    {
        public abstract Task HandleAsync(TSender sender, TEventArgs eventArgs, bool RunSynchronously, CancellationToken cancellationToken);
    }
}

namespace Azure.Storage.Queues
{
    public class MyInvalidQueueMessageHandler : AzureCallback<QueueClient, object>
    {
        /// <inheritdoc/>
        public override async Task HandleAsync(QueueClient queueClient, object message, bool RunSynchronously, CancellationToken cancellationToken)
        {
            if (message is QueueMessage queueMessage)
            {
                Console.WriteLine($"Received message could not be decoded ${queueMessage.MessageId}, ${queueMessage.Body}");
                if (RunSynchronously)
                {
                    queueClient.DeleteMessage(queueMessage.MessageId, queueMessage.PopReceipt, cancellationToken);
                }
                else
                {
                    await queueClient.DeleteMessageAsync(queueMessage.MessageId, queueMessage.PopReceipt, cancellationToken).ConfigureAwait(false);
                }
            }
            else if (message is PeekedMessage peekedMessage)
            {
                Console.WriteLine($"Peeked message could not be decoded ${peekedMessage.MessageId}, ${peekedMessage.Body}");
            }
        }
    }
}

Usage.

string connectionString = "";
string queueName = "";
QueueClientOptions queueClientOptions = new QueueClientOptions()
{
    MessageEncoding = QueueMessageEncoding.Base64
};

queueClientOptions.InvalidQueueMessageHandler = new MyInvalidQueueMessageHandler();

QueueClient queueClient = new QueueClient(connectionString, queueName, queueClientOptions);

QueueMessage[] queueMessages = await queueClient.ReceiveMessagesAsync(20).ConfigureAwait(false);

Option 3: Take event (async delegate) and run it in fire-and-forget manner.

QueueClientOptions declaration (only delta):

namespace Azure.Storage.Queues
{
    public class QueueClientOptions : ClientOptions
    {
        public event Func<QueueClient, object, CancellationToken, Task> OnInvalidMessageAsync;
    }
}

Usage:

string connectionString = "";
string queueName = "";
QueueClientOptions queueClientOptions = new QueueClientOptions()
{
    MessageEncoding = QueueMessageEncoding.Base64
};

queueClientOptions.OnInvalidMessageAsync += async (QueueClient queueClient, object message, CancellationToken cancellationToken) =>
{
    if (message is QueueMessage queueMessage)
    {
        Console.WriteLine($"Received message could not be decoded ${queueMessage.MessageId}, ${queueMessage.Body}");
        await queueClient.DeleteMessageAsync(queueMessage.MessageId, queueMessage.PopReceipt, cancellationToken).ConfigureAwait(false);
    }
    else if (message is PeekedMessage peekedMessage)
    {
        Console.WriteLine($"Peeked message could not be decoded ${peekedMessage.MessageId}, ${peekedMessage.Body}");
    }
};

QueueClient queueClient = new QueueClient(connectionString, queueName, queueClientOptions);

QueueMessage[] queueMessages = await queueClient.ReceiveMessagesAsync(20).ConfigureAwait(false);

Note that event handler will be scheduled on threadpool without waiting for result, i.e. _ = Task.Run(...).

Java

Message encoding.

public enum QueueMessageEncoding {
    NONE,
    BASE64
}

Invalid message callback. Async only as sync client is really wrapped async client.

public interface InvalidQueueMessageHandler {
    Mono<Void> onInvalidMessage(QueueAsyncClient queueAsyncClient, Object message);
}

Sample callback.

public static class MyInvalidMessageHandler implements InvalidQueueMessageHandler {
    @Override
    public Mono<Void> onInvalidMessage(QueueAsyncClient queueAsyncClient, Object message) {
        if (message instanceof QueueMessageItem) {
            QueueMessageItem queueMessageItem = (QueueMessageItem) message;
            System.out.printf("Unable to decode message %s %s", queueMessageItem.getMessageId(), queueMessageItem.getMessageText());
            return queueAsyncClient.deleteMessage(queueMessageItem.getMessageId(), queueMessageItem.getPopReceipt());
        } else if (message instanceof PeekedMessageItem) {
            PeekedMessageItem peekedMessageItem = (PeekedMessageItem) message;
            System.out.printf("Unable to decode message %s %s", peekedMessageItem.getMessageId(), peekedMessageItem.getMessageText());
        }
        return Mono.empty();
    }
}

Usage.

String queueUrl = "";
QueueClient queueClient = new QueueClientBuilder()
    .endpoint(queueUrl)
    .messageEncoding(QueueMessageEncoding.BASE64)
    .invalidQueueMessageHandler(new MyInvalidMessageHandler())
    .buildClient();
Iterable<QueueMessageItem> messages = queueClient.receiveMessages(20);

Python

Sync.

def on_invalid_message(self, queue_client, message):
    print("Unable to decode message " + message.id + " " + message.content)
    if hasattr(message, 'pop_receipt'):
        queue_client.delete_message(message)


def test_message_text_base64(self, resource_group, location, storage_account, storage_account_key):
    qsc = QueueServiceClient(self.account_url(storage_account, "queue"), storage_account_key)
    queue = QueueClient(
        account_url=self.account_url(storage_account, "queue"),
        queue_name=self.get_resource_name(TEST_QUEUE_PREFIX),
        credential=storage_account_key,
        message_encode_policy=TextBase64EncodePolicy(),
        message_decode_policy=TextBase64DecodePolicy(),
        invalid_message_hook=self.on_invalid_message)

    messages = queue.receive_messages(20)

Async.

async def on_invalid_message(self, queue_client, message):
    print("Unable to decode message " + message.id + " " + message.content)
    if hasattr(message, 'pop_receipt'):
        await queue_client.delete_message(message)
            
async def test_message_text_base64(self, resource_group, location, storage_account, storage_account_key):
    qsc = QueueServiceClient(self.account_url(storage_account, "queue"), storage_account_key, transport=AiohttpTestTransport())
    queue = QueueClient(
        account_url=self.account_url(storage_account, "queue"),
        queue_name=self.get_resource_name(TEST_QUEUE_PREFIX),
        credential=storage_account_key,
        message_encode_policy=TextBase64EncodePolicy(),
        message_decode_policy=TextBase64DecodePolicy(),
        invalid_message_hook=self.on_invalid_message,
        transport=AiohttpTestTransport())

    messages = await queue.receive_messages(20)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment