Skip to content

Instantly share code, notes, and snippets.

@MelbourneDeveloper
Last active October 25, 2020 00:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MelbourneDeveloper/dc5f04782e54e20d5ef268b47d6adc08 to your computer and use it in GitHub Desktop.
Save MelbourneDeveloper/dc5f04782e54e20d5ef268b47d6adc08 to your computer and use it in GitHub Desktop.
Use a hot observable to share
/*
Output
Name: One Message: Hi
Name: Two Message: Hi
Name: One Message: Hi
Name: Two Message: Hi
Name: One Message: Hi
Name: Two Message: Hi
Name: One Message: Hi
Name: Two Message: Hi
*/
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Observables
{
class Subscriber
{
public string Name;
//Listen for OnNext and write to the debug window when it happens
public Subscriber(IObservable<string> observable, string name)
{
Name = name;
observable.Subscribe(s => Debug.WriteLine($"Name: {Name} Message: {s}"));
}
}
[TestClass]
public class UnitTest1
{
static string GetData() => "Hi";
[TestMethod]
public async Task Messaging()
{
var cancellationSource = new CancellationTokenSource();
var cancellationToken = cancellationSource.Token;
var coldObservable = Observable.Create<string>(observer =>
{
_ = Task.Run(async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
var data = GetData();
observer.OnNext(data);
await Task.Delay(1000);
}
}, cancellationToken);
return Disposable.Empty;
});
var publisher = coldObservable.Publish();
var connection = publisher.Connect();
new Subscriber(publisher, "One");
new Subscriber(publisher, "Two");
for (var i = 0; i < 5; i++)
{
if (i == 4)
{
cancellationSource.Cancel();
}
await Task.Delay(1000);
}
connection.Dispose();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment