Skip to content

Instantly share code, notes, and snippets.

View scattered-code's full-sized avatar
🏠
Working from home / Living at work

Alexandru Puiu scattered-code

🏠
Working from home / Living at work
View GitHub Profile
sudo echo ‘10.10.10.10 vmpuppet1.domain.local puppet’ >> /etc/hosts
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Extensions
{
public static class Extensions
{
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
public static IEnumerable<Entry> GetDocumentsFromDatabase(IDocumentSession session)
{
var skip = 0;
do
{
var entries = session.Query<Entry>().Where(x => !x.Deleted).OrderByDescending(x => x.DateModified).Skip(skip).Take(1024).ToList();
foreach (var entry in entries)
yield return entry;
skip += 1024;
if (entries.Count < 1024)
await GetDocumentsFromDatabase(session).ForEachAsync(dop: 20, body: async entry =>
{
_logger.Info($"Processing entry '{entry.Id}'");
});
public static Task ParallelForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
async Task AwaitPartition(IEnumerator<T> partition)
{
using (partition)
{
while (partition.MoveNext())
{ await body(partition.Current); }
}
}
static async IAsyncEnumerable<Order> GetDocumentsFromDatabase2(IAsyncDocumentSession session)
{
var skip = 0;
do
{
var entries = await session.Query<Order>().OrderByDescending(x => x.Id).Skip(skip).Take(100).ToListAsync();
foreach (var entry in entries)
yield return entry;
skip += 100;
if (entries.Count < 100)
using (var session = documentStore.OpenAsyncSession())
{
await foreach (var entry in GetDocumentsFromDatabase2(session))
{
Console.WriteLine($"Processing entry '{entry.Id}'");
}
}
public static Task AsyncParallelForEach<T>(this IEnumerable<T> source, Func<T, Task> body, int maxDegreeOfParallelism = DataflowBlockOptions.Unbounded, TaskScheduler scheduler = null)
{
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism
};
if (scheduler != null)
options.TaskScheduler = scheduler;
var block = new ActionBlock<T>(body, options);
using (var session = documentStore.OpenSession())
{
session.Advanced.MaxNumberOfRequestsPerSession = int.MaxValue;
SynchronizationContext.SetSynchronizationContext(new SynchronizationContext());
await GetDocumentsFromDatabase(session).AsyncParallelForEach(async entry => {
Console.WriteLine($"Processing entry '{entry.Id}'");
}, 20, TaskScheduler.FromCurrentSynchronizationContext()
);
public static async Task AsyncParallelForEach<T>(this IAsyncEnumerable<T> source, Func<T, Task> body, int maxDegreeOfParallelism = DataflowBlockOptions.Unbounded, TaskScheduler scheduler = null)
{
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism
};
if (scheduler != null)
options.TaskScheduler = scheduler;
var block = new ActionBlock<T>(body, options);