Skip to content

Instantly share code, notes, and snippets.

public class ManualCheckpointObserver : IChangeFeedObserver
{
private readonly ManualCheckpointer _checkpointer;
private CancellationTokenSource _ctsClose;
private Task _checkpointTask;
public ManualCheckpointObserver(ManualCheckpointer checkpointer)
{
_checkpointer = checkpointer ?? throw new ArgumentNullException(nameof(checkpointer));
}
@kadukf
kadukf / use_estimator.cs
Created September 13, 2018 21:44
How to use the estimator for monitoring
IRemainingWorkEstimator estimator = await CreateEstimatorAsync(uri, key, collection);
StringBuilder builder = new StringBuilder();
while (!ctsToken.IsCancellationRequested)
{
builder.Clear();
IReadOnlyList<RemainingPartitionWork> remainingWork = await estimator.GetEstimatedRemainingWorkPerPartitionAsync();
for (int i = 0; i < remainingWork.Count; i++)
@kadukf
kadukf / estimator_per_partition.cs
Created September 13, 2018 21:42
How to create estimator per partition
private static async Task<IRemainingWorkEstimator> CreateEstimatorAsync(string uri, string key, string collection)
{
IChangeFeedDocumentClient dbClient = new ChangeFeedDocumentClient(new DocumentClient(new Uri(uri), key));
dbClient = new QoSMeteringChangeFeedDocumentClient(dbClient, new QoSMeteringReporter());
var builder = new ChangeFeedProcessorBuilder()
.WithObserver<ConsoleObserver>()
.WithHostName("console_app_host")
.WithFeedCollection(new DocumentCollectionInfo()
{
@kadukf
kadukf / ParalellizedTasks.Partitioner.cs
Created August 31, 2018 10:29
Tasks paralellization using Partitioner
static async Task<(IReadOnlyCollection<int>, TimeSpan)> RunComputationsAsync(int maxDegreeOfParallelism, int messageCount)
{
var input = System.Linq.Enumerable.Range(0, messageCount);
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
var tasks = System.Collections.Concurrent.Partitioner.Create(input)
.GetPartitions(maxDegreeOfParallelism)
.Select(partition => Task.Run(async () =>
@kadukf
kadukf / ParalellizedTasks.Sempaphore.cs
Created August 31, 2018 10:29
Tasks paralellization using Semaphore
static async Task<(IReadOnlyCollection<int>, TimeSpan)> RunComputationsAsync(int maxDegreeOfParallelism, int messageCount)
{
SemaphoreSlim semaphore = new SemaphoreSlim(maxDegreeOfParallelism);
List<Task<int>> tasks = new List<Task<int>>();
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
for (int i = 0; i < messageCount; i++)
{
@kadukf
kadukf / ParalellizedTasks.Enumerable.cs
Created August 31, 2018 10:27
Tasks paralellization using Enumerable/ConcurrentQueue approach
static async Task<(IReadOnlyCollection<int>, TimeSpan)> RunComputationsAsync(int maxDegreeOfParallelism, int messageCount)
{
ConcurrentQueue<int> input = new ConcurrentQueue<int>(System.Linq.Enumerable.Range(0, messageCount));
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
var tasks = System.Linq.Enumerable.Range(1, maxDegreeOfParallelism).Select(async _ =>
{
List<int> results = new List<int>();
@kadukf
kadukf / ParalellizedTasks.Dataflow.cs
Created August 31, 2018 10:26
Tasks execution paralellization using Dataflow library
static async Task<(IReadOnlyCollection<int>, TimeSpan)> RunComputationsAsync(int maxDegreeOfParallelism, int messageCount)
{
ConcurrentBag<int> results = new ConcurrentBag<int>();
var workerBlock = new ActionBlock<int>(
async item =>
{
await Task.Delay(item == 3 ? 10000 : 1000);
results.Add(item * 2);
},
// Specify a maximum degree of parallelism.
@kadukf
kadukf / BuildChangeFeedProcessorAsync.cs
Created August 17, 2018 12:51
BuildChangeFeedProcessorAsync
IChangeFeedDocumentClient dbClient = new ChangeFeedDocumentClient(new DocumentClient(new Uri(uri), key));
dbClient = new QoSMeteringChangeFeedDocumentClient(dbClient, new QoSMeteringReporter());
var builder = new ChangeFeedProcessorBuilder()
.WithObserver<ConsoleObserver>()
.WithHostName("console_app_host")
.WithFeedCollection(new DocumentCollectionInfo()
{
Uri = new Uri(uri),
MasterKey = key,
@kadukf
kadukf / QoSMeteringChangeFeedDocumentQuery.cs
Created August 17, 2018 12:50
QoSMeteringChangeFeedDocumentQuery
public class QoSMeteringChangeFeedDocumentQuery : IChangeFeedDocumentQuery<Document>
{
private readonly IChangeFeedDocumentQuery<Document> _inner;
private readonly string _partitionRangeId;
private readonly IQoSMeteringReporter _meter;
public QoSMeteringChangeFeedDocumentQuery(IChangeFeedDocumentQuery<Document> inner, string partitionRangeId, IQoSMeteringReporter meter)
{
_inner = inner ?? throw new ArgumentNullException(nameof(inner));
_partitionRangeId = partitionRangeId ?? throw new ArgumentNullException(nameof(partitionRangeId));
@kadukf
kadukf / QoSMeteringChangeFeedDocumentClient.cs
Created August 17, 2018 12:48
QoSMeteringChangeFeedDocumentClient
public class QoSMeteringChangeFeedDocumentClient : IChangeFeedDocumentClient
{
private readonly IChangeFeedDocumentClient _inner;
private readonly IQoSMeteringReporter _meter;
public QoSMeteringChangeFeedDocumentClient(IChangeFeedDocumentClient changeFeedDocumentClientImplementation, IQoSMeteringReporter meter)
{
_inner = changeFeedDocumentClientImplementation ?? throw new ArgumentNullException(nameof(changeFeedDocumentClientImplementation));
_meter = meter ?? throw new ArgumentNullException(nameof(meter));
}