Skip to content

Instantly share code, notes, and snippets.

@ramya-rao-a
Last active July 26, 2019 01:38
Show Gist options
  • Save ramya-rao-a/6ce030240112063e12a23477697b01f2 to your computer and use it in GitHub Desktop.
Save ramya-rao-a/6ce030240112063e12a23477697b01f2 to your computer and use it in GitHub Desktop.

Proposed API design for EPH in Typescript

// Name to be changed
export class EventProcessor {
  constructor(
    consumerGroupName: string,
    eventHubClient: EventHubClient,
    partitionProcessorFactory: PartitionProcessorFactory,
    partitionManager: PartitionManager, // the plugin for checkpoint and partition ownership management
    options?: EventProcessorOptions // optional 
  )

  public async start(): Promise<void>;
  public async stop(): Promise<void>;
}

// Options passed when creating EventProcessor, everything is optional
export interface EventProcessorOptions {
  // Should this be EventPositionProvider moving forward?
  // Stays as Event Position for upcoming preview
  initialEventPosition?: EventPosition; 
  maxBatchSize?: number; 
  maxWaitTime?: number; 
}

/**
 * The PartitionProcessorFactory is called by EPH whenever a new partition is about to be processed.
 */
export interface PartitionProcessorFactory {
  createPartitionProcessor(context: PartitionContext, checkpointManger: CheckpointManager): PartitionProcessor;
}

/**
 * Contains information about the partition the EventProcessor will be processing events from.
 * Since it is passed to the factory call to create EventProcessor, we don't need to pass it
 * to each method inside the processor like we did in Track 1
 * 
 * Checkpointing logic is moved out of the context for better discoverability
 */
export class PartitionContext {
  public readonly partitionId: string;
  public readonly eventHubName: string;
  public readonly consumerGroupName: string;
}

/**
 * CheckPointManager is created by the library & passed to user's code to let them create a checkpoint
 * This was not needed in Track 1 as the checkpoint() function was available on the PartitionContext 
 * This may not be needed in Track 2 if passing a lambda function works
 */
export class CheckpointManager {
  private partitionContext: PartitionContext; // for internal use by createCheckpoint
  private partitionManager: PartitionManager; // for internal use by createCheckpoint

  public async createCheckpoint(eventData: EventData): Promise<void> {}
  public async createCheckpoint(offset, sequenceNumber): Promise<void> {}
}

export interface PartitionProcessor {
  /**
   * Optional. Called when EPH begins processing a partition.
   * Python has __init__() which means something else altogether
   */
  initialize?: async () => Promise<void>
  /**
   * Optional. Called when EPH stops processing a partition.
   * This may occur when control of the partition switches to another EPH or when user stops EPH
   */
  close?: async (reason: CloseReason) => Promise<void>
  /**
   * Called when a batch of events have been received.
   */
  processEvents: async (events: EventData[]) => Promise<void>
  /**
   * Called when the underlying client experiences an error while receiving.
   */
  processError: async (error: Error) => Promise<void>
}


// Interface for the plugin to be passed when creating the EventProcessorHost 
// to manage partition ownership and checkpoint creation.
// Deals mainly with read/write to the chosen storage service
export interface PartitionManager {
  async listOwnerships(eventHubName: string, consumerGroupName: string): Promise<PartitionOwnership[]>;
  async claimOwnerships(partitionOwnerships: PartitionOwnership[]): Promise<PartitionOwnership[]>;
  async createCheckpoint(checkpoint: Checkpoint): Promise<void>;
}


// used by PartitionManager to claim ownership.
// returned by listOwnerships
interface PartitionOwnership {
  eventHubName: string;
  consumerGroupName: string;
  instanceId: string;
  partitionId: string;
  ownerLevel: number;
  offset?: number;
  sequenceNumber?: number;
  lastModifiedTime?: number; // in ms
  ETag?: string;
}

// used by createCheckpoint in PartitionManager 
interface Checkpoint {
  eventHubName: string;
  consumerGroupName: string;
  instanceId: string;
  partitionId: string;
  sequenceNumber: number;
  offset: number;
}

@YijunXieMS
Copy link

YijunXieMS commented Jul 25, 2019

Python API has less class defined.

class EventProcessor(object):
    def __init__(self, consumer_group_name, eventhub_client, partition_processor_callable, partition_manager, **kwargs):
    async def start(self):
    async def stop(self):

class CheckpointManager(object):
    """Users use checkpoint manager to update checkpoint。

    """
    def __init__(self, partition_id, eventhub_name, consumer_group_name, partition_manager: PartitionManager):
        self.partition_id = partition_id
        self.eventhub_name = eventhub_name
        self.consumer_group_name = consumer_group_name
        self.partition_manager = partition_manager

    async def update_checkpoint(self, **kwargs):
        await self.partition_manager.update_checkpoint(**kwargs)

class PartitionProcessor(ABC):
    def __init__(self):
        self.partition_id = None
        self.eventhub_name = None
        self.consumer_group_name = None
        self.checkpoint_manager = None

    async def update_checkpoint(self, **kwargs):
        """
        Other language doesn't have this method because PartitionProcessor is an interface.
        In Python this is an abstract class
        """
        await self.checkpoint_manager.update_checkpoint(**kwargs)

    async def initialize(self):
        """Called after partition consumer is ready to receive events
        """
        pass

    async def close(self, reason):
        """Called when EventProcessor stops processing this PartitionProcessor.

        """
        pass

    @abstractmethod
    async def process_events(self, events: List[EventData]):
        """Called when a batch of events have been received.

        """
        pass

    async def process_error(self, error):
        """Called when the underlying event hub partition consumer experiences an non-retriable error during receiving.

        """
        pass

class PartitionManager(ABC):
    """Subclass this class to implement the read/write access to storage service.

    Users may do their own subclass for checkpoint storage.
    """

    @abstractmethod
    async def list_ownership(self, eventhub_name: str, consumer_group_name: str) -> Iterable[Dict[str, Any]]:
        """

        :param eventhub_name:
        :param consumer_group_name:
        :return: Iterable of dictionaries containing the following partition ownership information:
                eventhub_name
                consumer_group_name
                instance_id
                partition_id
                owner_level
                offset
                sequence_number
                last_modified_time
                etag
        """
        pass

    @abstractmethod
    async def claim_ownership(self, partitions: Iterable[Dict]) -> Iterable[Dict[str, Any]]:
        pass

    @abstractmethod
    async def update_checkpoint(self, **kwargs) -> None:
        pass

Python example:

class MyPartitionProcessor(PartitionProcessor):
    async def initialize(self):
        print("PartitionProcessor for eventhub:{}, consumer group:{}, partition id:{} created".
              format(self.eventhub_name, self.consumer_group_name, self.partition_id))

    async def close(self, reason):
        print("PartitionProcessor for eventhub:{}, consumer group:{}, partition id:{} closed".
              format(self.eventhub_name, self.consumer_group_name, self.partition_id), "Reason:", reason)

    async def process_events(self, events):
        print("PartitionProcessor for eventhub:{}, consumer group:{}, partition id:{}, number of events processed:{}".
              format(self.eventhub_name, self.consumer_group_name, self.partition_id, len(events)))

    async def process_error(self, error):
        print("PartitionProcessor for eventhub:{}, consumer group:{}, partition id:{}, an error has happened:".
              format(self.eventhub_name, self.consumer_group_name, self.partition_id), error)


CONNECTION_STR = "<a connection string>"

async def stop_after_awhile(event_processor, duration):
    await asyncio.sleep(duration)
    await event_processor.stop()

async def main():
    client = EventHubClient.from_connection_string(CONNECTION_STR)
    partition_manager = InMemoryPartitionManager()
    event_processor = EventProcessor("$default", client, MyPartitionProcessor, partition_manager)
    await asyncio.gather(event_processor.start(), stop_after_awhile(event_processor, 1))

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment