Skip to content

Instantly share code, notes, and snippets.

@keithbloom
Created September 10, 2012 15:17
Show Gist options
  • Save keithbloom/3691448 to your computer and use it in GitHub Desktop.
Save keithbloom/3691448 to your computer and use it in GitHub Desktop.
0MQ and threads
var fileList = EnumerateDirectory(@"C:\example\directory", "*.*", SearchOption.AllDirectories);
ventilator.Run(fileList);
var result = sink.Run(fileList.Length);
Console.WriteLine("Found the length of {0} files in {1} milliseconds.\nDirectory size is {2}",
fileList.Length, stopWatch.ElapsedMilliseconds, result);
var context = ZmqContext.Create()
var ventilator = new Ventilator(context);
var sink = new Sink(context);
ventilator.Start();
sink.Start();
const int workersCount = 4;
var workers = new Thread[workersCount];
for (int i = 0; i < workersCount; i++)
{
(workers[i] = new Thread(() => new TaskWorker(context).Run())).Start();
}
public void Start()
{
_receiver = _context.CreateSocket(SocketType.PULL);
_receiver.Bind("inproc://sink");
}
public Int64 Run(int length)
{
Int64 sizeOfDirectory = 0;
for (var i = 0; i < length; i++)
{
var size = _receiver.Receive(Encoding.Unicode);
Int64 temp;
if(Int64.TryParse(size, out temp))
{
sizeOfDirectory += temp;
}
}
return sizeOfDirectory;
}
public void Start()
{
_ventilator = _context.CreateSocket(SocketType.PUSH);
_ventilator.Bind("inproc://ventilator");
}
public void Run(string[] fileList)
{
foreach (var fileName in fileList)
{
_ventilator.Send(fileName, Encoding.Unicode);
}
}
public void Run()
{
using (ZmqSocket ventilator = _context.CreateSocket(SocketType.PULL),
sink = _context.CreateSocket(SocketType.PUSH))
{
ventilator.Connect("inproc://ventilator");
sink.Connect("inproc://sink");
ventilator.ReceiveReady += (socket, events) =>
{
RecieverPollInHandler(ventilator, sink);
};
var poller = new Poller(new[] {ventilator, controller});
while (true)
{
poller.Poll();
}
}
}
private void RecieverPollInHandler(ZmqSocket reciever, ZmqSocket)
{
Thread.Sleep(100);
// Pull the job from the Ventilator
var fileToMeasure = reciever.Receive(Encoding.Unicode);
Int64 fileLength = 0;
FileStream fs = null;
try
{
fs = File.OpenRead(fileToMeasure);
fileLength = fs.Length;
}
catch (IOException) { }
finally
{
if (fs != null) fs.Dispose();
}
Console.Write(".");
// Push the result to the Sink
sender.Send(fileLength.ToString(), Encoding.Unicode);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment