Skip to content

Instantly share code, notes, and snippets.

@makomweb
Last active December 17, 2020 10:09
Show Gist options
  • Save makomweb/8529192 to your computer and use it in GitHub Desktop.
Save makomweb/8529192 to your computer and use it in GitHub Desktop.
Rx playground. Create a new Rx operator which can be used for processing data. It is parametrizable with a scheduler.
using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;
using FluentAssertions;
using NUnit.Framework;
namespace Tests
{
public class RxPlayground
{
[Datapoint]
public IScheduler Current;
[Datapoint]
public IScheduler Default;
[Datapoint]
public IScheduler Immediate;
[Datapoint]
public IScheduler TaskPool;
[Datapoint]
public IScheduler NewThread;
public RxPlayground()
{
Current = Scheduler.CurrentThread;
Default = Scheduler.Default;
Immediate = Scheduler.Immediate;
TaskPool = TaskPoolScheduler.Default;
NewThread = NewThreadScheduler.Default;
}
[Theory]
public async Task Process_using_scheduler(IScheduler scheduler)
{
var sut = new ReplaySubject<string>();
sut.OnNext("message");
var result = await sut.Process(scheduler).FirstAsync();
result.Should().Be("message + processed!");
}
[Theory]
public async Task Process_async_using_scheduler(IScheduler scheduler)
{
var sut = new ReplaySubject<string>();
sut.OnNext("message");
var result = await sut.ProcessAsync(scheduler).FirstAsync();
result.Should().Be("message + processed!");
}
}
public static class MyRxOperator
{
public static IObservable<string> Process(this IObservable<string> source)
{
return Process(source, Scheduler.Immediate);
}
public static IObservable<string> Process(this IObservable<string> source, IScheduler scheduler)
{
return Observable.Create<string>(observer =>
source.Subscribe(x =>
scheduler.Schedule(() =>
observer.OnNext(Process(x))
)
)
);
}
public static IObservable<string> ProcessAsync(this IObservable<string> source, IScheduler scheduler)
{
return Observable.Create<string>(observer =>
source.Subscribe(x =>
scheduler.Schedule(async () =>
{
try
{
var result = await ProcessAsync(x);
observer.OnNext(result);
}
catch (Exception ex)
{
observer.OnError(ex);
}
}),
observer.OnError,
observer.OnCompleted)
);
}
private static string Process(string data)
{
return data + " + processed!";
}
private static Task<string> ProcessAsync(string data)
{
return Task.Factory.StartNew(() => Process(data));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment