Skip to content

Instantly share code, notes, and snippets.

@ptupitsyn
Last active July 23, 2020 13:19
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ptupitsyn/cb2fa9670aa2fcd0e20672376cd520a1 to your computer and use it in GitHub Desktop.
Save ptupitsyn/cb2fa9670aa2fcd0e20672376cd520a1 to your computer and use it in GitHub Desktop.
IgniteAsyncStreams
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using Apache.Ignite.Core.Cache;
using Apache.Ignite.Core.Cache.Event;
using Apache.Ignite.Core.Cache.Query.Continuous;
namespace IgniteAsyncStreams
{
public static class IgniteAsyncStreamExtensions
{
public static async IAsyncEnumerable<ICacheEntry<TK, TV>> QueryContinuousAsync<TK, TV>(
this ICache<TK, TV> cache)
{
var queryListener = new AsyncContinuousQueryListener<TK, TV>();
var continuousQuery = new ContinuousQuery<TK, TV>(queryListener);
using (cache.QueryContinuous(continuousQuery))
{
while (true)
{
while (queryListener.Events.TryDequeue(out var entryEvent))
{
yield return entryEvent;
}
await queryListener.HasData.WaitAsync();
}
}
}
private class AsyncContinuousQueryListener<TK, TV> : ICacheEntryEventListener<TK, TV>
{
public readonly SemaphoreSlim HasData = new SemaphoreSlim(0, 1);
public readonly ConcurrentQueue<ICacheEntryEvent<TK, TV>> Events
= new ConcurrentQueue<ICacheEntryEvent<TK, TV>>();
public void OnEvent(IEnumerable<ICacheEntryEvent<TK, TV>> events)
{
foreach (var entryEvent in events)
{
Console.WriteLine("Received entry: " + entryEvent.Value);
Events.Enqueue(entryEvent);
}
HasData.Release();
}
}
}
}
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.0</TargetFramework>
<LangVersion>8.0</LangVersion>
<Nullable>enable</Nullable>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<WarningsAsErrors />
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Apache.Ignite" Version="2.8.0" />
<PackageReference Include="System.Linq.Async" Version="4.0.0" />
<PackageReference Include="System.Reactive" Version="4.1.6" />
</ItemGroup>
</Project>
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Apache.Ignite.Core;
using Apache.Ignite.Core.Cache;
namespace IgniteAsyncStreams
{
class Program
{
static async Task Main()
{
Console.WriteLine("Hello World!");
var ignite = Ignition.Start();
var cache = ignite.GetOrCreateCache<int, int>("c");
// Populate cache in background.
ThreadPool.QueueUserWorkItem(_ => ProduceData(cache));
// Async iteration.
await foreach (var entry in cache.QueryContinuousAsync())
{
if (entry.Key > 10)
break;
Console.WriteLine("Iterated entry: " + entry.Value);
}
// Async LINQ.
var results = await cache.QueryContinuousAsync()
.Where(e => e.Key > 0)
.Skip(5)
.Take(10)
.Select(e => e.Value)
.ToArrayAsync();
Console.WriteLine(results.Length);
}
private static void ProduceData(ICache<int, int> cache)
{
var i = 0;
while (true)
{
i++;
Console.WriteLine("Produced entry: " + i);
cache[i] = i;
Thread.Sleep(100);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment