Skip to content

Instantly share code, notes, and snippets.

@yreynhout
Created September 23, 2010 08:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yreynhout/593356 to your computer and use it in GitHub Desktop.
Save yreynhout/593356 to your computer and use it in GitHub Desktop.
Rx testable chaining and streaming
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
namespace ConsoleApplication4 {
public class Input {
public string DataYouGotFromSomeDataSource { get; set; }
}
public class OutputOfStep1 {
public Input OriginalInput { get; set; }
public char[] FilteredAndTransformedData { get; set; }
}
public class OutputOfStep2 {
public Input OriginalInput { get; set; }
public byte[] FilteredAndTransformedData { get; set; }
}
public class Output {
public Input OriginalInput { get; set; }
public long SumOfBytes { get; set; }
}
public class OutputOfStep1Producer : ProducerBase<Input, OutputOfStep1> {
public OutputOfStep1Producer(IObservable<Input> observedProducer, CancellationToken cancellationToken)
: base(observedProducer, cancellationToken) {
//Ctor is a nice place to inject other services you need ...
}
protected override void ProcessInput(Input input) {
if(input.DataYouGotFromSomeDataSource.Length > 2) { //Some filter
ProduceOutput( //Some transformation
new OutputOfStep1 {
OriginalInput = input,
FilteredAndTransformedData = input.DataYouGotFromSomeDataSource.ToCharArray(1, input.DataYouGotFromSomeDataSource.Length - 1)
});
}
}
}
public class OutputOfStep2Producer : ProducerBase<OutputOfStep1, OutputOfStep2> {
public OutputOfStep2Producer(IObservable<OutputOfStep1> observedProducer, CancellationToken cancellationToken)
: base(observedProducer, cancellationToken) {}
protected override void ProcessInput(OutputOfStep1 input) {
if(input.FilteredAndTransformedData.Length > 2) { //Some more filtering
ProduceOutput(new OutputOfStep2 { //Some transformation
OriginalInput = input.OriginalInput,
FilteredAndTransformedData = Encoding.UTF8.GetBytes(input.FilteredAndTransformedData)
});
}
}
}
public class OutputProducer : ProducerBase<OutputOfStep2, Output> {
public OutputProducer(IObservable<OutputOfStep2> observedProducer, CancellationToken cancellationToken)
: base(observedProducer, cancellationToken) {}
protected override void ProcessInput(OutputOfStep2 input) {
if(input.FilteredAndTransformedData.Length > 4) { //Some extra filtering
ProduceOutput( //Some transformation
new Output {
OriginalInput = input.OriginalInput,
SumOfBytes = input.FilteredAndTransformedData.Sum(x => Convert.ToInt64(x))
});
}
}
}
public interface IOutputStrategyFactory {
IObservable<Output> Create(IObservable<Input> streamOfInput, CancellationToken cancellationToken);
}
public class CookedUpFactory : IOutputStrategyFactory {
public IObservable<Output> Create(IObservable<Input> streamOfInput, CancellationToken cancellationToken) {
// Chaining it all up
return new OutputProducer(
new OutputOfStep2Producer(
new OutputOfStep1Producer(streamOfInput,
cancellationToken),
cancellationToken),
cancellationToken);
}
}
class Program {
static void Main() {
IOutputStrategyFactory factory = new CookedUpFactory();
Console.WriteLine("Example 1");
var inputStream1 = new Subject<Input>();
var cancellationTokenSource = new CancellationTokenSource();
var outputStream1 = factory.Create(inputStream1, cancellationTokenSource.Token);
using(outputStream1.Subscribe(
x => Console.WriteLine("Input: {0}, Output: {1}", x.OriginalInput.DataYouGotFromSomeDataSource, x.SumOfBytes))) {
for (int i = 0; i < 10; i++) {
inputStream1.OnNext(new Input { DataYouGotFromSomeDataSource = new string('A', i)});
}
inputStream1.OnCompleted();
}
Console.WriteLine("");
Console.WriteLine("Example 2");
var inputStream2 = new Subject<Input>();
var outputStream2 = factory.Create(inputStream2, cancellationTokenSource.Token);
const int threshold = 2;
var producedOutputCounter = 0;
using(outputStream2.Subscribe(
x => {
producedOutputCounter++;
Console.WriteLine("Input: {0}, Output: {1}", x.OriginalInput.DataYouGotFromSomeDataSource, x.SumOfBytes);
//When 2 outputs have been produced we have enough output, just cancel the whole thing
//Note this is a condition outside of the control of the input stream.
if (producedOutputCounter == threshold) { cancellationTokenSource.Cancel(); }
})) {
for (int i = 0; i < 10; i++) {
if (!cancellationTokenSource.IsCancellationRequested) {
inputStream2.OnNext(new Input {DataYouGotFromSomeDataSource = new string('A', i)});
}
}
inputStream2.OnCompleted();
}
Console.ReadLine();
}
}
/// <summary>
/// Provides the base class for all producers in the chain.
/// </summary>
/// <typeparam name="TInput">The type of input.</typeparam>
/// <typeparam name="TOutput">The type of output.</typeparam>
public abstract class ProducerBase<TInput, TOutput> : IObservable<TOutput>, IDisposable {
private readonly CancellationToken _cancellationToken;
private readonly Subject<TOutput> _subject;
private readonly IDisposable _subscription;
/// <summary>
/// Initializes a new instance of the <see cref="ProducerBase&lt;TInput, TOutput&gt;"/> class.
/// </summary>
/// <param name="observedProducer">The observed producer.</param>
/// <param name="cancellationToken">The cancellation token.</param>
protected ProducerBase(IObservable<TInput> observedProducer, CancellationToken cancellationToken) {
if (observedProducer == null) throw new ArgumentNullException("observedProducer");
_cancellationToken = cancellationToken;
_subject = new Subject<TOutput>();
_subscription = observedProducer.Subscribe(OnObservedProducerInput, OnObservedProducerCompleted);
}
/// <summary>
/// Called when the observed producer has completed its work.
/// </summary>
void OnObservedProducerCompleted() {
_subject.OnCompleted();
Dispose();
}
/// <summary>
/// Called when the observed producer produces an input value.
/// </summary>
/// <param name="input">The input.</param>
void OnObservedProducerInput(TInput input) {
if (IsCancelled) return;
ProcessInput(input);
}
/// <summary>
/// Gets a value indicating whether this instance is cancelled.
/// </summary>
/// <value>
/// <c>true</c> if this instance is cancelled; otherwise, <c>false</c>.
/// </value>
protected bool IsCancelled { get { return _cancellationToken.IsCancellationRequested; } }
/// <summary>
/// Called when an input value must be processed.
/// </summary>
/// <param name="input">The input value.</param>
protected abstract void ProcessInput(TInput input);
/// <summary>
/// Produces the output.
/// </summary>
/// <param name="output">The output.</param>
protected virtual void ProduceOutput(TOutput output) {
if (IsCancelled) return;
_subject.OnNext(output);
}
/// <summary>
/// Subscribes an observer to the observable sequence.
/// </summary>
/// <param name="observer">The observer.</param>
/// <returns>A subscription.</returns>
public IDisposable Subscribe(IObserver<TOutput> observer) {
return _subject.Subscribe(observer);
}
/// <summary>
/// Gets or sets a value indicating whether this instance is disposed.
/// </summary>
/// <value>
/// <c>true</c> if this instance is disposed; otherwise, <c>false</c>.
/// </value>
public bool IsDisposed { get; private set; }
/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
public void Dispose() {
Dispose(true);
}
/// <summary>
/// Releases unmanaged and - optionally - managed resources
/// </summary>
/// <param name="disposing"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param>
protected virtual void Dispose(bool disposing) {
if (disposing && !IsDisposed) {
_subscription.Dispose();
IsDisposed = true;
}
}
}
}
@yreynhout
Copy link
Author

Things to note

  • The producers are observable. This is not strictly necessary. You could inject a subject instead of creating it (as is done in the producer base class).
  • This example is very class oriented (read verbose) while the Rx examples usually are more Linq and Delegate oriented. It does however allow for testing each piece of the chain more easily.

General remarks

  • Terminology is off.
  • I'm not really sure if I'm abusing Rx for something it wasn't meant for, but so far I like it.
  • This was my first encounter with Rx and I'm sure there are simpler ways to do this example. However do consider that testability and readability were one of my primary concerns at the time. As well as the ability to create different chains from various producers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment