Skip to content

Instantly share code, notes, and snippets.

@kosich
Last active September 27, 2021 14:28
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save kosich/cef1572743cbf3f46105ec2ba56228cd to your computer and use it in GitHub Desktop.
Save kosich/cef1572743cbf3f46105ec2ba56228cd to your computer and use it in GitHub Desktop.
Pausable Observable with buffer
const { rxObserver, palette } = require('api/v0.3');
const { merge, timer, Subject, from, empty } = require('rxjs');
const { filter, startWith, bufferToggle, take, flatMap, zip, distinctUntilChanged, share, skip, map, windowToggle } = require('rxjs/operators');
// stream for coloring
const palette$ = from(palette);
const source$ = timer(0, 10).pipe(
take(10),
// get color for each item
zip(palette$, Marble)
);
const pauseSubj$ = new Subject();
const pause$ = pauseSubj$.pipe(
distinctUntilChanged(),
share()
);
const on$ = pause$.pipe(filter(v=>!v));
const off$ = pause$.pipe(filter(v=>v));
const result$ =
merge(
source$.pipe(
bufferToggle(
off$,
()=>on$
),
flatMap(x=>x)
)
,
source$.pipe(
windowToggle(
on$,
()=>off$
),
flatMap(x=>x)
)
);
turnPause(false, 0);
turnPause(true, 30);
turnPause(false, 60);
// Subscriptions
source$.subscribe(rxObserver());
pause$.pipe(
skip(1),
map(v => v
? Marble('||', '#ffffff')
: Marble('▶', '#ffffff'))
)
.subscribe(rxObserver());
result$.subscribe(rxObserver());
// helpers
function turnPause(value, delay){
setTimeout(()=>{
pauseSubj$.next(value);
}, delay);
}
// creates a colored Marble
function Marble(value,color) {
return {
valueOf: ()=>value
, color
};
}
@kosich
Copy link
Author

kosich commented Mar 5, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment