Skip to content

Instantly share code, notes, and snippets.

@jspanchu
Last active December 22, 2022 04:04
Show Gist options
  • Save jspanchu/129400e5085c295bc5b5f4a33f54e15e to your computer and use it in GitHub Desktop.
Save jspanchu/129400e5085c295bc5b5f4a33f54e15e to your computer and use it in GitHub Desktop.
rxcpp subjects
// clang-format off
/**
* Description:
* Difference between various rxcpp subjects (SimpleSubject, BehaviorSubject and ReplaySubject)
* Compile:
* $ g++ -I/path/to/RxCpp/Rx/v2/src -g -Wall -Wextra -Werror -Wunused -std=gnu++17 rxcpp_subjects.cpp -o rxcpp_subjects -lpthread -ldl
* Run:
* $ ./main [-s] [-r] [-b] [-t <wait_before_subscribe (in ms)>]
* -b: BehaviourSubject
* -r: ReplaySubject
* -s: SimpleSubject
*
* Legend (event types):
* S - subscribed
* + - new event
* -> - event pushed into a subject
* <- - event emitted from a subject
* U - source unsubbed
*
* Ex: Try replay subject and subscribe 1 second after value emission begins.
* $ ./main -r -t 1000
*/
// clang-format on
#include "rxcpp/rx-includes.hpp"
#include <chrono>
#include <future>
#include <memory>
#include <ostream>
#include <thread>
static std::promise<bool> promise;
static std::chrono::milliseconds wait_before_subscribe{300};
template <class SType>
void run(std::shared_ptr<SType> subject, rxcpp::schedulers::run_loop &runLoop) {
auto source =
rxcpp::observable<>::create<int>([](rxcpp::subscriber<int> s) {
std::cout << "S | Thread: " << std::this_thread::get_id() << '\n';
for (int i = 0; i < 50; ++i) {
if (!s.is_subscribed()) {
std::cout << "U | Thread: " << std::this_thread::get_id() << '\n';
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::cout << "+ | Thread: " << std::this_thread::get_id() << " | "
<< i << '\n';
std::flush(std::cout);
s.on_next(i);
}
s.on_completed();
}).subscribe_on(rxcpp::observe_on_new_thread()); // emit integers from
// another thread.
auto subscription1 = source.subscribe([subject](int v) {
std::cout << "-> | Thread: " << std::this_thread::get_id() << " | " << v
<< '\n';
std::flush(std::cout);
subject->get_subscriber().on_next(v);
});
std::this_thread::sleep_for(wait_before_subscribe);
// receive integers on thread that called `run(subject, runLoop)`
auto subscription2 =
subject->get_observable()
.observe_on(rxcpp::observe_on_run_loop(runLoop))
.subscribe([](int v) {
std::cout << "<- | Thread: " << std::this_thread::get_id() << " | "
<< v << '\n';
std::flush(std::cout);
});
auto fut = promise.get_future();
while (true) {
while (!runLoop.empty() && runLoop.peek().when < runLoop.now()) {
runLoop.dispatch();
}
auto status = fut.wait_for(std::chrono::milliseconds(10));
if (status == std::future_status::ready) {
if (fut.get()) {
std::cout << "Unsub\n";
std::flush(std::cout);
subscription2.unsubscribe();
subscription1.unsubscribe();
break;
}
}
}
}
void simple_subject() {
auto runLoop = rxcpp::schedulers::run_loop();
auto subject = std::make_shared<rxcpp::subjects::subject<int>>();
run(subject, runLoop);
}
void behavior_subject() {
auto runLoop = rxcpp::schedulers::run_loop();
auto behavior = std::make_shared<rxcpp::subjects::behavior<int>>(-1);
run(behavior, runLoop);
}
void replay_subject() {
auto runLoop = rxcpp::schedulers::run_loop();
auto replay = std::make_shared<
rxcpp::subjects::replay<int, rxcpp::observe_on_one_worker>>(
10, rxcpp::observe_on_run_loop(runLoop));
run(replay, runLoop);
}
int main(int argc, char *argv[]) {
std::unique_ptr<std::thread> t;
for (int i = 0; i < argc; ++i) {
const char *arg = argv[i];
switch (arg[1]) {
case 'b':
t = std::make_unique<std::thread>(&behavior_subject);
break;
case 'r':
t = std::make_unique<std::thread>(&replay_subject);
break;
case 't':
wait_before_subscribe = std::chrono::milliseconds(std::atoi(argv[i + 1]));
break;
case 's':
t = std::make_unique<std::thread>(&simple_subject);
break;
default:
break;
}
}
if (t == nullptr) {
t = std::make_unique<std::thread>(&simple_subject);
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
promise.set_value(true);
t->join();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment