Instantly share code, notes, and snippets.

Embed
What would you like to do?
ZeroMQ 3.1 Router/Dealer example
public sealed class Host
{
private readonly string _zmqEndpoint;
private readonly Action<Action> _dispatcher;
private static ZmqSocket _server;
private static readonly ConcurrentQueue<Action> WaitToSend = new ConcurrentQueue<Action>();
public Host(string zmqEndpoint, Action<Action> dispatcher)
{
_zmqEndpoint = zmqEndpoint;
_dispatcher = dispatcher;
}
public void Run(Action<Action> idle)
{
var ctx = ZmqContext.Create();
_server = ctx.CreateSocket(SocketType.ROUTER);
_server.ReceiveReady += server_ReceiveReady;
_server.Linger = TimeSpan.FromMilliseconds(250);
_server.Bind(_zmqEndpoint);
var poller = new Poller();
poller.AddSocket(_server);
bool exit = false;
while (!exit)
{
poller.Poll(TimeSpan.FromMilliseconds(10));
if (WaitToSend.Any())
{
Action action;
if (WaitToSend.TryDequeue(out action)) action();
}
else idle(() => exit = true);
}
poller.ClearSockets();
poller.Dispose();
_server.Close();
_server.Dispose();
ctx.Terminate();
}
private void server_ReceiveReady(object s, SocketEventArgs e)
{
var datagram = _server.ReceiveMessage().Select(_ => _.Buffer).ToArray();
var peer = datagram[0];
var id = new Guid(datagram[1]);
var recipient = Encoding.UTF8.GetString(datagram[2]);
var sender = Encoding.UTF8.GetString(datagram[3]);
var type = Encoding.UTF8.GetString(datagram[4]);
var info = Encoding.UTF8.GetString(datagram[5]);
var request = new Request
{
MessageId = id,
Recipient = recipient,
Sender = sender,
Type = type,
Info = info,
Data = datagram.Skip(6).Select(buffer => Encoding.UTF8.GetString(buffer)).ToArray(),
Respond = response =>
{
var dgram = new List<byte[]> { peer, id.ToByteArray(), Encoding.UTF8.GetBytes(response.Type), Encoding.UTF8.GetBytes(response.Info) };
dgram.AddRange(response.Data.Select(_ => Encoding.UTF8.GetBytes(_)));
var dgrama = dgram.ToArray();
WaitToSend.Enqueue(() =>
{
for (int i = 0; i < dgram.Count - 1; i++) _server.SendMore(dgrama[i]);
_server.Send(dgrama[dgram.Count - 1]);
});
}
};
Action work = () =>
{
try
{
_handler.Handle(request);
}
catch (Exception ex)
{
Logger.Instanz.Exception("ZeroMqHost", request.Info, ex);
}
};
_dispatcher(work);
}
}
public class ZeroMqConnectionClient : IDisposable
{
private readonly string _host;
private readonly Thread _thread;
private ZmqSocket _server;
public ZeroMqConnectionClient(string host)
{
_host = host;
_thread = new Thread(Run);
_thread.IsBackground = true;
_thread.Start();
}
private void Run()
{
var ctx = ZmqContext.Create();
_server = ctx.CreateSocket(SocketType.DEALER);
_server.ReceiveReady += _server_ReceiveReady;
_server.Linger = TimeSpan.FromMilliseconds(250);
_server.Connect(_host);
var poller = new Poller();
poller.AddSocket(_server);
try
{
while (true)
{
poller.Poll(TimeSpan.FromMilliseconds(50));
if (_waitToSend.Any())
{
Action action;
if (_waitToSend.TryDequeue(out action)) action();
}
}
}
finally
{
_server.Close();
ctx.Terminate();
}
}
void _server_ReceiveReady(object sender, SocketEventArgs e)
{
var datagram = _server.ReceiveMessage();
var id = new Guid(datagram[0].Buffer);
var type = Encoding.UTF8.GetString(datagram[1].Buffer);
var info = Encoding.UTF8.GetString(datagram[2].Buffer);
var response = new Response
{
InReplyTo = id,
Type = type,
Info = info,
Data = datagram.Skip(3).Select(_ => Encoding.UTF8.GetString(_.Buffer)).ToArray()
};
if (!_sent.ContainsKey(id)) return;
var action = _sent[id];
_sent.Remove(id);
action(response);
}
private readonly ConcurrentQueue<Action> _waitToSend = new ConcurrentQueue<Action>();
private readonly Dictionary<Guid, Action<Response>> _sent = new Dictionary<Guid, Action<Response>>();
private void Request(Request request)
{
var dgram = new List<byte[]>
{
request.MessageId.ToByteArray(),
Encoding.UTF8.GetBytes(request.Recipient),
Encoding.UTF8.GetBytes(request.Sender),
Encoding.UTF8.GetBytes(request.Type),
Encoding.UTF8.GetBytes(request.Info)
};
dgram.AddRange(request.Data.Select(_ => Encoding.UTF8.GetBytes(_)));
var dgrama = dgram.ToArray();
_waitToSend.Enqueue(() =>
{
if (_sent.ContainsKey(request.MessageId)) return;
_sent.Add(request.MessageId, response => Detach(response, request.Respond));
for (int i = 0; i < dgram.Count - 1; i++) _server.SendMore(dgrama[i]);
_server.Send(dgrama[dgram.Count - 1]);
});
}
private void Detach(Response response, Action<Response> onresponse)
{
var thread = new Thread(() => onresponse(response));
thread.Start();
}
public void Dispose()
{
_thread.Abort();
_thread.Join();
}
}
}
@heimeshoff

This comment has been minimized.

Show comment
Hide comment
@heimeshoff

heimeshoff Apr 12, 2013

Thanks a lot,
Have my threaded querying up and running now.
Hey look, the system just got more responsive.... go figure :)

heimeshoff commented Apr 12, 2013

Thanks a lot,
Have my threaded querying up and running now.
Hey look, the system just got more responsive.... go figure :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment