Skip to content

Instantly share code, notes, and snippets.

@cowboyd
Last active July 19, 2019 17:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save cowboyd/dfd03e6edc3d29c82881b3386f071af2 to your computer and use it in GitHub Desktop.
Save cowboyd/dfd03e6edc3d29c82881b3386f071af2 to your computer and use it in GitHub Desktop.
What is the relationship between tasks and execution trees?
import { execute, call } from 'effection';
// One of the things that struck me was that when I was doing the
// implementation for the `enqueue` function in the task constraining
// functions
// (https://gist.github.com/cowboyd/19122da0c59c674cdea86cf6d70e9c75),
// was the eventual form it took.
//
// Like the other task constructors, it returned a function. Let's
// call it the "performance" operation. Unlike the others, the performance
// operation seemed really unique and cool. It doesn't really have any
// logic in of itself, but only resumes execution of a loop that forks
// a sub-process. Here it is again.
export function enqueue(proc) {
let loop = execute(function*() {
let tail = function*() {};
while (true) {
let args = yield;
// implementation
tail = this.fork(function*(prior) {
yield prior;
yield call(proc, ...args);
}, [tail]);
}
});
// performance funtion
return function(...args) {
loop.resume(args);
};
}
// The unique shape of this performance function seemed so generic
// that it made me wonder if I could re-write the other, more trivial
// task modifiers using this structure. It turns out that you
// can. If you model each one as an infinite loop that forks off a
// subprocess with each performance.
//
// This is what it looked like before:
export function throttleBefore(proc, maxConcurrency) {
let concurrency = 0;
return function(...args) {
if (concurrency < maxConcurrency) {
concurrency++;
execute(function*() {
try {
yield call(proc, ...args);
} finally {
concurrency--;
}
});
}
};
}
// And this is what it looks like after.
export function throttle(proc, maxConcurrency) {
let loop = execute(function*() {
// local state
let concurrency = 0;
while (true) {
// props for this invocation
let args = yield;
this.fork(function*() {
if (concurrency < maxConcurrency) {
try {
concurrency++;
yield call(proc, ...args);
} finally {
concurrency--;
}
}
});
}
});
// performance function
return (...args) => loop.resume(args);
}
// By moving all of the logit out of the performance function, we make
// the overall complexity of the function more complex, but the
// trade-off is that we can get our performance function for free, and
// it's clear where to slot in the logic: it's just the generator that
// gets forked.
// Here is the restart modelled the same way:
export function restartBefore(proc) {
// state
let current = { halt() {} };
//performance function
return function(...args) {
//logic
current.halt();
current = execute(proc, ...args);
};
}
//after
export function restart(proc) {
let loop = execute(function*() {
// state
let current = { halt() {} };
while (true) {
let args = yield;
this.fork(function() {
//logic
current.halt();
current = this.fork(proc, args);
});
}
});
// performance function
return (...args) => loop.resume(args);
}
// Again, the bits of the task fit into the familiar slots.
//
// For completeness the drop task constructor:
//before
export function dropBefore(proc) {
// state
let current = { isPending: false };
// performance
return function(...args) {
if (!current.isPending) {
current = execute(proc, ...args);
}
};
}
//after
export function drop(proc) {
let loop = execute(function*() {
let current = { isPending: false };
while (true) {
let args = yield;
this.fork(function() {
if (!current.isPending) {
current = this.fork(proc, args);
}
});
}
});
return (...args) => loop.resume(args);
}
// By now, we can see enough of a pattern to pose the question: what
// if a Task as we know it, is really just an execution tree "lifted"
// into the context of an infinite loop? We have enough information to
// take a stab at a `Task` class.
export class Task {
constructor(proc) {
this.proc = proc;
this.loop = execute(function*() {
while (true) {
let args = yield;
this.fork(proc, args);
}
});
}
perform(...args) {
this.loop.resume(args);
}
throttle(maxConcurrency) {
let concurrency = 0;
return this.flatMap(proc => function*(...args) {
if (concurrency < maxConcurrency) {
try {
concurrency++;
yield call(proc, ...args);
} finally {
concurrency--;
}
}
});
}
flatMap(sequence) {
let next = sequence(this.proc);
if (next instanceof Task) {
return next;
} else {
return new Task(next);
}
}
}
// We have a quasi-composable promitive in that now the main execution
// is always at the top level, instead of hidden somewhere inside of a
// performance function, and we can always derive new tasks using the
// flatMap function for sequencing tasks.
//
// Another alternative to flatmapping is to use the JavaScript
// pipeline operation when it becomes available. This pipeline is
// meaningless, but demonstrates what it might look like:
// let perfom = Task(countdownFrom |> throttle(2) |> enqueue |> drop);
// perform(10);
// I don't know if this is on the right track, but it seems like a
// step forward. I'm wonderisng how each task might be composed into a
// greater tree.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment