Skip to content

Instantly share code, notes, and snippets.

@Akashleena
Created May 18, 2020 13:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Akashleena/e115d874fd4a3f3f3c3fab3f972b6dd2 to your computer and use it in GitHub Desktop.
Save Akashleena/e115d874fd4a3f3f3c3fab3f972b6dd2 to your computer and use it in GitHub Desktop.
gazebo plugin implement zmq pub sub
/// \brief socket_com.cpp : Definition of SocketCom Class
#include <atomic>
#include <csignal>
#include <iostream>
#include <chrono>
#include <string>
#include <thread>
#include "socket_com.hpp"
// #include <proto/my_msg.pb.h>
using namespace gazebo;
/** \brief signal_handler: Function callback executed when a SIGINT or SIGTERM signals are captured.
*
* This is used to break the infinite loop that handles messages and exit the program smoothly.
* \param _signal
*/
void SocketCom::signal_handler(int _signal)
{
if (_signal == SIGINT || _signal == SIGTERM)
{
s_terminate = true;
}
}
/// \var s_instance_cnt: Counter that holds the number of indexes
/// to identify the model and match it to the coresponding publisher
int SocketCom::s_instance_cnt = 0;
/// \var s_terminate: semaphore to terminate thread
std::atomic<bool> SocketCom::s_terminate(false);
/// \class Constructor
SocketCom::SocketCom()
{
++s_instance_cnt;
this->agv_id = s_instance_cnt;
// Install a signal handler for SIGINT and SIGTERM.
std::signal(SIGINT, SocketCom::signal_handler);
std::signal(SIGTERM, SocketCom::signal_handler);
}
/// \class Desctructor
SocketCom::~SocketCom()
{
std::lock_guard<std::mutex> lock_guard(this->signal_mutex);
std::cout << "[AGV" << agv_id << "] >> Detaching AGV" << std::endl;
--s_instance_cnt;
s_terminate = true;
this->thread_sub.join();
this->thread_pub.join();
this->gz_publisher->Fini();
// wait until everything cleaned up
boost::this_thread::sleep_for(boost::chrono::milliseconds(100));
std::cout << "[AGV" << agv_id << "] >> Detached AGV" << std::endl;
s_terminate = false;
}
/// \brief Plugin Load function
/// \param[in] _model pointer to the model defining this plugin
/// \param[in] _sdf pointer to the SDF of the model
void SocketCom::Load(physics::ModelPtr _model,
sdf::ElementPtr _sdf)
{
std::lock_guard<std::mutex> lock_guard(this->signal_mutex);
GZ_ASSERT(_model, "model element is null");
GZ_ASSERT(_sdf, "sdf element is null");
this->model = _model;
this->world = _model->GetWorld();
this->sdf = _sdf;
// In case of deleting and reinserting model to world
s_terminate = false;
try
{
/// \brief Establish Gazebo Publisher to advertise data gathered from Endpoint
this->node = transport::NodePtr(new transport::Node());
this->node->Init(world->Name());
std::string topic_name = "~/" + model->GetName();
// this->gz_publisher = this->node->Advertise<gazebo::msgs::AGVBase>(topic_name);
/// \brief Establish 0MQ Subscriber and poll data from endpoint
this->context = boost::shared_ptr<zmq::context_t>(new zmq::context_t(1));
this->zmq_subscriber = boost::shared_ptr<zmq::socket_t>(new zmq::socket_t(*context, ZMQ_SUB));
this->zmq_subscriber->setsockopt(ZMQ_SUBSCRIBE, "", 0);
this->zmq_subscriber->connect("tcp://localhost:5563");
boost::this_thread::sleep_for(boost::chrono::milliseconds(100)); //pause to connect
this->context_pub = boost::shared_ptr<zmq::context_t>(new zmq::context_t(1));
this->zmq_publisher = boost::shared_ptr<zmq::socket_t>(new zmq::socket_t(*context_pub, ZMQ_PUB));
this->zmq_publisher->bind("tcp://*:5564");
/// \brief Start subscription
this->thread_sub = boost::thread(boost::bind(&SocketCom::Subscribe, this));
// this->thread_pub = boost::thread( boost::bind(&SocketCom::Publish, this) );
this->_updateConnection = event::Events::ConnectWorldUpdateBegin(
std::bind(&SocketCom::OnUpdate, this));
std::cout << "[AGV" << agv_id << "][SocketCom]: Started!" << std::endl;
}
catch (const zmq::error_t &e)
{
std::cout << "[AGV" << agv_id << "][SocketCom]: " << e.what() << " >> Could not connect to Publisher!" << std::endl;
}
}
/// \brief Publish(): Sending Data to Endpoinrt
void SocketCom::Publish(void)
{
// while (!s_terminate)
// {
// }
}
void SocketCom::OnUpdate()
{
gzmsg << "send msg\n";
boost::thread t([this]() {
std::string buf = "hello from gz";
zmq::message_t msg(buf.length());
memcpy ((void *) msg.data (), buf.c_str(), buf.size());
this->zmq_publisher->send(msg);
gzmsg<<"message sended\n"; });
}
/// \brief Subscribe(): Handling ZMQ recv Data
void SocketCom::Subscribe(void)
{
while (!s_terminate)
{
zmq::message_t zmq_msg;
this->zmq_subscriber->recv(&zmq_msg);
if (zmq_msg.size() > 0)
{
std::string response = std::string(
static_cast<const char *>(zmq_msg.data()),
zmq_msg.size());
gzmsg << response << "\n";
}
}
}
GZ_REGISTER_MODEL_PLUGIN(SocketCom)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment