Skip to content

Instantly share code, notes, and snippets.

@jennings
Last active April 8, 2021 00:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jennings/1f44ddf7de9610221672ec6d57c1ea70 to your computer and use it in GitHub Desktop.
Save jennings/1f44ddf7de9610221672ec6d57c1ea70 to your computer and use it in GitHub Desktop.
4/7/2021 17:01:54 [MAIN] Publishing two messages
4/7/2021 17:01:55 [CONSUMER] Received message, waiting 30 seconds
4/7/2021 17:01:55 [CONSUMER] Received message, waiting 30 seconds
4/7/2021 17:01:57 [MAIN] Stopping the bus
4/7/2021 17:02:07 [MAIN] Publishing a third message
4/7/2021 17:02:25 [CONSUMER] Wait complete, consuming the message
4/7/2021 17:02:25 [CONSUMER] Wait complete, consuming the message
4/7/2021 17:02:25 [MAIN] Bus stopped
4/7/2021 17:02:25 [MAIN] Stopping the bus again, just for kicks
4/7/2021 17:02:25 [MAIN] Bus stopped
using System;
using System.Threading.Tasks;
using MassTransit;
namespace mttest
{
class Program
{
static async Task Main(string[] args)
{
var bus = Bus.Factory.CreateUsingRabbitMq(config =>
{
config.ReceiveEndpoint(endpoint =>
{
endpoint.Consumer<MyConsumer>(c =>
{
c.UseConcurrentMessageLimit(10);
});
});
});
var publishBus = Bus.Factory.CreateUsingRabbitMq();
await bus.StartAsync();
Console.WriteLine("{0} [MAIN] Publishing two messages", DateTime.Now);
await publishBus.Publish<MyMessage>(new { });
await publishBus.Publish<MyMessage>(new { });
await Task.Delay(2000);
// Publish another message while the main bus is stopping
Task.Run(async () =>
{
await Task.Delay(10000);
Console.WriteLine("{0} [MAIN] Publishing a third message", DateTime.Now);
await publishBus.Publish<MyMessage>(new { });
});
Console.WriteLine("{0} [MAIN] Stopping the bus", DateTime.Now);
await bus.StopAsync();
Console.WriteLine("{0} [MAIN] Bus stopped", DateTime.Now);
Console.WriteLine("{0} [MAIN] Stopping the bus again, just for kicks", DateTime.Now);
await bus.StopAsync();
Console.WriteLine("{0} [MAIN] Bus stopped", DateTime.Now);
await publishBus.StopAsync();
}
}
public interface MyMessage
{
}
public class MyConsumer : IConsumer<MyMessage>
{
public async Task Consume(ConsumeContext<MyMessage> context)
{
const int seconds = 30;
Console.WriteLine("{0} [CONSUMER] Received message, waiting {1} seconds", DateTime.Now, seconds);
try
{
await Task.Delay(TimeSpan.FromSeconds(seconds), context.CancellationToken);
Console.WriteLine("{0} [CONSUMER] Wait complete, consuming the message", DateTime.Now);
}
catch (Exception ex)
{
Console.WriteLine("{0} [CONSUMER] Received exception {1}", DateTime.Now, ex.GetType());
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment