Skip to content

Instantly share code, notes, and snippets.

@bobbychopra
Created April 8, 2012 23:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bobbychopra/2340392 to your computer and use it in GitHub Desktop.
Save bobbychopra/2340392 to your computer and use it in GitHub Desktop.
Take Initial Messages until Timeout Received (rx framework)
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive.Linq;
namespace ScratchApp
{
enum MessageType { Initial, Update }
class Message
{
public MessageType MessageType { get; set; }
public int SequenceNo { get; set; }
public object Data { get; set; }
public Message(MessageType messageType, int seqNo, object data)
{
MessageType = messageType;
SequenceNo = seqNo;
Data = data;
}
}
class Program
{
static void Main(string[] args)
{
var messages = Observable.Interval(TimeSpan.FromSeconds(1))
.Select(i => new Message(i < 5 ? MessageType.Initial : MessageType.Update, (int)i, null));
//all initial messages
messages.TakeWhile(m => m.MessageType == MessageType.Initial)
.Subscribe(m => Console.WriteLine(" InitialData: " + m.SequenceNo));
//all update messages
messages.SkipWhile(m => m.MessageType == MessageType.Initial)
.Subscribe(m => Console.WriteLine(" update: " + m.SequenceNo));
//create a timeout after 3 seconds
var timeout = Observable.Timer(TimeSpan.FromSeconds(3));
timeout.Subscribe(t => Console.WriteLine(" >>> TIMEOUT REACHED!"));
//all initial messages until timeout received
messages.TakeWhile(m => m.MessageType == MessageType.Initial)
.TakeUntil(timeout)
.Subscribe(m => Console.WriteLine(" initial data (no timeout): " + m.SequenceNo));
Console.ReadLine();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment