Created
October 29, 2014 21:02
-
-
Save utaal/96c4328fbc48a07d69b4 to your computer and use it in GitHub Desktop.
Scheduling issues Naiad
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.IO; | |
using System.Linq; | |
using System.Text; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using Microsoft.Research.Naiad; | |
using Microsoft.Research.Naiad.Input; | |
using Microsoft.Research.Naiad.Dataflow; | |
using Microsoft.Research.Naiad.Dataflow.StandardVertices; | |
namespace Microsoft.Research.Naiad.Examples.Serialization | |
{ | |
public struct SerializationTest { | |
public SerializationTest(int x, int y) { | |
this.x = x; | |
this.y = y; | |
} | |
public int x; | |
public int y; | |
} | |
public static class Vertices { | |
public class TestVertex1 : UnaryVertex<int, SerializationTest, Epoch> | |
{ | |
public TestVertex1(int index, Stage<Epoch> vertex) : base(index, vertex) { | |
} | |
public override void OnReceive(Message<int, Epoch> message) | |
{ | |
// Console.Error.WriteLine("*: {0}\t{1}", message.time, message.length); | |
var output = this.Output.GetBufferForTime(message.time); | |
for (int i = 0; i < message.length; i++) | |
output.Send(new SerializationTest (message.payload [i], message.payload [i] + 1)); | |
this.NotifyAt (message.time); | |
} | |
} | |
public static Stream<SerializationTest, Epoch> TestStep1(this Stream<int, Epoch> in1) | |
{ | |
return Foundry.NewUnaryStage(in1, (i, s) => new TestVertex1(i, s), x => x % 8, null, "Test1"); | |
} | |
public class TestVertex2 : UnaryVertex<SerializationTest, int, Epoch> { | |
public TestVertex2(int index, Stage<Epoch> vertex) : base(index, vertex) { | |
} | |
public override void OnReceive(Message<SerializationTest, Epoch> message) | |
{ | |
// Console.Error.WriteLine("#: {0}\t{1}", message.time, message.length); | |
var output = this.Output.GetBufferForTime(message.time); | |
for (int i = 0; i < message.length; i++) | |
output.Send(message.payload[i].x); | |
this.NotifyAt (message.time); | |
} | |
} | |
public static Stream<int, Epoch> TestStep2(this Stream<SerializationTest, Epoch> inStream) | |
{ | |
return Foundry.NewUnaryStage (inStream, (i, s) => new TestVertex2 (i, s), x => x.x, null, "Test2"); | |
} | |
} | |
public class Serialization : Example | |
{ | |
public void Execute (string[] args) | |
{ | |
using (var computation = NewComputation.FromArgs(ref args)) | |
{ | |
var in1 = new BatchedDataSource<int>(); | |
computation.NewInput(in1) | |
.TestStep1() | |
.TestStep2() | |
.Subscribe(msgs => | |
{ | |
foreach (var m in msgs) | |
Console.WriteLine(Thread.CurrentThread.ManagedThreadId.ToString () + "OUT " + m.ToString()); | |
}); | |
computation.Activate(); | |
Console.Error.WriteLine ("Pushing data"); | |
for (int i = 0; i < 500; i++) | |
{ | |
if (computation.Configuration.ProcessID == 0) | |
in1.OnNext(new List<int> { 3 * i, 3 * i + 1, 3 * i + 2 }); | |
else | |
in1.OnNext(); | |
Console.Error.WriteLine ("Syncing {0}", i); | |
computation.Sync(i); | |
Console.Error.WriteLine ("Synced {0}", i); | |
} | |
Console.Error.WriteLine ("OnCompleted"); | |
in1.OnCompleted(); | |
Console.Error.WriteLine ("Join"); | |
computation.Join (); | |
Console.Error.WriteLine ("Naiad Process " + computation.Configuration.ProcessID + " finished"); | |
} | |
} | |
public string Usage { | |
get { return ""; } | |
} | |
public string Help { | |
get { return ""; } | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment