Skip to content

Instantly share code, notes, and snippets.

@thuru-zz
Created June 13, 2018 13:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thuru-zz/ad9f8cbf5735b2b2f35c532afb67d97a to your computer and use it in GitHub Desktop.
Save thuru-zz/ad9f8cbf5735b2b2f35c532afb67d97a to your computer and use it in GitHub Desktop.
cosmos change feed sql sdk
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace SQLSDK
{
public class ChangeFeedSQLSDKProvider
{
private readonly DocumentClient _documentClient;
private readonly Uri _collectionUri;
public ChangeFeedSQLSDKProvider()
{
}
public ChangeFeedSQLSDKProvider(string url, string key, string database, string collection)
{
_documentClient = new DocumentClient(new Uri(url), key,
new ConnectionPolicy { ConnectionMode = ConnectionMode.Direct, ConnectionProtocol = Protocol.Tcp });
_collectionUri = UriFactory.CreateDocumentCollectionUri(database, collection);
}
public async Task<int> GetChangeFeedAsync(string partitionName)
{
//var partionKeyRangeReponse = await _documentClient.ReadPartitionKeyRangeFeedAsync(_collectionUri, new FeedOptions
//{
// RequestContinuation = await GetContinuationTokenForPartitionAsync(partitionName),
// PartitionKey = new PartitionKey(partitionName)
//});
//var partitionKeyRanges = new List<PartitionKeyRange>();
//partitionKeyRanges.AddRange(partionKeyRangeReponse);
var changeFeedQuery = _documentClient.CreateDocumentChangeFeedQuery(_collectionUri, new ChangeFeedOptions
{
StartFromBeginning = true,
PartitionKey = new PartitionKey(partitionName),
RequestContinuation = await GetContinuationTokenForPartitionAsync(partitionName),
});
var changeDocumentCount = 0;
while (changeFeedQuery.HasMoreResults)
{
var response = await changeFeedQuery.ExecuteNextAsync<DeveloperModel>();
foreach(var document in response)
{
// TODO :: process changes here
Console.WriteLine($"changed for id - {document.Id} with name {document.Name} and skill {document.Skill}");
}
SetContinuationTokenForPartitionAsync(partitionName, response.ResponseContinuation);
changeDocumentCount++;
}
return changeDocumentCount;
}
private async Task<string> GetContinuationTokenForPartitionAsync(string partitionName)
{
// TODO :: retrieve from a key value pair : persistence
return null;
}
private async Task SetContinuationTokenForPartitionAsync(string partitionName, string lsn)
{
// TODO :: get the continuation token from persistence store
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment