Skip to content

Instantly share code, notes, and snippets.

@ramya-rao-a
Last active September 18, 2019 18:57
Show Gist options
  • Save ramya-rao-a/f1d95a86b2c70598575b9eab77231c88 to your computer and use it in GitHub Desktop.
Save ramya-rao-a/f1d95a86b2c70598575b9eab77231c88 to your computer and use it in GitHub Desktop.
EP .Net API
/*
There are 3 players in this game.
- EventProcessor
- Provided by the sdk.
- Ideal Usage: Multiple instances on separate machines to balance partition load
- PartitionProcessor
- Abstract class provided by the sdk, meant to be extended by user where they provide code to process events
- Ideal Usage:
- Extended by user & passed to EventProcessor constructor via a factory in .Net & Java,
via the type/class name in JS & Python
- The only method they are absolutely required to implement is ProcessEvents
- Methods available for user to provide any set-up/tear-down for processing the particular partition.
- Method available for checkpointing. This is implemented by sdk and not overridable by user.
- Properties available for info on current partition, consumer group, event hub
- PartitionManager
- Abstract class provided by the sdk, meant to write/read checkpoints & other relevant info to aid load balancing
to a durable store
- Vendors to ship implementations of this using different storage solutions.
- User to choose one such package based on their storage solution of choice
- User to instantiate and pass to the EventProcessor constructor
*/
namespace Azure.Messaging.EventHubs.Processor {
// ==========================================================
// PartitionProcessor is an abstract class definied by the sdk and to be extended by the user
// The methods here are called by the EventProcessor at various stages of processing a single partition.
// The properties here are all set internally by the EventProcessor.
// All properties except LastEnqueuedEventInformation are set only once.
// ==========================================================
public abstract class PartitionProcessor {
// Based on individual language patterns the below are "set" by the Event Processor
// or passed to the constructor of PartitionProcessor
protected int PartitionId { get; }
protected string ConsumerGroup { get; }
protected string EventHubName { get; }
protected string FullyQualifiedNamespace { get; }
// Set by user in the constructor or in the Initialize() method based on whether the parititon info
// was passed to the constructor or set directly by the Event Processor
// Read by EventProcessor if no checkpoint was found for this partition
public EventPosition InitialEventPosition { get; }
// Gets called by EventProcessor for every batch of events received.
// Max batch size is controlled via options passed to EventProcessor constructor.
// Max wait time for each batch is also controlled the same way.
abstract Task ProcessEventsAsync(IEnumerable<EventData> events);
// Gets called by EventProcessor before starting the receiving of events from the partition
// For example, set the InitialEventPosition to use when no checkpoints are present
abstract Task InitializeAsync();
// Gets called by EventProcessor after it stops receiving events from the partiition
abstract Task CloseAsync(CloseReason reason);
// Gets called by EventProcessor for any error that occurs when running user code or
// any non-retryable error that occurs either when receiving events
abstract Task ProcessErrorAsync(Exception exception);
// Methods implemented by the sdk used to update checkpoints, not to be over-ridden by users.
Task UpdateCheckpointAsync(EventData event);
}
// Reasons for stopping the processing of the partition
public enum CloseReason {
PartitionOwnershipLost = 1,
Shutdown = 0,
}
// ==========================================================
// Event Processor defined by sdk, instantiated & run by user
// ==========================================================
public class EventProcessor {
public EventProcessor(
string fullyQualifiedNamespace,
string eventHubName,
string consumerGroup,
TokenCredential credential,
Func<PartitionProcessor> partitionProcessorFactory, // Python & JS can pass just the class type
PartitionManager partitionManager,
EventProcessorOptions options = null
);
public string Identifier { get; }
public Task StartAsync();
public Task StopAsync();
}
public class EventProcessorOptions {
public EventProcessorOptions();
public int MaximumMessageCount { get; set; }
public Nullable<TimeSpan> MaximumReceiveWaitTime { get; set; }
public EventHubClientOptions { get; set; }
}
// ==========================================================
// PartitionManager is an abstract class defined by the sdk and to be implemented by the vendor
// Chosen by the user based on their storage solution of choice
// Instantiated by user and passed to the EventProcessor constructor
// ==========================================================
public abstract class PartitionManager {
// Clears checkpoint for given partition
public abstract Task DeleteCheckpointAsync(
string fullyQualifiedNamespace,
string eventHubName,
string consumerGroup,
int partitionId
);
// Updates the checkpoint for given partition. All relevant info is in the Checkpoint class.
//
// This method is invoked by the EventProcessor when the user expresses the desire to checkpoint
// an event using the `PartitionContext` instance passed to the user code.
public abstract Task UpdateCheckpointAsync(Checkpoint checkpoint);
// Lists info on which instance of an EventProcessor is currently processing which partitions
// along with the last checkpoint for each partition.
//
// This method is invoked by the EventProcessor when trying to determine if it needs to pick up
// another partition for processing in order to balance the load among multiple instances of EventProcessor
public abstract Task<IEnumerable<PartitionOwnership>> ListOwnershipAsync(
string fullyQualifiedNamespace,
string eventHubName,
string consumerGroup
);
// Updates the ownership for partitions by assigning partitions to different instances of EventProcessors
//
// This method is invoked by the EventProcessor after it runs its load balancing algorithm and determins that
// it needs to pick up one or more extra partitions for processing
public abstract Task<IEnumerable<PartitionOwnership>> ClaimOwnershipAsync(IEnumerable<PartitionOwnership> partitionOwnership);
}
public class Checkpoint {
public string FullyQualifiedNamespace { get; }
public string EventHubName { get; }
public string ConsumerGroup { get; }
public int PartitionId { get; }
public long Offset { get; }
public long SequenceNumber { get; }
public string OwnerIdentifier { get; }
}
public class PartitionOwnership {
public string FullyQualifiedNamespace { get; }
public string EventHubName { get; }
public string ConsumerGroup { get; }
public int PartitionId { get; }
public string ETag { get; set; }
public Nullable<DateTimeOffset> LastModifiedTime { get; }
public Nullable<long> Offset { get; }
public string OwnerIdentifier { get; }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment