Skip to content

Instantly share code, notes, and snippets.

@rogeralsing
Last active March 4, 2018 11:44
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save rogeralsing/d05e4b7cc64a3cfff3b8 to your computer and use it in GitHub Desktop.
Save rogeralsing/d05e4b7cc64a3cfff3b8 to your computer and use it in GitHub Desktop.
Durable Messagebus integration with Akka and Azure Servicebus
using System;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Routing;
using Microsoft.ServiceBus.Messaging;
namespace ConsoleApplication13
{
public class MyBusinessActor : ReceiveActor
{
public MyBusinessActor()
{
//here is where you should receive your business messages
//apply domain logic, store to DB etc.
Receive<string>(s =>
{
Console.WriteLine("{0} Processed {1}", Self.Path, s);
//reply to the sender that everything went well
//in this example, we pass back the message we received in a built in `Success` message
//you can send back a Status.Failure incase of exceptions if you desire too
//or just let it fail by timeout as we do in this example
Sender.Tell(new Status.Success(s));
});
}
}
internal class Program
{
private static void Main(string[] args)
{
CreateMessages();
using (var system = ActorSystem.Create("MySystem"))
{
//spin up our workers
//this should be done via config, but here we use a hardcoded setup for simplicity
//Do note that the workers can be spread across multiple servers using Akka.Remote or Akka.Cluster
var businessActor =
system.ActorOf(Props.Create<MyBusinessActor>().WithRouter(new ConsistentHashingPool(10)));
//start the message processor
ProcessMessages(businessActor);
//wait for user to end the application
Console.ReadLine();
}
}
private static async void ProcessMessages(IActorRef myBusinessActor)
{
//set up a azure SB subscription client
//(or use a Queue client, or whatever client your specific MQ supports)
var subscriptionClient = SubscriptionClient.Create("service1", "service1");
while (true)
{
//fetch a batch of messages
var batch = await subscriptionClient.ReceiveBatchAsync(100, TimeSpan.FromSeconds(1));
//transform the messages into a list of tasks
//the tasks will either be successful and ack the MQ message
//or they will timeout and do nothing
var tasks = (
from res in batch
let importantMessage = res.GetBody<string>()
let ask = myBusinessActor
.Ask<Status.Success>(new ConsistentHashableEnvelope(importantMessage,
importantMessage.GetHashCode()),TimeSpan.FromSeconds(1))
let done = ask.ContinueWith(t =>
{
if (t.IsCanceled)
{
Console.WriteLine("Failed to ack {0}", importantMessage);
}
else
{
res.Complete();
Console.WriteLine("Completed {0}", importantMessage);
}
},TaskContinuationOptions.None)
select done).ToList();
//wait for all messages to either succeed or timeout
await Task.WhenAll(tasks);
Console.WriteLine("All messages acked");
//continue with the next batch
}
}
//dummy method only used to prefill the msgqueue with data for this example
private static void CreateMessages()
{
var client = TopicClient.Create("service1");
for (var i = 0; i < 100; i++)
{
client.SendAsync(new BrokeredMessage("hello" + i)
{
MessageId = Guid.NewGuid().ToString()
});
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment