Skip to content

Instantly share code, notes, and snippets.

@abdullin

abdullin/sf.cs Secret

Created December 1, 2011 17:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save abdullin/819f7ec8749d63568fce to your computer and use it in GitHub Desktop.
Save abdullin/819f7ec8749d63568fce to your computer and use it in GitHub Desktop.
Store and Forward
public sealed class SF
{
[Test]
public void Test()
{
var outTmd = "out.tmd";
if (File.Exists(outTmd))
{
File.Delete(outTmd);
}
using (var ctx = new Context())
using (var source = new CancellationTokenSource())
{
var token = source.Token;
int count = 0;
int persisted = 0;
int sent = 0;
var recorder = Task.Factory.StartNew(() =>
{
using (var server = ctx.Socket(SocketType.REP))
using (var publisher = ctx.Socket(SocketType.PUB))
{
server.Bind("tcp://127.0.0.1:4567");
publisher.Bind("tcp://127.0.0.1:4568");
using (var fs = File.OpenWrite(outTmd))
{
fs.SetLength(1024 * 1024 * 50);
fs.Seek(0, SeekOrigin.Begin);
while (!token.IsCancellationRequested)
{
var bytes = server.Recv();
if (bytes.Length > 0)
{
fs.Write(bytes, 0, bytes.Length);
server.Send(new byte[] {0});
publisher.Send(bytes);
persisted += 1;
//fs.Flush(true);
}
}
}
}
});
var sender = Task.Factory.StartNew(() =>
{
using (var s = ctx.Socket(SocketType.REQ))
{
s.Connect("tcp://127.0.0.1:4567");
while (!token.IsCancellationRequested)
{
s.Send("Da test message", Encoding.Unicode);
if (s.Recv()[0] != 0)
throw new InvalidOperationException();
sent += 1;
}
}
});
var downstream = Task.Factory.StartNew(() =>
{
using (var s = ctx.Socket(SocketType.SUB))
{
s.Subscribe(new byte[0]);
s.Connect("tcp://127.0.0.1:4568");
while (!token.IsCancellationRequested)
{
s.Recv();
count += 1;
}
}
});
for (int i = 0; i < 1000; i++)
{
Console.WriteLine("{0}-{1}-{2}", sent,persisted, count);
Thread.Sleep(1000);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment