Skip to content

Instantly share code, notes, and snippets.

@loosechainsaw
Created February 9, 2015 13:30
Show Gist options
  • Save loosechainsaw/d61b9bb41e3ecdf4eb12 to your computer and use it in GitHub Desktop.
Save loosechainsaw/d61b9bb41e3ecdf4eb12 to your computer and use it in GitHub Desktop.
Example of Scatter Gather in Akka.NET
using System;
using Akka.Actor;
using Akka;
using Akka.Remote;
using Akka.Routing;
using System.Linq;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Actors
{
public class Hotel
{
public Hotel (string country, string city, string name)
{
Country = country;
City = city;
Name = name;
}
public string Country{ get; private set; }
public string City{ get; private set; }
public string Name{ get; private set; }
public override string ToString ()
{
return string.Format ("[Hotel: Country={0}, City={1}, Name={2}]", Country, City, Name);
}
}
public class Query
{
public Query (string country, string city)
{
Country = country;
City = city;
}
public string Country{ get; private set; }
public string City{ get; private set; }
}
public class Results
{
public Results (string country, string city, IEnumerable<Hotel> hotels)
{
Country = country;
City = city;
Hotels = hotels;
}
public string Country{ get; private set; }
public string City{ get; private set; }
public IEnumerable<Hotel> Hotels{ get; private set; }
}
public class AggregatorActor : ReceiveActor
{
private List<Hotel> Results;
private int seen = 0;
public AggregatorActor (ActorRef original, int waitFor)
{
Results = new List<Hotel> ();
Receive<Results> (x => {
if (++seen == waitFor) {
original.Tell (x);
Self.Tell (PoisonPill.Instance);
}
});
}
}
public class CategoryActor : ReceiveActor
{
public CategoryActor (string country, IEnumerable<Hotel> hotels, bool top = false)
{
var listOfActors = new List<ActorRef> ();
if (top) {
var countries = hotels.Select (x => x.Country).Distinct ().ToList ();
countries.ForEach (c => {
var hotelsInCountry = hotels.Where (x => x.Country == c).ToList ();
listOfActors.Add (Context.ActorOf (Props.Create<CategoryActor> (c, hotelsInCountry, false), c));
});
} else {
var cities = hotels.Where (x => x.Country == country).Select (x => x.City).Distinct ().ToList ();
cities.ForEach (city => {
var hotelsInCity = hotels.Where (x => x.Country == country && x.City == city).ToList ();
listOfActors.Add (Context.ActorOf (Props.Create<TopicActor> (country, city, hotelsInCity), city));
});
}
Receive<Query> (q => {
var sender = Sender;
var aggregator = Context.ActorOf (Props.Create<AggregatorActor> (sender, listOfActors.Count), "Aggregator");
listOfActors.ForEach (x => {
x.Tell (q, aggregator);
});
});
}
}
public class TopicActor : ReceiveActor
{
public TopicActor (string country, string city, IEnumerable<Hotel> hotels)
{
this.country = country;
this.city = city;
this.hotels = hotels;
Receive<Query> (q => {
var sender = Sender;
sender.Tell (new Results (this.country, this.city, this.hotels.Where (x => x.Country == country && x.City == city).ToList ()));
});
}
private string country;
private string city;
private IEnumerable<Hotel> hotels;
}
class MainClass
{
public static List<Hotel> GetHotels ()
{
var hotels = new List<Hotel> ();
hotels.Add (new Hotel ("America", "NYC", "Hilton"));
hotels.Add (new Hotel ("America", "NYC", "Plaza"));
hotels.Add (new Hotel ("America", "NYC", "Pierre"));
hotels.Add (new Hotel ("America", "Vegas", "Caesars"));
hotels.Add (new Hotel ("America", "Vegas", "Mandalay"));
hotels.Add (new Hotel ("America", "Vegas", "Belagio"));
hotels.Add (new Hotel ("Australia", "Perth", "Burswood"));
hotels.Add (new Hotel ("Australia", "Perth", "Hyatt"));
hotels.Add (new Hotel ("Australia", "Perth", "Hilton"));
hotels.Add (new Hotel ("Australia", "Perth", "Duxton"));
hotels.Add (new Hotel ("Australia", "Perth", "Richardson"));
return hotels;
}
public static void Main (string[] args)
{
using (var system = ActorSystem.Create ("System")) {
var top = system.ActorOf (Props.Create<CategoryActor> (String.Empty, GetHotels (), true), "TopLevelCategory");
Thread.Sleep (2300);
var f = top.Ask<Results> (new Query ("America", "Vegas"));
f.ContinueWith (x => {
x.Result.Hotels.ToList ().ForEach (y => Console.WriteLine (y));
});
Console.ReadKey ();
system.Shutdown ();
}
}
}
}
@rogeralsing
Copy link

and in AggregatorActor, I take it that Results should get any incoming result appended to it?
And once it reaches waitFor, it returns all of the results, not just the last result (?)

@loosechainsaw
Copy link
Author

Thanks rodger :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment