Skip to content

Instantly share code, notes, and snippets.

@valm
Created November 14, 2013 14:04
Show Gist options
  • Save valm/7467295 to your computer and use it in GitHub Desktop.
Save valm/7467295 to your computer and use it in GitHub Desktop.
TPL DataFlow and Reactive Extensions Example
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Reactive.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Xml.Linq;
namespace TPLDataFlowTest
{
class Program
{
static void Main(string[] args)
{
//TPLDemo1();
RxMix5();
//RxMix4();
Console.ReadKey();
}
private static void RxMix5()
{
var blockOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
BoundedCapacity = 1
};
ActionBlock<int> warmupBlock = new ActionBlock<int>(async i =>
{
await Task.Delay(1000);
Console.WriteLine(i);
}, blockOptions);
ActionBlock<int> postBlock = new ActionBlock<int>(async i =>
{
await Task.Delay(1000);
Console.WriteLine(i);
}, blockOptions);
IObservable<int> warmUpSource = Observable.Range(1, 100).TakeUntil(DateTimeOffset.UtcNow.AddSeconds(5));
warmUpSource.Subscribe(warmupBlock.AsObserver());
IObservable<int> testSource = Observable.Range(1000, 1000).TakeUntil(DateTimeOffset.UtcNow.AddSeconds(10));
testSource.Subscribe(postBlock.AsObserver());
}
private static void RxMix1()
{
IPropagatorBlock<int, string> source = new TransformBlock<int, string>(i => i.ToString());
IObservable<int> observable = source.AsObservable().Select(Int32.Parse);
IDisposable subscription = observable.Subscribe(i => Console.WriteLine(i));
// send some data into TDF
source.Post(138);
}
private static void RxMix2()
{
IPropagatorBlock<string, int> target = new TransformBlock<string, int>(s => Int32.Parse(s));
IDisposable link = target.LinkTo(new ActionBlock<int>(i => Console.WriteLine(i)));
IObserver<string> observer = target.AsObserver();
IObservable<string> observable = Observable.Range(1, 10).Select(i => i.ToString());
observable.Subscribe(observer);
}
private static void RxMix4()
{
var inputBlock = new BufferBlock<string>();
var transformInputBlock = new TransformBlock<string, XDocument>(s => XDocument.Parse(s));
var processBlock = new TransformBlock<XDocument, Tuple<string, int>>(
x =>
{
var person = x.Element("person");
return Tuple.Create((string)person.Element("name"), (int)person.Element("age"));
});
var transformOutputBlock =
new TransformBlock<Tuple<string, int>, string>(
t => string.Format(CultureInfo.CurrentCulture, "{0} is {1} years old", t.Item1, t.Item2));
var outputBlock = new ActionBlock<string>(m => Console.Out.WriteLine(m));
using (inputBlock.LinkTo(transformInputBlock))
using (transformInputBlock.LinkTo(processBlock))
using (processBlock.LinkTo(transformOutputBlock))
using (transformOutputBlock.LinkTo(outputBlock))
{
inputBlock.Completion.ContinueWith(t => transformInputBlock.Complete());
transformInputBlock.Completion.ContinueWith(t => processBlock.Complete());
processBlock.Completion.ContinueWith(t => transformOutputBlock.Complete());
transformOutputBlock.Completion.ContinueWith(t => outputBlock.Complete());
var records = new[]
{
"<person><name>Michael Collins</name><age>38</age></person>",
"<person><name>George Washington</name><age>281</age></person>",
"<person><name>Abraham Lincoln</name><age>204</age></person>"
};
foreach (var record in records)
{
inputBlock.Post(record);
}
inputBlock.Complete();
outputBlock.Completion.Wait();
}
}
private static void RxMix3()
{
IObservable<int> originalInts = Observable.Range(1, 10);
IPropagatorBlock<int, int[]> batch = new BatchBlock<int>(2);
IObservable<int[]> batched = batch.AsObservable();
originalInts.Subscribe(batch.AsObserver());
IObservable<int> added = batched.Timeout(TimeSpan.FromMilliseconds(50)).Select(a => a.Sum());
IPropagatorBlock<int, string> toString = new TransformBlock<int, string>(i => i.ToString());
added.Subscribe(toString.AsObserver());
JoinBlock<string, int> join = new JoinBlock<string, int>();
toString.LinkTo(join.Target1);
IObserver<int> joinIn2 = join.Target2.AsObserver();
originalInts.Subscribe(joinIn2);
IObservable<Tuple<string, int>> joined = join.AsObservable();
joined.Subscribe(t => Console.WriteLine("{0};{1}", t.Item1, t.Item2));
}
private static void TPLDemo1()
{
// Create the cancellation source.
var cancellationSource = new CancellationTokenSource();
var inputWorkBufferBlock = new BufferBlock<Uri>();
// Input - Uri - seed address
// Output - Uri - key, content, content-type
var downloaderBlock = new TransformBlock<Uri, string>(address =>
{
var httpClient = new HttpClient();
// Downloads the requested resource as a string.
Console.WriteLine("Downloading '{0}'... Thread id {1}", address.OriginalString, Thread.CurrentThread.ManagedThreadId);
var contentType = string.Empty;
var content = httpClient.GetAsync(address).ContinueWith(task =>
{
HttpResponseMessage response = task.Result;
if (task.Result.IsSuccessStatusCode)
{
return task.Result.Content.ReadAsStringAsync();
}
return new Task<string>(() => null);
}).Unwrap();
return content.Result;
}, new ExecutionDataflowBlockOptions
{
CancellationToken = cancellationSource.Token,
MaxDegreeOfParallelism = 5
});
var outputBufferBlock = new BufferBlock<string>();
var saverBlock = new ActionBlock<string>(content =>
{
if (content != null)
{
const string targetPath = "c:\\temp\\TPLtest";
const string extension = ".html";
var fileName = Path.ChangeExtension(Path.Combine(targetPath, Path.GetRandomFileName()), extension);
Console.WriteLine("Saving {0} ...Thread: {1}", fileName, Thread.CurrentThread.ManagedThreadId);
using (var stream = new StreamWriter(fileName))
{
stream.Write(content);
}
}
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1
});
// Blocks linking
inputWorkBufferBlock.LinkTo(downloaderBlock);
// Filtering, skips empty response
downloaderBlock.LinkTo(outputBufferBlock, s => !string.IsNullOrWhiteSpace(s));
outputBufferBlock.LinkTo(saverBlock);
// Propagating completition
inputWorkBufferBlock.Completion.ContinueWith(t =>
{
if (t.IsFaulted)
{
((IDataflowBlock)downloaderBlock).Fault(t.Exception);
}
else
{
downloaderBlock.Complete();
}
});
downloaderBlock.Completion.ContinueWith(t =>
{
if (t.IsFaulted)
{
((IDataflowBlock)outputBufferBlock).Fault(t.Exception);
}
else
{
outputBufferBlock.Complete();
}
});
outputBufferBlock.Completion.ContinueWith(t =>
{
if (t.IsFaulted)
{
((IDataflowBlock)outputBufferBlock).Fault(t.Exception);
}
else
{
outputBufferBlock.Complete();
}
});
outputBufferBlock.Completion.ContinueWith(t =>
{
if (t.IsFaulted)
{
((IDataflowBlock)saverBlock).Fault(t.Exception);
}
else
{
saverBlock.Complete();
}
});
// Message passing
inputWorkBufferBlock.Post(new Uri("http://svnbook.red-bean.com/nightly/ru/svn-book.html"));
inputWorkBufferBlock.Post(new Uri("http://bash.im"));
inputWorkBufferBlock.Post(new Uri("http://habrahabr.ru"));
inputWorkBufferBlock.Post(new Uri("http://lb.ua"));
inputWorkBufferBlock.Post(new Uri("http://blogs.msdn.com/b/pfxteam/"));
inputWorkBufferBlock.Post(new Uri("http://hgbook.red-bean.com/read/a-tour-of-mercurial-merging-work.html"));
inputWorkBufferBlock.Complete();
saverBlock.Completion.Wait();
Console.WriteLine("Job is DONE...");
Console.WriteLine("Hit ANY KEY to exit...");
Console.ReadKey();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment