Skip to content

Instantly share code, notes, and snippets.

@utaal
Last active August 29, 2015 14:08
Show Gist options
  • Save utaal/c10e450e1227f7ef5462 to your computer and use it in GitHub Desktop.
Save utaal/c10e450e1227f7ef5462 to your computer and use it in GitHub Desktop.
Minimal code to reproduce serialization issue in Naiad
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Research.Naiad;
using Microsoft.Research.Naiad.Input;
using Microsoft.Research.Naiad.Dataflow;
using Microsoft.Research.Naiad.Dataflow.StandardVertices;
namespace Microsoft.Research.Naiad.Examples.Serialization
{
public struct SerializationTest {
public SerializationTest(int x, int y) {
this.x = x;
this.y = y;
}
public int x;
public int y;
}
public static class Vertices {
public class TestVertex1 : UnaryVertex<int, SerializationTest, Epoch>
{
public TestVertex1(int index, Stage<Epoch> vertex) : base(index, vertex) {
}
public override void OnReceive(Message<int, Epoch> message)
{
Console.Error.Write("*");
for (int i = 0; i < message.length; i++) {
this.Output.Send(new Message<SerializationTest, Epoch>(
new SerializationTest (message.payload [i], message.payload [i] + 1),
message.time));
}
this.NotifyAt (message.time);
}
}
public static Stream<SerializationTest, Epoch> TestStep1(this Stream<int, Epoch> in1)
{
return Foundry.NewUnaryStage(in1, (i, s) => new TestVertex1(i, s), x => x % 8, null, "Test1");
}
public class TestVertex2 : UnaryVertex<SerializationTest, int, Epoch> {
public TestVertex2(int index, Stage<Epoch> vertex) : base(index, vertex) {
}
public override void OnReceive(Message<SerializationTest, Epoch> message)
{
Console.Error.Write("#");
for (int i = 0; i < message.length; i++) {
this.Output.Send (new Message<int, Epoch> (message.payload[i].x, message.time));
}
this.NotifyAt (message.time);
}
}
public static Stream<int, Epoch> TestStep2(this Stream<SerializationTest, Epoch> inStream)
{
return Foundry.NewUnaryStage (inStream, (i, s) => new TestVertex2 (i, s), x => x.x, null, "Test2");
}
}
public class Serialization : Example
{
public void Execute (string[] args)
{
using (var computation = NewComputation.FromArgs(ref args))
{
var in1 = new BatchedDataSource<int>();
computation.NewInput(in1).TestStep1().TestStep2().Subscribe (msgs => {
foreach (var m in msgs) Console.WriteLine(Thread.CurrentThread.ManagedThreadId.ToString () + "OUT " + m.ToString());
});
computation.Activate();
Console.Error.WriteLine ("Pushing data");
if (computation.Configuration.ProcessID == 0)
{
for (int i = 0; i < 10000000; i+=3) {
in1.OnNext(new List<int> { i * 3, i * 3 + 1, i * 3 + 2 });
}
}
Console.Error.WriteLine ("OnCompleted");
in1.OnCompleted();
Console.Error.WriteLine ("Join");
computation.Join ();
Console.Error.WriteLine ("Naiad Process " + computation.Configuration.ProcessID + " finished");
}
}
public string Usage {
get { return ""; }
}
public string Help {
get { return ""; }
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment