Created
March 1, 2013 23:13
-
-
Save joshgo/5068700 to your computer and use it in GitHub Desktop.
sample code for possibly having ETL's as "nodes". - Each node can pass the data to it's child or children (2)
- Some nodes can transform
- Some nodes can split data
- Some nodes can write/read from queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System; | |
| namespace NodeBus | |
| { | |
| class Program | |
| { | |
| public static void Main(string[] args) | |
| { | |
| INode square = new TransformNode(new OutfileNode(null, "file2.txt"), (x) => | |
| { | |
| return Int32.Parse(x) * Int32.Parse(x); | |
| }); | |
| INode add = new TransformNode(new OutfileNode(null, "file3.txt"), (x) => | |
| { | |
| return Int32.Parse(x) + Int32.Parse(x); | |
| }); | |
| INode split = new SplitterNode(square, add); | |
| INode queue = new QueueNode(split); | |
| INodeStarter file1 = new InfileNode(queue, "file1.txt"); | |
| // file1 ==> queue ==> splitter ==> db | |
| // ==> file2 | |
| file1.Start(); | |
| Console.Write("Press any key to continue . . . "); | |
| Console.ReadKey(true); | |
| } | |
| } | |
| public interface INode | |
| { | |
| void Write(dynamic obj); | |
| } | |
| public interface INodeStarter | |
| { | |
| void Start(); | |
| } | |
| public class InfileNode : INodeStarter | |
| { | |
| private INode _child; | |
| private string _file; | |
| public InfileNode(INode child, string file) | |
| { | |
| _child = child; | |
| _file = file; | |
| } | |
| public void Start() | |
| { | |
| using(var file = new System.IO.StreamReader(_file)) | |
| { | |
| while(!file.EndOfStream) | |
| { | |
| _child.Write(file.ReadLine()); | |
| } | |
| } | |
| } | |
| } | |
| public class OutfileNode : INode | |
| { | |
| private INode _child; | |
| private string _file; | |
| public OutfileNode(INode child, string file) | |
| { | |
| _child = child; | |
| _file = file; | |
| } | |
| public void Write(dynamic obj) | |
| { | |
| using(System.IO.StreamWriter file = new System.IO.StreamWriter(_file, true)) | |
| { | |
| file.WriteLine(obj.ToString()); | |
| } | |
| if(_child != null) | |
| _child.Write(obj); | |
| } | |
| } | |
| public class DbNode : INode | |
| { | |
| private INode _child; | |
| public DbNode(INode child) | |
| { | |
| _child = child; | |
| } | |
| public void Write(dynamic obj) | |
| { | |
| // write to db | |
| if (_child != null) | |
| _child.Write(obj); | |
| } | |
| } | |
| public class QueueNode : INode | |
| { | |
| private System.Collections.Concurrent.BlockingCollection<dynamic> _queue = new System.Collections.Concurrent.BlockingCollection<dynamic>(); | |
| private INode _child; | |
| public QueueNode(INode child) | |
| { | |
| _child = child; | |
| System.Threading.Tasks.Task.Factory.StartNew(() => | |
| { | |
| while(true) | |
| { | |
| var item = _queue.Take(); | |
| _child.Write(item); | |
| } | |
| }); | |
| } | |
| public void Write(dynamic obj) | |
| { | |
| _queue.Add(obj); | |
| } | |
| } | |
| public class SplitterNode : INode | |
| { | |
| private INode _left, _right; | |
| public SplitterNode(INode left, INode right) | |
| { | |
| _left = left; | |
| _right = right; | |
| } | |
| public void Write(dynamic obj) | |
| { | |
| if(_left != null) | |
| _left.Write(obj); | |
| if(_right != null) | |
| _right.Write(obj); | |
| } | |
| } | |
| public class LogicNode : INode | |
| { | |
| private INode _left, _right; | |
| private Func<dynamic, bool> _pred; | |
| public LogicNode(INode left, INode right, Func<dynamic, bool> pred) | |
| { | |
| _left = left; | |
| _right = right; | |
| _pred = pred; | |
| } | |
| public void Write(dynamic obj) | |
| { | |
| if (_pred(obj) && _left != null) | |
| _left.Write(obj); | |
| else if(_right != null) | |
| _right.Write(obj); | |
| } | |
| } | |
| public class TransformNode : INode | |
| { | |
| private INode _child; | |
| private Func<dynamic, dynamic> _trans; | |
| public TransformNode(INode child, Func<dynamic, dynamic> trans){ | |
| _child = child; | |
| _trans = trans; | |
| } | |
| public void Write(dynamic obj) | |
| { | |
| if(_trans != null && _child != null) | |
| _child.Write(_trans(obj)); | |
| else if (_child != null) | |
| _child.Write(obj); | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment