Created
May 12, 2015 13:46
-
-
Save micdenny/292ac2ca63b8531ce87c to your computer and use it in GitHub Desktop.
EasyNetQ RPC Deadlock
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using EasyNetQ; | |
namespace EasynetQ.SubRpc.Deadlock | |
{ | |
internal class Program | |
{ | |
private static void Main(string[] args) | |
{ | |
var stop = false; | |
var waitHandle = new ManualResetEvent(false); | |
var pub = new Thread(() => | |
{ | |
var bus = RabbitHutch.CreateBus("host=localhost"); | |
while (!stop) | |
{ | |
bus.Publish(new MyMessage()); | |
Thread.Sleep(1000); | |
} | |
bus.Dispose(); | |
}); | |
var sub = new Thread(() => | |
{ | |
var bus = RabbitHutch.CreateBus("host=localhost"); | |
bus.Subscribe<MyMessage>("test", message => | |
{ | |
// this will deadlock on the EasyNetQ's dispatcher | |
bus.Request<MyRequest, MyResponse>(new MyRequest()); | |
Console.WriteLine("Message consumed!"); | |
}); | |
// also this will deadlock even if is an async method | |
//bus.SubscribeAsync<MyMessage>("test2", message => SubHandleAsync(bus)); | |
waitHandle.WaitOne(); | |
bus.Dispose(); | |
}); | |
var rpc = new Thread(() => | |
{ | |
var bus = RabbitHutch.CreateBus("host=localhost"); | |
bus.Respond<MyRequest, MyResponse>(request => new MyResponse()); | |
waitHandle.WaitOne(); | |
bus.Dispose(); | |
}); | |
rpc.Start(); | |
sub.Start(); | |
pub.Start(); | |
Console.WriteLine("Press enter to exit the application..."); | |
Console.ReadLine(); | |
stop = true; | |
waitHandle.Set(); | |
sub.Join(); | |
pub.Join(); | |
rpc.Join(); | |
} | |
private async static Task SubHandleAsync(IBus bus) | |
{ | |
// this will deadlock on the EasyNetQ's dispatcher, even if the method is async, just because you await later! | |
bus.Request<MyRequest, MyResponse>(new MyRequest()); | |
// if you change into the follow the deadlock disappear: | |
// await bus.RequestAsync<MyRequest, MyResponse>(new MyRequest()); | |
await StorageAccess(); | |
// .... some other stuff... | |
await StorageAccess(); | |
await StorageAccess(); | |
Console.WriteLine("Message consumed!"); | |
} | |
private static Task StorageAccess() | |
{ | |
return Task.Delay(1000); | |
} | |
} | |
public class MyMessage | |
{ | |
} | |
public class MyRequest | |
{ | |
} | |
public class MyResponse | |
{ | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment