Skip to content

Instantly share code, notes, and snippets.

@Panchatcharam
Last active Jan 28, 2018
Embed
What would you like to do?
This is the modified version of Ironhouse Pattern at https://github.com/zeromq/czmq/blob/master/examples/security/ironhouse2.c, where the original version is implementing the PUSH PULL and this version implements the PUB/SUB. This works fine but still lots of improvements to be done.
// The Ironhouse Pattern
//
// This is exactly the same example but broken into two threads
// so you can better see what client and server do, separately.
#include <czmq.h>
#include <iostream>
#include <sstream>
#include <sys/time.h>
#include <string>
struct timeval timeofday;
struct Quote {
std::string ticker;
std::string exchange;
std::string full_name;
double value;
std::string data;
long long timestampus;
long long timestamps;
friend std::ostream& operator<< (std::ostream& out, Quote& object)
{
out << object.ticker << " " << object.exchange << " " << object.full_name << " " << object.value << " " << object.data << " " << object.timestampus << " " << object.timestamps;
return out;
}
friend std::istream& operator>> (std::istream& in, Quote& object)
{
in >> object.ticker;
in >> object.exchange;
in >> object.full_name;
in >> object.value;
in >> object.data;
in >> object.timestampus;
in >> object.timestamps;
return in;
}
};
void await_term (zsock_t *pipe)
{
bool terminated = false;
while (!terminated) {
zmsg_t *msg = zmsg_recv (pipe);
if (!msg)
break; // Interrupted
char *command = zmsg_popstr (msg);
// All actors must handle $TERM in this way
if (streq (command, "$TERM"))
terminated = true;
else {
puts ("E: invalid message to actor");
assert (false);
}
free (command);
zmsg_destroy (&msg);
}
}
// The client task runs in its own context, and receives the
// server public key as an argument.
static void
client_task (zsock_t *pipe, void *args)
{
// Load our persistent certificate from disk
zcert_t *client_cert = zcert_load ("client_cert.txt");
assert (client_cert);
// Create client socket and configure it to use full encryption
zsock_t *client = zsock_new (ZMQ_SUB);
assert (client);
zcert_apply (client_cert, client);
zsock_set_curve_serverkey (client, (char *) args);
int rc = zsock_connect (client, "tcp://127.0.0.1:9000");
assert (rc == 0);
zsock_set_subscribe(client, "SECURITY");
zsock_signal (pipe, 0);
// Wait for our message, that signals the test was successful
// char *message = "";
for( unsigned int idx = 0; idx < 600; ++idx )
{
{
std::string Topic = zstr_recv (client);
std::cout<<"Topic : " << Topic << std::endl;
// printf("\n Message Received : %s",Topic.c_str());
// free (message);
Quote quote;
std::stringstream ss;
std::string content = zstr_recv (client);
ss << content;
ss >> quote;
std::cout<<"ticker : " << quote.ticker << std::endl;
std::cout<<"exchange : " << quote.exchange << std::endl;
std::cout<<"full_name : " << quote.full_name << std::endl;
std::cout<<"value : " << quote.value << std::endl;
std::cout<<"data : " << quote.data << std::endl;
std::cout<<"timestamp : " << quote.timestampus << std::endl << std::endl << std::endl;
//sleep(1);
}
}
puts ("\n Ironhouse test OK");
await_term (pipe);
// Free all memory we used
zsock_destroy (&client);
zcert_destroy (&client_cert);
}
static void
server_task (zsock_t *pipe, void *args)
{
// Start the authenticator and tell it do authenticate clients
// via the certificates stored in the .curve directory.
zactor_t *auth = zactor_new (zauth, NULL);
assert (auth);
assert (zstr_send (auth, "VERBOSE") == 0);
assert (zsock_wait (auth) >= 0);
assert (zstr_sendx (auth, "ALLOW", "127.0.0.1", NULL) == 0);
assert (zsock_wait (auth) >= 0);
assert (zstr_sendx (auth, "CURVE", ".curve", NULL) == 0);
assert (zsock_wait (auth) >= 0);
// Create server socket and configure it to use full encryption
zsock_t *server = zsock_new (ZMQ_PUB);
assert (server);
zcert_apply ((zcert_t *) args, server);
zsock_set_curve_server (server, 1);
int rc = zsock_bind (server, "tcp://*:9000");
assert (rc != -1);
zsock_signal (pipe, 0);
sleep(5);
for( unsigned int idx = 0; idx < 600; ++idx )
{
// Send our test message, just once
assert (zstr_sendm (server, "SECURITY") == 0);
//assert (zstr_send (server, "Hello PANCH") == 0);
{
Quote quote = {};
quote.ticker = "BHELL";
quote.exchange = "BSE";
quote.full_name = "BombayStockExchange";
quote.value = 0.0;
quote.data = "sample";
//quote.timestamp = time(0);
gettimeofday(&timeofday,NULL);
quote.timestampus = timeofday.tv_usec;
quote.timestamps = timeofday.tv_sec;
std::stringstream ss;
quote.value *= 0.1;
quote.data = "10020";
ss << quote;
zstr_sendm(server, "SECURITY");
zstr_send(server, ss.str().c_str());
}
sleep(1);
}
await_term (pipe);
// Free all memory we used
zsock_destroy (&server);
zactor_destroy (&auth);
}
int main (void)
{
// Create the certificate store directory and client certs
zcert_t *client_cert = zcert_new ();
int rc = zsys_dir_create (".curve");
assert (rc == 0);
zcert_set_meta (client_cert, "name", "Client test certificate");
zcert_save_public (client_cert, ".curve/testcert.pub");
rc = zcert_save (client_cert, "client_cert.txt");
assert (rc == 0);
zcert_destroy (&client_cert);
// Create the server certificate
zcert_t *server_cert = zcert_new ();
// Now start the two detached threads; each of these has their
// own ZeroMQ context.
zactor_t *server_actor = zactor_new (server_task, server_cert);
assert (server_actor);
zactor_t *client_actor = zactor_new (client_task, (void *) zcert_public_txt (server_cert));
assert (client_actor);
// Free the memory we used
// and coordinate process termination with actor termination
zactor_destroy (&client_actor);
zactor_destroy (&server_actor);
return 0;
}
@Panchatcharam

This comment has been minimized.

Copy link
Owner Author

@Panchatcharam Panchatcharam commented Sep 26, 2017

Build this with the following command line argument
# g++ -o iron iron.cpp -lczmq -lzmq -lsodium
It is evident that you need to have libsodium, czmq and zmq installed on your machine.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment