Skip to content

Instantly share code, notes, and snippets.

@windworst
Created October 18, 2018 10:28
Show Gist options
  • Save windworst/f69ab72c583acfe46ff5e71970c6fc5e to your computer and use it in GitHub Desktop.
Save windworst/f69ab72c583acfe46ff5e71970c6fc5e to your computer and use it in GitHub Desktop.
Observable
#ifndef OBSERVABLE_H
#define OBSERVABLE_H
#include <functional>
template<typename T, typename E>
class Observer
{
public:
const std::function<void(T)> onNext;
const std::function<void(E)> onError;
const std::function<void()> onComplete;
Observer(
std::function<void(T)> onNext,
std::function<void(E)> onError = [] (E) {},
std::function<void()> onComplete = [] () {}
) :
onNext(onNext),
onError(onError),
onComplete(onComplete)
{
}
};
template<typename T, typename E>
class Observable;
template<typename T, typename E>
class OnSubscribe
{
private:
bool isFinished = false;
Observer<T,E> mObserver;
public:
OnSubscribe(Observer<T,E> observer) : mObserver(observer)
{
}
void onNext(T value)
{
if(!isFinished)
{
mObserver.onNext(value);
}
}
void onComplete()
{
if(!isFinished)
{
isFinished = true;
mObserver.onComplete();
}
}
void onError(E error)
{
if(!isFinished)
{
isFinished = true;
mObserver.onError(error);
}
}
template<typename RT, typename RE>
OnSubscribe<RT, RE> map(std::function<T(RT)> op, std::function<E(RE)> opE) const
{
auto onNext = mObserver.onNext;
auto onError = mObserver.onError;
auto onComplete = mObserver.onComplete;
auto newOnNext = [=] (RT t) { onNext(op(t)); };
auto newOnError = [=] (RE e) { onError(opE(e)); };
return OnSubscribe<RT, RE>(Observer<RT, RE>(newOnNext,newOnError,onComplete));
}
template<typename RT>
OnSubscribe<RT, E> map(std::function<T(RT)> op) const
{
return map(op, [] (E _) { return _;});
}
template<typename RT>
OnSubscribe<RT, E> flatMap(std::function<Observable<T, E>(RT)> op) const
{
auto onNext = mObserver.onNext;
auto onError = mObserver.onError;
auto onComplete = mObserver.onComplete;
auto newOnNext = [=] (RT rt) {
op(rt).subscribe([=] (T t) { onNext(t); }, onError, onComplete);
};
return OnSubscribe<RT, E>(Observer<RT, E>(newOnNext,onError,onComplete));
}
};
template<typename T, typename E>
class Observable
{
private:
std::function<void(OnSubscribe<T,E>)> onSubscribe;
public:
Observable(std::function<void(OnSubscribe<T,E>)> onSubscribe) : onSubscribe(onSubscribe)
{
}
template<typename RT, typename RE>
Observable<RT,RE> lift(std::function<OnSubscribe<T,E>(const OnSubscribe<RT,RE>&)> lifter) const
{
auto onSubscribe = this->onSubscribe;
return Observable<RT,RE>([=] (OnSubscribe<RT,RE> subscriber) { onSubscribe(lifter(subscriber)); });
}
template<typename RT, typename RE>
Observable<RT,RE> map(std::function<RT(T)> op, std::function<RE(E)> opE) const
{
return lift<RT,RE>( [=] (const OnSubscribe<RT,RE>& subscriber) { return subscriber.template map<T,E>(op, opE); } );
}
template<typename RT>
Observable<RT, E> map(std::function<RT(T)> op) const
{
return map<RT, E>(op, [] (E _) { return _;});
}
template<typename RT>
Observable<RT, E> flatMap(std::function<Observable<RT, E>(T)> op) const
{
return lift<RT,E>( [=] (const OnSubscribe<RT,E>& subscriber) { return subscriber.template flatMap<T>(op); });
}
template<typename RT, typename RE>
Observable<RT, RE> compose(std::function<Observable<RT, RE>(const Observable<T, E>&)> composer) const
{
return composer(*this);
}
void subscribe(const Observer<T, E>& observer) const
{
onSubscribe(OnSubscribe<T,E>(observer));
}
void subscribe(
std::function<void(T)> onNext,
std::function<void(E)> onError = [] (E) {},
std::function<void()> onComplete = [] () {}
) const
{
onSubscribe(OnSubscribe<T,E>(Observer<T, E>(onNext, onError, onComplete)));
}
};
#endif // OBSERVABLE_H
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment