Skip to content

Instantly share code, notes, and snippets.

@reuben
Created May 19, 2012 19:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save reuben/2732194 to your computer and use it in GitHub Desktop.
Save reuben/2732194 to your computer and use it in GitHub Desktop.
0MQ subscriber code for a Triskle client
/*
* vim: set ts=4 :
* =============================================================================
* Triskle Commons
* Copyright (C) 2011 Reuben 'Seta00' Morais. All rights reserved.
* =============================================================================
*
* This program is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License, version 3.0, as published by the
* Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
#include "zmqsubscriber.h"
Subscriber::Subscriber(void *ctx, QString &addr) {
socket = zmq_socket(ctx, ZMQ_SUB);
controller = zmq_socket(ctx, ZMQ_REP);
address = addr;
}
bool Subscriber::addListener(IListener *listener) {
if (!listeners.contains(listener)) {
listeners.append(listener);
return true;
}
return false;
}
void Subscriber::run() {
zmq_connect(socket, address.toAscii().data());
zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0);
zmq_bind(controller, "inproc://triskle-thread-controller");
zmq_pollitem_t items[] = {
{socket, 0, ZMQ_POLLIN, 0},
{controller, 0, ZMQ_POLLIN, 0}
};
while(zmq_poll(items, 1, -1)) {
if (items[1].revents & ZMQ_POLLIN)
break;
vector<zmq_msg_t*> msgs;
qint64 rcvmore = 1;
size_t szMore = sizeof(rcvmore);
while (rcvmore == 1) {
zmq_msg_t *msg = new zmq_msg_t;
zmq_msg_init(msg);
zmq_recvmsg(socket, msg, 0);
msgs.push_back(msg);
zmq_getsockopt(socket, ZMQ_RCVMORE, &rcvmore, &szMore);
}
DataMessage dataMsg = DataMessage::parseMessage(msgs);
foreach (IListener *l, listeners)
l->onMessage(dataMsg);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment