Skip to content

Instantly share code, notes, and snippets.

@marcpiechura
Last active March 7, 2018 13:34
Show Gist options
  • Save marcpiechura/41e7b616c3c386f0a23d03d1f1fa31d6 to your computer and use it in GitHub Desktop.
Save marcpiechura/41e7b616c3c386f0a23d03d1f1fa31d6 to your computer and use it in GitHub Desktop.
public static async Task Run()
{
var reactiveEnumerable = new ReactiveEnumerable();
var system = ActorSystem.Create("AsyncEnumerable");
int counter = 0;
Source.Tick(TimeSpan.FromMilliseconds(100), TimeSpan.FromMilliseconds(100), "")
// Ticks are ignored if no demand was signalled
.Select(_ =>
{
Console.WriteLine("Tick");
return counter++;
})
.To(Sink.FromSubscriber(reactiveEnumerable))
.Run(system.Materializer());
// make sure subscription has happened
await Task.Delay(1000);
foreach (var t in reactiveEnumerable)
{
var i = await t;
// backpressure upstream by not calling move next and therefore not signaling demand
await Task.Delay(TimeSpan.FromSeconds(1));
Console.WriteLine(i);
}
}
class ReactiveEnumerable : IEnumerable<Task<int>>, ISubscriber<int>
{
class ReactiveEnumerator : IEnumerator<Task<int>>
{
private readonly ReactiveEnumerable _enumerable;
public ReactiveEnumerator(ReactiveEnumerable enumerable) => _enumerable = enumerable;
public void Dispose()
{
}
public bool MoveNext()
{
// could be greater than 1 if we could handle the next n elements in a push based fashion,
// next call to Request would only be needed after n elements have been pushed
// but could also happen everytime in between. So we can adjust rate at runtime depending on
// if the consumer is faster than the producer or not.
_enumerable._subscription.Request(1);
return !_enumerable._isFinished;
}
public void Reset()
{
}
public Task<int> Current => _enumerable._source.Task;
object IEnumerator.Current => Current;
}
public IEnumerator<Task<int>> GetEnumerator() => _enumerator;
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
private ISubscription _subscription;
private bool _isFinished;
private TaskCompletionSource<int> _source;
private ReactiveEnumerator _enumerator;
public void OnSubscribe(ISubscription subscription)
{
_subscription = subscription;
_source = new TaskCompletionSource<int>();
_enumerator = new ReactiveEnumerator(this);
}
public void OnNext(int element)
{
var old = _source;
_source = new TaskCompletionSource<int>();
old.SetResult(element);
}
public void OnError(Exception cause) => _isFinished = true;
public void OnComplete() => _isFinished = true;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment