This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class ManualCheckpointObserver : IChangeFeedObserver | |
{ | |
private readonly ManualCheckpointer _checkpointer; | |
private CancellationTokenSource _ctsClose; | |
private Task _checkpointTask; | |
public ManualCheckpointObserver(ManualCheckpointer checkpointer) | |
{ | |
_checkpointer = checkpointer ?? throw new ArgumentNullException(nameof(checkpointer)); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
IRemainingWorkEstimator estimator = await CreateEstimatorAsync(uri, key, collection); | |
StringBuilder builder = new StringBuilder(); | |
while (!ctsToken.IsCancellationRequested) | |
{ | |
builder.Clear(); | |
IReadOnlyList<RemainingPartitionWork> remainingWork = await estimator.GetEstimatedRemainingWorkPerPartitionAsync(); | |
for (int i = 0; i < remainingWork.Count; i++) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private static async Task<IRemainingWorkEstimator> CreateEstimatorAsync(string uri, string key, string collection) | |
{ | |
IChangeFeedDocumentClient dbClient = new ChangeFeedDocumentClient(new DocumentClient(new Uri(uri), key)); | |
dbClient = new QoSMeteringChangeFeedDocumentClient(dbClient, new QoSMeteringReporter()); | |
var builder = new ChangeFeedProcessorBuilder() | |
.WithObserver<ConsoleObserver>() | |
.WithHostName("console_app_host") | |
.WithFeedCollection(new DocumentCollectionInfo() | |
{ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
static async Task<(IReadOnlyCollection<int>, TimeSpan)> RunComputationsAsync(int maxDegreeOfParallelism, int messageCount) | |
{ | |
var input = System.Linq.Enumerable.Range(0, messageCount); | |
Stopwatch stopwatch = new Stopwatch(); | |
stopwatch.Start(); | |
var tasks = System.Collections.Concurrent.Partitioner.Create(input) | |
.GetPartitions(maxDegreeOfParallelism) | |
.Select(partition => Task.Run(async () => |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
static async Task<(IReadOnlyCollection<int>, TimeSpan)> RunComputationsAsync(int maxDegreeOfParallelism, int messageCount) | |
{ | |
SemaphoreSlim semaphore = new SemaphoreSlim(maxDegreeOfParallelism); | |
List<Task<int>> tasks = new List<Task<int>>(); | |
Stopwatch stopwatch = new Stopwatch(); | |
stopwatch.Start(); | |
for (int i = 0; i < messageCount; i++) | |
{ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
static async Task<(IReadOnlyCollection<int>, TimeSpan)> RunComputationsAsync(int maxDegreeOfParallelism, int messageCount) | |
{ | |
ConcurrentQueue<int> input = new ConcurrentQueue<int>(System.Linq.Enumerable.Range(0, messageCount)); | |
Stopwatch stopwatch = new Stopwatch(); | |
stopwatch.Start(); | |
var tasks = System.Linq.Enumerable.Range(1, maxDegreeOfParallelism).Select(async _ => | |
{ | |
List<int> results = new List<int>(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
static async Task<(IReadOnlyCollection<int>, TimeSpan)> RunComputationsAsync(int maxDegreeOfParallelism, int messageCount) | |
{ | |
ConcurrentBag<int> results = new ConcurrentBag<int>(); | |
var workerBlock = new ActionBlock<int>( | |
async item => | |
{ | |
await Task.Delay(item == 3 ? 10000 : 1000); | |
results.Add(item * 2); | |
}, | |
// Specify a maximum degree of parallelism. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
IChangeFeedDocumentClient dbClient = new ChangeFeedDocumentClient(new DocumentClient(new Uri(uri), key)); | |
dbClient = new QoSMeteringChangeFeedDocumentClient(dbClient, new QoSMeteringReporter()); | |
var builder = new ChangeFeedProcessorBuilder() | |
.WithObserver<ConsoleObserver>() | |
.WithHostName("console_app_host") | |
.WithFeedCollection(new DocumentCollectionInfo() | |
{ | |
Uri = new Uri(uri), | |
MasterKey = key, |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class QoSMeteringChangeFeedDocumentQuery : IChangeFeedDocumentQuery<Document> | |
{ | |
private readonly IChangeFeedDocumentQuery<Document> _inner; | |
private readonly string _partitionRangeId; | |
private readonly IQoSMeteringReporter _meter; | |
public QoSMeteringChangeFeedDocumentQuery(IChangeFeedDocumentQuery<Document> inner, string partitionRangeId, IQoSMeteringReporter meter) | |
{ | |
_inner = inner ?? throw new ArgumentNullException(nameof(inner)); | |
_partitionRangeId = partitionRangeId ?? throw new ArgumentNullException(nameof(partitionRangeId)); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class QoSMeteringChangeFeedDocumentClient : IChangeFeedDocumentClient | |
{ | |
private readonly IChangeFeedDocumentClient _inner; | |
private readonly IQoSMeteringReporter _meter; | |
public QoSMeteringChangeFeedDocumentClient(IChangeFeedDocumentClient changeFeedDocumentClientImplementation, IQoSMeteringReporter meter) | |
{ | |
_inner = changeFeedDocumentClientImplementation ?? throw new ArgumentNullException(nameof(changeFeedDocumentClientImplementation)); | |
_meter = meter ?? throw new ArgumentNullException(nameof(meter)); | |
} |