Skip to content

Instantly share code, notes, and snippets.

@musukvl
Created February 15, 2019 18:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save musukvl/d703d8c4070ae2b0821cd5eb9a100587 to your computer and use it in GitHub Desktop.
Save musukvl/d703d8c4070ae2b0821cd5eb9a100587 to your computer and use it in GitHub Desktop.
Mass transit concurrent sample
using System;
using System.Threading;
using GreenPipes;
using MassTransit;
namespace TestMassTransit
{
public class VideoFileMessage
{
public int Num { get; set; }
public int TimeToConvert { get; set; }
}
class Program
{
static void Main(string[] args)
{
int firstServerFilesCount = 0;
var firstServer = Bus.Factory.CreateUsingRabbitMq(rabbit =>
{
var host = rabbit.Host(new Uri("rabbitmq://localhost"), h =>
{
h.Username("guest");
h.Password("guest");
});
rabbit.UseConcurrencyLimit(3);
rabbit.ReceiveEndpoint(host, "filesToConvert", endPoint =>
{
endPoint.PrefetchCount = 3;
endPoint.Handler<VideoFileMessage>(context =>
{
var msg = context.Message;
firstServerFilesCount++;
Thread.Sleep(msg.TimeToConvert);
var task = Console.Out.WriteLineAsync($"#Server 1. {Environment.NewLine}File {msg.Num} processed {msg.TimeToConvert} ms. Threads: {firstServerFilesCount} of 5. ThreadId - {Thread.CurrentThread.ManagedThreadId}");
firstServerFilesCount--;
return task;
});
});
});
int secondServerFilesCount = 0;
var secondServer = Bus.Factory.CreateUsingRabbitMq(rabbit =>
{
var host = rabbit.Host(new Uri("rabbitmq://localhost"), h =>
{
h.Username("guest");
h.Password("guest");
});
rabbit.UseConcurrencyLimit(5);
rabbit.ReceiveEndpoint(host, "filesToConvert", endpoint =>
{
endpoint.PrefetchCount = 5;
endpoint.Handler<VideoFileMessage>(context =>
{
var msg = context.Message;
secondServerFilesCount++;
Thread.Sleep(msg.TimeToConvert);
var task = Console.Out.WriteLineAsync($"#Server 2. {Environment.NewLine}File {msg.Num} processed {msg.TimeToConvert} ms. Threads: {secondServerFilesCount} of 5. ThreadId - {Thread.CurrentThread.ManagedThreadId}");
secondServerFilesCount--;
return task;
});
});
});
var publishServer = Bus.Factory.CreateUsingRabbitMq(rabbit =>
{
var host = rabbit.Host(new Uri("rabbitmq://localhost"), h =>
{
h.Username("guest");
h.Password("guest");
});
});
firstServer.Start();
secondServer.Start();
publishServer.Start();
Random rnd = new Random();
for (int i = 1; i <= 100; i++)
{
publishServer.Publish(new VideoFileMessage() { Num = i, TimeToConvert = rnd.Next(100, 5000) });
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment