Skip to content

Instantly share code, notes, and snippets.

@JosephLaurino
Last active December 25, 2015 11:49
Show Gist options
  • Save JosephLaurino/6971996 to your computer and use it in GitHub Desktop.
Save JosephLaurino/6971996 to your computer and use it in GitHub Desktop.
simple ZeroMQ sample, CLIENT sends char input from console to SERVER, while SERVER transmits any new char input to all subscribed CLIENT(s) this sample uses c++11 for portable std::thread
////////////////////////////////////////////////////////////////////
// client_transmit_input.cpp FILE
////////////////////////////////////////////////////////////////////
#include "client_transmit_input.h"
// -------------------------------------------------------------------
void ClientRequest_SendCharHandler(zmq::context_t* context, std::string threadName)
{
zmq::socket_t socket (*context, ZMQ_REQ);
std::cout << std::endl << "Connecting to relay char server..." ;
socket.connect ("tcp://localhost:5555");
while(1)
{
zmq::message_t request (2);
char input;
std::cin >> input;
memcpy ((void *) request.data (), &input, 2);
// std::cout << threadName << " Sending " << input << std::endl;
socket.send (request);
// Get the reply.
zmq::message_t reply; // empty reply
socket.recv (&reply);
}
}
// -------------------------------------------------------------------
void ClientSubscription_InputHandler(zmq::context_t* pContext)
{
// Socket to talk to server
std::cout << std::endl << "Subscribing to input updates from server";
zmq::socket_t subscriber (*pContext, ZMQ_SUB);
subscriber.connect("tcp://localhost:5556");
// ? -- what if we want to subscribe to everything???
const char *filter = "0";
subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter));
while( 1 )
{
zmq::message_t update;
subscriber.recv(&update);
std::string filter;
std::string input;
std::istringstream iss(static_cast<char*>(update.data()));
iss >> filter >> input;
std::cout << input;
}
}
// -------------------------------------------------------------------
void test_client_transmit_input(zmq::context_t *pContext)
{
std::string threadName("thread_");
std::thread clientRequestSendCharThread(ClientRequest_SendCharHandler, pContext, threadName);
std::thread clientSubscriptionThread(ClientSubscription_InputHandler, pContext);
// also launch a thread for subscriber... join so that main does not exit
clientSubscriptionThread.join();
}
/////////////////////////////////////////////////////
// client_transmit_input.h FILE
#ifndef CLIENT_TRANSMIT_INPUT_H
#define CLIENT_TRANSMIT_INPUT_H
#include <zmq.hpp>
#include <string>
#include <iostream>
#include <thread>
#include <iostream>
#include <sstream>
#include <vector>
void ClientRequest_SendCharHandler(zmq::context_t* context, std::string threadName);
void ClientSubscription_InputHandler(zmq::context_t* pContext);
void test_client_transmit_input(zmq::context_t *pContext);
#endif
///////////////////////////////////
// main.cpp FILE for client
#include "client_transmit_input.h"
// -------------------------------------------------------------------
int main ()
{
zmq::context_t context(1);
test_client_transmit_input(&context);
return 0;
}
//////////////////////////////////////
// main.cpp FILE for server
#include "server_transmit_input.h"
// -------------------------------------------------------------------
int main ()
{
zmq::context_t context (1);
test_transmit_input(&context);
return 0;
}
//////////////////////////////////////////////
// server_transmit_input.cpp FILE
//////////////////////////////////////////////
#include "server_transmit_input.h"
// ? --- learn how to do multi-threaded signaling using the
// zeromq way... no shared data is recommended
std::mutex g_hasNewInputDataMutex;
bool g_hasNewInputData = false;
char g_inputData;
// -------------------------------------------------------------------
void ServerRespond_SendCharHandler(zmq::context_t* pContext)
{
zmq::socket_t socket (*pContext, ZMQ_REP);
socket.bind ("tcp://*:5555");
float counter = 0.0f;
std::cout << std::endl << "ServerRespond_SendCharHandler listening at " << "tcp://*:5555" ;
while (true)
{
zmq::message_t request;
socket.recv (&request);
{
std::lock_guard<std::mutex> lock(g_hasNewInputDataMutex);
g_hasNewInputData = true;
g_inputData = *((char*)(request.data()));
std::cout << g_inputData;
}
socket.send (request);
}
}
// -------------------------------------------------------------------
void ServerPublisher_InputHandler(zmq::context_t* pContext)
{
zmq::socket_t publisher (*pContext, ZMQ_PUB);
publisher.bind("tcp://*:5556");
std::cout << std::endl<< "ServerPublisher_InputHandler listening at " << "tcp://*:5556";
while (1)
{
std::lock_guard<std::mutex> lock(g_hasNewInputDataMutex);
if( g_hasNewInputData == true )
{
// Send message to all subscribers
zmq::message_t message(10);
sprintf_s ((char *) message.data(), 10, "0 %c", g_inputData);
publisher.send(message);
g_hasNewInputData = false;
}
ThreadSleepInMilliseconds(1);
}
}
// -------------------------------------------------------------------
void test_transmit_input(zmq::context_t* pContext)
{
std::thread serverRespondThread(ServerRespond_SendCharHandler, pContext);
std::thread serverPublisherThread(ServerPublisher_InputHandler, pContext);
serverPublisherThread.join(); // wait for this thread to finish
}
///////////////////////////////////////
// server_transmit_input.h FILE
///////////////////////////////////////
#ifndef SERVER_TRANSMIT_INPUT_H
#define SERVER_TRANSMIT_INPUT_H
#include <zmq.hpp>
#include <string>
#include <iostream>
#include <thread>
#include <chrono>
#include <math.h>
#include <mutex>
#include "shared_code.h"
void ServerRespond_SendCharHandler(zmq::context_t* pContext);
void ServerPublisher_InputHandler(zmq::context_t* pContext);
void test_transmit_input(zmq::context_t* pContext);
#endif
////////////////////////////////////////////////////
// shared_code.h FILE
#ifndef SHARED_CODE
#define SHARED_CODE
#include <thread>
#include <chrono>
// -------------------------------------------------------------------
inline void ThreadSleepInMilliseconds(int milliseconds)
{
std::chrono::milliseconds dura( milliseconds );
std::this_thread::sleep_for( dura );
}
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment