Skip to content

Instantly share code, notes, and snippets.

@KirillShlenskiy
Last active January 12, 2016 01:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save KirillShlenskiy/7f3e2c4b28b9f940c3da to your computer and use it in GitHub Desktop.
Save KirillShlenskiy/7f3e2c4b28b9f940c3da to your computer and use it in GitHub Desktop.
using System;
using System.Diagnostics;
using System.Messaging;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;
namespace StackOverflow.Q34437298
{
public class MessagePumpTests
{
static string queue = @".\Private$\concurrenttest";
private ITestOutputHelper Debug;
public MessagePumpTests(ITestOutputHelper debug)
{
Debug = debug;
if (!MessageQueue.Exists(queue)) {
MessageQueue.Create(queue);
}
using (MessageQueue messageQueue = GetQueue()) {
messageQueue.Purge();
}
}
private MessageQueue GetQueue()
{
return new MessageQueue(queue) {
Formatter = new XmlMessageFormatter(new Type[] { typeof(Command1) })
};
}
[Fact]
public async Task Usage()
{
using (MessageQueue msMq = GetQueue())
{
MessagePump pump = MessagePump.Run(
msMq,
async message =>
{
await Task.Delay(50);
Console.WriteLine($"Finished processing message {message.Id}");
},
maxDegreeOfParallelism: 4
);
for (int i = 0; i < 100; i++)
{
msMq.Send(new Message());
Thread.Sleep(25);
}
pump.Stop();
await pump.Completion;
}
}
[Fact]
public async Task Completion()
{
using (MessageQueue msMq = GetQueue())
{
// Diagnostics.
Stopwatch sw = Stopwatch.StartNew();
Action<string> print = msg => Debug.WriteLine(((double)sw.ElapsedMilliseconds / 1000).ToString("0.###") + "s: " + msg);
CancellationTokenSource cts = new CancellationTokenSource();
int queuedMessages = 0;
int processedMessages = 0;
MessagePump pump = MessagePump.Run(msMq, async message =>
{
Command1 command = (Command1)message.Body;
print($"Processing command {command.id}.");
await Task.Delay(500).ConfigureAwait(false);
print($"Finished processing command {command.id}.");
Interlocked.Increment(ref processedMessages);
},
15,
cts.Token);
for (int i = 0; i < 100; i++)
{
print($"Sending command {i}.");
msMq.Send(new Command1 { id = i, name = $"Command {i}" });
queuedMessages++;
Thread.Sleep(25);
}
pump.Stop();
await pump.Completion;
Assert.Equal(100, queuedMessages);
Assert.Equal(100, processedMessages);
Assert.Equal(0, msMq.GetAllMessages().Length);
}
}
[Fact]
public async Task Cancellation()
{
using (MessageQueue msMq = GetQueue())
{
// Diagnostics.
Stopwatch sw = Stopwatch.StartNew();
Action<string> print = msg => Debug.WriteLine(((double)sw.ElapsedMilliseconds / 1000).ToString("0.###") + "s: " + msg);
CancellationTokenSource cts = new CancellationTokenSource();
int queuedMessages = 0;
int processedMessages = 0;
MessagePump pump = MessagePump.Run(msMq, async message =>
{
Command1 command = (Command1)message.Body;
print($"Processing command {command.id}.");
await Task.Delay(500).ConfigureAwait(false);
print($"Finished processing command {command.id}.");
Interlocked.Increment(ref processedMessages);
},
15,
cts.Token);
for (int i = 0; i < 100; i++)
{
print($"Sending command {i}.");
msMq.Send(new Command1 { id = i, name = $"Command {i}" });
queuedMessages++;
if (i == 50) {
cts.Cancel();
}
Thread.Sleep(25);
}
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => pump.Completion);
Assert.Equal(100, queuedMessages);
Assert.True(processedMessages < queuedMessages);
Assert.True(msMq.GetAllMessages().Length > 0);
}
}
[Fact]
public async Task Exception()
{
using (MessageQueue msMq = GetQueue())
{
// Diagnostics.
Stopwatch sw = Stopwatch.StartNew();
Action<string> print = msg => Debug.WriteLine(((double)sw.ElapsedMilliseconds / 1000).ToString("0.###") + "s: " + msg);
CancellationTokenSource cts = new CancellationTokenSource();
int queuedMessages = 0;
int processedMessages = 0;
MessagePump pump = MessagePump.Run(msMq, async message =>
{
Command1 command = (Command1)message.Body;
print($"Processing command {command.id}.");
if (command.id == 50) {
throw new InvalidOperationException("TEST");
}
await Task.Delay(500).ConfigureAwait(false);
print($"Finished processing command {command.id}.");
Interlocked.Increment(ref processedMessages);
},
15,
cts.Token);
for (int i = 0; i < 100; i++)
{
print($"Sending command {i}.");
msMq.Send(new Command1 { id = i, name = $"Command {i}" });
queuedMessages++;
Thread.Sleep(25);
}
await Assert.ThrowsAsync<InvalidOperationException>(() => pump.Completion);
Assert.Equal(100, queuedMessages);
Assert.True(processedMessages < queuedMessages);
Assert.True(msMq.GetAllMessages().Length > 0);
}
}
[Fact]
public async Task Stop()
{
using (MessageQueue msMq = GetQueue())
{
// Diagnostics.
Stopwatch sw = Stopwatch.StartNew();
Action<string> print = msg => Debug.WriteLine(((double)sw.ElapsedMilliseconds / 1000).ToString("0.###") + "s: " + msg);
CancellationTokenSource cts = new CancellationTokenSource();
int queuedMessages = 0;
int processedMessages = 0;
MessagePump pump = MessagePump.Run(msMq, async message =>
{
Command1 command = (Command1)message.Body;
print($"Processing command {command.id}.");
await Task.Delay(500).ConfigureAwait(false);
print($"Finished processing command {command.id}.");
Interlocked.Increment(ref processedMessages);
},
15,
cts.Token);
for (int i = 0; i < 100; i++)
{
print($"Sending command {i}.");
msMq.Send(new Command1 { id = i, name = $"Command {i}" });
queuedMessages++;
if (i == 50) {
pump.Stop();
}
Thread.Sleep(25);
}
await pump.Completion;
Assert.Equal(100, queuedMessages);
Assert.NotEqual(0, processedMessages);
Assert.Equal(100 - processedMessages, msMq.GetAllMessages().Length);
}
}
public sealed class Command1
{
public int id { get; set; }
public string name { get; set; }
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment