Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Interactively invoke remote resources: REST function parameters
// awaitable queue
class AwaitableQueue {
constructor(){
this.queue = []
this.resolvers = []
}
pop(){
if(this.queue.length > 0) return Promise.resolve(this.queue.shift());
return new Promise((resolve, reject) => this.resolvers.push(resolve) )
}
push(payload){
if(this.resolvers.length > 0) this.resolvers.shift()(payload)
else this.queue.push(payload);
}
}
// session keeps track of closures and callbacks
class Session extends AwaitableQueue {
constructor(){
super()
this.closures = []
this.callbacks = []
}
pushClosure(fn){ return this.closures.push(fn) - 1 }
pushCallback(fn){ return this.callbacks.push(fn) - 1 }
}
// creates functions that create call messages
function dispatch(sess, func){
return (...args) => new Promise((resolve, reject) => {
let funcs = []
let param = args.map(k => typeof k === 'function' ?
(funcs.push(sess.pushClosure(k)), null) : (funcs.push(null), k))
sess.push({ type: 'call', func: func, param: param, funcs: funcs, callback: sess.pushCallback(resolve) })
})
}
// interprets messages, invokes functions and sends return messages
function handle(sess, msg, handler){
if(msg.type === 'call'){
let fn = msg.func === -1 ? handler : sess.closures[msg.func]
Promise.resolve(fn(...msg.param.map((k, i) => msg.funcs[i] === null ? k : dispatch(sess, msg.funcs[i]) )))
.then(value => sess.push({ type: 'return', value: value, callback: msg.callback, func: msg.func }))
}else if(msg.type === 'return'){
sess.callbacks[msg.callback](msg.value)
delete sess.callbacks[msg.callback]
}
}
// mock server / fetch api
// should be easy to replace with real functions
var endpoints = {}
async function fetch(path, opts){
let data = await endpoints[path](JSON.parse(opts.body))
console.log('POST ' + path + ':', opts.body, '-->', JSON.stringify(data))
return { async json(){ return data } }
}
async function serve(path, fn){
endpoints[path] = fn;
}
let sessions = []
// serverside register handler for a particular path
function register(path, handler){
serve(path, async function(data){
if(data.func === -1 && data.type === 'call')
data.sess = sessions.push(new Session()) - 1; // allocate a new queue
let sess = sessions[data.sess];
if(!sess) throw new Error('session not found')
handle(sess, data, handler)
let resp = await sess.pop()
if(resp.func === -1 && resp.type === 'return') delete sessions[data.sess]; // deallocate a queue
return { ...resp, sess: data.sess };
})
}
// clientside invoke function mounted at particular path
async function call(path, ...args){
let sess = new Session(), sid;
dispatch(sess, -1)(...args)
while(true){
let data = { ...(await sess.pop()), sess: sid }
let resp = await (await fetch(path, { body: JSON.stringify(data) })).json()
handle(sess, resp, x => { throw new Error('no default handler for client agent') })
sid = resp.sess;
if(resp.func === -1 && resp.type === 'return') return resp.value;
}
}
// register an example reduce endpoint
register('/reduce', async function(list, fn){
if(list.length === 0) return null;
if(list.length === 1) return list[0];
let init = await fn(list[0], list[1])
for(let i = 2; i < list.length; i++){
init = await fn(init, list[i])
}
return init
})
;(async function(){
let result = await call('/reduce', [1, 2, 3, 4], (a, b) => {
return a + b
})
console.log('final result: ', result)
})()
// register('/bidirectional', async function(list, fn){
// return await fn(list[0], list[1],
// (a, b) => a + b)
// })
// call('/bidirectional', [1, 2], async function(a, b, op){
// return await op(a, b)
// }).then(k => console.log(k))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.