Skip to content

Instantly share code, notes, and snippets.

@corbifex
Last active December 2, 2020 16:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save corbifex/1f7e275358c2e8a7c21f3a182e133719 to your computer and use it in GitHub Desktop.
Save corbifex/1f7e275358c2e8a7c21f3a182e133719 to your computer and use it in GitHub Desktop.
Lisk wsChannel workaround
import WebSocket from 'isomorphic-ws';
import { EventEmitter } from 'events';
const CONNECTION_TIMEOUT = 2000;
const ACKNOWLEDGMENT_TIMEOUT = 2000;
const RESPONSE_TIMEOUT = 3000;
const timeout = async (ms, message) => new Promise((_, reject) => {
const id = setTimeout(() => {
clearTimeout(id);
reject(new Error(message ?? `Timed out in ${ms}ms.`));
}, ms);
});
const defer = () => {
let resolve = (res) => {
};
let reject = (error) => {
};
const promise = new Promise((_resolve, _reject) => {
resolve = _resolve;
reject = _reject;
});
return {promise, resolve, reject};
};
const messageIsNotification = (input) => !!((input.id === undefined || input.id === null) && input.method);
export class WSChannel {
isAlive = false;
_url = "";
_ws = null;
_requestCounter = 0;
_pendingRequests = {};
_emitter = null;
constructor(url) {
this._url = url;
this._emitter = new EventEmitter();
this._ws = new WebSocket(this._url);
}
async connect() {
const connect = new Promise(resolve => {
this._ws.onopen = () => {
setTimeout(() => {
this.isAlive = true;
resolve();
}, 1000)
}
});
const error = new Promise((_, reject) => {
this._ws.onerror = (err) => {
this.isAlive = false;
reject(err.error);
};
});
await Promise.race([
connect,
error,
timeout(CONNECTION_TIMEOUT, `Could not connect in ${CONNECTION_TIMEOUT}ms`),
]);
this._ws.onmessage = (data) => {
this._handleMessage(data.data);
};
}
async disconnect() {
this._requestCounter = 0;
this._pendingRequests = {};
if (!this._ws) {
return Promise.resolve();
}
return new Promise(resolve => {
this._ws.onclose = () => {
this.isAlive = false;
this._ws = undefined;
resolve();
};
// eslint-disable-next-line no-unused-expressions
this._ws?.close();
});
}
async invoke(
actionName,
params,
) {
const request = {
jsonrpc: '2.0',
id: this._requestCounter,
method: actionName,
params: params ?? {},
};
const send = new Promise((resolve, reject) => {
try {
this._ws.send(JSON.stringify(request))
return resolve();
} catch (err) {
if (err) {
return reject(err);
}
}
});
await Promise.race([
send,
timeout(ACKNOWLEDGMENT_TIMEOUT, `Request is not acknowledged in ${ACKNOWLEDGMENT_TIMEOUT}ms`),
]);
const response = defer();
this._pendingRequests[this._requestCounter] = response;
this._requestCounter += 1;
return Promise.race([
response.promise,
timeout(RESPONSE_TIMEOUT, `Response not received in ${RESPONSE_TIMEOUT}ms`),
]);
}
subscribe(eventName, cb) {
this._emitter.on(eventName, cb);
}
_handleMessage(message) {
const res = JSON.parse(message);
// Its an event
if (messageIsNotification(res)) {
this._emitter.emit(res.method, res.params);
} else {
const id = typeof res.id === 'number' ? res.id : parseInt(res.id, 10);
if (this._pendingRequests[id]) {
if (res.error) {
this._pendingRequests[id].reject(new Error(res.error.data ?? res.error.data));
} else {
this._pendingRequests[id].resolve(res.result);
}
delete this._pendingRequests[id];
}
}
}
}
@corbifex
Copy link
Author

corbifex commented Dec 2, 2020

import { createClient } from "@liskhq/lisk-api-client";
import { WSChannel } from './wsChannel';
const wsChannel = new WSChannel('ws://localhost:4001/ws');
await wsChannel.connect();
const client = await createClient(wsChannel));

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment