Skip to content

Instantly share code, notes, and snippets.

@srnagar
Last active January 7, 2020 20:37
Show Gist options
  • Save srnagar/ef39ebcd803dde311e9b7913fcb638d5 to your computer and use it in GitHub Desktop.
Save srnagar/ef39ebcd803dde311e9b7913fcb638d5 to your computer and use it in GitHub Desktop.
Test cases for Event Processor client with blob checkpoint store

Checkpoint Store

  • Start from scratch - no blobs for either ownership or checkpoint
  • Ownership and checkpoint blobs for some partitions but not all
  • Ownership blobs for all partitions but no checkpoint blobs
  • Ownership blobs without ownerid in metadata and checkpoint blobs without sequence number and offset in metadata
  • Checkpoint blobs with only sequence number (no offset)
  • Checkpoint blobs with only offset (no sequence number)

Producer client

package com.azure.messaging.eventhubs.checkpointstore.blob;

import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
import java.util.concurrent.TimeUnit;

public class EventProducer {

    private static final String CONNECTION_STRING = "";

    public static void main(String args[]) throws InterruptedException {
        EventHubProducerAsyncClient producer = new EventHubClientBuilder()
            .connectionString(CONNECTION_STRING)
            .buildAsyncProducerClient();
        System.out.println("Creating batch and sending events");
        producer.createBatch()
            .flatMap(batch -> {
                batch.tryAdd(new EventData("Test event"));
                return producer.send(batch);
            }).repeat(() -> true)
            .subscribe(unused -> {
                    System.out.println("Message sent");
                },
                error -> System.out.println("Error sending event " + error.getMessage()),
                () -> System.out.println("Complete sending"));

        TimeUnit.DAYS.sleep(1);

        producer.close();
        System.out.println("Closing application");

    }

}

Event Processor with basic error handling and event processing

package com.azure.messaging.eventhubs.checkpointstore.blob;

import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import java.util.concurrent.TimeUnit;

public class EventProcessor {
    private static final String STORAGE_CONNECTION_STRING ="";
    private static final String SAS_TOKEN = "";
    private static final String CONTAINER_NAME = "";

    private static final String  EH_CONNECTION_STRING = "";
    private static final String CONSUMER_GROUP = "";

    public static void main(String[] args) throws InterruptedException {

        BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .connectionString(STORAGE_CONNECTION_STRING)
            .containerName(CONTAINER_NAME)
            .sasToken(SAS_TOKEN)
            .buildAsyncClient();

        BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient);
        EventProcessorClient processor = new EventProcessorClientBuilder()
            .checkpointStore(blobCheckpointStore)
            .connectionString(EH_CONNECTION_STRING)
            .consumerGroup(CONSUMER_GROUP)
            .processEvent(eventContext -> {
                if (eventContext.getEventData().getSequenceNumber() % 100 == 0) {
                    System.out.println(
                        "Updating checkpoint for partition " + eventContext.getPartitionContext().getPartitionId()
                            + " with seq num " + eventContext.getEventData().getSequenceNumber());
                    eventContext.updateCheckpoint();
                }
            })
            .processError(errorContext -> {
                System.out.println(
                    "Error " + errorContext.getPartitionContext().getPartitionId() + " " + errorContext.getThrowable()
                        .getMessage());
            })
            .buildEventProcessorClient();
        processor.start();
        TimeUnit.DAYS.sleep(1);
        processor.stop();
    }
}

Event Processor with initialization handler error

package com.azure.messaging.eventhubs.checkpointstore.blob;

import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import java.util.concurrent.TimeUnit;

public class EventProcessor {
    private static final String STORAGE_CONNECTION_STRING ="";
    private static final String SAS_TOKEN = "";
    private static final String CONTAINER_NAME = "";

    private static final String  EH_CONNECTION_STRING = "";
    private static final String CONSUMER_GROUP = "";

    public static void main(String[] args) throws InterruptedException {

        BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .connectionString(STORAGE_CONNECTION_STRING)
            .containerName(CONTAINER_NAME)
            .sasToken(SAS_TOKEN)
            .buildAsyncClient();

        BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient);
        EventProcessorClient processor = new EventProcessorClientBuilder()
            .checkpointStore(blobCheckpointStore)
            .connectionString(EH_CONNECTION_STRING)
            .consumerGroup(CONSUMER_GROUP)
            .processPartitionInitialization(initContext -> {throw new IllegalStateException("init error");})
            .processEvent(eventContext -> {
                if (eventContext.getEventData().getSequenceNumber() % 100 == 0) {
                    System.out.println(
                        "Updating checkpoint for partition " + eventContext.getPartitionContext().getPartitionId()
                            + " with seq num " + eventContext.getEventData().getSequenceNumber());
                    eventContext.updateCheckpoint();
                }
            })
            .processError(errorContext -> {
                System.out.println(
                    "Error " + errorContext.getPartitionContext().getPartitionId() + " " + errorContext.getThrowable()
                        .getMessage());
            })
            .buildEventProcessorClient();
        processor.start();
        TimeUnit.DAYS.sleep(1);
        processor.stop();
    }
}

Event Processor with process event handler error

package com.azure.messaging.eventhubs.checkpointstore.blob;

import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import java.util.concurrent.TimeUnit;

public class EventProcessor {
    private static final String STORAGE_CONNECTION_STRING ="";
    private static final String SAS_TOKEN = "";
    private static final String CONTAINER_NAME = "";

    private static final String  EH_CONNECTION_STRING = "";
    private static final String CONSUMER_GROUP = "";

    public static void main(String[] args) throws InterruptedException {

        BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .connectionString(STORAGE_CONNECTION_STRING)
            .containerName(CONTAINER_NAME)
            .sasToken(SAS_TOKEN)
            .buildAsyncClient();

        BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient);
        EventProcessorClient processor = new EventProcessorClientBuilder()
            .checkpointStore(blobCheckpointStore)
            .connectionString(EH_CONNECTION_STRING)
            .consumerGroup(CONSUMER_GROUP)
            .processEvent(eventContext -> { throw new IllegalStateException("process error"); })
            .processError(errorContext -> {
                System.out.println(
                    "Error " + errorContext.getPartitionContext().getPartitionId() + " " + errorContext.getThrowable()
                        .getMessage());
            })
            .buildEventProcessorClient();
        processor.start();
        TimeUnit.DAYS.sleep(1);
        processor.stop();
    }
}

Event Processor with close handler error

package com.azure.messaging.eventhubs.checkpointstore.blob;

import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import java.util.concurrent.TimeUnit;

public class EventProcessor {
    private static final String STORAGE_CONNECTION_STRING ="";
    private static final String SAS_TOKEN = "";
    private static final String CONTAINER_NAME = "";

    private static final String  EH_CONNECTION_STRING = "";
    private static final String CONSUMER_GROUP = "";

    public static void main(String[] args) throws InterruptedException {

        BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .connectionString(STORAGE_CONNECTION_STRING)
            .containerName(CONTAINER_NAME)
            .sasToken(SAS_TOKEN)
            .buildAsyncClient();

        BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient);
        EventProcessorClient processor = new EventProcessorClientBuilder()
            .checkpointStore(blobCheckpointStore)
            .connectionString(EH_CONNECTION_STRING)
            .consumerGroup(CONSUMER_GROUP)
            .processPartitionClose(closeContext -> {throw new IllegalStateException("close error");})
            .processEvent(eventContext -> {
                if (eventContext.getEventData().getSequenceNumber() % 100 == 0) {
                    System.out.println(
                        "Updating checkpoint for partition " + eventContext.getPartitionContext().getPartitionId()
                            + " with seq num " + eventContext.getEventData().getSequenceNumber());
                    eventContext.updateCheckpoint();
                }
            })
            .processError(errorContext -> {
                System.out.println(
                    "Error " + errorContext.getPartitionContext().getPartitionId() + " " + errorContext.getThrowable()
                        .getMessage());
            })
            .buildEventProcessorClient();
        processor.start();
        TimeUnit.DAYS.sleep(1);
        processor.stop();
    }
}

Event Processor with inital event position

package com.azure.messaging.eventhubs.checkpointstore.blob;

import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import java.util.concurrent.TimeUnit;

public class EventProcessor {
    private static final String STORAGE_CONNECTION_STRING ="";
    private static final String SAS_TOKEN = "";
    private static final String CONTAINER_NAME = "";

    private static final String  EH_CONNECTION_STRING = "";
    private static final String CONSUMER_GROUP = "";

    public static void main(String[] args) throws InterruptedException {

        BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .connectionString(STORAGE_CONNECTION_STRING)
            .containerName(CONTAINER_NAME)
            .sasToken(SAS_TOKEN)
            .buildAsyncClient();

        BlobCheckpointStore blobCheckpointStore = new BlobCheckpointStore(blobContainerAsyncClient);
        Map<String, EventPosition> initPositionMap = new HashMap<>() {{
            put("0", EventPosition.earliest());
            put("1", EventPosition.fromSequenceNumber(10));
        }};
        
        EventProcessorClient processor = new EventProcessorClientBuilder()
            .checkpointStore(blobCheckpointStore)
            .connectionString(EH_CONNECTION_STRING)
            .consumerGroup(CONSUMER_GROUP)
            .initialPartitionEventPosition(initPositionMap)
            .processEvent(eventContext -> {
                if (eventContext.getEventData().getSequenceNumber() % 100 == 0) {
                    System.out.println(
                        "Updating checkpoint for partition " + eventContext.getPartitionContext().getPartitionId()
                            + " with seq num " + eventContext.getEventData().getSequenceNumber());
                    eventContext.updateCheckpoint();
                }
            })
            .processError(errorContext -> {
                System.out.println(
                    "Error " + errorContext.getPartitionContext().getPartitionId() + " " + errorContext.getThrowable()
                        .getMessage());
            })
            .buildEventProcessorClient();
        processor.start();
        TimeUnit.DAYS.sleep(1);
        processor.stop();
    }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment