Skip to content

Instantly share code, notes, and snippets.

@ramya-rao-a
Last active June 3, 2019 05:25
Show Gist options
  • Save ramya-rao-a/8401c9209d15fc63aee21361c93cc414 to your computer and use it in GitHub Desktop.
Save ramya-rao-a/8401c9209d15fc63aee21361c93cc414 to your computer and use it in GitHub Desktop.
// Main EventHubClient class
export class EventHubClient {
// Expects connectionString with EntityPath set to event hub name
constructor(connectionString: string, options?: EventHubClientOptions);
// Expects user implemented token provider. Token is used for auth over cbs link
constructor(host: string, entityPath: string, tokenProvider: TokenProvider, options?: EventHubClientOptions);
// Expects user to use @azure/ms-rest-nodeauth library to create the credentials
constructor(
host: string,
entityPath: string,
credentials: ApplicationTokenCredentials | UserTokenCredentials | DeviceTokenCredentials | MSITokenCredentials,
options?: EventHubClientOptions
);
// Helper method for connection string with and without EntityPath
static createFromConnectionString(
connectionString: string,
entityPath?: string,
options?: EventHubClientOptions
): EventHubClient;
close(): Promise<void>;
createSender(options?: SenderOptions): Sender;
createReceiver(partitionId: string, options?: ReceiverOptions): Receiver;
getProperties(cancellationToken?: Aborter): Promise<EventHubProperties>;
getPartitionIds(cancellationToken?: Aborter): Promise<Array<string>>;
getPartitionProperties(partitionId: string, cancellationToken?: Aborter): Promise<PartitionProperties>;
readonly eventHubName: string;
}
// Options to pass when creating EventHubClient
export interface EventHubClientOptions {
// User can provide their own encode/decode methods. Useful, as we don't restrict event data to just bytes
dataTransformer?: DataTransformer;
// UA string to append to the default one
userAgent?: string;
// Options to be used when using Websockets which one might want to do in proxy environments or when port 5671 is blocked
// Since the library does not ship with a Websocket implementation, we expect user to provide one
webSocket?: WebSocketImpl;
webSocketConstructorOptions?: any;
// Retry options for operations on the client
retryOptions?: RetryOptions;
}
// Retry options passed to client, sender and receiver
// Will have maxRetryInterval and isExponential once we support exponential retries
export interface RetryOptions {
retryCount?: number;
retryInterval?: number; // in milliseconds
}
// Return type for getProperties()
export interface EventHubProperties {
createdAt: Date;
partitionIds: string[];
path: string;
}
// Return type for getPartitionProperties
export interface PartitionProperties {
beginningSequenceNumber: number;
eventHubPath: string;
id: string;
lastEnqueuedOffset: string;
lastEnqueuedSequenceNumber: number;
lastEnqueuedTimeUtc: Date;
}
// ======================================== Sending related API starts ======================================
// Options passed to createSender()
export interface SenderOptions {
partitionId?: string;
retryOptions?: RetryOptions;
}
// Each sender holds a dedicated AMQP sender link
export class Sender {
close(): Promise<void>;
send(data: EventData[], options?: BatchingOptions): Promise<void>;
readonly isClosed: boolean;
}
// Event Data to be sent
export interface EventData {
body: any;
properties?: {
[key: string]: any;
};
}
// Options to control the send operation. May include more when we support smart batching
export interface BatchingOptions {
batchLabel?: string | null;
cancellationToken?: Aborter;
}
// ======================================== Sending related API ends ======================================
// ======================================== Receiving related API starts ======================================
// Options passed to createReceiver()
export interface ReceiverOptions {
consumerGroup?: string;
eventPosition?: EventPosition;
exclusiveReceiverPriority?: number;
retryOptions?: RetryOptions;
}
// Options to create async iterator for events
export interface EventIteratorOptions {
cancellationToken?: Aborter;
prefetchCount?: number;
}
// Each receiver holds an AMQP receiver link dedicated to 1 partition
export class Receiver {
close(): Promise<void>;
getAsyncIterator(options?: EventIteratorOptions): AsyncIterableIterator<ReceivedEventData>;
receive(onMessage: OnMessage, onError: OnError, cancellationToken?: Aborter): ReceiveHandler;
receiveBatch(maxMessageCount: number, maxWaitTimeInSeconds?: number, cancellationToken?: Aborter):
// We don't allow multiple use of the various receive operations in parallel.
// So, a helper for user to check if another receive operation is in progress
isReceivingMessages(): boolean;
readonly consumerGroup: string | undefined;
readonly exclusiveReceiverPriority: number | undefined;
readonly isClosed: boolean;
readonly partitionId: string;
}
// Position in the stream, used to determine where to start a receiver from
export class EventPosition {
constructor(options?: EventPositionOptions);
static fromEnqueuedTime(enqueuedTime: Date | number): EventPosition;
static fromOffset(offset: string, isInclusive?: boolean): EventPosition;
static fromSequenceNumber(sequenceNumber: number, isInclusive?: boolean): EventPosition;
static readonly firstAvailableEvent: EventPosition;
static readonly newEventsOnly: EventPosition;
static readonly endOfStreamOffset: string;
static readonly startOfStreamOffset: string;
enqueuedTime?: Date | number;
isInclusive: boolean;
offset?: string;
sequenceNumber?: number;
}
// Event received from the service
export interface ReceivedEventData {
body: any;
enqueuedTimeUtc?: Date;
offset?: string;
partitionKey?: string | null;
properties?: {
[key: string]: any;
};
sequenceNumber?: number;
}
// Signature for error callback in streaming receiver
export type OnError = (error: MessagingError | Error) => void;
// Signature for event data callback in streaming receiver
export type OnMessage = (eventData: ReceivedEventData) => void;
// Handler returned by streaming receiver used for stopping it.
export class ReceiveHandler {
readonly consumerGroup: string | undefined;
readonly isReceiverOpen: boolean;
readonly partitionId: string | number | undefined;
stop(): Promise<void>;
}
// ======================================== Receiving related API ends ======================================
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment