Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thuru-zz/34c967a0f83246f17d23bc8f88cae2d8 to your computer and use it in GitHub Desktop.
Save thuru-zz/34c967a0f83246f17d23bc8f88cae2d8 to your computer and use it in GitHub Desktop.
cosmos change feed processor library
public class ChangeFeedProcessorSDK
{
private readonly DocumentCollectionInfo _monitoredCollection;
private readonly DocumentCollectionInfo _leaseCollection;
public ChangeFeedProcessorSDK(DocumentCollectionInfo monitorCollection, DocumentCollectionInfo leaseCollection)
{
_monitoredCollection = monitorCollection;
_leaseCollection = leaseCollection;
}
public async Task<int> GetChangesAsync()
{
var hostName = $"Host - {Guid.NewGuid().ToString()}";
var builder = new ChangeFeedProcessorBuilder();
builder
.WithHostName(hostName)
.WithFeedCollection(_monitoredCollection)
.WithLeaseCollection(_leaseCollection)
.WithObserverFactory(new CustomObserverFactory());
var processor = await builder.BuildAsync();
await processor.StartAsync();
Console.WriteLine($"Started host - {hostName}");
Console.WriteLine("Press any key to stop");
Console.ReadKey();
await processor.StopAsync();
return 0;
}
}
public class CustomObserverFactory : Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing.IChangeFeedObserverFactory
{
public Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing.IChangeFeedObserver CreateObserver()
{
return new CustomObserver();
}
}
public class CustomObserver : Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing.IChangeFeedObserver
{
public Task CloseAsync(IChangeFeedObserverContext context, Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing.ChangeFeedObserverCloseReason reason)
{
Console.WriteLine($"Closing the listener to the partition key range {context.PartitionKeyRangeId} because {reason}");
return Task.CompletedTask;
}
public Task OpenAsync(IChangeFeedObserverContext context)
{
Console.WriteLine($"Openning the listener to the partition key range {context.PartitionKeyRangeId}");
return Task.CompletedTask;
}
public Task ProcessChangesAsync(IChangeFeedObserverContext context, IReadOnlyList<Document> docs, CancellationToken cancellationToken)
{
foreach(var document in docs)
{
// TODO :: processing logic
Console.WriteLine($"Changed document Id - {document.Id}");
}
return Task.CompletedTask;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment