Skip to content

Instantly share code, notes, and snippets.

@OmerMor
Created January 3, 2012 11:24
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save OmerMor/1554548 to your computer and use it in GitHub Desktop.
Save OmerMor/1554548 to your computer and use it in GitHub Desktop.
Rx & Async
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;
namespace Demo
{
public class RxAsyncTest
{
private readonly EventLoopScheduler m_eventLoopScheduler;
private int m_eventLoopSchedulerThreadId;
public RxAsyncTest()
{
m_eventLoopScheduler = new EventLoopScheduler(start => new Thread(() =>
{
m_eventLoopSchedulerThreadId = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine("EventLoopScheduler: Thread Id = {0}",
m_eventLoopSchedulerThreadId);
start();
}));
}
[Test]
public void Await_in_subscription()
{
Console.WriteLine("Main: Thread Id = {0}", Thread.CurrentThread.ManagedThreadId);
var finishedEvent = new AutoResetEvent(false);
Observable
.Return(Tuple.Create(10, 20), m_eventLoopScheduler)
.Subscribe(async tuple =>
{
Console.WriteLine("Observer before: Thread Id = {0}", Thread.CurrentThread.ManagedThreadId);
var result = await AddAsync(tuple.Item1, tuple.Item2);
Console.WriteLine("Observer after: Thread Id = {0}", Thread.CurrentThread.ManagedThreadId);
finishedEvent.Set();
});
finishedEvent.WaitOne();
}
[Test]
public void Use_SchedulerSynchronizationContext()
{
var schedulerThreadId = -1;
var scheduler = new SyncContextEventLoopScheduler(
start => new Thread(() =>
{
schedulerThreadId = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine("SyncContextEventLoopScheduler: Thread Id = {0}",
schedulerThreadId);
start();
}));
Console.WriteLine("Test: Thread Id = {0}", Thread.CurrentThread.ManagedThreadId);
var finishedEvent = new AutoResetEvent(false);
var threadIds = new List<int>();
Observable
.Return(Tuple.Create(10, 20), scheduler)
.Subscribe(async tuple =>
{
Console.WriteLine("Observer before: Thread Id = {0}", Thread.CurrentThread.ManagedThreadId);
var result = await AddWithRxAsync(tuple.Item1, tuple.Item2, scheduler);
threadIds.Add(Thread.CurrentThread.ManagedThreadId);
Console.WriteLine("Observer after: Thread Id = {0}", Thread.CurrentThread.ManagedThreadId);
finishedEvent.Set();
});
finishedEvent.WaitOne();
Assert.AreEqual(schedulerThreadId, threadIds.Distinct().Single());
}
public Task<int> AddAsync(int x, int y)
{
var task = Task.Factory.StartNew(() =>
{
Console.WriteLine("Add: Thread Id = {0}", Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(TimeSpan.FromSeconds(1));
return x + y;
});
return task;
}
}
public class SchedulerSynchronizationContext : SynchronizationContext
{
private readonly IScheduler m_scheduler;
public SchedulerSynchronizationContext(IScheduler scheduler)
{
m_scheduler = scheduler;
}
public override void Send(SendOrPostCallback callback, object state)
{
throw new NotImplementedException("Too lazy to implemenet synchronous invocation now...");
}
public override void Post(SendOrPostCallback callback, object state)
{
m_scheduler.Schedule(() => callback.Invoke(state));
}
}
public class SyncContextEventLoopScheduler : IScheduler
{
private readonly EventLoopScheduler m_scheduler;
public SyncContextEventLoopScheduler(Func<ThreadStart, Thread> factory)
{
m_scheduler = new EventLoopScheduler(factory);
setSyncContext();
}
public SyncContextEventLoopScheduler()
{
m_scheduler = new EventLoopScheduler();
setSyncContext();
}
#region IScheduler Members
public IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
{
return m_scheduler.Schedule(state, action);
}
public IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
{
return m_scheduler.Schedule(state, dueTime, action);
}
public IDisposable Schedule<TState>(TState state, DateTimeOffset dueTime,
Func<IScheduler, TState, IDisposable> action)
{
return m_scheduler.Schedule(state, dueTime, action);
}
public DateTimeOffset Now
{
get { return m_scheduler.Now; }
}
#endregion
private void setSyncContext()
{
m_scheduler.Schedule(() =>
{
var syncContext = new SchedulerSynchronizationContext(this);
SynchronizationContext.SetSynchronizationContext(syncContext);
});
}
public void Dispose()
{
m_scheduler.Dispose();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment