Skip to content

Instantly share code, notes, and snippets.

@liammclennan
Created May 27, 2015 10:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save liammclennan/542cb1264305f525c70b to your computer and use it in GitHub Desktop.
Save liammclennan/542cb1264305f525c70b to your computer and use it in GitHub Desktop.
Partitioning events with Akka.net (linqpad - reference akka.net and Rx-main)
void Main()
{
var system = ActorSystem.Create("MySystem");
var greeters = system.ActorOf(Props.Create(() => new DeviceSplitter()));
var r = new System.Random();
var measures = Enumerable.Repeat(new [] {"a","b","c","d"}, 250000)
.SelectMany(c => c)
.Select (id => new Measure(id,r.Next()));
foreach (var m in measures) greeters.Tell(m);
Console.ReadLine();
}
public class Measure
{
public Measure(string deviceId, double value)
{
DeviceId = deviceId;
Value = value;
}
public string DeviceId { get;private set; }
public double Value { get;private set; }
}
public class Dump {}
public class DeviceSplitter : ReceiveActor
{
public DeviceSplitter()
{
Dictionary<string,IActorRef> knownDevices = new Dictionary<string,IActorRef>();
Receive<Measure>(m => {
if (!knownDevices.ContainsKey(m.DeviceId)) knownDevices.Add(m.DeviceId, Context.ActorOf(Props.Create(() => new DeviceActor())));
knownDevices[m.DeviceId].Tell(m);
});
}
}
public class DeviceActor : ReceiveActor
{
public DeviceActor() {
var measures = new List<Measure>();
var subject = new ReplaySubject<Measure>();
Receive<Measure>(m => {
subject.OnNext(m);
});
subject.Buffer(5000).Subscribe(m =>
Console.WriteLine("Average over last {0} from {1} = {2}",m.Count(),m.First().DeviceId,m.Average (x => x.Value)));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment