Skip to content

Instantly share code, notes, and snippets.

@ptupitsyn
Created October 9, 2019 11:32
Show Gist options
  • Save ptupitsyn/61ec797fb3f44cafc25c22332c61c19c to your computer and use it in GitHub Desktop.
Save ptupitsyn/61ec797fb3f44cafc25c22332c61c19c to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Apache.Ignite.Core.Cache;
using Apache.Ignite.Core.Cache.Event;
using Apache.Ignite.Core.Cache.Query.Continuous;
namespace IgniteAsyncStreams
{
public static class IgniteAsyncStreamExtensions
{
public static IAsyncEnumerable<ICacheEntry<TK, TV>> QueryContinuousAsync<TK, TV>(
this ICache<TK, TV> cache)
{
return new AsyncContinuousQueryEnumerable<TK, TV>(cache);
}
public static async IAsyncEnumerable<ICacheEntry<TK, TV>> QueryContinuousAsync2<TK, TV>(
this ICache<TK, TV> cache)
{
var queryListener = new AsyncContinuousQueryListener<TK, TV>();
var continuousQuery = new ContinuousQuery<TK, TV>(queryListener);
using (cache.QueryContinuous(continuousQuery))
{
while (true)
{
while (queryListener.Events.TryDequeue(out var entryEvent))
{
yield return entryEvent;
}
await queryListener.HasData.WaitAsync();
}
}
}
private class AsyncContinuousQueryEnumerable<TK, TV> : IAsyncEnumerable<ICacheEntry<TK, TV>>
{
private readonly ICache<TK, TV> _cache;
public AsyncContinuousQueryEnumerable(ICache<TK, TV> cache)
{
_cache = cache;
}
public IAsyncEnumerator<ICacheEntry<TK, TV>> GetAsyncEnumerator(
CancellationToken cancellationToken = new CancellationToken())
{
var queryListener = new AsyncContinuousQueryListener<TK, TV>();
var continuousQuery = new ContinuousQuery<TK, TV>(queryListener);
var queryHandle = _cache.QueryContinuous(continuousQuery);
return new AsyncContinuousQueryEnumerator<TK, TV>(queryListener, queryHandle);
}
}
private class AsyncContinuousQueryEnumerator<TK, TV> : IAsyncEnumerator<ICacheEntryEvent<TK, TV>>
{
private readonly AsyncContinuousQueryListener<TK, TV> _queryListener;
private readonly IContinuousQueryHandle _queryHandle;
public AsyncContinuousQueryEnumerator(AsyncContinuousQueryListener<TK, TV> queryListener,
IContinuousQueryHandle queryHandle)
{
_queryListener = queryListener;
_queryHandle = queryHandle;
}
public ValueTask DisposeAsync()
{
_queryHandle.Dispose();
return default;
}
public async ValueTask<bool> MoveNextAsync()
{
ICacheEntryEvent<TK, TV>? entryEvent;
while (!_queryListener.Events.TryDequeue(out entryEvent))
{
await _queryListener.HasData.WaitAsync();
}
Current = entryEvent;
return true;
}
#nullable disable
public ICacheEntryEvent<TK, TV> Current { get; private set; }
#nullable restore
}
private class AsyncContinuousQueryListener<TK, TV> : ICacheEntryEventListener<TK, TV>
{
public readonly SemaphoreSlim HasData = new SemaphoreSlim(0, 1);
public readonly ConcurrentQueue<ICacheEntryEvent<TK, TV>> Events
= new ConcurrentQueue<ICacheEntryEvent<TK, TV>>();
public void OnEvent(IEnumerable<ICacheEntryEvent<TK, TV>> events)
{
foreach (var entryEvent in events)
{
Console.WriteLine("Received entry: " + entryEvent.Value);
Events.Enqueue(entryEvent);
}
HasData.Release();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment