Last active
May 17, 2019 14:38
-
-
Save jbardon/86deb49988f6e4ff8ab5e25bdd7de007 to your computer and use it in GitHub Desktop.
RxJS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<!DOCTYPE html> | |
<html> | |
<head> | |
<meta charset="utf-8"> | |
<title>RxJS example</title> | |
<link rel="stylesheet" href="https://code.jquery.com/qunit/qunit-2.9.2.css"> | |
</head> | |
<body> | |
<div id="qunit"></div> | |
<div id="qunit-fixture"></div> | |
<script src="https://unpkg.com/qunit@2.9.2/qunit/qunit.js"></script> | |
<script type="module"> | |
/* eslint-disable */ | |
/* | |
http://reactivex.io/rxjs/manual/overview.html#choose-an-operator | |
https://blog.strongbrew.io/rxjs-best-practices-in-angular/ | |
https://blog.angularindepth.com/practical-rxjs-in-the-wild-requests-with-concatmap-vs-mergemap-vs-forkjoin-11e5b2efe293 | |
https://blog.angularindepth.com/switchmap-bugs-b6de69155524 | |
https://github.com/jeffbcross/bad-rxjs/blob/master/README.md | |
*/ | |
// Import operators from 'rxjs/operators' (lettable operators) | |
// not from 'rxjs/add/operators/{op}' (patch operators) | |
import { of, from, pairs, range, timer, interval, pipe, bindCallback, throwError, empty, Observable, Subject } from 'https://unpkg.com/rxjs@6.5.1/_esm2015/index.js?module'; | |
import { filter, map, take, takeUntil, concatMap, catchError, finalize, first, tap, switchMap } from 'https://unpkg.com/rxjs@6.5.1/_esm2015/operators/index.js?module'; | |
import { TestScheduler } from 'https://unpkg.com/rxjs@6.5.1/_esm2015/testing/index.js?module'; | |
const module = window.QUnit.module; | |
const test = window.QUnit.test; | |
const only = window.QUnit.only; | |
const log = window.QUnit.log; | |
module('Create stream', () => { | |
// https://blog.angularindepth.com/the-extensive-guide-to-creating-streams-in-rxjs-aaa02baaff9a | |
test('from value', function (assert) { | |
assert.ok(true, 'of (1, 2)'); | |
of(1, 2) | |
.subscribe(x => { | |
// Executes two times with x = 1 and x = 2 | |
assert.ok(true, x); | |
}); | |
}); | |
test('from array', function (assert) { | |
assert.ok(true, 'from ([1, 2])'); | |
from([1, 2]) | |
.subscribe(x => { | |
assert.ok(true, x); | |
}); | |
}); | |
test('from object', function (assert) { | |
assert.ok(true, 'pairs ({a: 1, b: 2})'); | |
pairs({a: 1, b: 2}) | |
.subscribe(([key, value]) => { | |
assert.ok(true, `${key}: ${value}`); | |
}); | |
}); | |
test('from promise', function (assert) { | |
assert.ok(true, 'from (Promise.resolve(1))'); | |
from(Promise.resolve(1)) | |
.subscribe(x => { | |
assert.ok(true, x); | |
}); | |
}); | |
test('from callback', function (assert) { | |
assert.ok(true, 'bindCallback ((1, fn) => fn(1)'); | |
const withCallback = (val, fn) => fn(val); | |
const withObservable = bindCallback(withCallback); | |
// val parameter of withCallback | |
// fn parameter doesn't matter, the val is passed to the stream | |
withObservable(1) | |
.subscribe(x => { | |
assert.ok(true, x); | |
}); | |
}); | |
}); | |
module('Generate stream', () => { | |
test(`from range`, function (assert) { | |
assert.ok(true, 'range (10, 2)'); | |
range(10, 2) | |
.subscribe(x => { | |
// Executes 2 times with x = 10 and x = 11 | |
assert.ok(true, x); | |
}); | |
}); | |
test('from timer', function (assert) { | |
const done = assert.async(); | |
assert.ok(true, 'timer (1000)'); | |
timer(1000) | |
.subscribe(() => { | |
// Executes once after 1000ms | |
assert.ok(true); | |
}, | |
null, done); | |
}); | |
test('from timer with offset', function (assert) { | |
const done = assert.async(); | |
assert.ok(true, `timer(5000, 1000).pipe(take(3))`); | |
timer(5000, 1000) | |
.pipe(take(3)) // to stop the infinite stream, limit to 2 values | |
.subscribe(x => { | |
// Executes after 5000ms, every 1000ms | |
assert.ok(true, x + ''); | |
}, null, done); | |
}); | |
test('from interval', function (assert) { | |
const done = assert.async(); | |
assert.ok(true, `interval(1000).pipe(takeUntil(timer(5000)))`); | |
interval(1000) | |
.pipe(takeUntil(timer(5000))) // another way to stop the stream, after 5000ms | |
.subscribe(x => { | |
// Executes every 1000 ms with value: 0, 1, 2, 3 | |
assert.ok(x >= 0, x + ''); | |
}, null, done); | |
}); | |
}); | |
module('Clean code', () => { | |
test('too many pipes [BAD]', assert => { | |
range(1, 10) | |
.pipe(filter(x => x > 5)) | |
.pipe(map(x => x + 100)) | |
.pipe(take(1)) | |
.subscribe(x => { | |
assert.ok(true, x) | |
}); | |
}); | |
test('too many pipes [GOOD]', assert => { | |
range(1, 10) | |
.pipe( | |
filter(x => x > 5), | |
map(x => x + 100), | |
take(1) | |
).subscribe(x => { | |
assert.ok(true, x) | |
}); | |
}); | |
test('nested subscribes [BAD]', assert => { | |
const returnObservable = (x) => of(x + 1); | |
of(1) | |
.subscribe(x => { | |
// A second observable need the first result | |
returnObservable(x).subscribe(y => { | |
assert.ok(true, y); | |
}); | |
}); | |
}); | |
test('nested subscribes [GOOD]', assert => { | |
const returnObservable = (x) => of(x + 1); | |
// Be careful, concatMap isn't always the better choice | |
// Later we'll compare other operators such as: mergeMap, switchMap, forkJoin and others | |
of(1) | |
.pipe(concatMap(x => returnObservable(x))) | |
.subscribe(y => { | |
assert.ok(true, y); | |
}); | |
}); | |
}); | |
module('From promises to observables', () => { | |
test('a resolved promise', assert => { | |
Promise.resolve('a') | |
.then(x => { | |
assert.ok(true, x); | |
}); | |
}); | |
test('an observable with one value', assert => { | |
of('a') | |
.subscribe(x => { | |
assert.ok(true, x); | |
}); | |
}); | |
test('an observable with multiple values', assert => { | |
of('a', 'b') | |
.subscribe(x => { | |
assert.ok(true, x); | |
}); | |
}); | |
test('a rejected promise', assert => { | |
Promise.reject('a') | |
.catch(x => { | |
assert.ok(true, x); | |
}); | |
}); | |
test('an observable with one error', assert => { | |
throwError('a') | |
.subscribe(null, x => { | |
assert.ok(true, x); | |
}); | |
}); | |
test('a completed observable', assert => { | |
empty() | |
.subscribe(null, null, () => { | |
assert.ok(true); | |
}); | |
}); | |
}); | |
module('Error handling', () => { | |
// https://blog.angular-university.io/rxjs-error-handling/ | |
test('an observable errors out', assert => { | |
var done = assert.async(); | |
const getObservable = () => Observable.create(observer => { | |
observer.next('a'); | |
observer.next('b'); | |
observer.error('c'); | |
observer.next('d'); // Never emitted because of the error | |
}); | |
// The stream errors out because there is no error handling | |
// An error out stream cannot be completed nor used again | |
getObservable() | |
.subscribe(x => { // First callback for values | |
assert.ok(x != 'd', `Value ${x}`); | |
}, | |
x => { // Second callback for errors | |
assert.ok(true, `Error ${x}`); | |
}, | |
() => { // Third callback for completion (never called) | |
assert.ok(false, `Completed`); | |
}); | |
// Hack to end the test and check | |
// 'd' value nor completed callbacks are triggered | |
timer(5000).subscribe(done); | |
}); | |
test('an observable locally catch error', assert => { | |
var done = assert.async(); | |
throwError('a') | |
.pipe(catchError(err => { | |
assert.ok(true, `Handling error`); | |
return of(err); // Replace error out stream with new stream and fallback value | |
})) | |
.subscribe(x => { | |
// Value called with catchError stream value | |
assert.ok(x === 'a', `Value ${x}`); | |
}, | |
null, | |
() => { // Called after the catchError value handling | |
assert.ok(true, `Completed`); | |
done(); | |
}); | |
}); | |
test('an observable rethrow errors', assert => { | |
var done = assert.async(); | |
throwError('a') | |
.pipe(catchError(err => { | |
assert.ok(true, `Handling error`); | |
return throwError(err); | |
})) | |
.subscribe( | |
null, | |
x => { | |
// Called because catchError rethrow an error out stream | |
assert.ok(true, `Error ${x}`); | |
}, | |
null | |
); | |
// Hack to end the test | |
timer(5000).subscribe(done); | |
}); | |
test('an observable finalize after errors', assert => { | |
throwError('a') | |
.pipe( | |
catchError(x => throwError(x)), | |
finalize(() => { | |
assert.ok(true, 'Finalize'); | |
}) | |
) | |
.subscribe(null, x => { | |
assert.ok(true, `Error ${x}`); | |
}); | |
}); | |
// Also see retryWhen, delayWhen | |
}); | |
module('Unsubscribe', () => { | |
// When you unsubscribe the last observer, the stream completes and stops | |
// https://blog.angularindepth.com/the-best-way-to-unsubscribe-rxjs-observable-in-the-angular-applications-d8f9aa42f6a0 | |
test('manually', assert => { | |
const done = assert.async(); | |
const subscription = interval(1000) // Save the subscription | |
.subscribe(x => { | |
assert.ok(true, x + '') | |
}, null, () => { | |
// Never triggered with manual method | |
}); | |
timer(5000) // Simulate quit page or angular component ngDestroy | |
.pipe(first()) | |
.subscribe(() => { | |
subscription.unsubscribe(); // Unsubscribe manually, but not complete stream | |
done(); | |
}); | |
}); | |
test('with another stream', assert => { | |
const done = assert.async(); | |
const unsubscribe$ = new Subject(); | |
interval(1000) | |
.pipe(takeUntil(unsubscribe$)) // Complete stream on first event, put at the end of pipeline | |
.subscribe(x => { | |
assert.ok(true, x + '') | |
}, null, | |
() => { | |
assert.ok(true, 'Complete'); | |
done(); | |
}); | |
timer(5000) | |
.pipe(first()) | |
.subscribe(null, null, () => { | |
unsubscribe$.next('a'); // Triggers takeUntil | |
unsubscribe$.complete(); // Don't let the stream opened | |
}); | |
}); | |
// Also see takeWhile(predicate) | |
test('auto with take', assert => { | |
const done = assert.async(); | |
interval(1000) | |
.pipe(take(2)) // Unsubscribe after two values | |
.subscribe(x => { | |
assert.ok(true, x + ''); | |
}, | |
null, | |
() => { | |
assert.ok(true, 'Complete'); | |
done(); | |
}); | |
}); | |
test('auto with first', assert => { | |
const done = assert.async(); | |
interval(1000) | |
.pipe(first()) // Same as take(1) | |
.subscribe(x => { | |
assert.ok(true, x + ''); | |
}, | |
null, | |
() => { | |
assert.ok(true, 'Complete'); | |
done(); | |
}); | |
}); | |
test('auto with first and condition', assert => { | |
const done = assert.async(); | |
interval(1000) | |
.pipe(first(x => x === 1)) // Not unsubscribe if no value sent | |
.subscribe(x => { | |
assert.ok(true, x + ''); | |
}, | |
null, | |
() => { | |
assert.ok(true, 'Complete'); | |
done(); | |
}); | |
}); | |
// With angular | async automatically unsubscribes | |
// no more subscribe | |
}); | |
module('Higher order observable', () => { | |
// combine, concat, flatMap, switchMap, mergeMap | |
const scheduler = new TestScheduler((actual, expected) => | |
QUnit.assert.deepEqual(actual, expected) | |
); | |
test('simple marble test', assert => scheduler.run(({ cold, expectObservable }) => { | |
const marbles = { | |
input: '-a-b-c', | |
output: '---(b|)', | |
}; | |
const values = { | |
input: { a: 0, b: 1, c: 2 }, | |
output: { b: 10 } | |
}; | |
const stream$ = cold(marbles.input, values.input); | |
const actual = stream$.pipe( | |
map(x => x * 10), | |
filter(x => x > 0), | |
take(1) | |
); | |
expectObservable(actual).toBe(marbles.output, values.output); | |
})); | |
test('concatMap', assert => scheduler.run(({ cold, expectObservable }) => { | |
// Errors, wait other requests | |
const marbles = { | |
input: '-a-b-c|', | |
other: 'x-y|', | |
output: '-a-bc-de-f|', // bc because when ay is triggered, b is already here | |
}; | |
const values = { | |
input: { a: 0, b: 1, c: 2 }, | |
other: { x: 'x', y: 'y'}, | |
output: { a: '0x', b: '0y', c: '1x', d: '1y', e: '2x', f: '2y' } | |
}; | |
const stream$ = cold(marbles.input, values.input); | |
const stream2$ = cold(marbles.other, values.other); | |
const actual = stream$.pipe( | |
concatMap(x => stream2$, (x, y) => x + y) | |
); | |
expectObservable(actual).toBe(marbles.output, values.output); | |
})); | |
only('switchMap', assert => scheduler.run(({ cold, expectObservable }) => { | |
const marbles = { | |
input: '-a-b-c|', | |
inner: 'x-y|', | |
output: '-a-b-c-d|', | |
}; | |
const values = { | |
input: { a: 0, b: 1, c: 2 }, | |
inner: { x: 'x', y: 'y'}, | |
output: { a: '0x', b: '1x', c: '2x', d: '2y' } | |
}; | |
const stream$ = cold(marbles.input, values.input); | |
const stream2$ = cold(marbles.inner, values.inner); | |
const actual = stream$.pipe( | |
switchMap(x => stream2$, (x, y) => x + y) | |
); | |
expectObservable(actual).toBe(marbles.output, values.output); | |
})); | |
}); | |
// tap => side effect in pipes (do in RX5) | |
</script> | |
</body> | |
</html> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment