Skip to content

Instantly share code, notes, and snippets.

@ramya-rao-a
Last active September 24, 2019 18:27
Show Gist options
  • Save ramya-rao-a/66e6d2eb99054a60b8efad7001e7bc6e to your computer and use it in GitHub Desktop.
Save ramya-rao-a/66e6d2eb99054a60b8efad7001e7bc6e to your computer and use it in GitHub Desktop.
Proposal for Event Processor in JS
// User class extending the base class provided by the library
class SamplePartitionProcessor extends PartitionProcessor {
async processEvents(events) { console.log(events);}
async processError(error) { console.log(error); }
async initialize() { console.log(`Started processing partition: ${this.partitionId}`); }
async close(reason) { console.log(`Stopped processing partition: ${this.partitionId} for reason ${reason}`); }
}
// 3 constructor overloads similar to EventHubClient, extra arguments are consumerGroupName, and PartitionProcessor
// Options are not shown, but are similar to what EventHubClientOptions support + max batch size, max wait time per batch
myProcessor = new EventProcessor(connectionstring, consumerGroupName, SamplePartitionProcessor);
myProcessor = new EventProcessor(connectionstring, eventhubName, consumerGroupName, SamplePartitionProcessor);
myProcessor = new EventProcessor(fullyQualifiedNamespace, eventhubName, credentials, consumerGroupName, SamplePartitionProcessor);
// And/Or use the EventHubClient as we have proof that adding more connections doesnt help JS much
myProcessor = new EventProcessor(eventHubclient, consumerGroupName, SamplePartitionProcessor)
// Start without partition manager, user attempt to checkpoint will fail
myProcessor.start();
// Start without partition manager, but with top level error handler, user attempt to checkpoint will fail
myProcessor.start(errorHandler)
// Start by passing partition manager that uses blobs for checkpointing & load balancing
containerClient = new ContainerClient("storage-connection-string", "container-name");
myPartitionManager = new BlobPartitionManager(containerClient);
myProcessor.start(myPartitionManager);
// Start by passing error handler & partition manager that uses blobs for checkpointing & load balancing
myProcessor.start(myPartitionManager, errorHandler);
// Stop the event processor
myProcessor.stop();
@ramya-rao-a
Copy link
Author

Alternative where PartitionProcessor goes to start():

// 3 constructor overloads similar to EventHubClient, extra arguments are consumerGroupName
// Options are not shown, but are similar to what EventHubClientOptions support + max batch size, max wait time per batch
myProcessor = new EventProcessor(connectionstring, consumerGroupName);
myProcessor = new EventProcessor(connectionstring, eventhubName, consumerGroupName);
myProcessor = new EventProcessor(fullyQualifiedNamespace, eventhubName, credentials, consumerGroupName);

// And/Or use the EventHubClient as we have proof that adding more connections doesnt help JS much
myProcessor = new EventProcessor(eventHubclient, consumerGroupName)

// Start without partition manager, user attempt to checkpoint will fail
myProcessor.start(SamplePartitionProcessor);

// Start without partition manager, but with top level error handler, user attempt to checkpoint will fail
myProcessor.start(SamplePartitionProcessor , errorHandler)

// Start by passing partition manager that uses blobs for checkpointing & load balancing
containerClient = new ContainerClient("storage-connection-string", "container-name");
myPartitionManager = new BlobPartitionManager(containerClient);
myProcessor.start(SamplePartitionProcessor , myPartitionManager);

// Start by passing error handler & partition manager that uses blobs for checkpointing & load balancing
myProcessor.start(SamplePartitionProcessor , myPartitionManager, errorHandler);

// Stop the event processor
myProcessor.stop();

@ramya-rao-a
Copy link
Author

ramya-rao-a commented Sep 24, 2019

Yet another alternative based on below motivations from Service team

  • Service team wants to encourage sending without setting partitionId
  • Service team wants to discourage the use of low level consumer apis that target partitionId directly

Key changes:

  • createConsumer and createProducer are now createPartitionConsumer and createPartitionProducer
  • There is a send and receive available directly on the client that dont require user to provide partitionId
// 3 constructor overloads for EventHubClient, options not shown
myClient = new EventHubClient(connectionstring);
myClient = new EventHubClient(connectionstring, eventhubName);
myClient = new EventHubClient(fullyQualifiedNamespace, eventhubName, credentials);

// Send without setting partitionId
await myClient.send(myBatch)

// Send by setting partitionId
myPartitionProducer = myClient.createPartitionProducer(partitionId);
await myPartitionProducer.send(myBatch)

// Receive without caring about partitionId. 
// This spins up the EPH and starts receiving without checkpointing or load balancing support
myReceiver = myClient.receive(consumerGroupName, SamplePartitionProcessor);
await myReceiver.stop();

// Receive without caring about partitionId. 
// This spins up the EPH and starts receiving with checkpointing or load balancing support
containerClient = new ContainerClient("storage-connection-string", "container-name");
myPartitionManager = new BlobPartitionManager(containerClient);
myReceiver = myClient.receive(consumerGroupName, SamplePartitionProcessor, myPartitionManager);
await myReceiver.stop();

// Receive from a particular partitionId.
myPartitionConsumer = myClient.createPartitionConsumer(consumerGroupName, partitionId, position, options);
myEvents = await myPartitionConsumer.receiveBatch(10);

Open Questions:

  • createBatch() will now have to live on the client as well
  • what should the return type of receivelook like?
  • what about the top level error handler?

@KrzysztofCwalina
Copy link

Here are my quick thoughts:

  1. I like the send method that does not require users to deal/understand partition IDs.
  2. I don't like making the names to get producer/consumer longer and more complicated. I think we should keep them as GetProducer/Consumer, even if they take partitionId parameter.
  3. I don't like that method receive returns an object that needs to be started, and that it takes SamplePartitionProcessor. Receive is a verb and so it should actually do something. Also, it breaks the symmetry between reading and writing.
  4. I don't like that consumer.receive takes multiple callbacks. If we use callbacks, it would be better to have them be properties.

@ramya-rao-a
Copy link
Author

ramya-rao-a commented Sep 24, 2019

@KrzysztofCwalina

I don't like that method receive returns an object that needs to be started, ... . Receive is a verb and so it should actually do something.

The proposed API does not require user to "start" anything. The very act of calling receive on the client starts the receiving of events

I don't like that consumer.receive takes multiple callbacks. If we use callbacks, it would be better to have them be properties.

That is a JS specific API that is under separate review. I have updated the above proposal to replace it with a receiveBatch() call that returns a promise that resolves to an array of events

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment