Skip to content

Instantly share code, notes, and snippets.

@noseratio
Created June 9, 2022 23:41
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save noseratio/8ec4801aed9a8592900ddd6f88f0dd4a to your computer and use it in GitHub Desktop.
Save noseratio/8ec4801aed9a8592900ddd6f88f0dd4a to your computer and use it in GitHub Desktop.
Lazy vs no Lazy for IObservable pipeline construction
// by Theodor Zoulias
// https://stackoverflow.com/questions/72558093/in-rx-net-how-do-i-make-a-subject-to-resemble-taskcompletionsource-behavior/72561100?noredirect=1#comment128188846_72561100
//
// comment out "#define USING_LAZY"
#define USING_LAZY
using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
using System.Threading.Tasks;
class Program
{
static void Main()
{
const int count = 10_000;
var mem0 = GC.GetTotalAllocatedBytes(true);
for (int i = 0; i < count; i++)
{
var subject = new PromiseSubject<int>();
var subscription = subject.Subscribe();
subscription.Dispose();
}
var mem1 = GC.GetTotalAllocatedBytes(true);
Console.WriteLine($"Allocated {(mem1 - mem0) / count:#,0} bytes per instance");
}
public class PromiseSubject<T> : ISubject<T>
{
private readonly TaskCompletionSource<(bool HasValue, T Value)> _tcs = new();
#if USING_LAZY
private readonly Lazy<IObservable<T>> _lazyObservable;
#else
private readonly IObservable<T> _observable;
#endif
public PromiseSubject()
{
#if USING_LAZY
_lazyObservable = new(() =>
_tcs.Task.ToObservable()
.Where(r => r.HasValue)
.Select(r => r.Value!));
#else
_observable = _tcs.Task
.ToObservable()
.Where(r => r.HasValue)
.Select(r => r.Value!);
#endif
}
public void OnCompleted() =>
_tcs.TrySetResult((false, default!));
public void OnError(Exception error) =>
_tcs.TrySetException(error);
public void OnNext(T value) =>
_tcs.TrySetResult((true, value));
public IDisposable Subscribe(IObserver<T> observer) =>
#if USING_LAZY
_lazyObservable.Value.Subscribe(observer);
#else
_observable.Subscribe(observer);
#endif
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment