Skip to content

Instantly share code, notes, and snippets.

@mfcollins3
Created July 18, 2013 18:12
Show Gist options
  • Save mfcollins3/6031590 to your computer and use it in GitHub Desktop.
Save mfcollins3/6031590 to your computer and use it in GitHub Desktop.
Sample TPL Dataflow block that shows a custom dataflow block for Neuron ESB parties. The NeuronEsbBlock class can be used either as a source block or a target block in a dataflow pipeline.
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Neuron.Esb;
public sealed class NeuronEsbBlock : IDisposable,
IPropagatorBlock<ESBMessage, ESBMessage>,
IReceivableSourceBlock<ESBMessage>
{
private readonly IReceivableSourceBlock<ESBMessage> innerSourceBlock;
private readonly ITargetBlock<ESBMessage> innerTargetBlock;
private readonly Party party;
private bool disposed;
public NeuronEsbBlock(
SubscriberConfiguration clientConfig, SubscriberOptions options)
{
this.party = new Party(clientConfig, options);
var actionBlock = new ActionBlock<ESBMessage>(
message => this.party.SendMessage(message));
this.innerTargetBlock = actionBlock;
var bufferBlock = new BufferBlock<ESBMessage>();
this.innerSourceBlock = bufferBlock;
actionBlock.Completion.ContinueWith(
t =>
{
if (t.IsFaulted)
{
((IDataflowBlock)bufferBlock).Fault(t.Exception);
}
else
{
bufferBlock.Complete();
}
});
this.party.OnReceive += (sender, e) =>
{
if (null != e.Exception)
{
this.Fault(e.Exception);
}
else
{
bufferBlock.Post(e.Message);
}
};
}
~NeuronEsbBlock()
{
this.Dispose(false);
}
public Task Completion
{
get
{
return this.innerSourceBlock.Completion;
}
}
public void Complete()
{
this.innerTargetBlock.Complete();
}
public void Connect()
{
this.party.Connect();
}
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
public void Fault(Exception exception)
{
this.innerTargetBlock.Fault(exception);
}
bool IReceivableSourceBlock<ESBMessage>.TryReceive(
Predicate<ESBMessage> filter, out ESBMessage item)
{
return this.innerSourceBlock.TryReceive(filter, out item);
}
bool IReceivableSourceBlock<ESBMessage>.TryReceiveAll(out IList<ESBMessage> items)
{
return this.innerSourceBlock.TryReceiveAll(out items);
}
ESBMessage ISourceBlock<ESBMessage>.ConsumeMessage(
DataflowMessageHeader messageHeader, ITargetBlock<ESBMessage> target,
out bool messageConsumed)
{
return this.innerSourceBlock.ConsumeMessage(
messageHeader, target, out messageConsumed);
}
IDisposable ISourceBlock<ESBMessage>.LinkTo(
ITargetBlock<ESBMessage> target, DataflowLinkOptions linkOptions)
{
return this.innerSourceBlock.LinkTo(target, linkOptions);
}
void ISourceBlock<ESBMessage>.ReleaseReservation(
DataflowMessageHeader messageHeader, ITargetBlock<ESBMessage> target)
{
this.innerSourceBlock.ReleaseReservation(messageHeader, target);
}
bool ISourceBlock<ESBMessage>.ReserveMessage(
DataflowMessageHeader messageHeader, ITargetBlock<ESBMessage> target)
{
return this.innerSourceBlock.ReserveMessage(messageHeader, target);
}
DataflowMessageStatus ITargetBlock<ESBMessage>.OfferMessage(
DataflowMessageHeader messageHeader,
ESBMessage messageValue,
ISourceBlock<ESBMessage> source,
bool consumeToAccept)
{
return this.innerTargetBlock.OfferMessage(
messageHeader, messageValue, source, consumeToAccept);
}
private void Dispose(bool disposing)
{
if (this.disposed)
{
return;
}
this.party.Dispose();
if (!disposing)
{
return;
}
this.disposed = true;
}
}
using System;
using System.Threading.Tasks.Dataflow;
using Neuron.Esb;
internal class Program
{
private static void Main()
{
var publisherConfig = new SubscriberConfiguration(
"Publisher1", "Enterprise", "net.tcp://localhost:50000", null);
var subscriberConfig = new SubscriberConfiguration(
"Subscriber1", "Enterprise", "net.tcp://localhost:50000", null);
using (var publisherBlock = new NeuronEsbBlock(publisherConfig, SubscriberOptions.None))
using (var subscriberBlock = new NeuronEsbBlock(subscriberConfig, SubscriberOptions.None))
{
var outputBlock = new ActionBlock<ESBMessage>(
message => Console.Out.WriteLine("RECEIVED: {0}", message.Text));
using (subscriberBlock.LinkTo(outputBlock))
{
publisherBlock.Completion.ContinueWith(t => subscriberBlock.Complete());
subscriberBlock.Completion.ContinueWith(t => outputBlock.Complete());
publisherBlock.Connect();
subscriberBlock.Connect();
Console.Error.WriteLine("Enter the messages to publish.");
var line = Console.In.ReadLine();
while (!string.IsNullOrEmpty(line))
{
publisherBlock.Post(new ESBMessage("Topic1", line));
line = Console.In.ReadLine();
}
publisherBlock.Complete();
outputBlock.Completion.Wait();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment