Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
ServiceBusPlugin Tricks
public class SenderValidationPlugin : ServiceBusPlugin
{
}
public class SenderValidationPlugin : ServiceBusPlugin
{
// Overrides abstract property
public override string Name => this.GetType().FullName;
}
public class SenderValidationPlugin : ServiceBusPlugin
{
public override string Name => this.GetType().FullName;
// Overrides virtual property
public override bool ShouldContinueOnException { get; }
// Overrides virtual methods
public override async Task<Message> BeforeMessageSend(Message message)
{
throw new NotImplementedException();
}
public override async Task<Message> AfterMessageReceive(Message message)
{
throw new NotImplementedException();
}
}
public class SenderValidationPlugin : ServiceBusPlugin
{
public override string Name => this.GetType().FullName;
public override bool ShouldContinueOnException { get; }
public override async Task<Message> BeforeMessageSend(Message message)
{
return await this.ValidateAsync(message)
.ConfigureAwait(false);
}
public override async Task<Message> AfterMessageReceive(Message message)
{
return await this.ValidateAsync(message)
.ConfigureAwait(false);
}
// Implements private method for both BeforeMessageSend and AfterMessageReceive.
private async Task<Message> ValidateAsync(Message message)
{
throw new NotImplementedException();
}
}
var plugin = new SenderValidationPlugin();
var subscription = new SubscriptionClient("/* CONNECTION STRING */", "my-topic", "my-subscription", ReceiveMode.PeekLock);
subscription.RegisterPlugin(plugin);
subscription.RegisterMessageHandler(async (message, token) =>
{
await subscription.CompleteAsync(message.SystemProperties.LockToken)
.ConfigureAwait(false);
},
new MessageHandlerOptions(args => {
Console.WriteLine(args.Exception.Message);
Console.WriteLine(args.ExceptionReceivedContext.EntityPath);
}));
var plugin = new SenderValidationPlugin();
var topic = new TopicClient("/* CONNECTION STRING */", "my-topic");
topic.RegisterPlugin(plugin);
var payload = "{ \"hello\": \"world\" }";
var body = Encoding.UTF8.GetBytes(serialised);
var message = new Message(body);
message.UserProperties.Add("sender", "lorem");
await topic.SendAsync(message)
.ConfigureAwait(false);
private async Task<Message> ValidateAsync(Message message)
{
var cloned = message.Clone()
var body = cloned.Body;
if (!body.Any())
{
throw new InvalidOperationException("Message body not exists");
}
var payload = Encoding.UTF8.GetString(cloned.Body);
var sender = cloned.UserProperties["sender"] as string;
if (string.IsNullOrWhiteSpace(sender))
{
throw new InvalidOperationException("Sender not defined");
}
var senders = new List<string>() { "app1", "app2", "app3" };
if (!senders.Contains(sender))
{
throw new InvalidOperationException("Invalid sender");
}
return message;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment