Skip to content

Instantly share code, notes, and snippets.

@SzymonPobiega
Last active March 28, 2018 11:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save SzymonPobiega/d461c8b1e4257f754e736081c81ad885 to your computer and use it in GitHub Desktop.
Save SzymonPobiega/d461c8b1e4257f754e736081c81ad885 to your computer and use it in GitHub Desktop.
using System;
using System.Threading;
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.Satellites;
using NServiceBus.Transports;
using NServiceBus.Transports.SQLServer;
using NServiceBus.Unicast;
using NServiceBus.Unicast.Transport;
namespace TaskStarvationTest
{
class Program
{
static void Main(string[] args)
{
var config = new BusConfiguration();
config.EndpointName("TaskStarvationTest");
var transport = config.UseTransport<SqlServerTransport>();
transport.ConnectionString(@"initial catalog=nservicebus; data source=.\sqlexpress; integrated security=true");
config.UsePersistence<InMemoryPersistence>();
var startableBus = Bus.Create(config);
var endpoint = startableBus.Start();
var unicastBus = (UnicastBus) endpoint;
var satellite = unicastBus.Builder.Build<Satellite>();
var notifications = unicastBus.Builder.Build<TransportNotifications>();
//Delays start of the transport very 3rd time to trigger calling stop before the start sequence completes
notifications.ReceiveTaskStarted.Subscribe(new Observer());
while (true)
{
Console.WriteLine("Processing a message");
endpoint.Send(Address.Parse("StartStopSatellite"), new MyMessage());
satellite.Run();
Thread.Sleep(100);
}
}
}
class Observer : IObserver<ReceiveTaskStarted>
{
int counter;
public void OnNext(ReceiveTaskStarted value)
{
var v = Interlocked.Increment(ref counter);
if (v % 3 == 0)
{
Thread.Sleep(1000);
}
}
public void OnError(Exception error)
{
}
public void OnCompleted()
{
}
}
class MyMessage : IMessage
{
}
class Satellite : IAdvancedSatellite
{
DequeuerWrapper wrapper;
ManualResetEventSlim manualResetEvent = new ManualResetEventSlim(false);
public void Run()
{
try
{
manualResetEvent.Reset();
wrapper.StartInternal();
}
finally
{
manualResetEvent.Wait(CancellationToken.None);
}
}
public bool Handle(TransportMessage message)
{
Console.WriteLine("Got message");
Task.Run(() => StopInternal());
return true;
}
void StopInternal()
{
wrapper.StopInternal();
manualResetEvent.Set();
}
public void Start()
{
}
public void Stop()
{
}
public Address InputAddress => Address.Parse("StartStopSatellite");
public bool Disabled => false;
public Action<TransportReceiver> GetReceiverCustomization()
{
return r =>
{
wrapper = new DequeuerWrapper(r.Receiver);
r.Receiver = wrapper;
};
}
class DequeuerWrapper : IDequeueMessages
{
readonly IDequeueMessages realDequeuer;
int maximumConcurrencyLevel;
int disposeSignaled;
public DequeuerWrapper(IDequeueMessages realDequeuer)
{
this.realDequeuer = realDequeuer;
}
public void Init(Address address, TransactionSettings transactionSettings, Func<TransportMessage, bool> tryProcessMessage, Action<TransportMessage, Exception> endProcessMessage)
{
realDequeuer.Init(address, transactionSettings, tryProcessMessage, endProcessMessage);
}
public void StartInternal()
{
Interlocked.Exchange(ref disposeSignaled, 0);
realDequeuer.Start(maximumConcurrencyLevel);
}
public void Start(int maximumConcurrencyLevel)
{
this.maximumConcurrencyLevel = maximumConcurrencyLevel;
}
public void Stop()
{
}
public void StopInternal()
{
if (Interlocked.Exchange(ref disposeSignaled, 1) != 0)
{
return;
}
try
{
realDequeuer.Stop();
}
catch (Exception)
{
// Making build go green.
var r = 1 + 1;
Interlocked.Increment(ref r);
// We are shutting down, race condition can result in an exception in the real dequeuer.
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment