Skip to content

Instantly share code, notes, and snippets.

@janderit
Created September 10, 2012 15:48
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save janderit/3691675 to your computer and use it in GitHub Desktop.
Save janderit/3691675 to your computer and use it in GitHub Desktop.
ZeroMQ 3.2 Single threaded container
/// <summary>
/// Encapsulates a zeroMQ context and a worker thread.
/// Use the ZmqPollPool to server multiple 0mq sockets with single-threaded semantics.
///
/// Use MarshalAndWait or MarshalAsync to marshal 0mq context access to the working thread.
/// Call Dispose to close all registered sockets and terminate the context and thread.
/// </summary>
public class ZmqPollPool : IDisposable
{
private TimeSpan _pollTimeout;
private ZmqContext _ctx;
private readonly Thread _thread;
private bool _stop;
private readonly List<Action> _commands = new List<Action>();
private Poller _poller;
private readonly List<ZmqSocket> _sockets = new List<ZmqSocket>();
private bool _any;
public ZmqPollPool(TimeSpan pollTimeout)
{
_pollTimeout = pollTimeout;
var wait = new ManualResetEventSlim();
_thread = new Thread(()=>Runner(wait));
_thread.Start();
wait.Wait();
}
/// <summary>
/// Sets the timeout for polling to a new value. Threadsafe.
/// </summary>
/// <param name="pollTimeout"></param>
public void SetTimeout(TimeSpan pollTimeout)
{
Action action = () => _pollTimeout = pollTimeout;
if (_thread.ManagedThreadId == Thread.CurrentThread.ManagedThreadId) action();
else MarshalAsync(action);
}
private void Runner(ManualResetEventSlim wait)
{
Thread.CurrentThread.IsBackground = true;
_ctx = ZmqContext.Create();
_poller = new Poller();
wait.Set();
wait = null;
while (!_stop)
{
if (_any) _poller.Poll(_pollTimeout); else Thread.Sleep(_pollTimeout);
if (_commands.Count == 0) continue;
lock (_commands)
{
foreach (var c in _commands) c();
_commands.Clear();
}
}
_poller.ClearSockets();
_sockets.ForEach(_ => _.Close());
_poller.Dispose();
_ctx.Terminate();
_ctx.Dispose();
}
/// <summary>
/// Marshal an action into the working thread. Use this for socket operations.
/// </summary>
public void MarshalAndWait(Action action)
{
var mux = new ManualResetEventSlim();
lock (_commands) _commands.Add(()=>
{
action();
mux.Set();
});
if (!_stop) mux.Wait();
}
/// <summary>
/// Marshal an action into the working thread. Use this for sending messages via a socket.
/// </summary>
public void MarshalAsync(Action action)
{
lock (_commands) _commands.Add(action);
}
public delegate void RegisterSocket(ZmqSocket socket);
/// <summary>
/// Threadsafe access to the 0mq context. Use this to create a new socket. Register the socket with the provided callback to allow the PollPool to close the socket when disposed.
/// </summary>
public void MarshalAsync(Action<ZmqContext, RegisterSocket> action)
{
MarshalAsync(() => action(_ctx, Register));
}
/// <summary>
/// Threadsafe access to the 0mq context. Use this to create a new socket. Register the socket with the provided callback to allow the PollPool to close the socket when disposed.
/// </summary>
public void MarshalAndWait(Action<ZmqContext, RegisterSocket> action)
{
MarshalAndWait(() => action(_ctx, Register));
}
public delegate void MessageCallbackByteArray(byte[][] datagram);
public delegate void RegisterAndSubscribeForByteArrayEncodedMessages(ZmqSocket socket, MessageCallbackByteArray callback);
/// <summary>
/// Threadsafe access to the 0mq context. Use this to create a new socket. Register the socket with the provided callback to allow the PollPool to close the socket when disposed.
/// The callback will be called on the worker thread whenever a new complete message has been received.
/// </summary>
public void MarshalAsync(Action<ZmqContext, RegisterAndSubscribeForByteArrayEncodedMessages> action)
{
MarshalAsync(() => action(_ctx, Register));
}
/// <summary>
/// Threadsafe access to the 0mq context. Use this to create a new socket. Register the socket with the provided callback to allow the PollPool to close the socket when disposed.
/// The callback will be called on the worker thread whenever a new complete message has been received.
/// </summary>
public void MarshalAndWait(Action<ZmqContext, RegisterAndSubscribeForByteArrayEncodedMessages> action)
{
MarshalAndWait(() => action(_ctx, Register));
}
public delegate void MessageCallbackString(string[] datagram);
public delegate void RegisterAndSubscribeForStringEncodedMessages(ZmqSocket socket, MessageCallbackString callback, Encoding stringEncoding);
/// <summary>
/// Threadsafe access to the 0mq context. Use this to create a new socket. Register the socket with the provided callback to allow the PollPool to close the socket when disposed.
/// The callback will be called on the worker thread whenever a new complete message has been received.
/// </summary>
public void MarshalAsync(Action<ZmqContext, RegisterAndSubscribeForStringEncodedMessages> action)
{
MarshalAsync(() => action(_ctx, Register));
}
/// <summary>
/// Threadsafe access to the 0mq context. Use this to create a new socket. Register the socket with the provided callback to allow the PollPool to close the socket when disposed.
/// The callback will be called on the worker thread whenever a new complete message has been received.
/// </summary>
public void MarshalAndWait(Action<ZmqContext, RegisterAndSubscribeForStringEncodedMessages> action)
{
MarshalAndWait(() => action(_ctx, Register));
}
private void Register(ZmqSocket sock)
{
_sockets.Add(sock);
}
private void Register(ZmqSocket sock, MessageCallbackByteArray onMessage)
{
sock.ReceiveReady += (s, e) =>
{
var msg = e.Socket.ReceiveMessage().Select(_ => _.Buffer).ToArray();
onMessage(msg);
};
_poller.AddSocket(sock);
Register(sock);
_any = true;
}
private void Register(ZmqSocket sock, MessageCallbackString onMessage, Encoding stringEncoding)
{
Register(sock, datagram => onMessage(datagram.Select(stringEncoding.GetString).ToArray()));
}
/// <summary>
/// Closes all registered sockets, terminates the worker thread and calls zmq_term to terminate the context.
/// ATTN: if this method does not return, you have probably created a registered socket on the context without registering it with the poll pool.
/// </summary>
public void Dispose()
{
_stop = true;
_thread.Join();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment