Skip to content

Instantly share code, notes, and snippets.

@peerreynders
Created April 21, 2018 18:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save peerreynders/cb963be283b950d65e77bd52943a425f to your computer and use it in GitHub Desktop.
Save peerreynders/cb963be283b950d65e77bd52943a425f to your computer and use it in GitHub Desktop.
RxJS in Action Ch10 4A: Building your middleware
// file: src/index.js - Derived from:
// RxJS in Action (2017, Manning), 978-1-617-29341-2
// by Paul P. Daniels and Luis Atencio
// Listings:
// 10.7 Implementing custom ofType operator (p.299)
// 10.8 Building your middleware (p.299)
import Rx from 'rxjs';
import {createStore} from 'redux';
// -- Listing 10.7 (p.299) Implementing custom ofType operator
// ** Instead: Use filterOnType in lieu of monkeypatching
// ** Rx.Observable.prototype.ofType
const ofType = (types) => {
switch (types.length) {
case 0:
throw new Error('Must specify at least one type!');
case 1:
return ({type}) => type === types[0];
default:
return ({type}) => types.indexOf(type) > -1;
}
};
const filterOnType = (action$, ...types) =>
action$.filter(ofType(types));
// --- Listing 10.7 End
const LOG = 'LOG';
// p.300
const simpleReducer = (state, {type, payload}) => {
switch(type) {
case LOG:
return {...state, messages: [...payload, 'in Redux']};
default:
return state;
}
};
const wrapPayload = action => ({...action, payload: [...action.payload, 'in Rx!']});
const simpleEpic = (action$, store) =>
filterOnType(action$, LOG)
.delay(1000)
.map(wrapPayload);
// --- Listing 10.8 Building your middleware
const createMiddleware = (store, epics) => {
const source$ = new Rx.Subject();
const dispatchToStore = action => store.dispatch(action);
const attachEpic = epic => epic(source$, store);
const attachedEpics = epics.map(attachEpic);
const combinedEpics$ =
Rx.Observable
.merge(...attachedEpics)
.publish();
combinedEpics$.subscribe(source$); // Ouroboros
combinedEpics$.subscribe(dispatchToStore);
const dispatch = action => source$.next(action);
return [combinedEpics$, dispatch];
};
// --- Listing 10.8 End
// p.290
const createStreamFromStore = (store) => {
const getState = () => store.getState();
const initial = getState();
const state$ =
Rx.Observable
.from(store)
.map(getState)
.publishBehavior(initial)
.refCount();
return [state$, getState];
};
const initialState = {messages: []};
const epics = [simpleEpic];
const store = createStore(simpleReducer, initialState);
const [epics$, dispatch] = createMiddleware(store, epics);
const [state$, getState] = createStreamFromStore(store); // eslint-disable-line no-unused-vars
const subscribe = ((state$, epics$) =>
(...args) => {
const stateSub = state$.subscribe(...args);
const epicsSub = epics$.connect();
const unsubscribe = () => {
stateSub.unsubscribe();
epicsSub.unsubscribe();
};
return unsubscribe;
}
)(state$, epics$);
const logMessages = ({messages}) => console.log(messages.join('=>'));
const unsubscribe = subscribe(logMessages); // eslint-disable-line no-unused-vars
dispatch({type: LOG, payload: ['Hello']});
// cleanup
(() => {
const secs10 = 10000;
const timer$ = Rx.Observable.timer(secs10);
const beforeUnload$ = Rx.Observable.fromEvent(window, 'beforeunload');
const doIt = _event => {
unsubscribe();
console.log('Done!');
};
Rx.Observable.race(timer$, beforeUnload$)
.take(1)
.subscribe(doIt);
})();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment