Skip to content

Instantly share code, notes, and snippets.

@phatboyg
Created February 10, 2019 21:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save phatboyg/a062e5dda9fc39c7a91c7522dce4cec6 to your computer and use it in GitHub Desktop.
Save phatboyg/a062e5dda9fc39c7a91c7522dce4cec6 to your computer and use it in GitHub Desktop.
Sample showing Azure Service Bus with Lock Timeout
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.2</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="MassTransit" Version="5.3.1" />
<PackageReference Include="MassTransit.Azure.ServiceBus.Core" Version="5.3.1" />
<PackageReference Include="MassTransit.Log4Net" Version="5.3.1" />
</ItemGroup>
</Project>
using System;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using GreenPipes;
using log4net;
using log4net.Config;
using MassTransit;
using MassTransit.Azure.ServiceBus.Core;
using MassTransit.Internals.Extensions;
using MassTransit.Log4NetIntegration;
using MassTransit.Log4NetIntegration.Logging;
namespace ConsoleApp2
{
class Program
{
internal static Guid RunId = NewId.NextGuid();
static void Main(string[] args)
{
var bus = Bus.Factory.CreateUsingAzureServiceBus(cfg =>
{
string connectionString =
"Endpoint=sb://.servicebus.windows.net/;SharedAccessKeyName=;SharedAccessKey=";
var host = cfg.Host(connectionString, h => { h.OperationTimeout = TimeSpan.FromSeconds(60); });
cfg.UseLog4Net();
cfg.ReceiveEndpoint(host, "SlowConsumerService", e =>
{
e.LockDuration = TimeSpan.FromSeconds(60);
e.MaxAutoRenewDuration = TimeSpan.FromMinutes(30);
e.PrefetchCount = 100;
e.Consumer<SlowConsumer>();
});
});
bus.Start();
for (int i = 0; i < 25; i++)
{
bus.Publish<SlowMessage>(new {Delay = TimeSpan.FromMinutes(i), RunId});
}
Console.ReadLine();
bus.Stop();
}
static void ConfigureLogger()
{
const string logConfig = @"<?xml version=""1.0"" encoding=""utf-8"" ?>
<log4net>
<root>
<level value=""INFO"" />
<appender-ref ref=""console"" />
</root>
<appender name=""console"" type=""log4net.Appender.ColoredConsoleAppender"">
<layout type=""log4net.Layout.PatternLayout"">
<conversionPattern value=""%m%n"" />
</layout>
</appender>
</log4net>";
using (var stream = new MemoryStream(Encoding.UTF8.GetBytes(logConfig)))
{
var logRepository = LogManager.GetRepository(System.Reflection.Assembly.GetEntryAssembly());
XmlConfigurator.Configure(logRepository, stream);
}
Log4NetLogger.Use();
}
}
public class SlowConsumer :
IConsumer<SlowMessage>
{
public async Task Consume(ConsumeContext<SlowMessage> context)
{
DateTime started = DateTime.UtcNow;
if (Program.RunId != context.Message.RunId)
return;
BrokeredMessageContext messageContext = context.GetPayload<BrokeredMessageContext>();
int limit = (int) (context.Message.Delay.TotalSeconds / 60);
for (int i = 0; i < limit; i++)
{
await Console.Out.WriteLineAsync(
$"Message: (delay: {context.Message.Delay.ToFriendlyString(),8}, locked-until: {messageContext.LockedUntil,24}, queued: {started,24})");
await Task.Delay(TimeSpan.FromMinutes(1));
}
await Console.Out.WriteLineAsync(
$"M--DONE: (delay: {context.Message.Delay.ToFriendlyString(),8}, locked-until: {messageContext.LockedUntil,24}, queued: {started,24}, duration:{DateTime.UtcNow - started})");
}
}
public interface SlowMessage
{
Guid RunId { get; }
TimeSpan Delay { get; }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment