Skip to content

Instantly share code, notes, and snippets.

@MikeBild
Created February 15, 2012 23:04
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MikeBild/1839871 to your computer and use it in GitHub Desktop.
Save MikeBild/1839871 to your computer and use it in GitHub Desktop.
Minimod.MessageProcessor and Minimod.ZeroMqMessageStream as Document Saga (Message) Demo
using System;
using System.Reactive.Linq;
using Minimod.MessageProcessor;
using Minimod.ZeroMqMessageStream;
namespace ConsoleApplication1
{
class Program
{
static void Main(string[] args)
{
var messageStream1 = new ZeroMqMessageStream("tcp://127.0.0.1:8097", "tcp://127.0.0.1:8097", "tcp://127.0.0.1:8098", "tcp://127.0.0.1:8099");
var messageStream2 = new ZeroMqMessageStream("tcp://127.0.0.1:8098", "tcp://127.0.0.1:8097", "tcp://127.0.0.1:8098", "tcp://127.0.0.1:8099");
var messageStream3 = new ZeroMqMessageStream("tcp://127.0.0.1:8099", "tcp://127.0.0.1:8097", "tcp://127.0.0.1:8098", "tcp://127.0.0.1:8099");
var sendStream = new ZeroMqMessageStream(new[] { "tcp://127.0.0.1:8097", "tcp://127.0.0.1:8098", "tcp://127.0.0.1:8099" });
new Chapter1Processor(messageStream1);
new Chapter2Processor(messageStream2);
new DoneProcessor(messageStream3);
Console.ReadLine();
sendStream.Send(new ChaptersDocumentMessage
{
Bookmark = "Chapter1",
Chaper1 = "Empty",
Chaper2 = "Empty",
ThankYouMessage = "Thank you"
});
sendStream.Send(new ChaptersDocumentMessage
{
Bookmark = "Chapter1",
Chaper1 = "Empty",
Chaper2 = "Empty",
ThankYouMessage = "Thanx",
});
Console.ReadLine();
messageStream1.Dispose();
messageStream2.Dispose();
messageStream3.Dispose();
sendStream.Dispose();
}
}
public class ChaptersDocumentMessage : DocumentMessage
{
public string Bookmark { get; set; }
public string ThankYouMessage { get; set; }
public string Chaper1 { get; set; }
public string Chaper2 { get; set; }
}
public class Chapter1Processor : MessageProcessor
{
public Chapter1Processor(IObservable<object> messageStream)
: base(messageStream)
{
OnReceive<ChaptersDocumentMessage>(messages => messages
.Where(message => message.Bookmark == "Chapter1")
.Do(FillChapter));
}
private void FillChapter(ChaptersDocumentMessage message)
{
message.Chaper1 = "Filled";
message.Bookmark = "Chapter2";
message.MessageStreamContext.Send(message);
}
}
public class Chapter2Processor : MessageProcessor
{
public Chapter2Processor(IObservable<object> messageStream)
: base(messageStream)
{
OnReceive<ChaptersDocumentMessage>(messages => messages
.Where(message => message.Bookmark == "Chapter2")
.Do(FillChapter));
}
private void FillChapter(ChaptersDocumentMessage message)
{
message.Chaper2 = "Filled";
message.Bookmark = "Done";
message.MessageStreamContext.Send(message);
}
}
public class DoneProcessor : MessageProcessor
{
public DoneProcessor(IObservable<object> messageStream)
: base(messageStream)
{
OnReceive<ChaptersDocumentMessage>(messages => messages
.Where(message => message.Bookmark == "Done")
.Do(DocumentDone));
}
private void DocumentDone(ChaptersDocumentMessage message)
{
Console.WriteLine(String.Format("{0} for filling document.", message.ThankYouMessage));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment