Skip to content

Instantly share code, notes, and snippets.

@ramya-rao-a
Last active October 11, 2019 03:07
Show Gist options
  • Save ramya-rao-a/d8c4caab4218caedce22b5bd273fbaf9 to your computer and use it in GitHub Desktop.
Save ramya-rao-a/d8c4caab4218caedce22b5bd273fbaf9 to your computer and use it in GitHub Desktop.
Event Hubs library proposal with single client
/*
This proposal builds on top of what was shipped in Preview 4 of Event Hubs library
and attempts to discard a separate notion of EPH by rolling that feature into the
existing concept of a "consumer".
This is mainly geared towards JS and can be tweaked for Python
This needs serious naming re-considerations for the methods on the consumer :)
*/
const client = new EventHubClient(connectionString, eventHubName);
// Create producer to send without partitionId
const producer = client.createProducer();
// Create producer to send specific partition
const producer = client.createProducer(paritionId);
// Create consumer to receive from all partitions
const consumer = client.createConsumer(consumerGroupName);
// Create consumer to receive from all partitions with load balancing
const consumer = client.createConsumer(consumerGroupName, partitionManager)
// Create consumer to receive from specific partition
const consumer = client.createConsumer(consumerGroupName, partitionId, eventPosition)
// Receive with push model with message & error handler
consumer.subscribe(onMessage, onError)
await consumer.close();
// Callbacks for setup/teardown work for each partition
consumer.onInitializePartition = (partitionContext) => {// user code}
consumer.onClosePartition = (partitionContext, closeReason) => {// user code}
// Track last enqueued event info
const info = consumer.lastEnqueuedEventInfo[partitionId];
console.log(info.sequenceNumber, info.offset, info.enqueuedTime, info.retrievalTime);
// checkpointing. Will throw error when no partition manager is passed
// this might leadus into the discussion if the load balancing & checkpointing features should be split
// so that users dealing with single partition or those not planning to have multiple instances of their app
// have the feature of checkpointing
await consumer.updateCheckpoint(event)
await consumer.updateCheckpoint(offset, partitionId)
// Custom Checkpointing requires people to provide customized partition manager & call updateCheckpoint on it
await customizedPartitionManger.updateCheckpoint(what-ever-this-needs-to-do-checkpointing)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment