Created
October 30, 2018 16:18
-
-
Save peerreynders/10634838fa91373db8aee11755e790fc to your computer and use it in GitHub Desktop.
gen-browser Pinger/Ponger refactor part 4 - Using RxJS fromEvent, using, etc. to remove some boilerplate
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
file: src/ServerEvents.js | |
Previous: | |
- part 3: https://gist.github.com/peerreynders/d59d37dd47016a932ee9c786e3817560 | |
- part 2: https://gist.github.com/peerreynders/a0965c47cce31fb6a6677b90f0ca71cc | |
- part 1: https://gist.github.com/peerreynders/474145762df0708fe78bf39b548fab00 | |
see also: https://gist.github.com/peerreynders/412911579e4a0c485f67a122634cffe2 | |
*/ | |
import {EMPTY, fromEvent, throwError, using} from "rxjs"; | |
import {catchError, map, publish, take, timeout} from "rxjs/operators"; | |
const EVENT_TYPE_MESSAGE = 'message'; | |
const MESSAGE_TYPE_INITIALIZATION = '__gen_browser__/init'; | |
const TIMEOUT_DEFAULT = 5000; | |
const _source = Symbol('source'); | |
const _transfer = Symbol('transfer'); | |
class EventSourceHolder { | |
constructor(sourceOrFactory) { | |
this[_source] = typeof sourceOrFactory === 'function' ? | |
sourceOrFactory() : | |
sourceOrFactory; | |
this[_transfer] = () => { | |
let source = this[_source]; | |
this[_source] = null; | |
return source; | |
}; | |
} | |
get eventSource() { | |
let source = this[_source]; | |
if (!source) { | |
throw new Error('EventSourceHolder property eventSource: No EventSource'); | |
} | |
return source; | |
}; | |
transfer() { | |
let source = this[_transfer](); | |
if (!source) { | |
throw new Error('EventSourceHolder.transfer: No EventSource'); | |
} | |
return source; | |
} | |
unsubscribe() { | |
let source = this[_transfer](); | |
if(source) { | |
source.close(); | |
} | |
} | |
} // end class EventSourceHolder | |
function makeSendPath(mailboxUrl) { | |
const parsedUrl = new URL(mailboxUrl); | |
return parsedUrl.origin + '/send/'; | |
} | |
function makeMessageObservable(holder) { | |
return fromEvent(holder.eventSource, EVENT_TYPE_MESSAGE, false).pipe( | |
map(({data}) => JSON.parse(data)), | |
catchError(e => e.readyState === EventSource.CLOSED ? EMPTY : throwError(e)) // just complete it; server closed | |
); | |
} | |
function makeOnInitialization(holder){ | |
return event => { | |
const {type, address, config} = JSON.parse(event.data); | |
if (type === MESSAGE_TYPE_INITIALIZATION) { | |
const source = holder.transfer(); // transfer EventSource to new holder to transfer ownership for "unsubscribe" | |
const sendPath = makeSendPath(source.url); | |
let messageObservable = using( | |
() => new EventSourceHolder(source), | |
makeMessageObservable | |
); | |
return [{address, config, sendPath}, messageObservable]; // SUCCESS | |
} | |
// Initialization failed | |
return throwError(new Error(`Invalid initialization data ${event.data}`)); | |
}; | |
} | |
function makeMakeInitializationObservable(due) { | |
return holder => { | |
let operators = [ | |
map(makeOnInitialization(holder)), // default operators | |
take(1) | |
]; | |
if (due > 0) { // only add timeout if requested | |
operators.unshift(timeout(due)); | |
} | |
return fromEvent(holder.eventSource, EVENT_TYPE_MESSAGE, false).pipe(...operators); | |
}; | |
} | |
// | |
// Emits a single [{address,config}, messageObservable] value and then completes | |
// The messageObservable emits the messages after successful initialization | |
// | |
export function makeEventsConnectableFromFactory(eventSourcefactory, timeout = TIMEOUT_DEFAULT) { | |
let initializationObservable = using( | |
() => new EventSourceHolder(eventSourcefactory), | |
makeMakeInitializationObservable(timeout) | |
); | |
return (publish())(initializationObservable); // - Turn into ConnectableObservable | |
} | |
export function makeEventsConnectable(url, timeout = TIMEOUT_DEFAULT) { | |
const factory = () => new EventSource(`${url}/mailbox`); | |
return makeEventsConnectableFromFactory(factory, timeout); | |
} | |
export function send(sendPath, address, payload) { | |
const sendUrl = sendPath + address; | |
const body = JSON.stringify(payload); | |
const request = new Request(sendUrl, {method: 'POST', body}); | |
return fetch(request); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment