Last active
December 2, 2020 16:48
-
-
Save corbifex/1f7e275358c2e8a7c21f3a182e133719 to your computer and use it in GitHub Desktop.
Lisk wsChannel workaround
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import WebSocket from 'isomorphic-ws'; | |
import { EventEmitter } from 'events'; | |
const CONNECTION_TIMEOUT = 2000; | |
const ACKNOWLEDGMENT_TIMEOUT = 2000; | |
const RESPONSE_TIMEOUT = 3000; | |
const timeout = async (ms, message) => new Promise((_, reject) => { | |
const id = setTimeout(() => { | |
clearTimeout(id); | |
reject(new Error(message ?? `Timed out in ${ms}ms.`)); | |
}, ms); | |
}); | |
const defer = () => { | |
let resolve = (res) => { | |
}; | |
let reject = (error) => { | |
}; | |
const promise = new Promise((_resolve, _reject) => { | |
resolve = _resolve; | |
reject = _reject; | |
}); | |
return {promise, resolve, reject}; | |
}; | |
const messageIsNotification = (input) => !!((input.id === undefined || input.id === null) && input.method); | |
export class WSChannel { | |
isAlive = false; | |
_url = ""; | |
_ws = null; | |
_requestCounter = 0; | |
_pendingRequests = {}; | |
_emitter = null; | |
constructor(url) { | |
this._url = url; | |
this._emitter = new EventEmitter(); | |
this._ws = new WebSocket(this._url); | |
} | |
async connect() { | |
const connect = new Promise(resolve => { | |
this._ws.onopen = () => { | |
setTimeout(() => { | |
this.isAlive = true; | |
resolve(); | |
}, 1000) | |
} | |
}); | |
const error = new Promise((_, reject) => { | |
this._ws.onerror = (err) => { | |
this.isAlive = false; | |
reject(err.error); | |
}; | |
}); | |
await Promise.race([ | |
connect, | |
error, | |
timeout(CONNECTION_TIMEOUT, `Could not connect in ${CONNECTION_TIMEOUT}ms`), | |
]); | |
this._ws.onmessage = (data) => { | |
this._handleMessage(data.data); | |
}; | |
} | |
async disconnect() { | |
this._requestCounter = 0; | |
this._pendingRequests = {}; | |
if (!this._ws) { | |
return Promise.resolve(); | |
} | |
return new Promise(resolve => { | |
this._ws.onclose = () => { | |
this.isAlive = false; | |
this._ws = undefined; | |
resolve(); | |
}; | |
// eslint-disable-next-line no-unused-expressions | |
this._ws?.close(); | |
}); | |
} | |
async invoke( | |
actionName, | |
params, | |
) { | |
const request = { | |
jsonrpc: '2.0', | |
id: this._requestCounter, | |
method: actionName, | |
params: params ?? {}, | |
}; | |
const send = new Promise((resolve, reject) => { | |
try { | |
this._ws.send(JSON.stringify(request)) | |
return resolve(); | |
} catch (err) { | |
if (err) { | |
return reject(err); | |
} | |
} | |
}); | |
await Promise.race([ | |
send, | |
timeout(ACKNOWLEDGMENT_TIMEOUT, `Request is not acknowledged in ${ACKNOWLEDGMENT_TIMEOUT}ms`), | |
]); | |
const response = defer(); | |
this._pendingRequests[this._requestCounter] = response; | |
this._requestCounter += 1; | |
return Promise.race([ | |
response.promise, | |
timeout(RESPONSE_TIMEOUT, `Response not received in ${RESPONSE_TIMEOUT}ms`), | |
]); | |
} | |
subscribe(eventName, cb) { | |
this._emitter.on(eventName, cb); | |
} | |
_handleMessage(message) { | |
const res = JSON.parse(message); | |
// Its an event | |
if (messageIsNotification(res)) { | |
this._emitter.emit(res.method, res.params); | |
} else { | |
const id = typeof res.id === 'number' ? res.id : parseInt(res.id, 10); | |
if (this._pendingRequests[id]) { | |
if (res.error) { | |
this._pendingRequests[id].reject(new Error(res.error.data ?? res.error.data)); | |
} else { | |
this._pendingRequests[id].resolve(res.result); | |
} | |
delete this._pendingRequests[id]; | |
} | |
} | |
} | |
} |
Author
corbifex
commented
Dec 2, 2020
•
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment