Skip to content

Instantly share code, notes, and snippets.

@mrtank
Created April 8, 2016 15:05
Show Gist options
  • Save mrtank/4a08004199620d61fed46197bb30b3ef to your computer and use it in GitHub Desktop.
Save mrtank/4a08004199620d61fed46197bb30b3ef to your computer and use it in GitHub Desktop.
[TestMethod]
public void OnCompletedCalled()
{
TestScheduler scheduler = new TestScheduler();
ReplaySubject<TcpClient> subject = new ReplaySubject<TcpClient>(scheduler);
TcpListener listener = new TcpListener(IPAddress.Any, 11122);
listener.Start();
CancellationDisposable cancel = new CancellationDisposable();
bool isOk = false;
Func<IObservable<TcpClient>> asyncObs = Observable.FromAsyncPattern(listener.BeginAcceptTcpClient, listener.EndAcceptTcpClient);
asyncObs().Where(x => x.Connected)
.SubscribeOn(scheduler)
.ObserveOn(scheduler)
.Subscribe(
subject.OnNext,
cancel.Token);
subject
.SubscribeOn(scheduler)
.ObserveOn(scheduler)
.Subscribe(x => { }, () => { isOk = true; }, cancel.Token);
cancel.Token.Register(subject.OnCompleted);
cancel.Token.Register(listener.Stop);
//cancel.Token.Register(() => { isOk = true; });
cancel.Dispose();
scheduler.AdvanceBy(1);
Assert.IsTrue(isOk);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment