Skip to content

Instantly share code, notes, and snippets.

@ajrouvoet
Created April 16, 2014 12:52
Show Gist options
  • Save ajrouvoet/10869633 to your computer and use it in GitHub Desktop.
Save ajrouvoet/10869633 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
namespace System.Reactive
{
public static class Observable
{
/// <summary>
/// Creates an observable sequence object from the specified subscription function.
/// </summary>
/// <typeparam name="subscribe">Subscribe method implementation.</typeparam>
public static IObservable<T> Create<T>(Func<ObserverBase<T>, ISubscription> subscribe)
{
return new AnonymousObservable<T>((obs) => obs.Add(subscribe(obs)));
}
/// <summary>
/// Creates an observable sequence object from the specified subscription function.
/// </summary>
/// <typeparam name="subscribe">Subscribe method implementation.</typeparam>
public static IObservable<T> Create<T>(Action<ObserverBase<T>> subscribe)
{
return new AnonymousObservable<T>((obs) => subscribe(obs));
}
/// <summary>
/// Creates an observable from it's arguments
/// </summary>
public static IObservable<T> Of<T>(params T[] args)
{
return Observable.Create<T>((ObserverBase<T> obs) => obs.Of(args));
}
}
}
using System;
namespace System.Reactive
{
/// <summary>
/// Class to create an IObservable&lt;T&gt; instance from a delegate-based implementation of the Subscribe method.
/// </summary>
/// <typeparam name="T">The type of the elements in the sequence.</typeparam>
public class AnonymousObservable<T> : ObservableBase<T>
{
readonly Action<ObserverBase<T>> _subscribe;
/// <summary>
/// Creates an observable sequence object from the specified subscription function.
/// </summary>
/// <param name="subscribe">Subscribe method implementation.</param>
/// <exception cref="ArgumentNullException"><paramref name="subscribe"/> is null.</exception>
public AnonymousObservable(Action<ObserverBase<T>> subscribe)
{
if (subscribe == null)
throw new ArgumentNullException("subscribe");
_subscribe = subscribe;
}
/// <summary>
/// Calls the subscription function that was supplied to the constructor.
/// </summary>
/// <param name="observer">Observer to send notifications to.</param>
/// <returns>Disposable object representing an observer's subscription to the observable sequence.</returns>
internal override void UnsafeSubscribe(ObserverBase<T> observer)
{
_subscribe(observer);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment