Created
October 18, 2018 10:28
-
-
Save windworst/f69ab72c583acfe46ff5e71970c6fc5e to your computer and use it in GitHub Desktop.
Observable
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef OBSERVABLE_H | |
#define OBSERVABLE_H | |
#include <functional> | |
template<typename T, typename E> | |
class Observer | |
{ | |
public: | |
const std::function<void(T)> onNext; | |
const std::function<void(E)> onError; | |
const std::function<void()> onComplete; | |
Observer( | |
std::function<void(T)> onNext, | |
std::function<void(E)> onError = [] (E) {}, | |
std::function<void()> onComplete = [] () {} | |
) : | |
onNext(onNext), | |
onError(onError), | |
onComplete(onComplete) | |
{ | |
} | |
}; | |
template<typename T, typename E> | |
class Observable; | |
template<typename T, typename E> | |
class OnSubscribe | |
{ | |
private: | |
bool isFinished = false; | |
Observer<T,E> mObserver; | |
public: | |
OnSubscribe(Observer<T,E> observer) : mObserver(observer) | |
{ | |
} | |
void onNext(T value) | |
{ | |
if(!isFinished) | |
{ | |
mObserver.onNext(value); | |
} | |
} | |
void onComplete() | |
{ | |
if(!isFinished) | |
{ | |
isFinished = true; | |
mObserver.onComplete(); | |
} | |
} | |
void onError(E error) | |
{ | |
if(!isFinished) | |
{ | |
isFinished = true; | |
mObserver.onError(error); | |
} | |
} | |
template<typename RT, typename RE> | |
OnSubscribe<RT, RE> map(std::function<T(RT)> op, std::function<E(RE)> opE) const | |
{ | |
auto onNext = mObserver.onNext; | |
auto onError = mObserver.onError; | |
auto onComplete = mObserver.onComplete; | |
auto newOnNext = [=] (RT t) { onNext(op(t)); }; | |
auto newOnError = [=] (RE e) { onError(opE(e)); }; | |
return OnSubscribe<RT, RE>(Observer<RT, RE>(newOnNext,newOnError,onComplete)); | |
} | |
template<typename RT> | |
OnSubscribe<RT, E> map(std::function<T(RT)> op) const | |
{ | |
return map(op, [] (E _) { return _;}); | |
} | |
template<typename RT> | |
OnSubscribe<RT, E> flatMap(std::function<Observable<T, E>(RT)> op) const | |
{ | |
auto onNext = mObserver.onNext; | |
auto onError = mObserver.onError; | |
auto onComplete = mObserver.onComplete; | |
auto newOnNext = [=] (RT rt) { | |
op(rt).subscribe([=] (T t) { onNext(t); }, onError, onComplete); | |
}; | |
return OnSubscribe<RT, E>(Observer<RT, E>(newOnNext,onError,onComplete)); | |
} | |
}; | |
template<typename T, typename E> | |
class Observable | |
{ | |
private: | |
std::function<void(OnSubscribe<T,E>)> onSubscribe; | |
public: | |
Observable(std::function<void(OnSubscribe<T,E>)> onSubscribe) : onSubscribe(onSubscribe) | |
{ | |
} | |
template<typename RT, typename RE> | |
Observable<RT,RE> lift(std::function<OnSubscribe<T,E>(const OnSubscribe<RT,RE>&)> lifter) const | |
{ | |
auto onSubscribe = this->onSubscribe; | |
return Observable<RT,RE>([=] (OnSubscribe<RT,RE> subscriber) { onSubscribe(lifter(subscriber)); }); | |
} | |
template<typename RT, typename RE> | |
Observable<RT,RE> map(std::function<RT(T)> op, std::function<RE(E)> opE) const | |
{ | |
return lift<RT,RE>( [=] (const OnSubscribe<RT,RE>& subscriber) { return subscriber.template map<T,E>(op, opE); } ); | |
} | |
template<typename RT> | |
Observable<RT, E> map(std::function<RT(T)> op) const | |
{ | |
return map<RT, E>(op, [] (E _) { return _;}); | |
} | |
template<typename RT> | |
Observable<RT, E> flatMap(std::function<Observable<RT, E>(T)> op) const | |
{ | |
return lift<RT,E>( [=] (const OnSubscribe<RT,E>& subscriber) { return subscriber.template flatMap<T>(op); }); | |
} | |
template<typename RT, typename RE> | |
Observable<RT, RE> compose(std::function<Observable<RT, RE>(const Observable<T, E>&)> composer) const | |
{ | |
return composer(*this); | |
} | |
void subscribe(const Observer<T, E>& observer) const | |
{ | |
onSubscribe(OnSubscribe<T,E>(observer)); | |
} | |
void subscribe( | |
std::function<void(T)> onNext, | |
std::function<void(E)> onError = [] (E) {}, | |
std::function<void()> onComplete = [] () {} | |
) const | |
{ | |
onSubscribe(OnSubscribe<T,E>(Observer<T, E>(onNext, onError, onComplete))); | |
} | |
}; | |
#endif // OBSERVABLE_H |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment