Skip to content

Instantly share code, notes, and snippets.

@mbe24
Last active March 31, 2019 11:56
Show Gist options
  • Save mbe24/14850d46dbeb9069aba42247d275e371 to your computer and use it in GitHub Desktop.
Save mbe24/14850d46dbeb9069aba42247d275e371 to your computer and use it in GitHub Desktop.
ZMQ Control Pipe Example
package org.beyene.zmqpipe;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ZmqControlPipe {
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
private final ZContext context = new ZContext();
private final ZMQ.Socket emitter = context.createSocket(SocketType.PAIR);
private final String controlPipe = "inproc://pipe";
public static void main(String[] args) {
new ZmqControlPipe().run();
}
public void run() {
Controlled controlled = new Controlled(context, controlPipe);
executor.submit(controlled);
executor.schedule(this::close, 5, TimeUnit.SECONDS);
executor.shutdown();
try {
executor.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
if (!context.isClosed())
context.close();
}
private void close() {
emitter.connect(controlPipe);
emitter.send("close");
}
private static class Controlled implements Runnable {
private final ZContext context;
private final ZMQ.Poller poller;
public Controlled(ZContext context, String controlPipe) {
this.context = context;
this.poller = context.createPoller(2);
ZMQ.Socket sub = context.createSocket(SocketType.SUB);
sub.connect("tcp://zmq.devnet.iota.org:5556");
poller.register(sub, ZMQ.Poller.POLLIN);
ZMQ.Socket receiver = context.createSocket(SocketType.PAIR);
receiver.bind(controlPipe);
poller.register(receiver, ZMQ.Poller.POLLIN);
}
@Override
public void run() {
while (true) {
poller.poll();
if (poller.pollin(0)) {
ZMQ.Socket sub = poller.getSocket(0);
ZMsg message = ZMsg.recvMsg(sub);
System.out.println(message.poll().getString(ZMQ.CHARSET));
} else if (poller.pollin(1)) {
System.out.println("Received message on control pipe. Breaking...");
break;
}
}
for (int i = 0; i < poller.getSize(); i++) {
poller.getSocket(i).close();
}
if (!context.isClosed())
context.close();
}
}
}
@mbe24
Copy link
Author

mbe24 commented Mar 31, 2019

Revision 2 is the inproc:// version that uses a shared context (inproc:// doesn't seem to work with two different contexts, at least in jeromq).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment