-
-
Save marcpiechura/41e7b616c3c386f0a23d03d1f1fa31d6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static async Task Run() | |
{ | |
var reactiveEnumerable = new ReactiveEnumerable(); | |
var system = ActorSystem.Create("AsyncEnumerable"); | |
int counter = 0; | |
Source.Tick(TimeSpan.FromMilliseconds(100), TimeSpan.FromMilliseconds(100), "") | |
// Ticks are ignored if no demand was signalled | |
.Select(_ => | |
{ | |
Console.WriteLine("Tick"); | |
return counter++; | |
}) | |
.To(Sink.FromSubscriber(reactiveEnumerable)) | |
.Run(system.Materializer()); | |
// make sure subscription has happened | |
await Task.Delay(1000); | |
foreach (var t in reactiveEnumerable) | |
{ | |
var i = await t; | |
// backpressure upstream by not calling move next and therefore not signaling demand | |
await Task.Delay(TimeSpan.FromSeconds(1)); | |
Console.WriteLine(i); | |
} | |
} | |
class ReactiveEnumerable : IEnumerable<Task<int>>, ISubscriber<int> | |
{ | |
class ReactiveEnumerator : IEnumerator<Task<int>> | |
{ | |
private readonly ReactiveEnumerable _enumerable; | |
public ReactiveEnumerator(ReactiveEnumerable enumerable) => _enumerable = enumerable; | |
public void Dispose() | |
{ | |
} | |
public bool MoveNext() | |
{ | |
// could be greater than 1 if we could handle the next n elements in a push based fashion, | |
// next call to Request would only be needed after n elements have been pushed | |
// but could also happen everytime in between. So we can adjust rate at runtime depending on | |
// if the consumer is faster than the producer or not. | |
_enumerable._subscription.Request(1); | |
return !_enumerable._isFinished; | |
} | |
public void Reset() | |
{ | |
} | |
public Task<int> Current => _enumerable._source.Task; | |
object IEnumerator.Current => Current; | |
} | |
public IEnumerator<Task<int>> GetEnumerator() => _enumerator; | |
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); | |
private ISubscription _subscription; | |
private bool _isFinished; | |
private TaskCompletionSource<int> _source; | |
private ReactiveEnumerator _enumerator; | |
public void OnSubscribe(ISubscription subscription) | |
{ | |
_subscription = subscription; | |
_source = new TaskCompletionSource<int>(); | |
_enumerator = new ReactiveEnumerator(this); | |
} | |
public void OnNext(int element) | |
{ | |
var old = _source; | |
_source = new TaskCompletionSource<int>(); | |
old.SetResult(element); | |
} | |
public void OnError(Exception cause) => _isFinished = true; | |
public void OnComplete() => _isFinished = true; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment