Created
June 18, 2018 15:40
-
-
Save thuru-zz/34c967a0f83246f17d23bc8f88cae2d8 to your computer and use it in GitHub Desktop.
cosmos change feed processor library
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class ChangeFeedProcessorSDK | |
{ | |
private readonly DocumentCollectionInfo _monitoredCollection; | |
private readonly DocumentCollectionInfo _leaseCollection; | |
public ChangeFeedProcessorSDK(DocumentCollectionInfo monitorCollection, DocumentCollectionInfo leaseCollection) | |
{ | |
_monitoredCollection = monitorCollection; | |
_leaseCollection = leaseCollection; | |
} | |
public async Task<int> GetChangesAsync() | |
{ | |
var hostName = $"Host - {Guid.NewGuid().ToString()}"; | |
var builder = new ChangeFeedProcessorBuilder(); | |
builder | |
.WithHostName(hostName) | |
.WithFeedCollection(_monitoredCollection) | |
.WithLeaseCollection(_leaseCollection) | |
.WithObserverFactory(new CustomObserverFactory()); | |
var processor = await builder.BuildAsync(); | |
await processor.StartAsync(); | |
Console.WriteLine($"Started host - {hostName}"); | |
Console.WriteLine("Press any key to stop"); | |
Console.ReadKey(); | |
await processor.StopAsync(); | |
return 0; | |
} | |
} | |
public class CustomObserverFactory : Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing.IChangeFeedObserverFactory | |
{ | |
public Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing.IChangeFeedObserver CreateObserver() | |
{ | |
return new CustomObserver(); | |
} | |
} | |
public class CustomObserver : Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing.IChangeFeedObserver | |
{ | |
public Task CloseAsync(IChangeFeedObserverContext context, Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing.ChangeFeedObserverCloseReason reason) | |
{ | |
Console.WriteLine($"Closing the listener to the partition key range {context.PartitionKeyRangeId} because {reason}"); | |
return Task.CompletedTask; | |
} | |
public Task OpenAsync(IChangeFeedObserverContext context) | |
{ | |
Console.WriteLine($"Openning the listener to the partition key range {context.PartitionKeyRangeId}"); | |
return Task.CompletedTask; | |
} | |
public Task ProcessChangesAsync(IChangeFeedObserverContext context, IReadOnlyList<Document> docs, CancellationToken cancellationToken) | |
{ | |
foreach(var document in docs) | |
{ | |
// TODO :: processing logic | |
Console.WriteLine($"Changed document Id - {document.Id}"); | |
} | |
return Task.CompletedTask; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment