Created
September 10, 2012 15:48
-
-
Save janderit/3691675 to your computer and use it in GitHub Desktop.
ZeroMQ 3.2 Single threaded container
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// <summary> | |
/// Encapsulates a zeroMQ context and a worker thread. | |
/// Use the ZmqPollPool to server multiple 0mq sockets with single-threaded semantics. | |
/// | |
/// Use MarshalAndWait or MarshalAsync to marshal 0mq context access to the working thread. | |
/// Call Dispose to close all registered sockets and terminate the context and thread. | |
/// </summary> | |
public class ZmqPollPool : IDisposable | |
{ | |
private TimeSpan _pollTimeout; | |
private ZmqContext _ctx; | |
private readonly Thread _thread; | |
private bool _stop; | |
private readonly List<Action> _commands = new List<Action>(); | |
private Poller _poller; | |
private readonly List<ZmqSocket> _sockets = new List<ZmqSocket>(); | |
private bool _any; | |
public ZmqPollPool(TimeSpan pollTimeout) | |
{ | |
_pollTimeout = pollTimeout; | |
var wait = new ManualResetEventSlim(); | |
_thread = new Thread(()=>Runner(wait)); | |
_thread.Start(); | |
wait.Wait(); | |
} | |
/// <summary> | |
/// Sets the timeout for polling to a new value. Threadsafe. | |
/// </summary> | |
/// <param name="pollTimeout"></param> | |
public void SetTimeout(TimeSpan pollTimeout) | |
{ | |
Action action = () => _pollTimeout = pollTimeout; | |
if (_thread.ManagedThreadId == Thread.CurrentThread.ManagedThreadId) action(); | |
else MarshalAsync(action); | |
} | |
private void Runner(ManualResetEventSlim wait) | |
{ | |
Thread.CurrentThread.IsBackground = true; | |
_ctx = ZmqContext.Create(); | |
_poller = new Poller(); | |
wait.Set(); | |
wait = null; | |
while (!_stop) | |
{ | |
if (_any) _poller.Poll(_pollTimeout); else Thread.Sleep(_pollTimeout); | |
if (_commands.Count == 0) continue; | |
lock (_commands) | |
{ | |
foreach (var c in _commands) c(); | |
_commands.Clear(); | |
} | |
} | |
_poller.ClearSockets(); | |
_sockets.ForEach(_ => _.Close()); | |
_poller.Dispose(); | |
_ctx.Terminate(); | |
_ctx.Dispose(); | |
} | |
/// <summary> | |
/// Marshal an action into the working thread. Use this for socket operations. | |
/// </summary> | |
public void MarshalAndWait(Action action) | |
{ | |
var mux = new ManualResetEventSlim(); | |
lock (_commands) _commands.Add(()=> | |
{ | |
action(); | |
mux.Set(); | |
}); | |
if (!_stop) mux.Wait(); | |
} | |
/// <summary> | |
/// Marshal an action into the working thread. Use this for sending messages via a socket. | |
/// </summary> | |
public void MarshalAsync(Action action) | |
{ | |
lock (_commands) _commands.Add(action); | |
} | |
public delegate void RegisterSocket(ZmqSocket socket); | |
/// <summary> | |
/// Threadsafe access to the 0mq context. Use this to create a new socket. Register the socket with the provided callback to allow the PollPool to close the socket when disposed. | |
/// </summary> | |
public void MarshalAsync(Action<ZmqContext, RegisterSocket> action) | |
{ | |
MarshalAsync(() => action(_ctx, Register)); | |
} | |
/// <summary> | |
/// Threadsafe access to the 0mq context. Use this to create a new socket. Register the socket with the provided callback to allow the PollPool to close the socket when disposed. | |
/// </summary> | |
public void MarshalAndWait(Action<ZmqContext, RegisterSocket> action) | |
{ | |
MarshalAndWait(() => action(_ctx, Register)); | |
} | |
public delegate void MessageCallbackByteArray(byte[][] datagram); | |
public delegate void RegisterAndSubscribeForByteArrayEncodedMessages(ZmqSocket socket, MessageCallbackByteArray callback); | |
/// <summary> | |
/// Threadsafe access to the 0mq context. Use this to create a new socket. Register the socket with the provided callback to allow the PollPool to close the socket when disposed. | |
/// The callback will be called on the worker thread whenever a new complete message has been received. | |
/// </summary> | |
public void MarshalAsync(Action<ZmqContext, RegisterAndSubscribeForByteArrayEncodedMessages> action) | |
{ | |
MarshalAsync(() => action(_ctx, Register)); | |
} | |
/// <summary> | |
/// Threadsafe access to the 0mq context. Use this to create a new socket. Register the socket with the provided callback to allow the PollPool to close the socket when disposed. | |
/// The callback will be called on the worker thread whenever a new complete message has been received. | |
/// </summary> | |
public void MarshalAndWait(Action<ZmqContext, RegisterAndSubscribeForByteArrayEncodedMessages> action) | |
{ | |
MarshalAndWait(() => action(_ctx, Register)); | |
} | |
public delegate void MessageCallbackString(string[] datagram); | |
public delegate void RegisterAndSubscribeForStringEncodedMessages(ZmqSocket socket, MessageCallbackString callback, Encoding stringEncoding); | |
/// <summary> | |
/// Threadsafe access to the 0mq context. Use this to create a new socket. Register the socket with the provided callback to allow the PollPool to close the socket when disposed. | |
/// The callback will be called on the worker thread whenever a new complete message has been received. | |
/// </summary> | |
public void MarshalAsync(Action<ZmqContext, RegisterAndSubscribeForStringEncodedMessages> action) | |
{ | |
MarshalAsync(() => action(_ctx, Register)); | |
} | |
/// <summary> | |
/// Threadsafe access to the 0mq context. Use this to create a new socket. Register the socket with the provided callback to allow the PollPool to close the socket when disposed. | |
/// The callback will be called on the worker thread whenever a new complete message has been received. | |
/// </summary> | |
public void MarshalAndWait(Action<ZmqContext, RegisterAndSubscribeForStringEncodedMessages> action) | |
{ | |
MarshalAndWait(() => action(_ctx, Register)); | |
} | |
private void Register(ZmqSocket sock) | |
{ | |
_sockets.Add(sock); | |
} | |
private void Register(ZmqSocket sock, MessageCallbackByteArray onMessage) | |
{ | |
sock.ReceiveReady += (s, e) => | |
{ | |
var msg = e.Socket.ReceiveMessage().Select(_ => _.Buffer).ToArray(); | |
onMessage(msg); | |
}; | |
_poller.AddSocket(sock); | |
Register(sock); | |
_any = true; | |
} | |
private void Register(ZmqSocket sock, MessageCallbackString onMessage, Encoding stringEncoding) | |
{ | |
Register(sock, datagram => onMessage(datagram.Select(stringEncoding.GetString).ToArray())); | |
} | |
/// <summary> | |
/// Closes all registered sockets, terminates the worker thread and calls zmq_term to terminate the context. | |
/// ATTN: if this method does not return, you have probably created a registered socket on the context without registering it with the poll pool. | |
/// </summary> | |
public void Dispose() | |
{ | |
_stop = true; | |
_thread.Join(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment