/** * Sravz LLC **/ #ifndef SRAVZ_REDIS_SUBSCRIBER_H #define SRAVZ_REDIS_SUBSCRIBER_H #include "util.hpp" using RedisInstance = sw::redis::Redis; class RedisSubscriber: public std::enable_shared_from_this<RedisSubscriber> { public: RedisSubscriber(std::string topics, std::shared_ptr<boost::asio::thread_pool>& io) : topics_(topics) ,redis_consumer_(RedisInstance(getRedisConnectionOptions())) ,redis_publisher_(RedisInstance(getRedisConnectionOptions())) ,io_(io) { } ~RedisSubscriber() { std::cout << "Subscriber closed" << std::endl; } void subscribe(); void parse(); void publish(); void run(); private: std::string topics_; RedisInstance redis_consumer_; RedisInstance redis_publisher_; std::shared_ptr<boost::asio::thread_pool> io_; boost::lockfree::spsc_queue<std::string, boost::lockfree::capacity<1024> > spsc_queue_parse_; boost::lockfree::spsc_queue<boost::json::object, boost::lockfree::capacity<1024> > spsc_queue_publish_; const int REDIS_BATCH_SIZE = 10; const int EXPONENTIAL_BACKOFF_LIMIT = 5; template<typename T1, typename T2> void get_message_with_backoff_(boost::lockfree::spsc_queue<T1, boost::lockfree::capacity<1024> > &queue, T2 &msg); }; #endif