Skip to content

Instantly share code, notes, and snippets.

@micdenny
Created May 12, 2015 13:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save micdenny/292ac2ca63b8531ce87c to your computer and use it in GitHub Desktop.
Save micdenny/292ac2ca63b8531ce87c to your computer and use it in GitHub Desktop.
EasyNetQ RPC Deadlock
using System;
using System.Threading;
using System.Threading.Tasks;
using EasyNetQ;
namespace EasynetQ.SubRpc.Deadlock
{
internal class Program
{
private static void Main(string[] args)
{
var stop = false;
var waitHandle = new ManualResetEvent(false);
var pub = new Thread(() =>
{
var bus = RabbitHutch.CreateBus("host=localhost");
while (!stop)
{
bus.Publish(new MyMessage());
Thread.Sleep(1000);
}
bus.Dispose();
});
var sub = new Thread(() =>
{
var bus = RabbitHutch.CreateBus("host=localhost");
bus.Subscribe<MyMessage>("test", message =>
{
// this will deadlock on the EasyNetQ's dispatcher
bus.Request<MyRequest, MyResponse>(new MyRequest());
Console.WriteLine("Message consumed!");
});
// also this will deadlock even if is an async method
//bus.SubscribeAsync<MyMessage>("test2", message => SubHandleAsync(bus));
waitHandle.WaitOne();
bus.Dispose();
});
var rpc = new Thread(() =>
{
var bus = RabbitHutch.CreateBus("host=localhost");
bus.Respond<MyRequest, MyResponse>(request => new MyResponse());
waitHandle.WaitOne();
bus.Dispose();
});
rpc.Start();
sub.Start();
pub.Start();
Console.WriteLine("Press enter to exit the application...");
Console.ReadLine();
stop = true;
waitHandle.Set();
sub.Join();
pub.Join();
rpc.Join();
}
private async static Task SubHandleAsync(IBus bus)
{
// this will deadlock on the EasyNetQ's dispatcher, even if the method is async, just because you await later!
bus.Request<MyRequest, MyResponse>(new MyRequest());
// if you change into the follow the deadlock disappear:
// await bus.RequestAsync<MyRequest, MyResponse>(new MyRequest());
await StorageAccess();
// .... some other stuff...
await StorageAccess();
await StorageAccess();
Console.WriteLine("Message consumed!");
}
private static Task StorageAccess()
{
return Task.Delay(1000);
}
}
public class MyMessage
{
}
public class MyRequest
{
}
public class MyResponse
{
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment