Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Observable.Create Example
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Example
{
internal class Program
{
//Keep track of observers as they are created so we can see how many were created
public static readonly List<IObserver<Ticket>> Observers = new List<IObserver<Ticket>>();
private static void Main()
{
var ticketObservable = Observable.Create((Func<IObserver<Ticket>, IDisposable>)TicketFactory.TicketSubscribe);
var handle = ticketObservable.Subscribe(ticket => Console.WriteLine(ticket.ToString()));
var handle2 = ticketObservable.Subscribe(ticket => Console.WriteLine(ticket.ToString()));
while (true)
{
Console.WriteLine($"Observer Count: {Observers.Count}");
if (Observers.Count > 1)
{
Console.WriteLine($"Are the first two observers the same object? {ReferenceEquals(Observers.First(), Observers[1])}");
}
Thread.Sleep(1000);
}
}
}
internal class Ticket
{
private readonly string ticketID;
private readonly DateTime timeStamp;
public Ticket(string tid)
{
ticketID = tid;
timeStamp = DateTime.Now;
}
public override string ToString() => $"Ticket ID : {ticketID}\nTimestamp : {timeStamp}\n";
}
public class TicketFactory : IDisposable
{
private bool bGenerate = true;
internal TicketFactory(object ticketObserver) => Task.Factory.StartNew(TicketGenerator, ticketObserver);
public void Dispose()
{
bGenerate = false;
}
private void TicketGenerator(object observer)
{
var ticketObserver = (IObserver<Ticket>)observer;
Program.Observers.Add(ticketObserver);
Ticket t;
while (bGenerate)
{
t = new Ticket(Guid.NewGuid().ToString());
ticketObserver.OnNext(t);
Thread.Sleep(3000);
}
}
public static IDisposable TicketSubscribe(object ticketObserver) => new TicketFactory(ticketObserver);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.