Skip to content

Instantly share code, notes, and snippets.

@rinthel
Created June 10, 2019 11:47
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save rinthel/bd0e3f510a6a6ff03fdf22ef19b3328b to your computer and use it in GitHub Desktop.
Save rinthel/bd0e3f510a6a6ff03fdf22ef19b3328b to your computer and use it in GitHub Desktop.
rxcpp scheduler example code
console->info("===== schduler =====");
console->info("main thread id: {}", getThreadId());
Rx::schedulers::run_loop runloop;
Rx::subject<int> subject;
auto observable = subject.get_observable();
observable
.map([&](int v) {
console->info("thread[{}] - published value: {}", getThreadId(), v);
return v;
})
.observe_on(Rx::observe_on_run_loop(runloop))
.subscribe([&] (int v) {
console->info("subscriptionThread[{}] - subscription started: {}", getThreadId(), v);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
console->info("subscriptionThread[{}] - subscription ended: {}", getThreadId(), v);
});
bool runlooping = true;
std::thread runloopThread([&] {
console->info("start runloop thread");
while (runlooping) {
if (!runloop.empty())
runloop.dispatch();
}
});
auto subscriber = subject.get_subscriber();
console->info("start to publish values");
subscriber.on_next(1);
subscriber.on_next(2);
console->info("stop publishing");
while (!runloop.empty()) {
console->info("wait until runloop queue is empty...");
std::this_thread::sleep_for(std::chrono::milliseconds(400));
}
runlooping = false;
runloopThread.join();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment