Skip to content

Instantly share code, notes, and snippets.

@oeway
Last active May 4, 2022 04:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save oeway/76f853a8e547e223a177bb549603393b to your computer and use it in GitHub Desktop.
Save oeway/76f853a8e547e223a177bb549603393b to your computer and use it in GitHub Desktop.
<docs>
[TODO: write documentation for this plugin.]
</docs>
<config lang="json">
{
"name": "WebPythonWorker",
"type": "web-worker",
"tags": [],
"ui": "",
"version": "0.2.9",
"cover": "",
"description": "[TODO: describe this plugin with one sentence.]",
"icon": "extension",
"inputs": null,
"outputs": null,
"api_version": "0.1.8",
"env": "",
"permissions": [],
"requirements": ["https://cdn.jsdelivr.net/pyodide/v0.20.1a1/full/pyodide.js"],
"dependencies": [],
"flags": ["engine"],
"runnable": false
}
</config>
<script lang="javascript">
function randId() {
return (
Math.random()
.toString(36)
.substr(2, 10) + new Date().getTime()
);
}
const typedArrayToDtypeMapping = {
Int8Array: "int8",
Int16Array: "int16",
Int32Array: "int32",
Uint8Array: "uint8",
Uint16Array: "uint16",
Uint32Array: "uint32",
Float32Array: "float32",
Float64Array: "float64",
Array: "array"
};
const dtypeToTypedArray = {
int8: Int8Array,
int16: Int16Array,
int32: Int32Array,
uint8: Uint8Array,
uint16: Uint16Array,
uint32: Uint32Array,
float32: Float32Array,
float64: Float64Array,
array: Array
};
class MessageEmitter {
constructor(debug) {
this._event_handlers = {};
this._once_handlers = {};
this._debug = debug;
}
emit() {
throw new Error("emit is not implemented");
}
on(event, handler) {
if (!this._event_handlers[event]) {
this._event_handlers[event] = [];
}
this._event_handlers[event].push(handler);
}
once(event, handler) {
handler.___event_run_once = true;
this.on(event, handler);
}
off(event, handler) {
if (!event && !handler) {
// remove all events handlers
this._event_handlers = {};
} else if (event && !handler) {
// remove all hanlders for the event
if (this._event_handlers[event]) this._event_handlers[event] = [];
} else {
// remove a specific handler
if (this._event_handlers[event]) {
const idx = this._event_handlers[event].indexOf(handler);
if (idx >= 0) {
this._event_handlers[event].splice(idx, 1);
}
}
}
}
_fire(event, data) {
if (this._event_handlers[event]) {
var i = this._event_handlers[event].length;
while (i--) {
const handler = this._event_handlers[event][i];
try {
handler(data);
} catch (e) {
console.error(e);
} finally {
if (handler.___event_run_once) {
this._event_handlers[event].splice(i, 1);
}
}
}
} else {
if (this._debug) {
console.warn("unhandled event", event, data);
}
}
}
}
const API_VERSION = "0.2.3";
const ArrayBufferView = Object.getPrototypeOf(
Object.getPrototypeOf(new Uint8Array())
).constructor;
function _appendBuffer(buffer1, buffer2) {
const tmp = new Uint8Array(buffer1.byteLength + buffer2.byteLength);
tmp.set(new Uint8Array(buffer1), 0);
tmp.set(new Uint8Array(buffer2), buffer1.byteLength);
return tmp.buffer;
}
function indexObject(obj, is) {
if (!is) throw new Error("undefined index");
if (typeof is === "string") return indexObject(obj, is.split("."));
else if (is.length === 0) return obj;
else return indexObject(obj[is[0]], is.slice(1));
}
/**
* RPC object represents a single site in the
* communication protocol between the application and the plugin
*
* @param {Object} connection a special object allowing to send
* and receive messages from the opposite site (basically it
* should only provide send() and onMessage() methods)
*/
class RPC extends MessageEmitter {
constructor(connection, config, codecs) {
super(config && config.debug);
this._connection = connection;
this.config = config || {};
this._codecs = codecs || {};
this._object_store = {};
this._method_weakmap = new WeakMap();
this._object_weakmap = new WeakMap();
this._local_api = null;
this._remote_set = false;
// make sure there is an execute function
const name = this.config.name;
this._connection.execute =
this._connection.execute ||
function() {
throw new Error(`connection.execute not implemented (in "${name}")`);
};
this._store = new ReferenceStore();
this._method_refs = new ReferenceStore();
this._method_refs.onReady(() => {
this._fire("remoteIdle");
});
this._method_refs.onBusy(() => {
this._fire("remoteBusy");
});
this._setupMessageHanlders();
}
init() {
this._connection.emit({
type: "initialized",
config: this.config,
peer_id: this._connection.peer_id
});
}
setConfig(config) {
if (config)
for (const k of Object.keys(config)) {
this.config[k] = config[k];
}
}
/**
* Set a handler to be called when received a responce from the
* remote site reporting that the previously provided interface
* has been successfully set as remote for that site
*
* @param {Function} handler
*/
getRemoteCallStack() {
return this._method_refs.getStack();
}
/**
* @returns {Object} set of remote interface methods
*/
getRemote() {
return this._remote_interface;
}
/**
* Sets the interface of this site making it available to the
* remote site by sending a message with a set of methods names
*
* @param {Object} _interface to set
*/
setInterface(_interface, config) {
config = config || {};
this.config.name = config.name || this.config.name;
this.config.description = config.description || this.config.description;
if (this.config.forwarding_functions) {
for (let func_name of this.config.forwarding_functions) {
const _remote = this._remote_interface;
if (_remote[func_name]) {
if (_interface.constructor === Object) {
if (!_interface[func_name]) {
_interface[func_name] = (...args) => {
_remote[func_name](...args);
};
}
} else if (_interface.constructor.constructor === Function) {
if (!_interface.constructor.prototype[func_name]) {
_interface.constructor.prototype[func_name] = (...args) => {
_remote[func_name](...args);
};
}
}
}
}
}
this._local_api = _interface;
if (!this._remote_set) this._fire("interfaceAvailable");
else this.sendInterface();
return new Promise(resolve => {
this.once("interfaceSetAsRemote", resolve);
});
}
/**
* Sends the actual interface to the remote site upon it was
* updated or by a special request of the remote site
*/
sendInterface() {
if (!this._local_api) {
throw new Error("interface is not set.");
}
this._encode(this._local_api, true).then(api => {
this._connection.emit({ type: "setInterface", api: api });
});
}
_disposeObject(objectId) {
if (this._object_store[objectId]) {
delete this._object_store[objectId];
} else {
throw new Error(`Object (id=${objectId}) not found.`);
}
}
disposeObject(obj) {
return new Promise((resolve, reject) => {
if (this._object_weakmap.has(obj)) {
const objectId = this._object_weakmap.get(obj);
this._connection.once("disposed", data => {
if (data.error) reject(new Error(data.error));
else resolve();
});
this._connection.emit({
type: "disposeObject",
object_id: objectId
});
} else {
throw new Error("Invalid object");
}
});
}
/**
* Handles a message from the remote site
*/
_setupMessageHanlders() {
this._connection.on("init", this.init);
this._connection.on("execute", data => {
Promise.resolve(this._connection.execute(data.code))
.then(() => {
this._connection.emit({ type: "executed" });
})
.catch(e => {
console.error(e);
this._connection.emit({
type: "executed",
error: String(e)
});
});
});
this._connection.on("method", async data => {
let resolve, reject, method, method_this, args, result;
try {
if (data.promise) {
[resolve, reject] = await this._unwrap(data.promise, false);
}
const _interface = this._object_store[data.object_id];
method = indexObject(_interface, data.name);
if (data.name.includes(".")) {
const tmp = data.name.split(".");
const intf_index = tmp.slice(0, tmp.length - 1).join(".");
method_this = indexObject(_interface, intf_index);
} else {
method_this = _interface;
}
args = await this._unwrap(data.args, true);
if (data.promise) {
result = method.apply(method_this, args);
if (
result instanceof Promise ||
(method.constructor && method.constructor.name === "AsyncFunction")
) {
result.then(resolve).catch(reject);
} else {
resolve(result);
}
} else {
method.apply(method_this, args);
}
} catch (err) {
console.error(this.config.name, err);
if (reject) {
reject(err);
}
}
});
this._connection.on("callback", async data => {
let resolve, reject, method, args, result;
try {
if (data.promise) {
[resolve, reject] = await this._unwrap(data.promise, false);
}
if (data.promise) {
method = this._store.fetch(data.id);
args = await this._unwrap(data.args, true);
if (!method) {
throw new Error(
"Callback function can only called once, if you want to call a function for multiple times, please make it as a plugin api function. See https://imjoy.io/docs for more details."
);
}
result = method.apply(null, args);
if (
result instanceof Promise ||
(method.constructor && method.constructor.name === "AsyncFunction")
) {
result.then(resolve).catch(reject);
} else {
resolve(result);
}
} else {
method = this._store.fetch(data.id);
args = await this._unwrap(data.args, true);
if (!method) {
throw new Error(
"Please notice that callback function can only called once, if you want to call a function for multiple times, please make it as a plugin api function. See https://imjoy.io/docs for more details."
);
}
method.apply(null, args);
}
} catch (err) {
console.error(this.config.name, err);
if (reject) {
reject(err);
}
}
});
this._connection.on("disposeObject", data => {
try {
this._disposeObject(data.object_id);
this._connection.emit({
type: "disposed"
});
} catch (e) {
console.error(e);
this._connection.emit({
type: "disposed",
error: String(e)
});
}
});
this._connection.on("setInterface", data => {
this._setRemoteInterface(data.api);
});
this._connection.on("getInterface", () => {
this._fire("getInterface");
if (this._local_api) {
this.sendInterface();
} else {
this.once("interfaceAvailable", () => {
this.sendInterface();
});
}
});
this._connection.on("interfaceSetAsRemote", () => {
this._remote_set = true;
this._fire("interfaceSetAsRemote");
});
this._connection.on("disconnect", () => {
this._fire("beforeDisconnect");
this._connection.disconnect();
this._fire("disconnected");
});
}
/**
* Sends a requests to the remote site asking it to provide its
* current interface
*/
requestRemote() {
this._connection.emit({ type: "getInterface" });
}
_ndarray(typedArray, shape, dtype) {
const _dtype = typedArrayToDtype(typedArray);
if (dtype && dtype !== _dtype) {
throw "dtype doesn't match the type of the array: " +
_dtype +
" != " +
dtype;
}
shape = shape || [typedArray.length];
return {
_rtype: "ndarray",
_rvalue: typedArray.buffer,
_rshape: shape,
_rdtype: _dtype
};
}
/**
* Sets the new remote interface provided by the other site
*
* @param {Array} names list of function names
*/
_setRemoteInterface(api) {
this._decode(api).then(intf => {
// update existing interface instead of recreating it
// this will preserve the object reference
if (this._remote_interface) {
// clear the interface
for (let k in this._remote_interface) delete this._remote_interface[k];
// then assign the new interfaces
Object.assign(this._remote_interface, intf);
} else this._remote_interface = intf;
this._fire("remoteReady");
this._reportRemoteSet();
});
}
/**
* Generates the wrapped function corresponding to a single remote
* method. When the generated function is called, it will send the
* corresponding message to the remote site asking it to execute
* the particular method of its interface
*
* @param {String} name of the remote method
*
* @returns {Function} wrapped remote method
*/
_genRemoteMethod(targetId, name, objectId) {
const me = this;
const remoteMethod = function() {
return new Promise(async (resolve, reject) => {
let id = null;
try {
id = me._method_refs.put(objectId ? objectId + "/" + name : name);
const wrapped_resolve = function() {
if (id !== null) me._method_refs.fetch(id);
return resolve.apply(this, arguments);
};
const wrapped_reject = function() {
if (id !== null) me._method_refs.fetch(id);
return reject.apply(this, arguments);
};
const encodedPromise = await me._wrap([
wrapped_resolve,
wrapped_reject
]);
// store the key id for removing them from the reference store together
wrapped_resolve.__promise_pair = encodedPromise[1]._rvalue;
wrapped_reject.__promise_pair = encodedPromise[0]._rvalue;
let args = Array.prototype.slice.call(arguments);
const argLength = args.length;
// if the last argument is an object, mark it as kwargs
const withKwargs =
argLength > 0 &&
typeof args[argLength - 1] === "object" &&
args[argLength - 1] !== null &&
args[argLength - 1]._rkwargs;
if (withKwargs) delete args[argLength - 1]._rkwargs;
if (
name === "register" ||
name === "registerService" ||
name === "register_service" ||
name === "export" ||
name === "on"
) {
args = await me._wrap(args, true);
} else {
args = await me._wrap(args);
}
const transferables = args.__transferables__;
if (transferables) delete args.__transferables__;
me._connection.emit(
{
type: "method",
target_id: targetId,
name: name,
object_id: objectId,
args: args,
promise: encodedPromise,
with_kwargs: withKwargs
},
transferables
);
} catch (e) {
if (id) me._method_refs.fetch(id);
reject(
`Failed to exectue remote method (interface: ${objectId ||
me.id}, method: ${name}), error: ${e}`
);
}
});
};
remoteMethod.__remote_method = true;
return remoteMethod;
}
/**
* Sends a responce reporting that interface just provided by the
* remote site was successfully set by this site as remote
*/
_reportRemoteSet() {
this._connection.emit({ type: "interfaceSetAsRemote" });
}
/**
* Prepares the provided set of remote method arguments for
* sending to the remote site, replaces all the callbacks with
* identifiers
*
* @param {Array} args to wrap
*
* @returns {Array} wrapped arguments
*/
async _encode(aObject, asInterface, objectId) {
const aType = typeof aObject;
if (
aType === "number" ||
aType === "string" ||
aType === "boolean" ||
aObject === null ||
aObject === undefined ||
aObject instanceof ArrayBuffer
) {
return aObject;
}
let bObject;
if (typeof aObject === "function") {
if (asInterface) {
if (!objectId) throw new Error("objectId is not specified.");
bObject = {
_rtype: "interface",
_rtarget_id: this._connection.peer_id,
_rintf: objectId,
_rvalue: asInterface
};
this._method_weakmap.set(aObject, bObject);
} else if (this._method_weakmap.has(aObject)) {
bObject = this._method_weakmap.get(aObject);
} else {
const cid = this._store.put(aObject);
bObject = {
_rtype: "callback",
_rtarget_id: this._connection.peer_id,
_rname: (aObject.constructor && aObject.constructor.name) || cid,
_rvalue: cid
};
}
return bObject;
}
// skip if already encoded
if (aObject.constructor instanceof Object && aObject._rtype) {
// make sure the interface functions are encoded
if (aObject._rintf) {
const temp = aObject._rtype;
delete aObject._rtype;
bObject = await this._encode(aObject, asInterface, objectId);
bObject._rtype = temp;
} else {
bObject = aObject;
}
return bObject;
}
const transferables = [];
const _transfer = aObject._transfer;
const isarray = Array.isArray(aObject);
for (let tp of Object.keys(this._codecs)) {
const codec = this._codecs[tp];
if (codec.encoder && aObject instanceof codec.type) {
// TODO: what if multiple encoders found
let encodedObj = await Promise.resolve(codec.encoder(aObject));
if (encodedObj && !encodedObj._rtype) encodedObj._rtype = codec.name;
// encode the functions in the interface object
if (encodedObj && encodedObj._rintf) {
const temp = encodedObj._rtype;
delete encodedObj._rtype;
encodedObj = await this._encode(encodedObj, asInterface, objectId);
encodedObj._rtype = temp;
}
bObject = encodedObj;
return bObject;
}
}
if (
/*global tf*/
typeof tf !== "undefined" &&
tf.Tensor &&
aObject instanceof tf.Tensor
) {
const v_buffer = aObject.dataSync();
if (aObject._transfer || _transfer) {
transferables.push(v_buffer.buffer);
delete aObject._transfer;
}
bObject = {
_rtype: "ndarray",
_rvalue: v_buffer.buffer,
_rshape: aObject.shape,
_rdtype: aObject.dtype
};
} else if (
/*global nj*/
typeof nj !== "undefined" &&
nj.NdArray &&
aObject instanceof nj.NdArray
) {
const dtype = typedArrayToDtype(aObject.selection.data);
if (aObject._transfer || _transfer) {
transferables.push(aObject.selection.data.buffer);
delete aObject._transfer;
}
bObject = {
_rtype: "ndarray",
_rvalue: aObject.selection.data.buffer,
_rshape: aObject.shape,
_rdtype: dtype
};
} else if (aObject instanceof Error) {
console.error(aObject);
bObject = { _rtype: "error", _rvalue: aObject.toString() };
} else if (typeof File !== "undefined" && aObject instanceof File) {
bObject = {
_rtype: "file",
_rvalue: aObject,
_rpath: aObject._path || aObject.webkitRelativePath
};
}
// send objects supported by structure clone algorithm
// https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm
else if (
aObject !== Object(aObject) ||
aObject instanceof Boolean ||
aObject instanceof String ||
aObject instanceof Date ||
aObject instanceof RegExp ||
aObject instanceof ImageData ||
(typeof FileList !== "undefined" && aObject instanceof FileList) ||
(typeof FileSystemDirectoryHandle !== "undefined" &&
aObject instanceof FileSystemDirectoryHandle) ||
(typeof FileSystemFileHandle !== "undefined" &&
aObject instanceof FileSystemFileHandle) ||
(typeof FileSystemHandle !== "undefined" &&
aObject instanceof FileSystemHandle) ||
(typeof FileSystemWritableFileStream !== "undefined" &&
aObject instanceof FileSystemWritableFileStream)
) {
bObject = aObject;
// TODO: avoid object such as DynamicPlugin instance.
} else if (typeof File !== "undefined" && aObject instanceof File) {
bObject = {
_rtype: "file",
_rname: aObject.name,
_rmime: aObject.type,
_rvalue: aObject,
_rpath: aObject._path || aObject.webkitRelativePath
};
} else if (aObject instanceof Blob) {
bObject = { _rtype: "blob", _rvalue: aObject };
} else if (aObject instanceof ArrayBufferView) {
if (aObject._transfer || _transfer) {
transferables.push(aObject.buffer);
delete aObject._transfer;
}
const dtype = typedArrayToDtype(aObject);
bObject = {
_rtype: "typedarray",
_rvalue: aObject.buffer,
_rdtype: dtype
};
} else if (aObject instanceof DataView) {
if (aObject._transfer || _transfer) {
transferables.push(aObject.buffer);
delete aObject._transfer;
}
bObject = { _rtype: "memoryview", _rvalue: aObject.buffer };
} else if (aObject instanceof Set) {
bObject = {
_rtype: "set",
_rvalue: await this._encode(Array.from(aObject), asInterface)
};
} else if (aObject instanceof Map) {
bObject = {
_rtype: "orderedmap",
_rvalue: await this._encode(Array.from(aObject), asInterface)
};
} else if (
aObject.constructor instanceof Object ||
Array.isArray(aObject)
) {
bObject = isarray ? [] : {};
let keys;
// an object/array
if (aObject.constructor === Object || Array.isArray(aObject)) {
keys = Object.keys(aObject);
}
// a class
else if (aObject.constructor === Function) {
throw new Error("Please instantiate the class before exportting it.");
}
// instance of a class
else if (aObject.constructor.constructor === Function) {
keys = Object.getOwnPropertyNames(
Object.getPrototypeOf(aObject)
).concat(Object.keys(aObject));
// TODO: use a proxy object to represent the actual object
// always encode class instance as interface
asInterface = true;
} else {
throw Error("Unsupported interface type");
}
let hasFunction = false;
// encode interfaces
if (aObject._rintf || asInterface) {
if (!objectId) {
if (typeof aObject._rintf === "string" && aObject._rintf.length > 0) {
objectId = aObject._rintf; // enable custom object id
} else {
objectId = randId();
}
// Note: object with the same id will be overwritten
if (this._object_store[objectId])
console.warn(
`Overwritting interface object with the same id: ${objectId}`
);
this._object_store[objectId] = aObject;
}
for (let k of keys) {
if (k === "constructor") continue;
if (k.startsWith("_")) {
continue;
}
bObject[k] = await this._encode(
aObject[k],
typeof asInterface === "string" ? asInterface + "." + k : k,
objectId
);
if (typeof aObject[k] === "function") {
hasFunction = true;
}
}
// object id for dispose the object remotely
if (hasFunction) bObject._rintf = objectId;
// remove interface when closed
if (aObject.on && typeof aObject.on === "function") {
aObject.on("close", () => {
delete this._object_store[objectId];
});
}
} else {
for (let k of keys) {
if (["hasOwnProperty", "constructor"].includes(k)) continue;
bObject[k] = await this._encode(aObject[k]);
}
}
// for example, browserFS object
} else if (typeof aObject === "object") {
const keys = Object.getOwnPropertyNames(
Object.getPrototypeOf(aObject)
).concat(Object.keys(aObject));
const objectId = randId();
for (let k of keys) {
if (["hasOwnProperty", "constructor"].includes(k)) continue;
// encode as interface
bObject[k] = await this._encode(aObject[k], k, bObject);
}
// object id, used for dispose the object
bObject._rintf = objectId;
} else {
throw "imjoy-rpc: Unsupported data type:" + aObject;
}
if (transferables.length > 0) {
bObject.__transferables__ = transferables;
}
if (!bObject) {
throw new Error("Failed to encode object");
}
return bObject;
}
async _decode(aObject, withPromise) {
if (!aObject) {
return aObject;
}
let bObject;
if (aObject["_rtype"]) {
if (
this._codecs[aObject._rtype] &&
this._codecs[aObject._rtype].decoder
) {
if (aObject._rintf) {
const temp = aObject._rtype;
delete aObject._rtype;
aObject = await this._decode(aObject, withPromise);
aObject._rtype = temp;
}
bObject = await Promise.resolve(
this._codecs[aObject._rtype].decoder(aObject)
);
} else if (aObject._rtype === "callback") {
bObject = this._genRemoteCallback(
aObject._rtarget_id,
aObject._rvalue,
withPromise
);
} else if (aObject._rtype === "interface") {
bObject = this._genRemoteMethod(
aObject._rtarget_id,
aObject._rvalue,
aObject._rintf
);
} else if (aObject._rtype === "ndarray") {
/*global nj tf*/
//create build array/tensor if used in the plugin
if (typeof nj !== "undefined" && nj.array) {
if (Array.isArray(aObject._rvalue)) {
aObject._rvalue = aObject._rvalue.reduce(_appendBuffer);
}
bObject = nj
.array(new Uint8(aObject._rvalue), aObject._rdtype)
.reshape(aObject._rshape);
} else if (typeof tf !== "undefined" && tf.Tensor) {
if (Array.isArray(aObject._rvalue)) {
aObject._rvalue = aObject._rvalue.reduce(_appendBuffer);
}
const arraytype = dtypeToTypedArray[aObject._rdtype];
bObject = tf.tensor(
new arraytype(aObject._rvalue),
aObject._rshape,
aObject._rdtype
);
} else {
//keep it as regular if transfered to the main app
bObject = aObject;
}
} else if (aObject._rtype === "error") {
bObject = new Error(aObject._rvalue);
} else if (aObject._rtype === "file") {
if (aObject._rvalue instanceof File) {
bObject = aObject._rvalue;
//patch _path
bObject._path = aObject._rpath;
} else {
bObject = new File([aObject._rvalue], aObject._rname, {
type: aObject._rmime
});
bObject._path = aObject._rpath;
}
} else if (aObject._rtype === "typedarray") {
const arraytype = dtypeToTypedArray[aObject._rdtype];
if (!arraytype)
throw new Error("unsupported dtype: " + aObject._rdtype);
bObject = new arraytype(aObject._rvalue);
} else if (aObject._rtype === "memoryview") {
bObject = new DataView(aObject._rvalue);
} else if (aObject._rtype === "blob") {
if (aObject._rvalue instanceof Blob) {
bObject = aObject._rvalue;
} else {
bObject = new Blob([aObject._rvalue], { type: aObject._rmime });
}
} else if (aObject._rtype === "orderedmap") {
bObject = new Map(await this._decode(aObject._rvalue, withPromise));
} else if (aObject._rtype === "set") {
bObject = new Set(await this._decode(aObject._rvalue, withPromise));
} else {
// make sure all the interface functions are decoded
if (aObject._rintf) {
const temp = aObject._rtype;
delete aObject._rtype;
bObject = await this._decode(aObject, withPromise);
bObject._rtype = temp;
} else bObject = aObject;
}
} else if (aObject.constructor === Object || Array.isArray(aObject)) {
const isarray = Array.isArray(aObject);
bObject = isarray ? [] : {};
for (let k of Object.keys(aObject)) {
if (isarray || aObject.hasOwnProperty(k)) {
const v = aObject[k];
bObject[k] = await this._decode(v, withPromise);
}
}
} else {
bObject = aObject;
}
if (bObject === undefined) {
throw new Error("Failed to decode object");
}
// store the object id for dispose
if (aObject._rintf) {
this._object_weakmap.set(bObject, aObject._rintf);
}
return bObject;
}
async _wrap(args, asInterface) {
return await this._encode(args, asInterface);
}
/**
* Unwraps the set of arguments delivered from the remote site,
* replaces all callback identifiers with a function which will
* initiate sending that callback identifier back to other site
*
* @param {Object} args to unwrap
*
* @param {Boolean} withPromise is true means this the callback should contain a promise
*
* @returns {Array} unwrapped args
*/
async _unwrap(args, withPromise) {
return await this._decode(args, withPromise);
}
/**
* Generates the wrapped function corresponding to a single remote
* callback. When the generated function is called, it will send
* the corresponding message to the remote site asking it to
* execute the particular callback previously saved during a call
* by the remote site a method from the interface of this site
*
* @param {Number} id of the remote callback to execute
* @param {Number} argNum argument index of the callback
* @param {Boolean} withPromise is true means this the callback should contain a promise
*
* @returns {Function} wrapped remote callback
*/
_genRemoteCallback(targetId, cid, withPromise) {
const me = this;
let remoteCallback;
if (withPromise) {
remoteCallback = function() {
return new Promise(async (resolve, reject) => {
const args = await me._wrap(Array.prototype.slice.call(arguments));
const argLength = args.length;
// if the last argument is an object, mark it as kwargs
const withKwargs =
argLength > 0 &&
typeof args[argLength - 1] === "object" &&
args[argLength - 1] !== null &&
args[argLength - 1]._rkwargs;
if (withKwargs) delete args[argLength - 1]._rkwargs;
const transferables = args.__transferables__;
if (transferables) delete args.__transferables__;
const encodedPromise = await me._wrap([resolve, reject]);
// store the key id for removing them from the reference store together
resolve.__promise_pair = encodedPromise[1]._rvalue;
reject.__promise_pair = encodedPromise[0]._rvalue;
try {
me._connection.emit(
{
type: "callback",
target_id: targetId,
id: cid,
args: args,
promise: encodedPromise,
with_kwargs: withKwargs
},
transferables
);
} catch (e) {
reject(`Failed to exectue remote callback ( id: ${cid}).`);
}
});
};
return remoteCallback;
} else {
remoteCallback = async function() {
const args = await me._wrap(Array.prototype.slice.call(arguments));
const argLength = args.length;
// if the last argument is an object, mark it as kwargs
const withKwargs =
argLength > 0 &&
typeof args[argLength - 1] === "object" &&
args[argLength - 1] !== null &&
args[argLength - 1]._rkwargs;
if (withKwargs) delete args[argLength - 1]._rkwargs;
const transferables = args.__transferables__;
if (transferables) delete args.__transferables__;
return me._connection.emit(
{
type: "callback",
target_id: targetId,
id: cid,
args: args,
with_kwargs: withKwargs
},
transferables
);
};
return remoteCallback;
}
}
reset() {
this._event_handlers = {};
this._once_handlers = {};
this._remote_interface = null;
this._object_store = {};
this._method_weakmap = new WeakMap();
this._object_weakmap = new WeakMap();
this._local_api = null;
this._store = new ReferenceStore();
this._method_refs = new ReferenceStore();
}
/**
* Sends the notification message and breaks the connection
*/
disconnect() {
this._connection.emit({ type: "disconnect" });
this.reset();
setTimeout(() => {
this._connection.disconnect();
}, 2000);
}
}
/**
* ReferenceStore is a special object which stores other objects
* and provides the references (number) instead. This reference
* may then be sent over a json-based communication channel (IPC
* to another Node.js process or a message to the Worker). Other
* site may then provide the reference in the responce message
* implying the given object should be activated.
*
* Primary usage for the ReferenceStore is a storage for the
* callbacks, which therefore makes it possible to initiate a
* callback execution by the opposite site (which normally cannot
* directly execute functions over the communication channel).
*
* Each stored object can only be fetched once and is not
* available for the second time. Each stored object must be
* fetched, since otherwise it will remain stored forever and
* consume memory.
*
* Stored object indeces are simply the numbers, which are however
* released along with the objects, and are later reused again (in
* order to postpone the overflow, which should not likely happen,
* but anyway).
*/
class ReferenceStore {
constructor() {
this._store = {}; // stored object
this._indices = [0]; // smallest available indices
this._readyHandler = function() {};
this._busyHandler = function() {};
this._readyHandler();
}
/**
* call handler when the store is empty
*
* @param {FUNCTION} id of a handler
*/
onReady(readyHandler) {
this._readyHandler = readyHandler || function() {};
}
/**
* call handler when the store is not empty
*
* @param {FUNCTION} id of a handler
*/
onBusy(busyHandler) {
this._busyHandler = busyHandler || function() {};
}
/**
* get the length of the store
*
*/
getStack() {
return Object.keys(this._store).length;
}
/**
* @function _genId() generates the new reference id
*
* @returns {Number} smallest available id and reserves it
*/
_genId() {
let id;
if (this._indices.length === 1) {
id = this._indices[0]++;
} else {
id = this._indices.shift();
}
return id;
}
/**
* Releases the given reference id so that it will be available by
* another object stored
*
* @param {Number} id to release
*/
_releaseId(id) {
for (let i = 0; i < this._indices.length; i++) {
if (id < this._indices[i]) {
this._indices.splice(i, 0, id);
break;
}
}
// cleaning-up the sequence tail
for (let i = this._indices.length - 1; i >= 0; i--) {
if (this._indices[i] - 1 === this._indices[i - 1]) {
this._indices.pop();
} else {
break;
}
}
}
/**
* Stores the given object and returns the refernce id instead
*
* @param {Object} obj to store
*
* @returns {Number} reference id of the stored object
*/
put(obj) {
if (this._busyHandler && Object.keys(this._store).length === 0) {
this._busyHandler();
}
const id = this._genId();
this._store[id] = obj;
return id;
}
/**
* Retrieves previously stored object and releases its reference
*
* @param {Number} id of an object to retrieve
*/
fetch(id) {
const obj = this._store[id];
if (obj && !obj.__remote_method) {
delete this._store[id];
this._releaseId(id);
if (this._readyHandler && Object.keys(this._store).length === 0) {
this._readyHandler();
}
}
if (obj && obj.__promise_pair) {
this.fetch(obj.__promise_pair);
}
return obj;
}
}
const startupScript = `
import js
import micropip
import sys
import traceback
import asyncio
import os
os.environ["IMJOY_RPC_CONNECTION"] = 'unknown'
async def install_requirements(requirements):
await micropip.install(requirements)
async def run():
try:
await micropip.install(["imjoy-rpc==0.3.42", "numpy"])
# map imjoy_rpc to imjoy
import imjoy_rpc
from imjoy_rpc import connect_to_pyodide
from types import ModuleType
m = ModuleType("imjoy")
sys.modules["imjoy"] = m
api = await connect_to_pyodide()
m.api = api
js.__resolve(api)
except Exception as e:
js.__reject(traceback.format_exc())
asyncio.get_event_loop().run_until_complete(run())
`;
function runStartup(){
return new Promise((resolve, reject)=>{
self.__resolve = resolve
self.__reject = reject
self.pyodide.runPython(startupScript)
})
}
const toObject = (x) => {
if(x===undefined || x===null) return x;
if(self.pyodide.isPyProxy(x))
x = x.toJs({dict_converter : Object.fromEntries})
if (x instanceof Map) {
return Object.fromEntries(Array.from(
x.entries(),
([k, v]) => [k, toObject(v)]
))
} else if (x instanceof Array) {
return x.map(toObject);
} else {
return x;
}
}
const installedRequirements = []
async function installRequirements(requirements) {
requirements =
typeof requirements === "string"
? [requirements]
: requirements;
if (Array.isArray(requirements)) {
const python_packages = [];
for (var i = 0; i < requirements.length; i++) {
if(installedRequirements.includes(requirements[i].toLowerCase())) continue;
if (
requirements[i].toLowerCase().endsWith(".css") ||
requirements[i].startsWith("css:")
) {
if (requirements[i].startsWith("css:")) {
requirements[i] = requirements[i].slice(4);
}
link_node = document.createElement("link");
link_node.rel = "stylesheet";
link_node.href = requirements[i];
document.head.appendChild(link_node);
} else if (
// requirements[i].toLowerCase().endsWith(".js") ||
requirements[i].startsWith("js:")
) {
if (requirements[i].startsWith("js:")) {
requirements[i] = requirements[i].slice(3);
}
importScripts(requirements[i]);
} else if (requirements[i].startsWith("cache:")) {
//ignore cache
} else if (
requirements[i].toLowerCase().endsWith(".js") ||
requirements[i].startsWith("package:")
) {
if (requirements[i].startsWith("package:")) {
requirements[i] = requirements[i].slice(8);
}
python_packages.push(requirements[i]);
} else if (
requirements[i].startsWith("http:") ||
requirements[i].startsWith("https:")
) {
console.log(
"Unprocessed requirements url: " + requirements[i]
);
} else {
python_packages.push(requirements[i]);
}
}
if(python_packages.length>0){
const pyinstallRequirements = pyodide.globals.get("install_requirements");
await pyinstallRequirements(python_packages)
python_packages.forEach(pack=>installedRequirements.push(pack))
}
} else {
throw "unsupported requirements definition";
}
}
async function setupPyodide() {
if(self.pyodide) return;
self.pyodide = await loadPyodide({ indexURL : 'https://cdn.jsdelivr.net/pyodide/v0.20.1a1/full/' });
await self.pyodide.loadPackage(['micropip']);
}
function initializeIfNeeded(connection, default_config) {
connection.once("imjoyRPCReady", async data => {
const config = data.config || {};
let forwarding_functions = ["close", "on", "off", "emit"];
if (["rpc-window", "window"].includes(config.type || default_config.type)) {
forwarding_functions = forwarding_functions.concat([
"resize",
"show",
"hide",
"refresh"
]);
}
let credential;
if (config.credential_required) {
if (!Array.isArray(config.credential_fields)) {
throw new Error(
"Please specify the `config.credential_fields` as an array of object."
);
}
if (default_config.credential_handler) {
credential = await default_config.credential_handler(
config.credential_fields
);
} else {
credential = {};
for (let k in config.credential_fields) {
credential[k.id] = window.prompt(k.label, k.value);
}
}
}
connection.emit({
type: "initialize",
config: {
name: default_config.name,
type: default_config.type,
allow_execution: true,
enable_service_worker: true,
forwarding_functions: forwarding_functions,
expose_api_globally: true,
credential: credential
},
peer_id: data.peer_id
});
});
}
async function setup() {
let pyodidePromise;
let connected = false;
await api.registerService({
"type": "engine",
"pluginType": "web-python",
"icon": "🐍",
"name": 'web-python',
"url": 'imjoy.io/web-python',
lazy_connection: true,
async connect() {
try {
if (pyodidePromise) await pyodidePromise
else
pyodidePromise = await setupPyodide()
connected = true;
}
catch (e) {
connected = false;
throw e;
}
},
disconnect() {
},
async startPlugin(config, imjoy_interface, engine_utils) {
return new Promise(async (resolve_interface, reject_interface) => {
if (pyodidePromise) await pyodidePromise;
const coreConnection = new MessageEmitter()
coreConnection.emit = function (data) {
data.peer_id = this.peer_id;
// convert js to python data
this.messageCallback(data)
}
coreConnection.execute = async function (code) {
this.emit({ type: "execute", code: code });
}
globalThis.sendMessage = async function (data) {
data = toObject(data)
// convert python to js data
coreConnection._fire(data.type, data);
}
globalThis.setMessageCallback = (cb) => {
coreConnection.messageCallback = cb
}
coreConnection.on("initialized", data => {
const pluginConfig = data.config;
if (data.error) {
console.error("Failed to initialize the plugin", data.error);
return;
}
if (!data.peer_id) {
throw "Please provide a peer_id for the connection.";
}
coreConnection.peer_id = data.peer_id;
console.log("plugin initialized:", pluginConfig);
const core = new RPC(coreConnection, { name: "core" });
core.on("disconnected", details => {
console.log("status: plugin is disconnected", details);
});
core.on("remoteReady", () => {
console.log("status: plugin is ready");
});
core.on("remoteIdle", () => {
console.log("status: plugin is now idle");
});
core.on("remoteBusy", () => {
console.log("status: plugin is busy");
});
core.setInterface(imjoy_interface);
core.on("interfaceSetAsRemote", async () => {
core.on("remoteReady", async () => {
const _interface = core.getRemote();
if (_interface && Object.keys(_interface).length > 0) {
resolve_interface(_interface)
}
});
core.requestRemote();
});
core.sendInterface();
});
initializeIfNeeded(coreConnection, {})
await runStartup();
if (config["requirements"]) {
const pyinstallRequirements = pyodide.globals.get("install_requirements");
await pyinstallRequirements(config["requirements"])
}
const script = config["scripts"][0]
const lang = config["scripts"][0]["attrs"]["lang"]
if (script) {
coreConnection.on("executed", data => {
if (data.error) {
debugger
reject_interface(new Error(data.error));
}
});
coreConnection.execute(script);
}
})
},
heartbeat() {
return connected
}
})
}
api.export({ "setup": setup })
</script>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment