Skip to content

Instantly share code, notes, and snippets.

@jbardon
Last active May 17, 2019 14:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jbardon/86deb49988f6e4ff8ab5e25bdd7de007 to your computer and use it in GitHub Desktop.
Save jbardon/86deb49988f6e4ff8ab5e25bdd7de007 to your computer and use it in GitHub Desktop.
RxJS
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>RxJS example</title>
<link rel="stylesheet" href="https://code.jquery.com/qunit/qunit-2.9.2.css">
</head>
<body>
<div id="qunit"></div>
<div id="qunit-fixture"></div>
<script src="https://unpkg.com/qunit@2.9.2/qunit/qunit.js"></script>
<script type="module">
/* eslint-disable */
/*
http://reactivex.io/rxjs/manual/overview.html#choose-an-operator
https://blog.strongbrew.io/rxjs-best-practices-in-angular/
https://blog.angularindepth.com/practical-rxjs-in-the-wild-requests-with-concatmap-vs-mergemap-vs-forkjoin-11e5b2efe293
https://blog.angularindepth.com/switchmap-bugs-b6de69155524
https://github.com/jeffbcross/bad-rxjs/blob/master/README.md
*/
// Import operators from 'rxjs/operators' (lettable operators)
// not from 'rxjs/add/operators/{op}' (patch operators)
import { of, from, pairs, range, timer, interval, pipe, bindCallback, throwError, empty, Observable, Subject } from 'https://unpkg.com/rxjs@6.5.1/_esm2015/index.js?module';
import { filter, map, take, takeUntil, concatMap, catchError, finalize, first, tap, switchMap } from 'https://unpkg.com/rxjs@6.5.1/_esm2015/operators/index.js?module';
import { TestScheduler } from 'https://unpkg.com/rxjs@6.5.1/_esm2015/testing/index.js?module';
const module = window.QUnit.module;
const test = window.QUnit.test;
const only = window.QUnit.only;
const log = window.QUnit.log;
module('Create stream', () => {
// https://blog.angularindepth.com/the-extensive-guide-to-creating-streams-in-rxjs-aaa02baaff9a
test('from value', function (assert) {
assert.ok(true, 'of (1, 2)');
of(1, 2)
.subscribe(x => {
// Executes two times with x = 1 and x = 2
assert.ok(true, x);
});
});
test('from array', function (assert) {
assert.ok(true, 'from ([1, 2])');
from([1, 2])
.subscribe(x => {
assert.ok(true, x);
});
});
test('from object', function (assert) {
assert.ok(true, 'pairs ({a: 1, b: 2})');
pairs({a: 1, b: 2})
.subscribe(([key, value]) => {
assert.ok(true, `${key}: ${value}`);
});
});
test('from promise', function (assert) {
assert.ok(true, 'from (Promise.resolve(1))');
from(Promise.resolve(1))
.subscribe(x => {
assert.ok(true, x);
});
});
test('from callback', function (assert) {
assert.ok(true, 'bindCallback ((1, fn) => fn(1)');
const withCallback = (val, fn) => fn(val);
const withObservable = bindCallback(withCallback);
// val parameter of withCallback
// fn parameter doesn't matter, the val is passed to the stream
withObservable(1)
.subscribe(x => {
assert.ok(true, x);
});
});
});
module('Generate stream', () => {
test(`from range`, function (assert) {
assert.ok(true, 'range (10, 2)');
range(10, 2)
.subscribe(x => {
// Executes 2 times with x = 10 and x = 11
assert.ok(true, x);
});
});
test('from timer', function (assert) {
const done = assert.async();
assert.ok(true, 'timer (1000)');
timer(1000)
.subscribe(() => {
// Executes once after 1000ms
assert.ok(true);
},
null, done);
});
test('from timer with offset', function (assert) {
const done = assert.async();
assert.ok(true, `timer(5000, 1000).pipe(take(3))`);
timer(5000, 1000)
.pipe(take(3)) // to stop the infinite stream, limit to 2 values
.subscribe(x => {
// Executes after 5000ms, every 1000ms
assert.ok(true, x + '');
}, null, done);
});
test('from interval', function (assert) {
const done = assert.async();
assert.ok(true, `interval(1000).pipe(takeUntil(timer(5000)))`);
interval(1000)
.pipe(takeUntil(timer(5000))) // another way to stop the stream, after 5000ms
.subscribe(x => {
// Executes every 1000 ms with value: 0, 1, 2, 3
assert.ok(x >= 0, x + '');
}, null, done);
});
});
module('Clean code', () => {
test('too many pipes [BAD]', assert => {
range(1, 10)
.pipe(filter(x => x > 5))
.pipe(map(x => x + 100))
.pipe(take(1))
.subscribe(x => {
assert.ok(true, x)
});
});
test('too many pipes [GOOD]', assert => {
range(1, 10)
.pipe(
filter(x => x > 5),
map(x => x + 100),
take(1)
).subscribe(x => {
assert.ok(true, x)
});
});
test('nested subscribes [BAD]', assert => {
const returnObservable = (x) => of(x + 1);
of(1)
.subscribe(x => {
// A second observable need the first result
returnObservable(x).subscribe(y => {
assert.ok(true, y);
});
});
});
test('nested subscribes [GOOD]', assert => {
const returnObservable = (x) => of(x + 1);
// Be careful, concatMap isn't always the better choice
// Later we'll compare other operators such as: mergeMap, switchMap, forkJoin and others
of(1)
.pipe(concatMap(x => returnObservable(x)))
.subscribe(y => {
assert.ok(true, y);
});
});
});
module('From promises to observables', () => {
test('a resolved promise', assert => {
Promise.resolve('a')
.then(x => {
assert.ok(true, x);
});
});
test('an observable with one value', assert => {
of('a')
.subscribe(x => {
assert.ok(true, x);
});
});
test('an observable with multiple values', assert => {
of('a', 'b')
.subscribe(x => {
assert.ok(true, x);
});
});
test('a rejected promise', assert => {
Promise.reject('a')
.catch(x => {
assert.ok(true, x);
});
});
test('an observable with one error', assert => {
throwError('a')
.subscribe(null, x => {
assert.ok(true, x);
});
});
test('a completed observable', assert => {
empty()
.subscribe(null, null, () => {
assert.ok(true);
});
});
});
module('Error handling', () => {
// https://blog.angular-university.io/rxjs-error-handling/
test('an observable errors out', assert => {
var done = assert.async();
const getObservable = () => Observable.create(observer => {
observer.next('a');
observer.next('b');
observer.error('c');
observer.next('d'); // Never emitted because of the error
});
// The stream errors out because there is no error handling
// An error out stream cannot be completed nor used again
getObservable()
.subscribe(x => { // First callback for values
assert.ok(x != 'd', `Value ${x}`);
},
x => { // Second callback for errors
assert.ok(true, `Error ${x}`);
},
() => { // Third callback for completion (never called)
assert.ok(false, `Completed`);
});
// Hack to end the test and check
// 'd' value nor completed callbacks are triggered
timer(5000).subscribe(done);
});
test('an observable locally catch error', assert => {
var done = assert.async();
throwError('a')
.pipe(catchError(err => {
assert.ok(true, `Handling error`);
return of(err); // Replace error out stream with new stream and fallback value
}))
.subscribe(x => {
// Value called with catchError stream value
assert.ok(x === 'a', `Value ${x}`);
},
null,
() => { // Called after the catchError value handling
assert.ok(true, `Completed`);
done();
});
});
test('an observable rethrow errors', assert => {
var done = assert.async();
throwError('a')
.pipe(catchError(err => {
assert.ok(true, `Handling error`);
return throwError(err);
}))
.subscribe(
null,
x => {
// Called because catchError rethrow an error out stream
assert.ok(true, `Error ${x}`);
},
null
);
// Hack to end the test
timer(5000).subscribe(done);
});
test('an observable finalize after errors', assert => {
throwError('a')
.pipe(
catchError(x => throwError(x)),
finalize(() => {
assert.ok(true, 'Finalize');
})
)
.subscribe(null, x => {
assert.ok(true, `Error ${x}`);
});
});
// Also see retryWhen, delayWhen
});
module('Unsubscribe', () => {
// When you unsubscribe the last observer, the stream completes and stops
// https://blog.angularindepth.com/the-best-way-to-unsubscribe-rxjs-observable-in-the-angular-applications-d8f9aa42f6a0
test('manually', assert => {
const done = assert.async();
const subscription = interval(1000) // Save the subscription
.subscribe(x => {
assert.ok(true, x + '')
}, null, () => {
// Never triggered with manual method
});
timer(5000) // Simulate quit page or angular component ngDestroy
.pipe(first())
.subscribe(() => {
subscription.unsubscribe(); // Unsubscribe manually, but not complete stream
done();
});
});
test('with another stream', assert => {
const done = assert.async();
const unsubscribe$ = new Subject();
interval(1000)
.pipe(takeUntil(unsubscribe$)) // Complete stream on first event, put at the end of pipeline
.subscribe(x => {
assert.ok(true, x + '')
}, null,
() => {
assert.ok(true, 'Complete');
done();
});
timer(5000)
.pipe(first())
.subscribe(null, null, () => {
unsubscribe$.next('a'); // Triggers takeUntil
unsubscribe$.complete(); // Don't let the stream opened
});
});
// Also see takeWhile(predicate)
test('auto with take', assert => {
const done = assert.async();
interval(1000)
.pipe(take(2)) // Unsubscribe after two values
.subscribe(x => {
assert.ok(true, x + '');
},
null,
() => {
assert.ok(true, 'Complete');
done();
});
});
test('auto with first', assert => {
const done = assert.async();
interval(1000)
.pipe(first()) // Same as take(1)
.subscribe(x => {
assert.ok(true, x + '');
},
null,
() => {
assert.ok(true, 'Complete');
done();
});
});
test('auto with first and condition', assert => {
const done = assert.async();
interval(1000)
.pipe(first(x => x === 1)) // Not unsubscribe if no value sent
.subscribe(x => {
assert.ok(true, x + '');
},
null,
() => {
assert.ok(true, 'Complete');
done();
});
});
// With angular | async automatically unsubscribes
// no more subscribe
});
module('Higher order observable', () => {
// combine, concat, flatMap, switchMap, mergeMap
const scheduler = new TestScheduler((actual, expected) =>
QUnit.assert.deepEqual(actual, expected)
);
test('simple marble test', assert => scheduler.run(({ cold, expectObservable }) => {
const marbles = {
input: '-a-b-c',
output: '---(b|)',
};
const values = {
input: { a: 0, b: 1, c: 2 },
output: { b: 10 }
};
const stream$ = cold(marbles.input, values.input);
const actual = stream$.pipe(
map(x => x * 10),
filter(x => x > 0),
take(1)
);
expectObservable(actual).toBe(marbles.output, values.output);
}));
test('concatMap', assert => scheduler.run(({ cold, expectObservable }) => {
// Errors, wait other requests
const marbles = {
input: '-a-b-c|',
other: 'x-y|',
output: '-a-bc-de-f|', // bc because when ay is triggered, b is already here
};
const values = {
input: { a: 0, b: 1, c: 2 },
other: { x: 'x', y: 'y'},
output: { a: '0x', b: '0y', c: '1x', d: '1y', e: '2x', f: '2y' }
};
const stream$ = cold(marbles.input, values.input);
const stream2$ = cold(marbles.other, values.other);
const actual = stream$.pipe(
concatMap(x => stream2$, (x, y) => x + y)
);
expectObservable(actual).toBe(marbles.output, values.output);
}));
only('switchMap', assert => scheduler.run(({ cold, expectObservable }) => {
const marbles = {
input: '-a-b-c|',
inner: 'x-y|',
output: '-a-b-c-d|',
};
const values = {
input: { a: 0, b: 1, c: 2 },
inner: { x: 'x', y: 'y'},
output: { a: '0x', b: '1x', c: '2x', d: '2y' }
};
const stream$ = cold(marbles.input, values.input);
const stream2$ = cold(marbles.inner, values.inner);
const actual = stream$.pipe(
switchMap(x => stream2$, (x, y) => x + y)
);
expectObservable(actual).toBe(marbles.output, values.output);
}));
});
// tap => side effect in pipes (do in RX5)
</script>
</body>
</html>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment