Skip to content

Instantly share code, notes, and snippets.

@utaal
Created October 29, 2014 21:02
Show Gist options
  • Save utaal/96c4328fbc48a07d69b4 to your computer and use it in GitHub Desktop.
Save utaal/96c4328fbc48a07d69b4 to your computer and use it in GitHub Desktop.
Scheduling issues Naiad
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Research.Naiad;
using Microsoft.Research.Naiad.Input;
using Microsoft.Research.Naiad.Dataflow;
using Microsoft.Research.Naiad.Dataflow.StandardVertices;
namespace Microsoft.Research.Naiad.Examples.Serialization
{
public struct SerializationTest {
public SerializationTest(int x, int y) {
this.x = x;
this.y = y;
}
public int x;
public int y;
}
public static class Vertices {
public class TestVertex1 : UnaryVertex<int, SerializationTest, Epoch>
{
public TestVertex1(int index, Stage<Epoch> vertex) : base(index, vertex) {
}
public override void OnReceive(Message<int, Epoch> message)
{
// Console.Error.WriteLine("*: {0}\t{1}", message.time, message.length);
var output = this.Output.GetBufferForTime(message.time);
for (int i = 0; i < message.length; i++)
output.Send(new SerializationTest (message.payload [i], message.payload [i] + 1));
this.NotifyAt (message.time);
}
}
public static Stream<SerializationTest, Epoch> TestStep1(this Stream<int, Epoch> in1)
{
return Foundry.NewUnaryStage(in1, (i, s) => new TestVertex1(i, s), x => x % 8, null, "Test1");
}
public class TestVertex2 : UnaryVertex<SerializationTest, int, Epoch> {
public TestVertex2(int index, Stage<Epoch> vertex) : base(index, vertex) {
}
public override void OnReceive(Message<SerializationTest, Epoch> message)
{
// Console.Error.WriteLine("#: {0}\t{1}", message.time, message.length);
var output = this.Output.GetBufferForTime(message.time);
for (int i = 0; i < message.length; i++)
output.Send(message.payload[i].x);
this.NotifyAt (message.time);
}
}
public static Stream<int, Epoch> TestStep2(this Stream<SerializationTest, Epoch> inStream)
{
return Foundry.NewUnaryStage (inStream, (i, s) => new TestVertex2 (i, s), x => x.x, null, "Test2");
}
}
public class Serialization : Example
{
public void Execute (string[] args)
{
using (var computation = NewComputation.FromArgs(ref args))
{
var in1 = new BatchedDataSource<int>();
computation.NewInput(in1)
.TestStep1()
.TestStep2()
.Subscribe(msgs =>
{
foreach (var m in msgs)
Console.WriteLine(Thread.CurrentThread.ManagedThreadId.ToString () + "OUT " + m.ToString());
});
computation.Activate();
Console.Error.WriteLine ("Pushing data");
for (int i = 0; i < 500; i++)
{
if (computation.Configuration.ProcessID == 0)
in1.OnNext(new List<int> { 3 * i, 3 * i + 1, 3 * i + 2 });
else
in1.OnNext();
Console.Error.WriteLine ("Syncing {0}", i);
computation.Sync(i);
Console.Error.WriteLine ("Synced {0}", i);
}
Console.Error.WriteLine ("OnCompleted");
in1.OnCompleted();
Console.Error.WriteLine ("Join");
computation.Join ();
Console.Error.WriteLine ("Naiad Process " + computation.Configuration.ProcessID + " finished");
}
}
public string Usage {
get { return ""; }
}
public string Help {
get { return ""; }
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment