Created
December 14, 2016 00:39
-
-
Save colindooley11/edd927c2168b57587376b5cae3bff9f4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace Asos.Finance.Payments.PayPal.EndToEndTests.Framework.MessageTesting | |
{ | |
using System; | |
using System.Reactive.Linq; | |
using System.Reactive.Subjects; | |
using System.Threading.Tasks; | |
public class Observer | |
{ | |
private static readonly ReplaySubject<object> MessageObservable = new ReplaySubject<object>(); | |
public static void OnNotify(object message) | |
{ | |
MessageObservable.OnNext(message); | |
} | |
public static Task<T> SubscribeToMessageTest<T>(Func<T, bool> test, string failureMessage, TimeSpan timeout) where T : class | |
{ | |
var tcs = new TaskCompletionSource<T>(); | |
MessageObservable | |
.Timeout(timeout) | |
.FirstAsync(message => RunTest(message, test)) | |
.Subscribe( | |
message => OnSuccess(tcs, message), | |
exception => OnFailure(exception, tcs, failureMessage)); | |
return tcs.Task; | |
} | |
private static bool RunTest<T>(object message, Func<T, bool> messageTest) where T : class | |
{ | |
var convertedMessage = ConvertMessage<T>(message); | |
if (convertedMessage != null) | |
{ | |
return messageTest(convertedMessage); | |
} | |
return false; | |
} | |
private static T ConvertMessage<T>(object message) where T : class | |
{ | |
var messageAsType = message as T; | |
return messageAsType; | |
} | |
private static void OnSuccess<T>(TaskCompletionSource<T> tcs, object message) where T : class | |
{ | |
tcs.SetResult(ConvertMessage<T>(message)); | |
} | |
private static void OnFailure<T>(Exception ex, TaskCompletionSource<T> tcs, string failureMessage) | |
{ | |
Console.WriteLine(failureMessage); | |
var messsageException = new PublishTestException(failureMessage, ex); | |
tcs.SetException(messsageException); | |
} | |
} | |
} |
ReplaySubject really nice to buffer up messages since channel is observed. New subscribers get all messags already received and so subscriptions not in place before messages start arriving will still see messages
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Part of a testing framework created to "pull" messages as they arrive on Service Bus