Skip to content

Instantly share code, notes, and snippets.

@colindooley11
Created December 14, 2016 00:39
Show Gist options
  • Save colindooley11/edd927c2168b57587376b5cae3bff9f4 to your computer and use it in GitHub Desktop.
Save colindooley11/edd927c2168b57587376b5cae3bff9f4 to your computer and use it in GitHub Desktop.
namespace Asos.Finance.Payments.PayPal.EndToEndTests.Framework.MessageTesting
{
using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;
public class Observer
{
private static readonly ReplaySubject<object> MessageObservable = new ReplaySubject<object>();
public static void OnNotify(object message)
{
MessageObservable.OnNext(message);
}
public static Task<T> SubscribeToMessageTest<T>(Func<T, bool> test, string failureMessage, TimeSpan timeout) where T : class
{
var tcs = new TaskCompletionSource<T>();
MessageObservable
.Timeout(timeout)
.FirstAsync(message => RunTest(message, test))
.Subscribe(
message => OnSuccess(tcs, message),
exception => OnFailure(exception, tcs, failureMessage));
return tcs.Task;
}
private static bool RunTest<T>(object message, Func<T, bool> messageTest) where T : class
{
var convertedMessage = ConvertMessage<T>(message);
if (convertedMessage != null)
{
return messageTest(convertedMessage);
}
return false;
}
private static T ConvertMessage<T>(object message) where T : class
{
var messageAsType = message as T;
return messageAsType;
}
private static void OnSuccess<T>(TaskCompletionSource<T> tcs, object message) where T : class
{
tcs.SetResult(ConvertMessage<T>(message));
}
private static void OnFailure<T>(Exception ex, TaskCompletionSource<T> tcs, string failureMessage)
{
Console.WriteLine(failureMessage);
var messsageException = new PublishTestException(failureMessage, ex);
tcs.SetException(messsageException);
}
}
}
@colindooley11
Copy link
Author

Part of a testing framework created to "pull" messages as they arrive on Service Bus

@colindooley11
Copy link
Author

ReplaySubject really nice to buffer up messages since channel is observed. New subscribers get all messags already received and so subscriptions not in place before messages start arriving will still see messages

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment