Created
September 23, 2010 08:51
-
-
Save yreynhout/593356 to your computer and use it in GitHub Desktop.
Rx testable chaining and streaming
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using System.Threading; | |
namespace ConsoleApplication4 { | |
public class Input { | |
public string DataYouGotFromSomeDataSource { get; set; } | |
} | |
public class OutputOfStep1 { | |
public Input OriginalInput { get; set; } | |
public char[] FilteredAndTransformedData { get; set; } | |
} | |
public class OutputOfStep2 { | |
public Input OriginalInput { get; set; } | |
public byte[] FilteredAndTransformedData { get; set; } | |
} | |
public class Output { | |
public Input OriginalInput { get; set; } | |
public long SumOfBytes { get; set; } | |
} | |
public class OutputOfStep1Producer : ProducerBase<Input, OutputOfStep1> { | |
public OutputOfStep1Producer(IObservable<Input> observedProducer, CancellationToken cancellationToken) | |
: base(observedProducer, cancellationToken) { | |
//Ctor is a nice place to inject other services you need ... | |
} | |
protected override void ProcessInput(Input input) { | |
if(input.DataYouGotFromSomeDataSource.Length > 2) { //Some filter | |
ProduceOutput( //Some transformation | |
new OutputOfStep1 { | |
OriginalInput = input, | |
FilteredAndTransformedData = input.DataYouGotFromSomeDataSource.ToCharArray(1, input.DataYouGotFromSomeDataSource.Length - 1) | |
}); | |
} | |
} | |
} | |
public class OutputOfStep2Producer : ProducerBase<OutputOfStep1, OutputOfStep2> { | |
public OutputOfStep2Producer(IObservable<OutputOfStep1> observedProducer, CancellationToken cancellationToken) | |
: base(observedProducer, cancellationToken) {} | |
protected override void ProcessInput(OutputOfStep1 input) { | |
if(input.FilteredAndTransformedData.Length > 2) { //Some more filtering | |
ProduceOutput(new OutputOfStep2 { //Some transformation | |
OriginalInput = input.OriginalInput, | |
FilteredAndTransformedData = Encoding.UTF8.GetBytes(input.FilteredAndTransformedData) | |
}); | |
} | |
} | |
} | |
public class OutputProducer : ProducerBase<OutputOfStep2, Output> { | |
public OutputProducer(IObservable<OutputOfStep2> observedProducer, CancellationToken cancellationToken) | |
: base(observedProducer, cancellationToken) {} | |
protected override void ProcessInput(OutputOfStep2 input) { | |
if(input.FilteredAndTransformedData.Length > 4) { //Some extra filtering | |
ProduceOutput( //Some transformation | |
new Output { | |
OriginalInput = input.OriginalInput, | |
SumOfBytes = input.FilteredAndTransformedData.Sum(x => Convert.ToInt64(x)) | |
}); | |
} | |
} | |
} | |
public interface IOutputStrategyFactory { | |
IObservable<Output> Create(IObservable<Input> streamOfInput, CancellationToken cancellationToken); | |
} | |
public class CookedUpFactory : IOutputStrategyFactory { | |
public IObservable<Output> Create(IObservable<Input> streamOfInput, CancellationToken cancellationToken) { | |
// Chaining it all up | |
return new OutputProducer( | |
new OutputOfStep2Producer( | |
new OutputOfStep1Producer(streamOfInput, | |
cancellationToken), | |
cancellationToken), | |
cancellationToken); | |
} | |
} | |
class Program { | |
static void Main() { | |
IOutputStrategyFactory factory = new CookedUpFactory(); | |
Console.WriteLine("Example 1"); | |
var inputStream1 = new Subject<Input>(); | |
var cancellationTokenSource = new CancellationTokenSource(); | |
var outputStream1 = factory.Create(inputStream1, cancellationTokenSource.Token); | |
using(outputStream1.Subscribe( | |
x => Console.WriteLine("Input: {0}, Output: {1}", x.OriginalInput.DataYouGotFromSomeDataSource, x.SumOfBytes))) { | |
for (int i = 0; i < 10; i++) { | |
inputStream1.OnNext(new Input { DataYouGotFromSomeDataSource = new string('A', i)}); | |
} | |
inputStream1.OnCompleted(); | |
} | |
Console.WriteLine(""); | |
Console.WriteLine("Example 2"); | |
var inputStream2 = new Subject<Input>(); | |
var outputStream2 = factory.Create(inputStream2, cancellationTokenSource.Token); | |
const int threshold = 2; | |
var producedOutputCounter = 0; | |
using(outputStream2.Subscribe( | |
x => { | |
producedOutputCounter++; | |
Console.WriteLine("Input: {0}, Output: {1}", x.OriginalInput.DataYouGotFromSomeDataSource, x.SumOfBytes); | |
//When 2 outputs have been produced we have enough output, just cancel the whole thing | |
//Note this is a condition outside of the control of the input stream. | |
if (producedOutputCounter == threshold) { cancellationTokenSource.Cancel(); } | |
})) { | |
for (int i = 0; i < 10; i++) { | |
if (!cancellationTokenSource.IsCancellationRequested) { | |
inputStream2.OnNext(new Input {DataYouGotFromSomeDataSource = new string('A', i)}); | |
} | |
} | |
inputStream2.OnCompleted(); | |
} | |
Console.ReadLine(); | |
} | |
} | |
/// <summary> | |
/// Provides the base class for all producers in the chain. | |
/// </summary> | |
/// <typeparam name="TInput">The type of input.</typeparam> | |
/// <typeparam name="TOutput">The type of output.</typeparam> | |
public abstract class ProducerBase<TInput, TOutput> : IObservable<TOutput>, IDisposable { | |
private readonly CancellationToken _cancellationToken; | |
private readonly Subject<TOutput> _subject; | |
private readonly IDisposable _subscription; | |
/// <summary> | |
/// Initializes a new instance of the <see cref="ProducerBase<TInput, TOutput>"/> class. | |
/// </summary> | |
/// <param name="observedProducer">The observed producer.</param> | |
/// <param name="cancellationToken">The cancellation token.</param> | |
protected ProducerBase(IObservable<TInput> observedProducer, CancellationToken cancellationToken) { | |
if (observedProducer == null) throw new ArgumentNullException("observedProducer"); | |
_cancellationToken = cancellationToken; | |
_subject = new Subject<TOutput>(); | |
_subscription = observedProducer.Subscribe(OnObservedProducerInput, OnObservedProducerCompleted); | |
} | |
/// <summary> | |
/// Called when the observed producer has completed its work. | |
/// </summary> | |
void OnObservedProducerCompleted() { | |
_subject.OnCompleted(); | |
Dispose(); | |
} | |
/// <summary> | |
/// Called when the observed producer produces an input value. | |
/// </summary> | |
/// <param name="input">The input.</param> | |
void OnObservedProducerInput(TInput input) { | |
if (IsCancelled) return; | |
ProcessInput(input); | |
} | |
/// <summary> | |
/// Gets a value indicating whether this instance is cancelled. | |
/// </summary> | |
/// <value> | |
/// <c>true</c> if this instance is cancelled; otherwise, <c>false</c>. | |
/// </value> | |
protected bool IsCancelled { get { return _cancellationToken.IsCancellationRequested; } } | |
/// <summary> | |
/// Called when an input value must be processed. | |
/// </summary> | |
/// <param name="input">The input value.</param> | |
protected abstract void ProcessInput(TInput input); | |
/// <summary> | |
/// Produces the output. | |
/// </summary> | |
/// <param name="output">The output.</param> | |
protected virtual void ProduceOutput(TOutput output) { | |
if (IsCancelled) return; | |
_subject.OnNext(output); | |
} | |
/// <summary> | |
/// Subscribes an observer to the observable sequence. | |
/// </summary> | |
/// <param name="observer">The observer.</param> | |
/// <returns>A subscription.</returns> | |
public IDisposable Subscribe(IObserver<TOutput> observer) { | |
return _subject.Subscribe(observer); | |
} | |
/// <summary> | |
/// Gets or sets a value indicating whether this instance is disposed. | |
/// </summary> | |
/// <value> | |
/// <c>true</c> if this instance is disposed; otherwise, <c>false</c>. | |
/// </value> | |
public bool IsDisposed { get; private set; } | |
/// <summary> | |
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. | |
/// </summary> | |
public void Dispose() { | |
Dispose(true); | |
} | |
/// <summary> | |
/// Releases unmanaged and - optionally - managed resources | |
/// </summary> | |
/// <param name="disposing"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param> | |
protected virtual void Dispose(bool disposing) { | |
if (disposing && !IsDisposed) { | |
_subscription.Dispose(); | |
IsDisposed = true; | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Things to note
General remarks