Last active
October 25, 2018 13:47
Star
You must be signed in to star a gist
gen-browser Pinger/Ponger refactor part 3 - RxJS 6 on Top of EventSource
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// file: src/Misc.js | |
export const PING = 'ping'; | |
export const PONG = 'pong'; | |
export function now() { | |
return new Date().toLocaleTimeString(); | |
} | |
export function toTextFrom(from) { | |
const [_, signature] = from.split('--'); | |
return signature.substring(0, 20) + '...'; | |
}; | |
export function getPropPathValue(obj, path, defaultValue) { | |
let val = obj; | |
for(let name of path) { | |
if (!val) { | |
val = defaultValue; | |
break; | |
} | |
val = val[name]; | |
} | |
return val; | |
} | |
export function makeMessage(from, type) { | |
return {type, from}; | |
} | |
export function makeCleanup(subscription) { | |
return event => { | |
subscription.unsubscribe(); | |
// event.returnValue = "\o/"; // for leave prompt | |
}; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// file: src/Pinger.js | |
/* | |
Client Apps: Pinger.js, Ponger.js | |
Miscellaneous shared: Misc.js | |
View Support: AddressInput.js, StatusDisplay.js, MsgTable.js | |
Observable (RxJS 6) SSE interface: ServerEvents.js | |
Server-side: https://github.com/CrowdHailer/gen_browser /examples/standalone.exs | |
*/ | |
import {setStatus} from './StatusDisplay'; | |
import {addSelectListener, getAddress} from './AddressInput'; | |
import {makeCleanup, makeMessage, now, PING, PONG, toTextFrom} from './Misc'; | |
import {prependMsgRow} from './MsgTable'; | |
import {makeEventsConnectable, send} from './ServerEvents'; | |
import {fromEvent} from "rxjs"; | |
import {filter, map, tap, switchMap} from "rxjs/operators"; | |
addSelectListener(); | |
setStatus('Connecting ...'); | |
let events = makeEventsConnectable('http://localhost:8080'); | |
events.pipe( | |
map(([sourceInfo, _]) => sourceInfo) | |
).subscribe( | |
onSourceInfo, | |
onError, | |
() => console.info('Configuration Complete') | |
); | |
events.pipe( | |
switchMap(([_, messages]) => messages), | |
tap(msg => console.info('received message: %O', msg)), | |
filter(({type}) => type === PONG) | |
).subscribe( | |
onPong, | |
onError, | |
() => console.info('Messages Complete') | |
); | |
let subscription = events.connect(); | |
fromEvent(window, 'beforeunload') | |
.subscribe(makeCleanup(subscription)); | |
// --- === --- | |
let clickSubscription = null; | |
function onSourceInfo(sourceInfo) { | |
console.info(sourceInfo.address); | |
if (clickSubscription) { | |
clickSubscription.unsubscribe(); | |
clickSubscription = null; | |
} | |
const button = document.querySelector('form > button'); | |
clickSubscription = fromEvent(button, 'click') | |
.subscribe(makeSendPing(sourceInfo)); | |
setStatus('Ready'); | |
} | |
function onPong({type, from}) { | |
prependMsgRow(type, now(), toTextFrom(from)); | |
} | |
function onError(error) { | |
setStatus(error.toString()); | |
} | |
function makeSendPing({sendPath, address, config}) { | |
return event => { | |
send( | |
sendPath, | |
getAddress(config.logger), // i.e. "to" | |
makeMessage(address, PING) // message includes "from" | |
).then(onResponse, onPromiseError); | |
event.preventDefault(); | |
}; | |
} | |
function onResponse(response) { | |
const {status, statusText} = response; | |
if (202 !== status) { | |
setStatus(`send not accepted: ${status} ${statusText}`); | |
} | |
} | |
function onPromiseError(error) { | |
onError(error); | |
return Promise.resolve(); // "DeprecationWarning: Unhandled promise rejections are deprecated" | |
} // i.e. Promise errors aren't considered "handled" until a resolved value is returned. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// file: src/Ponger.js | |
/* | |
Client Apps: Pinger.js, Ponger.js | |
Miscellaneous shared: Misc.js | |
View Support: AddressInput.js, StatusDisplay.js, MsgTable.js | |
Observable (RxJS 6) SSE interface: ServerEvents.js | |
Server-side: https://github.com/CrowdHailer/gen_browser /examples/standalone.exs | |
*/ | |
import {setStatus} from './StatusDisplay'; | |
import {addSelectListener, setAddress} from './AddressInput'; | |
import {makeCleanup, makeMessage, now, PING, PONG, toTextFrom} from './Misc'; | |
import {prependMsgRow} from './MsgTable'; | |
import {makeEventsConnectable, send} from './ServerEvents'; | |
import {fromEvent} from "rxjs"; | |
import {filter, map, switchMap, tap} from "rxjs/operators"; | |
addSelectListener(); | |
setStatus('Connecting ...'); | |
let events = makeEventsConnectable('http://localhost:8080'); | |
events.pipe( | |
map(([sourceInfo, _]) => sourceInfo) | |
).subscribe( | |
onSourceInfo, | |
onError, | |
() => console.info('Configuration Complete') | |
); | |
events.pipe( | |
switchMap(([_, messages]) => messages), | |
tap(msg => console.info('received message: %O', msg)), | |
filter(({type}) => type === PING) | |
).subscribe( | |
onPing, | |
onError, | |
() => console.info('Messages Complete') | |
); | |
let subscription = events.connect(); | |
fromEvent(window, 'beforeunload') | |
.subscribe(makeCleanup(subscription)); | |
// --- === --- | |
let currentSourceInfo = null; | |
function onSourceInfo(sourceInfo) { | |
currentSourceInfo = sourceInfo; | |
console.info(sourceInfo.address); | |
setAddress(sourceInfo.address); | |
setStatus('Ready'); | |
} | |
function onPing({type, from}) { | |
const {sendPath, address} = currentSourceInfo; | |
prependMsgRow(type, now(), toTextFrom(from)); | |
send( | |
sendPath, | |
from, // i.e. "to" | |
makeMessage(address, PONG) // message includes "from" | |
).then(onResponse, onPromiseError); | |
} | |
function onResponse(response) { | |
const {status, statusText} = response; | |
if (202 !== status) { | |
setStatus(`send not accepted: ${status} ${statusText}`); | |
} | |
} | |
function onError(error) { | |
setStatus(error.toString()); | |
} | |
function onPromiseError(error) { | |
onError(error); | |
return Promise.resolve(); // "DeprecationWarning: Unhandled promise rejections are deprecated" | |
} // i.e. Promise errors aren't considered "handled" until a resolved value is returned. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// file: src/ServerEvents.js | |
import {Observable} from "rxjs"; | |
import {publish} from "rxjs/operators"; | |
const FIRST_MESSAGE_TYPE = '__gen_browser__/init'; | |
function removeListener(context, type) { | |
const listener = context[type]; | |
if (listener) { | |
context.source.removeEventListener(type, listener, false); | |
} | |
context[type] = null; | |
} | |
function setListener(context, type, newListener) { | |
removeListener(context, type); | |
context[type] = newListener; | |
context.source.addEventListener(type, newListener, false); | |
} | |
function removeMessageListener(context) { | |
removeListener(context, 'message'); | |
} | |
function setMessageListener(context, listener) { | |
setListener(context, 'message', listener); | |
} | |
function removeErrorListener(context) { | |
removeListener(context, 'error'); | |
} | |
function setErrorListener(context, listener) { | |
setListener(context, 'error', listener); | |
} | |
function forwardNext(observer, value) { | |
if(observer) { | |
observer.next(value); | |
} else { | |
console.info('forwardNext: no observer. Value %O', value); | |
} | |
} | |
function forwardError(observer, error) { | |
if(observer) { | |
observer.error(error); | |
} else { | |
console.info('forwardError: no observer. Error %O', error); | |
} | |
} | |
function forwardComplete(observer) { | |
if(observer) { | |
observer.complete(); | |
} else { | |
console.info('fowardComplete: no observer'); | |
} | |
} | |
function closeEventSource (context) { | |
if (context.source) { | |
removeErrorListener(context); | |
removeMessageListener(context); | |
context.source.close(); | |
context.source = null; | |
} | |
if (context.messageObservable) { | |
if (context.observer) { | |
forwardComplete(context.observer); | |
context.observer = null; | |
} | |
context.messageObservable = null; | |
} | |
} | |
function makeMessageObservable(context) { | |
const connectMessageObserver = observer => { | |
context.observer = observer; | |
const unsubscribe = () => { | |
context.observer = null; | |
closeEventSource(context); | |
}; | |
const onMessage = ({data}) => { | |
forwardNext(context.observer, JSON.parse(data)); | |
}; | |
const onError = e => { | |
if (e.readyState === EventSource.CLOSED) { | |
forwardComplete(context.observer); | |
} else { | |
forwardError(context.observer, e); | |
} | |
unsubscribe(); | |
}; | |
// begin regular message processing | |
setMessageListener(context, onMessage); // begin regular message processing | |
setErrorListener(context, onError); | |
return unsubscribe; | |
}; | |
context.messageObservable = Observable.create(connectMessageObserver); | |
} | |
// | |
// Emits a single [{address,config}, messageObservable] value and then completes | |
// The messageObservable emits the messages after successful configuration | |
// | |
export function makeEventsConnectable(url, timeout = 5000) { | |
const connectConfigObservable = configurationObserver => { | |
let configObserver = configurationObserver; | |
let configTimeoutID = null; | |
let context = { | |
source: null, | |
observer: null, | |
message: null, // message listener | |
error: null, // error listener | |
messageObservable: null | |
}; | |
// window.myContext = context; | |
const unsubscribe = () => { | |
configObserver = null; | |
if (context.source && (!context.messageObservable)) { | |
// source but no messageObservable - close source | |
closeEventSource(context); | |
} | |
}; | |
const onError = error => { | |
forwardError(configObserver, error); | |
unsubscribe(); | |
}; | |
const onMessage = event => { | |
if (configTimeoutID) { | |
clearTimeout(configTimeoutID); | |
} | |
const {type, address, config} = JSON.parse(event.data); | |
if(type === FIRST_MESSAGE_TYPE ) { | |
makeMessageObservable(context); | |
const sendPath = `${url}/send/`; | |
forwardNext(configObserver, [{address, config, sendPath}, context.messageObservable]); | |
forwardComplete(configObserver); | |
configObserver = null; // configuration complete | |
} else { | |
// Configuration failed | |
onError(new Error(`Invalid configuration data ${event.data}`)); | |
} | |
}; // end configListener | |
const abortConfig = () => { | |
close(); | |
onError(new Error(`Configuration timed out: ${timeout} ms`)); | |
}; | |
try { | |
context.source = new EventSource(`${url}/mailbox`); | |
setErrorListener(context, onError); | |
setMessageListener(context, onMessage); | |
configTimeoutID = timeout > 0 ? setTimeout(abortConfig, timeout) : null; | |
} catch(error) { | |
onError(error); | |
} | |
return unsubscribe; | |
}; // end connectConfigObservable | |
let configObservable = Observable.create(connectConfigObservable); | |
return (publish())(configObservable); // - Turn Observable into ConnectableObservable | |
} | |
export function send(sendPath, address, payload) { | |
const sendUrl = sendPath + address; | |
const body = JSON.stringify(payload); | |
const request = new Request(sendUrl, {method: 'POST', body}); | |
return fetch(request); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment