Skip to content

Instantly share code, notes, and snippets.

@Dorus
Forked from xgrommx/index.js
Last active May 6, 2021 11:18
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Dorus/a46a9f212d656ce95bd294fc5db7a3a0 to your computer and use it in GitHub Desktop.
Save Dorus/a46a9f212d656ce95bd294fc5db7a3a0 to your computer and use it in GitHub Desktop.
How we can make methods of observable via other methods
const flatMap = (fn, stream, resultSelector) =>
stream.flatMap((x, xi) => fn(x).map((y, yi) => resultSelector(x, y, xi, yi)));
const flatMapLatest = (fn, stream) =>
stream.publish(s => s.flatMap(v => fn(v).takeUntil(s)));
const flatMapLatest = (fn, stream, resultSelector) => stream.publish(s => {
return s.flatMap(v => fn(v), resultSelector).takeUntil(s));
});
const delay = (source, delay) => source.flatMap(e => Rx.Observable.timer(delay).mapTo(e))
const debounceTime = (time, stream) => flatMapLatest(v => of(v).delay(time), stream);
const debounce = (other, stream) => flatMapLatest(v => other().take(1).mapTo(() => v), stream);
const sample = (sampler, source) =>
source[typeof sampler === 'number' ? 'bufferTime' : 'buffer'](sampler).map(values => last(values));
const sample2 = (sampler, source) => create(o => {
return new CompositeDisposable(
(source = source.publish(), sampler.flatMap(() => source.take(1)).subscribe(o)),
source.connect()
);
});
const sample3 = (sampler, source) => source.publish(src => sampler.flatMap(() => src.take(1)));
const window$ = (source, windowBoundry) => {
source = source.publish();
const s = source.let(src => windowBoundry
.startWith(0)
.map(
src.takeUntil(windowBoundry)
));
source.connect();
return s;
}
const flatMapFirst$ = (source, fn) => source.publish(src => {
var free = true;
return src.filter(_ => free).map(fn).do(_ => free = false).switchMap(el => {
return el.finally(() => free = true);
});
});
const throttleTime = (source, delay) => source.flatMapFirst(e =>
of(e).concat(
timer(delay).filter(_ => false);
));
const throttle = (source, other) => source.flatMapFirst(e =>
of(e).concat(
other.take(1).filter(_ => false);
));
const interval = (delay) => Rx.Observable.timer(delay, delay)
const intervalQuick = (delay) => Rx.Observable.timer(0, delay)
const filter = (source, f) => source.flatMap(e => f(e) ? e : empty)
const map (source, f) => source.flatMap(e => of(f(e)))
const startWith = (source, el) => concat(of(el), source)
const takeLast = (n, source) => source.reduce((arr, value) => [...arr, value].slice(-n), []).mergeAll()
const last = (source, pred) => source.filter(pred).reduce((value, arr) => value)
const concat = (source1, source2) => source1.merge(source1.last().flatMap(e => source2)
@Dorus
Copy link
Author

Dorus commented Dec 23, 2018

Got to remember this one:

const any = (source, pred) => source.filter(pred).mapTo(true).concat([false]).take(1)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment