/**
 * Sravz LLC
 * TODO: Handle multiple topics
 **/
#include <redis_subscriber.hpp>

void
RedisSubscriber::
subscribe()
{
    // Create a Subscriber.
    auto sub = redis_consumer_.subscriber();

    // Set callback functions.
    sub.on_message([=](std::string channel, std::string msg) {
        spsc_queue_parse_.push(msg);
    });

    // Subscribe to channels and patterns.
    std::vector<std::string> topics;
    boost::algorithm::split(topics, topics_, boost::is_any_of(","));
    BOOST_FOREACH( std::string topic, topics )
    {
        sub.subscribe(topic);
    }

    // Consume messages in a loop.
    while (true) {
        try {
            sub.consume();
        } catch (const sw::redis::Error &err) {
            std::cout << err.what() << std::endl;
        }
    }
}

void
RedisSubscriber::
parse()
{
    std::string msg;
    boost::json::parse_options opt {};
    opt.allow_comments = true;
    opt.allow_trailing_commas = true;
    long last_timestamp = 0;
    while(true) {
        get_message_with_backoff_(spsc_queue_parse_, msg);
        try{
            boost::json::value val = boost::json::parse(msg, {}, opt);
            auto obj = val.as_object();
            auto cur_last_timestamp = boost::json::value_to<long>(obj.at("t"));
            if (last_timestamp > cur_last_timestamp) {
                continue;
            }
            last_timestamp = cur_last_timestamp;
            std::cout << "Parse: " << obj.at("s") << " " << obj.at("p") << " " << obj.at("t") << " thread ID: " << boost::this_thread::get_id() << std::endl;
            spsc_queue_publish_.push(obj);
        } catch(std::exception const&  ex)
        {
            std::cout << "Cannot parse datapoint: " <<  ex.what() << std::endl;
        }
    }
}

void
RedisSubscriber::
publish()
{
    boost::json::object obj;
    int count = 0;
    std::vector<std::string> result;
    result.push_back("TS.MADD");
    while(true) {
        get_message_with_backoff_(spsc_queue_publish_, obj);
        std::cout << "Publish: " << obj.at("s") << " " << obj.at("p") << " " << obj.at("t") << " thread ID: " << boost::this_thread::get_id() << std::endl;
        try {
            result.push_back(boost::json::value_to<std::string>(obj.at("s")));
            result.push_back(boost::lexical_cast<std::string>(boost::json::value_to<long>(obj.at("t"))));
            result.push_back(boost::json::value_to<std::string>(obj.at("p")));
            if (count >= REDIS_BATCH_SIZE) {
                redis_publisher_.command(result.begin(), result.end());
                count=0;
                result.resize(1);
            } else {
                count++;
            }
        } catch(std::exception const&  ex)
        {
            std::cout << "Cannot push datapoint: " <<  ex.what() << std::endl;
        }
    }
}

void
RedisSubscriber::
run() {
    boost::asio::post(io_->get_executor(), beast::bind_front_handler(&RedisSubscriber::subscribe, shared_from_this()));
    boost::asio::post(io_->get_executor(), beast::bind_front_handler(&RedisSubscriber::parse, shared_from_this()));
    boost::asio::post(io_->get_executor(), beast::bind_front_handler(&RedisSubscriber::publish, shared_from_this()));
}

template <typename T1, typename T2>
void
RedisSubscriber::
get_message_with_backoff_(boost::lockfree::spsc_queue<T1, boost::lockfree::capacity<1024> > &queue, T2 &msg) {
    int backoff = 0;
    while(true) {
        if (queue.pop(msg))
        {
            return;
        } else {
            sleep(1 << backoff++);
            if (backoff > EXPONENTIAL_BACKOFF_LIMIT) {
                backoff = 0;
            }
        }
    }
}