Created
May 18, 2020 13:21
-
-
Save Akashleena/e115d874fd4a3f3f3c3fab3f972b6dd2 to your computer and use it in GitHub Desktop.
gazebo plugin implement zmq pub sub
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// \brief socket_com.cpp : Definition of SocketCom Class | |
#include <atomic> | |
#include <csignal> | |
#include <iostream> | |
#include <chrono> | |
#include <string> | |
#include <thread> | |
#include "socket_com.hpp" | |
// #include <proto/my_msg.pb.h> | |
using namespace gazebo; | |
/** \brief signal_handler: Function callback executed when a SIGINT or SIGTERM signals are captured. | |
* | |
* This is used to break the infinite loop that handles messages and exit the program smoothly. | |
* \param _signal | |
*/ | |
void SocketCom::signal_handler(int _signal) | |
{ | |
if (_signal == SIGINT || _signal == SIGTERM) | |
{ | |
s_terminate = true; | |
} | |
} | |
/// \var s_instance_cnt: Counter that holds the number of indexes | |
/// to identify the model and match it to the coresponding publisher | |
int SocketCom::s_instance_cnt = 0; | |
/// \var s_terminate: semaphore to terminate thread | |
std::atomic<bool> SocketCom::s_terminate(false); | |
/// \class Constructor | |
SocketCom::SocketCom() | |
{ | |
++s_instance_cnt; | |
this->agv_id = s_instance_cnt; | |
// Install a signal handler for SIGINT and SIGTERM. | |
std::signal(SIGINT, SocketCom::signal_handler); | |
std::signal(SIGTERM, SocketCom::signal_handler); | |
} | |
/// \class Desctructor | |
SocketCom::~SocketCom() | |
{ | |
std::lock_guard<std::mutex> lock_guard(this->signal_mutex); | |
std::cout << "[AGV" << agv_id << "] >> Detaching AGV" << std::endl; | |
--s_instance_cnt; | |
s_terminate = true; | |
this->thread_sub.join(); | |
this->thread_pub.join(); | |
this->gz_publisher->Fini(); | |
// wait until everything cleaned up | |
boost::this_thread::sleep_for(boost::chrono::milliseconds(100)); | |
std::cout << "[AGV" << agv_id << "] >> Detached AGV" << std::endl; | |
s_terminate = false; | |
} | |
/// \brief Plugin Load function | |
/// \param[in] _model pointer to the model defining this plugin | |
/// \param[in] _sdf pointer to the SDF of the model | |
void SocketCom::Load(physics::ModelPtr _model, | |
sdf::ElementPtr _sdf) | |
{ | |
std::lock_guard<std::mutex> lock_guard(this->signal_mutex); | |
GZ_ASSERT(_model, "model element is null"); | |
GZ_ASSERT(_sdf, "sdf element is null"); | |
this->model = _model; | |
this->world = _model->GetWorld(); | |
this->sdf = _sdf; | |
// In case of deleting and reinserting model to world | |
s_terminate = false; | |
try | |
{ | |
/// \brief Establish Gazebo Publisher to advertise data gathered from Endpoint | |
this->node = transport::NodePtr(new transport::Node()); | |
this->node->Init(world->Name()); | |
std::string topic_name = "~/" + model->GetName(); | |
// this->gz_publisher = this->node->Advertise<gazebo::msgs::AGVBase>(topic_name); | |
/// \brief Establish 0MQ Subscriber and poll data from endpoint | |
this->context = boost::shared_ptr<zmq::context_t>(new zmq::context_t(1)); | |
this->zmq_subscriber = boost::shared_ptr<zmq::socket_t>(new zmq::socket_t(*context, ZMQ_SUB)); | |
this->zmq_subscriber->setsockopt(ZMQ_SUBSCRIBE, "", 0); | |
this->zmq_subscriber->connect("tcp://localhost:5563"); | |
boost::this_thread::sleep_for(boost::chrono::milliseconds(100)); //pause to connect | |
this->context_pub = boost::shared_ptr<zmq::context_t>(new zmq::context_t(1)); | |
this->zmq_publisher = boost::shared_ptr<zmq::socket_t>(new zmq::socket_t(*context_pub, ZMQ_PUB)); | |
this->zmq_publisher->bind("tcp://*:5564"); | |
/// \brief Start subscription | |
this->thread_sub = boost::thread(boost::bind(&SocketCom::Subscribe, this)); | |
// this->thread_pub = boost::thread( boost::bind(&SocketCom::Publish, this) ); | |
this->_updateConnection = event::Events::ConnectWorldUpdateBegin( | |
std::bind(&SocketCom::OnUpdate, this)); | |
std::cout << "[AGV" << agv_id << "][SocketCom]: Started!" << std::endl; | |
} | |
catch (const zmq::error_t &e) | |
{ | |
std::cout << "[AGV" << agv_id << "][SocketCom]: " << e.what() << " >> Could not connect to Publisher!" << std::endl; | |
} | |
} | |
/// \brief Publish(): Sending Data to Endpoinrt | |
void SocketCom::Publish(void) | |
{ | |
// while (!s_terminate) | |
// { | |
// } | |
} | |
void SocketCom::OnUpdate() | |
{ | |
gzmsg << "send msg\n"; | |
boost::thread t([this]() { | |
std::string buf = "hello from gz"; | |
zmq::message_t msg(buf.length()); | |
memcpy ((void *) msg.data (), buf.c_str(), buf.size()); | |
this->zmq_publisher->send(msg); | |
gzmsg<<"message sended\n"; }); | |
} | |
/// \brief Subscribe(): Handling ZMQ recv Data | |
void SocketCom::Subscribe(void) | |
{ | |
while (!s_terminate) | |
{ | |
zmq::message_t zmq_msg; | |
this->zmq_subscriber->recv(&zmq_msg); | |
if (zmq_msg.size() > 0) | |
{ | |
std::string response = std::string( | |
static_cast<const char *>(zmq_msg.data()), | |
zmq_msg.size()); | |
gzmsg << response << "\n"; | |
} | |
} | |
} | |
GZ_REGISTER_MODEL_PLUGIN(SocketCom) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment