Last active
August 29, 2015 14:08
-
-
Save utaal/c10e450e1227f7ef5462 to your computer and use it in GitHub Desktop.
Minimal code to reproduce serialization issue in Naiad
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.IO; | |
using System.Linq; | |
using System.Text; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using Microsoft.Research.Naiad; | |
using Microsoft.Research.Naiad.Input; | |
using Microsoft.Research.Naiad.Dataflow; | |
using Microsoft.Research.Naiad.Dataflow.StandardVertices; | |
namespace Microsoft.Research.Naiad.Examples.Serialization | |
{ | |
public struct SerializationTest { | |
public SerializationTest(int x, int y) { | |
this.x = x; | |
this.y = y; | |
} | |
public int x; | |
public int y; | |
} | |
public static class Vertices { | |
public class TestVertex1 : UnaryVertex<int, SerializationTest, Epoch> | |
{ | |
public TestVertex1(int index, Stage<Epoch> vertex) : base(index, vertex) { | |
} | |
public override void OnReceive(Message<int, Epoch> message) | |
{ | |
Console.Error.Write("*"); | |
for (int i = 0; i < message.length; i++) { | |
this.Output.Send(new Message<SerializationTest, Epoch>( | |
new SerializationTest (message.payload [i], message.payload [i] + 1), | |
message.time)); | |
} | |
this.NotifyAt (message.time); | |
} | |
} | |
public static Stream<SerializationTest, Epoch> TestStep1(this Stream<int, Epoch> in1) | |
{ | |
return Foundry.NewUnaryStage(in1, (i, s) => new TestVertex1(i, s), x => x % 8, null, "Test1"); | |
} | |
public class TestVertex2 : UnaryVertex<SerializationTest, int, Epoch> { | |
public TestVertex2(int index, Stage<Epoch> vertex) : base(index, vertex) { | |
} | |
public override void OnReceive(Message<SerializationTest, Epoch> message) | |
{ | |
Console.Error.Write("#"); | |
for (int i = 0; i < message.length; i++) { | |
this.Output.Send (new Message<int, Epoch> (message.payload[i].x, message.time)); | |
} | |
this.NotifyAt (message.time); | |
} | |
} | |
public static Stream<int, Epoch> TestStep2(this Stream<SerializationTest, Epoch> inStream) | |
{ | |
return Foundry.NewUnaryStage (inStream, (i, s) => new TestVertex2 (i, s), x => x.x, null, "Test2"); | |
} | |
} | |
public class Serialization : Example | |
{ | |
public void Execute (string[] args) | |
{ | |
using (var computation = NewComputation.FromArgs(ref args)) | |
{ | |
var in1 = new BatchedDataSource<int>(); | |
computation.NewInput(in1).TestStep1().TestStep2().Subscribe (msgs => { | |
foreach (var m in msgs) Console.WriteLine(Thread.CurrentThread.ManagedThreadId.ToString () + "OUT " + m.ToString()); | |
}); | |
computation.Activate(); | |
Console.Error.WriteLine ("Pushing data"); | |
if (computation.Configuration.ProcessID == 0) | |
{ | |
for (int i = 0; i < 10000000; i+=3) { | |
in1.OnNext(new List<int> { i * 3, i * 3 + 1, i * 3 + 2 }); | |
} | |
} | |
Console.Error.WriteLine ("OnCompleted"); | |
in1.OnCompleted(); | |
Console.Error.WriteLine ("Join"); | |
computation.Join (); | |
Console.Error.WriteLine ("Naiad Process " + computation.Configuration.ProcessID + " finished"); | |
} | |
} | |
public string Usage { | |
get { return ""; } | |
} | |
public string Help { | |
get { return ""; } | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment