Skip to content

Instantly share code, notes, and snippets.

@ptupitsyn
Created November 6, 2019 12:03
Show Gist options
  • Save ptupitsyn/9eb14faadedce3c312ef9fd6bad66e66 to your computer and use it in GitHub Desktop.
Save ptupitsyn/9eb14faadedce3c312ef9fd6bad66e66 to your computer and use it in GitHub Desktop.
IgniteAsyncStreamExtensions with System.Threading.Channels
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using Apache.Ignite.Core.Cache;
using Apache.Ignite.Core.Cache.Event;
using Apache.Ignite.Core.Cache.Query.Continuous;
namespace IgniteAsyncStreams
{
public static class IgniteAsyncStreamExtensions
{
public static async IAsyncEnumerable<ICacheEntry<TK, TV>> QueryContinuousAsync<TK, TV>(
this ICache<TK, TV> cache)
{
var queryListener = new AsyncContinuousQueryListener<TK, TV>();
var continuousQuery = new ContinuousQuery<TK, TV>(queryListener);
var handle = cache.QueryContinuous(continuousQuery);
try
{
while (true)
{
while (queryListener.Events.TryDequeue(out var entryEvent))
{
yield return entryEvent;
}
await queryListener.HasData.WaitAsync();
}
}
finally
{
handle.Dispose();
Console.WriteLine("Infinite loop exited");
}
}
public static async IAsyncEnumerable<ICacheEntry<TK, TV>> QueryContinuousAsync2<TK, TV>(
this ICache<TK, TV> cache)
{
var queryListener = new AsyncContinuousQueryChannelListener<TK, TV>();
var continuousQuery = new ContinuousQuery<TK, TV>(queryListener);
using (cache.QueryContinuous(continuousQuery))
{
while (true)
{
yield return await queryListener.Reader.ReadAsync();
}
}
}
private class AsyncContinuousQueryListener<TK, TV> : ICacheEntryEventListener<TK, TV>
{
public readonly SemaphoreSlim HasData = new SemaphoreSlim(0, 1);
public readonly ConcurrentQueue<ICacheEntryEvent<TK, TV>> Events
= new ConcurrentQueue<ICacheEntryEvent<TK, TV>>();
public void OnEvent(IEnumerable<ICacheEntryEvent<TK, TV>> events)
{
foreach (var entryEvent in events)
{
Console.WriteLine("Received entry: " + entryEvent.Value);
Events.Enqueue(entryEvent);
}
HasData.Release();
}
}
}
public class AsyncContinuousQueryChannelListener<TK, TV> : ICacheEntryEventListener<TK, TV>
{
private readonly Channel<ICacheEntryEvent<TK, TV>> _channel =
Channel.CreateUnbounded<ICacheEntryEvent<TK, TV>>();
public ChannelReader<ICacheEntryEvent<TK, TV>> Reader => _channel.Reader;
public void OnEvent(IEnumerable<ICacheEntryEvent<TK, TV>> evts)
{
foreach (var evt in evts)
{
var res = _channel.Writer.TryWrite(evt);
if (!res)
{
throw new Exception("Failed to write to channel");
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment