Skip to content

Instantly share code, notes, and snippets.

@danielmarbach
Created February 13, 2017 22:27
Show Gist options
  • Save danielmarbach/80b2c60cced407e1d3500db8c85de543 to your computer and use it in GitHub Desktop.
Save danielmarbach/80b2c60cced407e1d3500db8c85de543 to your computer and use it in GitHub Desktop.
RabbitMQ tryouts
class Program
{
private static int numberOfMessages;
static void Main(string[] args)
{
File.Delete("./sync.txt");
File.Delete("./async.txt");
numberOfMessages = 1000;
Console.WriteLine(" Sync Consume.");
SendMessages();
SyncConsumer(UsageMode.Sleep);
SendMessages();
SyncConsumer(UsageMode.FileStreamInside);
SendMessages();
SyncConsumer(UsageMode.FileStreamOutside);
Console.WriteLine("Async consume.");
SendMessages();
AsyncConsumer(UsageMode.Sleep);
SendMessages();
AsyncConsumer(UsageMode.FileStreamInside);
SendMessages();
AsyncConsumer(UsageMode.FileStreamOutside);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
enum UsageMode
{
Sleep,
FileStreamOutside,
FileStreamInside,
}
private static void AsyncConsumer(UsageMode mode)
{
var countDown = new CountdownEvent(numberOfMessages);
var factory = new ConnectionFactory() { HostName = "localhost", DispatchConsumersAsync = true };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
FileStream file = null;
if(mode == UsageMode.FileStreamOutside)
file = new FileStream("./async.txt", FileMode.Append, FileAccess.Write, FileShare.Write, bufferSize: 4096, useAsync: true);
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received = async (model, args) =>
{
if (countDown.IsSet)
{
return;
}
if (mode == UsageMode.Sleep)
await Task.Delay(15).ConfigureAwait(false);
if (mode == UsageMode.FileStreamInside)
file = new FileStream("./async.txt", FileMode.Append, FileAccess.Write, FileShare.Write, bufferSize: 4096, useAsync: true);
if (mode == UsageMode.FileStreamInside || mode == UsageMode.FileStreamOutside)
await file.WriteAsync(args.Body, 0, args.Body.Length).ConfigureAwait(false);
if (mode == UsageMode.FileStreamInside)
file.Dispose();
if (!countDown.IsSet)
{
countDown.Signal();
}
};
var stopWatch = Stopwatch.StartNew();
channel.BasicConsume("hello", true, consumer);
countDown.Wait();
file?.Close();
stopWatch.Stop();
Console.WriteLine("Done {0}: {1}", mode, stopWatch.Elapsed);
}
}
private static void SyncConsumer(UsageMode mode)
{
var countDown = new CountdownEvent(numberOfMessages);
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
FileStream file = null;
if (mode == UsageMode.FileStreamOutside)
file = new FileStream("./sync.txt", FileMode.Append, FileAccess.Write, FileShare.Write, bufferSize: 4096, useAsync: false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
if (countDown.IsSet)
{
return;
}
if (mode == UsageMode.Sleep)
Thread.Sleep(15);
if (mode == UsageMode.FileStreamInside)
file = new FileStream("./sync.txt", FileMode.Append, FileAccess.Write, FileShare.Write, bufferSize: 4096, useAsync: false);
if (mode == UsageMode.FileStreamInside || mode == UsageMode.FileStreamOutside)
file.Write(ea.Body, 0, ea.Body.Length);
if (mode == UsageMode.FileStreamInside)
file.Dispose();
if (!countDown.IsSet)
{
countDown.Signal();
}
};
var stopWatch = Stopwatch.StartNew();
channel.BasicConsume("hello", true, consumer);
countDown.Wait();
file?.Close();
stopWatch.Stop();
Console.WriteLine("Done {0}: {1}", mode, stopWatch.Elapsed);
}
}
private static void SendMessages()
{
var factory = new ConnectionFactory() {HostName = "localhost"};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
for (int i = 0; i < numberOfMessages; i++)
{
var body = Encoding.UTF8.GetBytes(i.ToString());
channel.BasicPublish(exchange: "",
routingKey: "hello",
basicProperties: null,
body: body);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment