-
-
Save KirillShlenskiy/7f3e2c4b28b9f940c3da to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Diagnostics; | |
using System.Messaging; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using Xunit; | |
using Xunit.Abstractions; | |
namespace StackOverflow.Q34437298 | |
{ | |
public class MessagePumpTests | |
{ | |
static string queue = @".\Private$\concurrenttest"; | |
private ITestOutputHelper Debug; | |
public MessagePumpTests(ITestOutputHelper debug) | |
{ | |
Debug = debug; | |
if (!MessageQueue.Exists(queue)) { | |
MessageQueue.Create(queue); | |
} | |
using (MessageQueue messageQueue = GetQueue()) { | |
messageQueue.Purge(); | |
} | |
} | |
private MessageQueue GetQueue() | |
{ | |
return new MessageQueue(queue) { | |
Formatter = new XmlMessageFormatter(new Type[] { typeof(Command1) }) | |
}; | |
} | |
[Fact] | |
public async Task Usage() | |
{ | |
using (MessageQueue msMq = GetQueue()) | |
{ | |
MessagePump pump = MessagePump.Run( | |
msMq, | |
async message => | |
{ | |
await Task.Delay(50); | |
Console.WriteLine($"Finished processing message {message.Id}"); | |
}, | |
maxDegreeOfParallelism: 4 | |
); | |
for (int i = 0; i < 100; i++) | |
{ | |
msMq.Send(new Message()); | |
Thread.Sleep(25); | |
} | |
pump.Stop(); | |
await pump.Completion; | |
} | |
} | |
[Fact] | |
public async Task Completion() | |
{ | |
using (MessageQueue msMq = GetQueue()) | |
{ | |
// Diagnostics. | |
Stopwatch sw = Stopwatch.StartNew(); | |
Action<string> print = msg => Debug.WriteLine(((double)sw.ElapsedMilliseconds / 1000).ToString("0.###") + "s: " + msg); | |
CancellationTokenSource cts = new CancellationTokenSource(); | |
int queuedMessages = 0; | |
int processedMessages = 0; | |
MessagePump pump = MessagePump.Run(msMq, async message => | |
{ | |
Command1 command = (Command1)message.Body; | |
print($"Processing command {command.id}."); | |
await Task.Delay(500).ConfigureAwait(false); | |
print($"Finished processing command {command.id}."); | |
Interlocked.Increment(ref processedMessages); | |
}, | |
15, | |
cts.Token); | |
for (int i = 0; i < 100; i++) | |
{ | |
print($"Sending command {i}."); | |
msMq.Send(new Command1 { id = i, name = $"Command {i}" }); | |
queuedMessages++; | |
Thread.Sleep(25); | |
} | |
pump.Stop(); | |
await pump.Completion; | |
Assert.Equal(100, queuedMessages); | |
Assert.Equal(100, processedMessages); | |
Assert.Equal(0, msMq.GetAllMessages().Length); | |
} | |
} | |
[Fact] | |
public async Task Cancellation() | |
{ | |
using (MessageQueue msMq = GetQueue()) | |
{ | |
// Diagnostics. | |
Stopwatch sw = Stopwatch.StartNew(); | |
Action<string> print = msg => Debug.WriteLine(((double)sw.ElapsedMilliseconds / 1000).ToString("0.###") + "s: " + msg); | |
CancellationTokenSource cts = new CancellationTokenSource(); | |
int queuedMessages = 0; | |
int processedMessages = 0; | |
MessagePump pump = MessagePump.Run(msMq, async message => | |
{ | |
Command1 command = (Command1)message.Body; | |
print($"Processing command {command.id}."); | |
await Task.Delay(500).ConfigureAwait(false); | |
print($"Finished processing command {command.id}."); | |
Interlocked.Increment(ref processedMessages); | |
}, | |
15, | |
cts.Token); | |
for (int i = 0; i < 100; i++) | |
{ | |
print($"Sending command {i}."); | |
msMq.Send(new Command1 { id = i, name = $"Command {i}" }); | |
queuedMessages++; | |
if (i == 50) { | |
cts.Cancel(); | |
} | |
Thread.Sleep(25); | |
} | |
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => pump.Completion); | |
Assert.Equal(100, queuedMessages); | |
Assert.True(processedMessages < queuedMessages); | |
Assert.True(msMq.GetAllMessages().Length > 0); | |
} | |
} | |
[Fact] | |
public async Task Exception() | |
{ | |
using (MessageQueue msMq = GetQueue()) | |
{ | |
// Diagnostics. | |
Stopwatch sw = Stopwatch.StartNew(); | |
Action<string> print = msg => Debug.WriteLine(((double)sw.ElapsedMilliseconds / 1000).ToString("0.###") + "s: " + msg); | |
CancellationTokenSource cts = new CancellationTokenSource(); | |
int queuedMessages = 0; | |
int processedMessages = 0; | |
MessagePump pump = MessagePump.Run(msMq, async message => | |
{ | |
Command1 command = (Command1)message.Body; | |
print($"Processing command {command.id}."); | |
if (command.id == 50) { | |
throw new InvalidOperationException("TEST"); | |
} | |
await Task.Delay(500).ConfigureAwait(false); | |
print($"Finished processing command {command.id}."); | |
Interlocked.Increment(ref processedMessages); | |
}, | |
15, | |
cts.Token); | |
for (int i = 0; i < 100; i++) | |
{ | |
print($"Sending command {i}."); | |
msMq.Send(new Command1 { id = i, name = $"Command {i}" }); | |
queuedMessages++; | |
Thread.Sleep(25); | |
} | |
await Assert.ThrowsAsync<InvalidOperationException>(() => pump.Completion); | |
Assert.Equal(100, queuedMessages); | |
Assert.True(processedMessages < queuedMessages); | |
Assert.True(msMq.GetAllMessages().Length > 0); | |
} | |
} | |
[Fact] | |
public async Task Stop() | |
{ | |
using (MessageQueue msMq = GetQueue()) | |
{ | |
// Diagnostics. | |
Stopwatch sw = Stopwatch.StartNew(); | |
Action<string> print = msg => Debug.WriteLine(((double)sw.ElapsedMilliseconds / 1000).ToString("0.###") + "s: " + msg); | |
CancellationTokenSource cts = new CancellationTokenSource(); | |
int queuedMessages = 0; | |
int processedMessages = 0; | |
MessagePump pump = MessagePump.Run(msMq, async message => | |
{ | |
Command1 command = (Command1)message.Body; | |
print($"Processing command {command.id}."); | |
await Task.Delay(500).ConfigureAwait(false); | |
print($"Finished processing command {command.id}."); | |
Interlocked.Increment(ref processedMessages); | |
}, | |
15, | |
cts.Token); | |
for (int i = 0; i < 100; i++) | |
{ | |
print($"Sending command {i}."); | |
msMq.Send(new Command1 { id = i, name = $"Command {i}" }); | |
queuedMessages++; | |
if (i == 50) { | |
pump.Stop(); | |
} | |
Thread.Sleep(25); | |
} | |
await pump.Completion; | |
Assert.Equal(100, queuedMessages); | |
Assert.NotEqual(0, processedMessages); | |
Assert.Equal(100 - processedMessages, msMq.GetAllMessages().Length); | |
} | |
} | |
public sealed class Command1 | |
{ | |
public int id { get; set; } | |
public string name { get; set; } | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment