Created
January 3, 2012 11:24
-
-
Save OmerMor/1554548 to your computer and use it in GitHub Desktop.
Rx & Async
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Reactive.Concurrency; | |
using System.Reactive.Linq; | |
using System.Reactive.Threading.Tasks; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using NUnit.Framework; | |
namespace Demo | |
{ | |
public class RxAsyncTest | |
{ | |
private readonly EventLoopScheduler m_eventLoopScheduler; | |
private int m_eventLoopSchedulerThreadId; | |
public RxAsyncTest() | |
{ | |
m_eventLoopScheduler = new EventLoopScheduler(start => new Thread(() => | |
{ | |
m_eventLoopSchedulerThreadId = Thread.CurrentThread.ManagedThreadId; | |
Console.WriteLine("EventLoopScheduler: Thread Id = {0}", | |
m_eventLoopSchedulerThreadId); | |
start(); | |
})); | |
} | |
[Test] | |
public void Await_in_subscription() | |
{ | |
Console.WriteLine("Main: Thread Id = {0}", Thread.CurrentThread.ManagedThreadId); | |
var finishedEvent = new AutoResetEvent(false); | |
Observable | |
.Return(Tuple.Create(10, 20), m_eventLoopScheduler) | |
.Subscribe(async tuple => | |
{ | |
Console.WriteLine("Observer before: Thread Id = {0}", Thread.CurrentThread.ManagedThreadId); | |
var result = await AddAsync(tuple.Item1, tuple.Item2); | |
Console.WriteLine("Observer after: Thread Id = {0}", Thread.CurrentThread.ManagedThreadId); | |
finishedEvent.Set(); | |
}); | |
finishedEvent.WaitOne(); | |
} | |
[Test] | |
public void Use_SchedulerSynchronizationContext() | |
{ | |
var schedulerThreadId = -1; | |
var scheduler = new SyncContextEventLoopScheduler( | |
start => new Thread(() => | |
{ | |
schedulerThreadId = Thread.CurrentThread.ManagedThreadId; | |
Console.WriteLine("SyncContextEventLoopScheduler: Thread Id = {0}", | |
schedulerThreadId); | |
start(); | |
})); | |
Console.WriteLine("Test: Thread Id = {0}", Thread.CurrentThread.ManagedThreadId); | |
var finishedEvent = new AutoResetEvent(false); | |
var threadIds = new List<int>(); | |
Observable | |
.Return(Tuple.Create(10, 20), scheduler) | |
.Subscribe(async tuple => | |
{ | |
Console.WriteLine("Observer before: Thread Id = {0}", Thread.CurrentThread.ManagedThreadId); | |
var result = await AddWithRxAsync(tuple.Item1, tuple.Item2, scheduler); | |
threadIds.Add(Thread.CurrentThread.ManagedThreadId); | |
Console.WriteLine("Observer after: Thread Id = {0}", Thread.CurrentThread.ManagedThreadId); | |
finishedEvent.Set(); | |
}); | |
finishedEvent.WaitOne(); | |
Assert.AreEqual(schedulerThreadId, threadIds.Distinct().Single()); | |
} | |
public Task<int> AddAsync(int x, int y) | |
{ | |
var task = Task.Factory.StartNew(() => | |
{ | |
Console.WriteLine("Add: Thread Id = {0}", Thread.CurrentThread.ManagedThreadId); | |
Thread.Sleep(TimeSpan.FromSeconds(1)); | |
return x + y; | |
}); | |
return task; | |
} | |
} | |
public class SchedulerSynchronizationContext : SynchronizationContext | |
{ | |
private readonly IScheduler m_scheduler; | |
public SchedulerSynchronizationContext(IScheduler scheduler) | |
{ | |
m_scheduler = scheduler; | |
} | |
public override void Send(SendOrPostCallback callback, object state) | |
{ | |
throw new NotImplementedException("Too lazy to implemenet synchronous invocation now..."); | |
} | |
public override void Post(SendOrPostCallback callback, object state) | |
{ | |
m_scheduler.Schedule(() => callback.Invoke(state)); | |
} | |
} | |
public class SyncContextEventLoopScheduler : IScheduler | |
{ | |
private readonly EventLoopScheduler m_scheduler; | |
public SyncContextEventLoopScheduler(Func<ThreadStart, Thread> factory) | |
{ | |
m_scheduler = new EventLoopScheduler(factory); | |
setSyncContext(); | |
} | |
public SyncContextEventLoopScheduler() | |
{ | |
m_scheduler = new EventLoopScheduler(); | |
setSyncContext(); | |
} | |
#region IScheduler Members | |
public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action) | |
{ | |
return m_scheduler.Schedule(state, action); | |
} | |
public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action) | |
{ | |
return m_scheduler.Schedule(state, dueTime, action); | |
} | |
public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime, | |
Func<IScheduler, TState, IDisposable> action) | |
{ | |
return m_scheduler.Schedule(state, dueTime, action); | |
} | |
public DateTimeOffset Now | |
{ | |
get { return m_scheduler.Now; } | |
} | |
#endregion | |
private void setSyncContext() | |
{ | |
m_scheduler.Schedule(() => | |
{ | |
var syncContext = new SchedulerSynchronizationContext(this); | |
SynchronizationContext.SetSynchronizationContext(syncContext); | |
}); | |
} | |
public void Dispose() | |
{ | |
m_scheduler.Dispose(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment