Skip to content

Instantly share code, notes, and snippets.

@alexander-alvarez
Created May 19, 2017 21:15
Show Gist options
  • Save alexander-alvarez/a944011f16e67ce9373436ca8dbbb983 to your computer and use it in GitHub Desktop.
Save alexander-alvarez/a944011f16e67ce9373436ca8dbbb983 to your computer and use it in GitHub Desktop.
Coalescing aync behavior implementation
import { module, test } from 'ember-qunit';
import run from 'ember-runloop'
import EmberObject from 'ember-object'
import RSVP from 'rsvp';
import { all } from 'ember-concurrency';
import { BufferedChannel } from 'pool/classes';
module('Unit | Utility | BufferedChannel');
test('BufferedChannel test', async function(assert) {
assert.expect(3);
// GIVEN
let args = [];
// An aync function that returns results for coalesced requests.
const fn = (abc, ids) => {
assert.deepEqual([abc, ids], ['abc', [1, 2]], 'the requests are coalesced');
return new RSVP.Promise((resolve, reject) => resolve({
'abc1': 'something that is only for 1',
'abc2': 'something that is only for 2'
}));
};
// An object that usesed the buffered channel
let Obj = EmberObject.extend({
task: new BufferedChannel(fn, {
enqueue(groupBy, num){
const id = this.queueId++;
this.queue.push({ id, num, groupBy });
return id;
},
dequeue({ args, id }){
if (this.results.hasOwnProperty(id)) {
this.queue = this.queue.filter(({ id }) => id !== id);
return this.results[id][args[0] + args[1]];
}
return null;
},
coalesceEnqueuedRequests(){
const req = this.queue.filter(({ id }) => !this.results.hasOwnProperty(id)).reduce((accum, { id, num, groupBy }) => {
if (accum.groupBy && accum.groupBy !== groupBy) {
return accum;
} else {
accum.groupBy = groupBy;
}
accum.ids.push(id);
accum.nums.push(num);
return accum;
}, { groupBy: null, nums: [], ids: [] });
return { args: [req.groupBy, req.nums], ids: req.ids };
}
})
});
let obj;
let ref1;
let ref2;
run(async function() {
obj = Obj.create();
// WHEN we perform requests within the allotted timeframe to be coalesced
ref1 = obj.get('task.go').perform('abc', 1);
ref2 = obj.get('task.go').perform('abc', 2);
});
const [res1, res2] = await all([ref1, ref2]);
// THEN
// the requests are proxied and pull from the same result.
assert.equal('something that is only for 1', res1);
assert.equal('something that is only for 2', res2);
});
import EmberObject from 'ember-object';
import { task, timeout } from 'ember-concurrency';
import { assert } from 'ember-metal/utils'
/**
* @class BufferedChannel
*/
export default class BufferedChannel extends EmberObject.extend({
requestState: null,
timeout: null,
go: task(function*() {
let args = [...arguments];
const id = this.get('requestState').enqueue(...args);
yield timeout(this.get('timeout'));
return this.get('bulkRequestHelper').perform({ args, id });
}),
bulkRequestHelper: task(function*({ args, id }) {
assert('id must be returned from enqueue', id !== undefined);
// if request is pending block, otherwise make request
const requestState = this.get('requestState');
if (!requestState.isRequestPendingOrResolved({ args, id })) {
const { args: coalescedArgs, ids } = requestState.coalesceEnqueuedRequests();
requestState.registerPending(ids);
const data = yield this.asyncFunction(...coalescedArgs);
requestState.registerResult(ids, data);
}
return this.get('blockUntilDelegated').perform({ args, id });
}),
blockUntilDelegated: task(function*({ args, id }) {
let results = null;
while (results === null) {
yield timeout(100); // pooling frequency to see if bulk request has updated
results = this.get('requestState').dequeue({ args, id });
}
return results;
})
}) {
constructor(asyncFunction, { enqueue, dequeue, coalesceEnqueuedRequests }, timeout = 500) {
super();
assert('Enqueue, dequeue, and coalesceEnqueuedRequests must be defined', enqueue && dequeue && coalesceEnqueuedRequests);
let requestState = new RequestState({ enqueue, dequeue, coalesceEnqueuedRequests });
this.setProperties({
asyncFunction,
requestState,
timeout
});
}
}
class RequestState {
constructor({ enqueue, dequeue, coalesceEnqueuedRequests }) {
this.queueId = 0;
this.queue = [];
this.requested = {};
this.results = {};
this.enqueue = enqueue;
this.dequeue = dequeue;
this.coalesceEnqueuedRequests = coalesceEnqueuedRequests;
}
/**
* Queues up request arguments (on this.queue) and returns and id that corresponds to those arguments.
*
* @method enqueue
* @return id to trace this request through it's existence
*/
enqueue() {}
/**
* Reads the queue and coalesces similar requests. Returns an object in the format:
* ```
* {
* args: [],
* ids: []
* }
* ```
* Where args are the potentially coalesced arguments to apply to the asynchronous function and
* ids are the argument ids that were coalesced into one request.
*
* Returns a list of arguments to be applied to the async function, relating to 1 or more enqueued argument sets.
*
* @method coalesceEnqueuedRequests
* @return Array arguments to be applied to the async function
*/
coalesceEnqueuedRequests() {}
/**
* Returns a boolean value indicating whether or not a request has been made that will contain the outputs
* necessary for the incoming arguments
*
* @method isRequestPending
* @property id Arguments id (returned from enqueue method)
* @return Boolean
*/
isRequestPendingOrResolved({ id }) {
return this.requested.hasOwnProperty(id) || this.results.hasOwnProperty(id);
}
/**
* Returns either a value if resolved or null if still not resolved.
* The returned value will be returned as resolved value for the taskInstance for the original request.
*
* @method dequeue
* @param args
* @param id
*/
dequeue({ args, id }) {}
/**
*
* @method registerPending
* @property ids List[String] of ids that map to request arguments
*/
registerPending(ids) {
ids.forEach((id) => this.requested[id] = true);
}
registerResult(ids, data) {
ids.forEach((id) => {
this.results[id] = data;
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment