Skip to content

Instantly share code, notes, and snippets.

@drbobbeaty
Created October 19, 2010 18:25
Show Gist options
  • Save drbobbeaty/634738 to your computer and use it in GitHub Desktop.
Save drbobbeaty/634738 to your computer and use it in GitHub Desktop.
Simple ZeroMQ Receiver for Quotes
// System Headers
#include <iostream>
#include <stdio.h>
#include <string>
#include <stdint.h>
#include <sys/time.h>
// Third-Party Headers
#include <zmq.hpp>
// Other Headers
// Forward Declarations
// Public Constants
// Public DataTypes
// Public Data Constants
/**
* This function simply returns the number of microseconds since epoch.
* We'll use it to time the messages and bytes per second we receive.
*/
uint64_t usecSinceEpoch() {
struct timeval tv;
gettimeofday(&tv, NULL);
return (tv.tv_sec * 1000000 + tv.tv_usec);
}
/**
* This is the main test frame -- open up a ZMQ socket to the right
* URLs and listen to what's being transmitted. Simple.
*/
int main(int argc, char *argv[]) {
bool error = false;
// make the ZMQ Context and Socket for what we need to "hear"
zmq::context_t *mContext = NULL;
if (!error) {
mContext = new zmq::context_t(1);
if (mContext == NULL) {
error = true;
std::cout << "could not create the ZMQ context" << std::endl;
}
}
zmq::socket_t *mSocket = NULL;
if (!error) {
mSocket = new zmq::socket_t(*mContext, ZMQ_SUB);
if (mSocket == NULL) {
error = true;
std::cout << "could not create the ZMQ socket" << std::endl;
} else {
// make sure we subscribe to all messages we get
mSocket->setsockopt(ZMQ_SUBSCRIBE, "", 0);
}
}
// verify it's all initialized OK
if (!error) {
std::cout << "Initialization complete." << std::endl;
}
// now let's connect to the URLs we need to listen to
if (!error) {
mSocket->connect("epgm://eth0;239.22.3.1:11111");
mSocket->connect("epgm://eth0;239.22.3.2:11111");
mSocket->connect("epgm://eth0;239.22.3.3:11111");
mSocket->connect("epgm://eth0;239.22.3.4:11111");
mSocket->connect("epgm://eth0;239.22.3.5:11111");
mSocket->connect("epgm://eth0;239.22.3.6:11111");
mSocket->connect("epgm://eth0;239.22.3.7:11111");
mSocket->connect("epgm://eth0;239.22.3.8:11111");
mSocket->connect("epgm://eth0;239.22.3.9:11111");
mSocket->connect("epgm://eth0;239.22.3.10:11111");
mSocket->connect("epgm://eth0;239.22.3.11:11111");
mSocket->connect("epgm://eth0;239.22.3.12:11111");
mSocket->connect("epgm://eth0;239.22.3.13:11111");
mSocket->connect("epgm://eth0;239.22.3.14:11111");
mSocket->connect("epgm://eth0;239.22.3.15:11111");
mSocket->connect("epgm://eth0;239.22.3.16:11111");
mSocket->connect("epgm://eth0;239.22.3.17:11111");
mSocket->connect("epgm://eth0;239.22.3.18:11111");
mSocket->connect("epgm://eth0;239.22.3.19:11111");
mSocket->connect("epgm://eth0;239.22.3.20:11111");
mSocket->connect("epgm://eth0;239.22.3.21:11111");
mSocket->connect("epgm://eth0;239.22.3.22:11111");
mSocket->connect("epgm://eth0;239.22.3.23:11111");
mSocket->connect("epgm://eth0;239.22.3.24:11111");
mSocket->connect("epgm://eth0;239.22.3.25:11111");
mSocket->connect("epgm://eth0;239.22.3.26:11111");
mSocket->connect("epgm://eth0;239.22.3.27:11111");
}
// now let's listen for a while and just record the stats
if (!error) {
uint64_t msgCnt = 0;
uint64_t byteCnt = 0;
uint64_t usec = usecSinceEpoch();
while (true) {
// get a message from one of the connections
zmq::message_t msg;
mSocket->recv(&msg);
// count it and it's contents
++msgCnt;
byteCnt += msg.size();
// see if we need to report this all
if (msgCnt % 20000 == 0) {
// get the snapshot time and log what we have
uint64_t now = usecSinceEpoch();
printf("%ld kmsgs = %.1f kb in %.3f msec: %.1f kmsgs/sec... %.1f kb/sec\n",
msgCnt/1000, byteCnt/1024.0, ((now - usec)/1000.0),
(msgCnt/1000.0)/((now - usec) * 1.0e-6),
(byteCnt/1024.0)/((now - usec) * 1.0e-6));
// now reset the counters
msgCnt = 0;
byteCnt = 0;
usec = now;
}
}
}
// clean it all up
if (mSocket != NULL) {
delete mSocket;
mSocket = NULL;
}
if (mContext != NULL) {
delete mContext;
mContext = NULL;
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment