Last active
December 22, 2022 04:04
-
-
Save jspanchu/129400e5085c295bc5b5f4a33f54e15e to your computer and use it in GitHub Desktop.
rxcpp subjects
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// clang-format off | |
/** | |
* Description: | |
* Difference between various rxcpp subjects (SimpleSubject, BehaviorSubject and ReplaySubject) | |
* Compile: | |
* $ g++ -I/path/to/RxCpp/Rx/v2/src -g -Wall -Wextra -Werror -Wunused -std=gnu++17 rxcpp_subjects.cpp -o rxcpp_subjects -lpthread -ldl | |
* Run: | |
* $ ./main [-s] [-r] [-b] [-t <wait_before_subscribe (in ms)>] | |
* -b: BehaviourSubject | |
* -r: ReplaySubject | |
* -s: SimpleSubject | |
* | |
* Legend (event types): | |
* S - subscribed | |
* + - new event | |
* -> - event pushed into a subject | |
* <- - event emitted from a subject | |
* U - source unsubbed | |
* | |
* Ex: Try replay subject and subscribe 1 second after value emission begins. | |
* $ ./main -r -t 1000 | |
*/ | |
// clang-format on | |
#include "rxcpp/rx-includes.hpp" | |
#include <chrono> | |
#include <future> | |
#include <memory> | |
#include <ostream> | |
#include <thread> | |
static std::promise<bool> promise; | |
static std::chrono::milliseconds wait_before_subscribe{300}; | |
template <class SType> | |
void run(std::shared_ptr<SType> subject, rxcpp::schedulers::run_loop &runLoop) { | |
auto source = | |
rxcpp::observable<>::create<int>([](rxcpp::subscriber<int> s) { | |
std::cout << "S | Thread: " << std::this_thread::get_id() << '\n'; | |
for (int i = 0; i < 50; ++i) { | |
if (!s.is_subscribed()) { | |
std::cout << "U | Thread: " << std::this_thread::get_id() << '\n'; | |
break; | |
} | |
std::this_thread::sleep_for(std::chrono::milliseconds(100)); | |
std::cout << "+ | Thread: " << std::this_thread::get_id() << " | " | |
<< i << '\n'; | |
std::flush(std::cout); | |
s.on_next(i); | |
} | |
s.on_completed(); | |
}).subscribe_on(rxcpp::observe_on_new_thread()); // emit integers from | |
// another thread. | |
auto subscription1 = source.subscribe([subject](int v) { | |
std::cout << "-> | Thread: " << std::this_thread::get_id() << " | " << v | |
<< '\n'; | |
std::flush(std::cout); | |
subject->get_subscriber().on_next(v); | |
}); | |
std::this_thread::sleep_for(wait_before_subscribe); | |
// receive integers on thread that called `run(subject, runLoop)` | |
auto subscription2 = | |
subject->get_observable() | |
.observe_on(rxcpp::observe_on_run_loop(runLoop)) | |
.subscribe([](int v) { | |
std::cout << "<- | Thread: " << std::this_thread::get_id() << " | " | |
<< v << '\n'; | |
std::flush(std::cout); | |
}); | |
auto fut = promise.get_future(); | |
while (true) { | |
while (!runLoop.empty() && runLoop.peek().when < runLoop.now()) { | |
runLoop.dispatch(); | |
} | |
auto status = fut.wait_for(std::chrono::milliseconds(10)); | |
if (status == std::future_status::ready) { | |
if (fut.get()) { | |
std::cout << "Unsub\n"; | |
std::flush(std::cout); | |
subscription2.unsubscribe(); | |
subscription1.unsubscribe(); | |
break; | |
} | |
} | |
} | |
} | |
void simple_subject() { | |
auto runLoop = rxcpp::schedulers::run_loop(); | |
auto subject = std::make_shared<rxcpp::subjects::subject<int>>(); | |
run(subject, runLoop); | |
} | |
void behavior_subject() { | |
auto runLoop = rxcpp::schedulers::run_loop(); | |
auto behavior = std::make_shared<rxcpp::subjects::behavior<int>>(-1); | |
run(behavior, runLoop); | |
} | |
void replay_subject() { | |
auto runLoop = rxcpp::schedulers::run_loop(); | |
auto replay = std::make_shared< | |
rxcpp::subjects::replay<int, rxcpp::observe_on_one_worker>>( | |
10, rxcpp::observe_on_run_loop(runLoop)); | |
run(replay, runLoop); | |
} | |
int main(int argc, char *argv[]) { | |
std::unique_ptr<std::thread> t; | |
for (int i = 0; i < argc; ++i) { | |
const char *arg = argv[i]; | |
switch (arg[1]) { | |
case 'b': | |
t = std::make_unique<std::thread>(&behavior_subject); | |
break; | |
case 'r': | |
t = std::make_unique<std::thread>(&replay_subject); | |
break; | |
case 't': | |
wait_before_subscribe = std::chrono::milliseconds(std::atoi(argv[i + 1])); | |
break; | |
case 's': | |
t = std::make_unique<std::thread>(&simple_subject); | |
break; | |
default: | |
break; | |
} | |
} | |
if (t == nullptr) { | |
t = std::make_unique<std::thread>(&simple_subject); | |
} | |
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); | |
promise.set_value(true); | |
t->join(); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment