Skip to content

Instantly share code, notes, and snippets.

@peerreynders
Last active October 25, 2018 13:47
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save peerreynders/d59d37dd47016a932ee9c786e3817560 to your computer and use it in GitHub Desktop.
gen-browser Pinger/Ponger refactor part 3 - RxJS 6 on Top of EventSource
// file: src/Misc.js
export const PING = 'ping';
export const PONG = 'pong';
export function now() {
return new Date().toLocaleTimeString();
}
export function toTextFrom(from) {
const [_, signature] = from.split('--');
return signature.substring(0, 20) + '...';
};
export function getPropPathValue(obj, path, defaultValue) {
let val = obj;
for(let name of path) {
if (!val) {
val = defaultValue;
break;
}
val = val[name];
}
return val;
}
export function makeMessage(from, type) {
return {type, from};
}
export function makeCleanup(subscription) {
return event => {
subscription.unsubscribe();
// event.returnValue = "\o/"; // for leave prompt
};
}
// file: src/Pinger.js
/*
Client Apps: Pinger.js, Ponger.js
Miscellaneous shared: Misc.js
View Support: AddressInput.js, StatusDisplay.js, MsgTable.js
Observable (RxJS 6) SSE interface: ServerEvents.js
Server-side: https://github.com/CrowdHailer/gen_browser /examples/standalone.exs
*/
import {setStatus} from './StatusDisplay';
import {addSelectListener, getAddress} from './AddressInput';
import {makeCleanup, makeMessage, now, PING, PONG, toTextFrom} from './Misc';
import {prependMsgRow} from './MsgTable';
import {makeEventsConnectable, send} from './ServerEvents';
import {fromEvent} from "rxjs";
import {filter, map, tap, switchMap} from "rxjs/operators";
addSelectListener();
setStatus('Connecting ...');
let events = makeEventsConnectable('http://localhost:8080');
events.pipe(
map(([sourceInfo, _]) => sourceInfo)
).subscribe(
onSourceInfo,
onError,
() => console.info('Configuration Complete')
);
events.pipe(
switchMap(([_, messages]) => messages),
tap(msg => console.info('received message: %O', msg)),
filter(({type}) => type === PONG)
).subscribe(
onPong,
onError,
() => console.info('Messages Complete')
);
let subscription = events.connect();
fromEvent(window, 'beforeunload')
.subscribe(makeCleanup(subscription));
// --- === ---
let clickSubscription = null;
function onSourceInfo(sourceInfo) {
console.info(sourceInfo.address);
if (clickSubscription) {
clickSubscription.unsubscribe();
clickSubscription = null;
}
const button = document.querySelector('form > button');
clickSubscription = fromEvent(button, 'click')
.subscribe(makeSendPing(sourceInfo));
setStatus('Ready');
}
function onPong({type, from}) {
prependMsgRow(type, now(), toTextFrom(from));
}
function onError(error) {
setStatus(error.toString());
}
function makeSendPing({sendPath, address, config}) {
return event => {
send(
sendPath,
getAddress(config.logger), // i.e. "to"
makeMessage(address, PING) // message includes "from"
).then(onResponse, onPromiseError);
event.preventDefault();
};
}
function onResponse(response) {
const {status, statusText} = response;
if (202 !== status) {
setStatus(`send not accepted: ${status} ${statusText}`);
}
}
function onPromiseError(error) {
onError(error);
return Promise.resolve(); // "DeprecationWarning: Unhandled promise rejections are deprecated"
} // i.e. Promise errors aren't considered "handled" until a resolved value is returned.
// file: src/Ponger.js
/*
Client Apps: Pinger.js, Ponger.js
Miscellaneous shared: Misc.js
View Support: AddressInput.js, StatusDisplay.js, MsgTable.js
Observable (RxJS 6) SSE interface: ServerEvents.js
Server-side: https://github.com/CrowdHailer/gen_browser /examples/standalone.exs
*/
import {setStatus} from './StatusDisplay';
import {addSelectListener, setAddress} from './AddressInput';
import {makeCleanup, makeMessage, now, PING, PONG, toTextFrom} from './Misc';
import {prependMsgRow} from './MsgTable';
import {makeEventsConnectable, send} from './ServerEvents';
import {fromEvent} from "rxjs";
import {filter, map, switchMap, tap} from "rxjs/operators";
addSelectListener();
setStatus('Connecting ...');
let events = makeEventsConnectable('http://localhost:8080');
events.pipe(
map(([sourceInfo, _]) => sourceInfo)
).subscribe(
onSourceInfo,
onError,
() => console.info('Configuration Complete')
);
events.pipe(
switchMap(([_, messages]) => messages),
tap(msg => console.info('received message: %O', msg)),
filter(({type}) => type === PING)
).subscribe(
onPing,
onError,
() => console.info('Messages Complete')
);
let subscription = events.connect();
fromEvent(window, 'beforeunload')
.subscribe(makeCleanup(subscription));
// --- === ---
let currentSourceInfo = null;
function onSourceInfo(sourceInfo) {
currentSourceInfo = sourceInfo;
console.info(sourceInfo.address);
setAddress(sourceInfo.address);
setStatus('Ready');
}
function onPing({type, from}) {
const {sendPath, address} = currentSourceInfo;
prependMsgRow(type, now(), toTextFrom(from));
send(
sendPath,
from, // i.e. "to"
makeMessage(address, PONG) // message includes "from"
).then(onResponse, onPromiseError);
}
function onResponse(response) {
const {status, statusText} = response;
if (202 !== status) {
setStatus(`send not accepted: ${status} ${statusText}`);
}
}
function onError(error) {
setStatus(error.toString());
}
function onPromiseError(error) {
onError(error);
return Promise.resolve(); // "DeprecationWarning: Unhandled promise rejections are deprecated"
} // i.e. Promise errors aren't considered "handled" until a resolved value is returned.
// file: src/ServerEvents.js
import {Observable} from "rxjs";
import {publish} from "rxjs/operators";
const FIRST_MESSAGE_TYPE = '__gen_browser__/init';
function removeListener(context, type) {
const listener = context[type];
if (listener) {
context.source.removeEventListener(type, listener, false);
}
context[type] = null;
}
function setListener(context, type, newListener) {
removeListener(context, type);
context[type] = newListener;
context.source.addEventListener(type, newListener, false);
}
function removeMessageListener(context) {
removeListener(context, 'message');
}
function setMessageListener(context, listener) {
setListener(context, 'message', listener);
}
function removeErrorListener(context) {
removeListener(context, 'error');
}
function setErrorListener(context, listener) {
setListener(context, 'error', listener);
}
function forwardNext(observer, value) {
if(observer) {
observer.next(value);
} else {
console.info('forwardNext: no observer. Value %O', value);
}
}
function forwardError(observer, error) {
if(observer) {
observer.error(error);
} else {
console.info('forwardError: no observer. Error %O', error);
}
}
function forwardComplete(observer) {
if(observer) {
observer.complete();
} else {
console.info('fowardComplete: no observer');
}
}
function closeEventSource (context) {
if (context.source) {
removeErrorListener(context);
removeMessageListener(context);
context.source.close();
context.source = null;
}
if (context.messageObservable) {
if (context.observer) {
forwardComplete(context.observer);
context.observer = null;
}
context.messageObservable = null;
}
}
function makeMessageObservable(context) {
const connectMessageObserver = observer => {
context.observer = observer;
const unsubscribe = () => {
context.observer = null;
closeEventSource(context);
};
const onMessage = ({data}) => {
forwardNext(context.observer, JSON.parse(data));
};
const onError = e => {
if (e.readyState === EventSource.CLOSED) {
forwardComplete(context.observer);
} else {
forwardError(context.observer, e);
}
unsubscribe();
};
// begin regular message processing
setMessageListener(context, onMessage); // begin regular message processing
setErrorListener(context, onError);
return unsubscribe;
};
context.messageObservable = Observable.create(connectMessageObserver);
}
//
// Emits a single [{address,config}, messageObservable] value and then completes
// The messageObservable emits the messages after successful configuration
//
export function makeEventsConnectable(url, timeout = 5000) {
const connectConfigObservable = configurationObserver => {
let configObserver = configurationObserver;
let configTimeoutID = null;
let context = {
source: null,
observer: null,
message: null, // message listener
error: null, // error listener
messageObservable: null
};
// window.myContext = context;
const unsubscribe = () => {
configObserver = null;
if (context.source && (!context.messageObservable)) {
// source but no messageObservable - close source
closeEventSource(context);
}
};
const onError = error => {
forwardError(configObserver, error);
unsubscribe();
};
const onMessage = event => {
if (configTimeoutID) {
clearTimeout(configTimeoutID);
}
const {type, address, config} = JSON.parse(event.data);
if(type === FIRST_MESSAGE_TYPE ) {
makeMessageObservable(context);
const sendPath = `${url}/send/`;
forwardNext(configObserver, [{address, config, sendPath}, context.messageObservable]);
forwardComplete(configObserver);
configObserver = null; // configuration complete
} else {
// Configuration failed
onError(new Error(`Invalid configuration data ${event.data}`));
}
}; // end configListener
const abortConfig = () => {
close();
onError(new Error(`Configuration timed out: ${timeout} ms`));
};
try {
context.source = new EventSource(`${url}/mailbox`);
setErrorListener(context, onError);
setMessageListener(context, onMessage);
configTimeoutID = timeout > 0 ? setTimeout(abortConfig, timeout) : null;
} catch(error) {
onError(error);
}
return unsubscribe;
}; // end connectConfigObservable
let configObservable = Observable.create(connectConfigObservable);
return (publish())(configObservable); // - Turn Observable into ConnectableObservable
}
export function send(sendPath, address, payload) {
const sendUrl = sendPath + address;
const body = JSON.stringify(payload);
const request = new Request(sendUrl, {method: 'POST', body});
return fetch(request);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment