Skip to content

Instantly share code, notes, and snippets.

@joshgo
Created March 1, 2013 23:13
Show Gist options
  • Save joshgo/5068700 to your computer and use it in GitHub Desktop.
Save joshgo/5068700 to your computer and use it in GitHub Desktop.
sample code for possibly having ETL's as "nodes". - Each node can pass the data to it's child or children (2) - Some nodes can transform - Some nodes can split data - Some nodes can write/read from queue
using System;
namespace NodeBus
{
class Program
{
public static void Main(string[] args)
{
INode square = new TransformNode(new OutfileNode(null, "file2.txt"), (x) =>
{
return Int32.Parse(x) * Int32.Parse(x);
});
INode add = new TransformNode(new OutfileNode(null, "file3.txt"), (x) =>
{
return Int32.Parse(x) + Int32.Parse(x);
});
INode split = new SplitterNode(square, add);
INode queue = new QueueNode(split);
INodeStarter file1 = new InfileNode(queue, "file1.txt");
// file1 ==> queue ==> splitter ==> db
// ==> file2
file1.Start();
Console.Write("Press any key to continue . . . ");
Console.ReadKey(true);
}
}
public interface INode
{
void Write(dynamic obj);
}
public interface INodeStarter
{
void Start();
}
public class InfileNode : INodeStarter
{
private INode _child;
private string _file;
public InfileNode(INode child, string file)
{
_child = child;
_file = file;
}
public void Start()
{
using(var file = new System.IO.StreamReader(_file))
{
while(!file.EndOfStream)
{
_child.Write(file.ReadLine());
}
}
}
}
public class OutfileNode : INode
{
private INode _child;
private string _file;
public OutfileNode(INode child, string file)
{
_child = child;
_file = file;
}
public void Write(dynamic obj)
{
using(System.IO.StreamWriter file = new System.IO.StreamWriter(_file, true))
{
file.WriteLine(obj.ToString());
}
if(_child != null)
_child.Write(obj);
}
}
public class DbNode : INode
{
private INode _child;
public DbNode(INode child)
{
_child = child;
}
public void Write(dynamic obj)
{
// write to db
if (_child != null)
_child.Write(obj);
}
}
public class QueueNode : INode
{
private System.Collections.Concurrent.BlockingCollection<dynamic> _queue = new System.Collections.Concurrent.BlockingCollection<dynamic>();
private INode _child;
public QueueNode(INode child)
{
_child = child;
System.Threading.Tasks.Task.Factory.StartNew(() =>
{
while(true)
{
var item = _queue.Take();
_child.Write(item);
}
});
}
public void Write(dynamic obj)
{
_queue.Add(obj);
}
}
public class SplitterNode : INode
{
private INode _left, _right;
public SplitterNode(INode left, INode right)
{
_left = left;
_right = right;
}
public void Write(dynamic obj)
{
if(_left != null)
_left.Write(obj);
if(_right != null)
_right.Write(obj);
}
}
public class LogicNode : INode
{
private INode _left, _right;
private Func<dynamic, bool> _pred;
public LogicNode(INode left, INode right, Func<dynamic, bool> pred)
{
_left = left;
_right = right;
_pred = pred;
}
public void Write(dynamic obj)
{
if (_pred(obj) && _left != null)
_left.Write(obj);
else if(_right != null)
_right.Write(obj);
}
}
public class TransformNode : INode
{
private INode _child;
private Func<dynamic, dynamic> _trans;
public TransformNode(INode child, Func<dynamic, dynamic> trans){
_child = child;
_trans = trans;
}
public void Write(dynamic obj)
{
if(_trans != null && _child != null)
_child.Write(_trans(obj));
else if (_child != null)
_child.Write(obj);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment