Skip to content

Instantly share code, notes, and snippets.

@DylanLukes
Created July 22, 2014 14:48
Show Gist options
  • Save DylanLukes/62e6449f8eb1088dbb0a to your computer and use it in GitHub Desktop.
Save DylanLukes/62e6449f8eb1088dbb0a to your computer and use it in GitHub Desktop.
var noflo = require('noflo');
var fbp = require('fbp');
var fs = require('fs');
var path = require('path');
var util = require('util');
var browserify = require('browserify');
var coffeeify = require('coffeeify');
var through = require('through2');
var stream = require('stream');
var wait = require('wait.for');
var _ = require('lodash');
var tmp = require('tmp');
tmp.setGracefulCleanup();
var isOfExt = function (file, ext) {
return path.extname(file) === ext;
};
var isFBP = _.partialRight(isOfExt, '.fbp');
var isJSON = _.partialRight(isOfExt, '.json');
// A variant for wait.forMethod for methods that do not use Node-style callbacks,
// (e.g. (err, data)) but just callback with data. It is used for the Loader.
wait.forSimpleMethod = function (obj, methodName) {
var method = function (cb) { obj[methodName](function (data) { cb(null, data); })};
if (!method) throw new Error('wait.forSimpleMethod: second argument must be the async method name (string)');
var newargs = Array.prototype.slice.call(arguments, 2); // remove obj and method name from args
return wait.applyAndWait(obj, method, newargs);
}
// Loads a codegen/shim template (memoized).
var loadTemplate = _.memoize(function (name) {
return fs.readFileSync(path.resolve(__dirname, path.join('../templates/', name)));
});
// Converts an FBP definition into a component module.
var fbpify = function (file) {
if (!isFBP(file)) { return through(); }
var data = '';
function write (buf) { data += buf; }
function end () {
var src;
try {
src = _.template(loadTemplate('subgraph.js.tpl'), {
graphDefinition: fbp.parse(data)
});
} catch (error) {
this.emit('error', error);
}
this.queue(src);
this.queue(null);
}
return through(write, end);
};
// FBP JSON graph expansion utility.
// Replaces process with processes, bridging the exported in- and out- ports
// of process
var expandVertex = function (graph, subgraph) {
};
var generateGraphJs = function (payload) {
var rs = stream.Readable();
// We make clones because we will remove nodes and edges as they are added.
rs._graph = _.cloneDeep(payload.graph);
// We convert processes into an assoc-list for convenience.
rs._nodes = _.transform(rs._graph.processes, function(result, properties, process) {
result.push({name: process, component: properties.component});
}, []);
rs._initialEdges = [];
rs._properEdges = [];
// Classify edges so we can codegen proper edges first.
_.each(rs._graph.connections, function (conn) {
if (conn.data && conn.src) {
// todo: throw
}
if (conn.data) { rs._initialEdges.push(conn); }
if (conn.src) { rs._properEdges.push(conn); }
});
rs._lookup = _.cloneDeep(payload.resolved);
rs._templates = {
preamble: "var processes = {}, sockets = [], noflo = require('<%= noflo %>');\n",
process: "processes.<%= process %> = require('<%= path %>').getComponent();\n",
properEdge: "sockets[<%= id %>] = noflo.internalSocket.createSocket();\n" +
"processes.<%= edge.src.process %>.outPorts['<%= edge.src.port %>'].attach(sockets[<%= id %>]);\n" +
"processes.<%= edge.tgt.process %>.inPorts['<%= edge.tgt.port %>'].attach(sockets[<%= id %>]);\n",
initialEdge: "sockets[<%= id %>] = noflo.internalSocket.createSocket();\n" +
"processes.<%= edge.tgt.process %>.inPorts['<%= edge.tgt.port %>'].attach(sockets[<%= id %>]);\n" +
"sockets[<%= id %>].send(<%= data %>);\n" +
"sockets[<%= id %>].disconnect();\n"
};
// We respect .push returning false, so we need a way to keep track of where we were.
// 0: nothing done
// 1: preamble done
// 2: processes done
// 3: proper edges done
// 4: initial data edges done (totally done)
rs._state = 0;
rs._id = 0;
rs.uniqueId = function () {
return rs._id++;
};
// We're only pushing UTF-8 strings, so this is cleaner.
rs.push = _.partialRight(rs.push, 'utf-8');
rs._read = function () {
var more = true;
codegen:
do {
switch (rs._state) {
case 0:
more = rs.push(_.template(rs._templates.preamble, {
noflo: require.resolve('noflo')
}));
rs._state = 1;
break;
case 1:
if (_.isEmpty(rs._nodes)) { rs._state = 2; continue; }
var node = rs._nodes.pop();
more = rs.push(_.template(rs._templates.process, {
process: node.name,
path: rs._lookup[node.component]
}));
break;
case 2:
if (_.isEmpty(rs._properEdges)) { rs._state = 3; continue; }
more = rs.push(_.template(rs._templates.properEdge, {
edge: rs._properEdges.pop(),
id: rs.uniqueId()
}));
break;
case 3:
if (_.isEmpty(rs._initialEdges)) { rs._state = 4; continue; }
var edge = rs._initialEdges.pop();
more = rs.push(_.template(rs._templates.initialEdge, {
edge: edge,
data: JSON.stringify(edge.data),
id: rs.uniqueId()
}));
break;
case 4:
rs.push(null);
default:
break codegen;
}
} while (more);
return;
};
return rs;
};
module.exports = function (grunt) {
grunt.registerMultiTask('noflo', 'Grunt plugin for creating browserified NoFlo builds for running a graph', function () {
var options = this.options({
platform: 'noflo-browser',
exclude: [
'emitter',
'btoa',
'atob',
'fs',
'read-installed',
'path',
'coffee-script',
'buffer',
],
ignore: [
'node_modules/noflo/lib/nodejs/ComponentLoader.js',
'node_modules/noflo/lib/Network.js',
'node_modules/noflo/lib/Graph.js',
'node_modules/noflo/lib/Journal.js',
'node_modules/noflo/lib/ComponentLoader.js'
],
baseDir: process.cwd(),
// Recognized extensions
extensions: ['.js', '.json', '.coffee', '.fbp'],
transforms: ['coffeeify'],
browserifyOptions: {
debug: false,
insertGlobals: false,
detectGlobals: true
},
flattenSubgraphs: true,
});
var done = this.async();
var bundler = new browserify({
extensions: options.extensions
});
_.each(options.exclude, function (file) { bundler.exclude(file); });
_.each(options.ignore, function (file) { bundler.ignore(file); });
_.each(options.transforms, function (t) { bundler.transform(t); });
bundler.transform(fbpify);
// The task's body proper is wrapped in a fiber, to keep the code clean
// even with asychrony. Coroutines > pipes for this use case.
wait.launchFiber(function () {
_.each(this.files, function (f) {
_(f.src)
// PHASE 0 (PRE)
// Filter out files that cannot be found, warning along the way.
.filter(function (filePath) {
if (!grunt.file.exists(filePath)) {
grunt.log.warn('Source file "' + filePath + '" not found.');
return false;
} else {
return true;
}
})
// PHASE 1 (LOAD)
// Load all graphs normalized to JSON.
.map(function (graphPath) {
if (isFBP(graphPath)) {
try {
return fbp.parse(grunt.file.read(graphPath));
} catch(err) {
grunt.log.error('Source graph "' + graphPath + '" is invalid.');
grunt.log.error(err);
}
// fs.createReadStream(graphPath).pipe(fbpify).pipe(process.stdout);
// process.exit()
} else {
if (isJSON(graphPath)) {
grunt.log.warn('Source graph "' + graphPath + '" has an unrecognized extension. Assuming json.');
}
return grunt.file.readJSON(graphPath);
}
})
// PHASE 2 (NORMALIZE)
// Resolve and annotate a list of required (for use) components, treating
// subgraphs as components. Additionally, warn on missing components.
.map(function (graph) {
return {
graph: graph,
required: _(graph.processes).values().pluck('component').unique().valueOf()
};
})
// PHASE 3 (ANALYZE/RESOLVE)
// From this point on, the pipeline contains an annotatable payload, rather than a bare graph.
// Compile a list of files to bundle, checking that each components exists.
.map(function (payload) {
var loader = new noflo.ComponentLoader(options.baseDir);
var components;
try {
// The loader doesn't use node-style callbacks, so we resort to
components = wait.forSimpleMethod(loader, 'listComponents');
} catch (err) {
// This is as absurd as it looks. Seems like a waitfor/fiber bug.
if (!util.isError(err)) { components = err; }
else { throw err; }
}
var resolved = {};
_(payload.required).each(function (componentName) {
if (!_.has(components, componentName)) {
grunt.log.error('Source component "' + componentName + '" is missing.');
}
resolved[componentName] = components[componentName];
});
return _.merge(payload, {
resolved: resolved
});
})
// PHASE 4 (OPTIMIZE)
.map(function (payload) {
if (options.flattenSubgraphs) {
}
return payload;
})
// PHASE 4 (GENERATE)
// Generate graph js and bundle
.map(function (payload) {
_(payload.resolved).values().each(function (componentPath) {
bundler.add(componentPath);
});
var tmpPath = wait.forMethod(tmp, 'tmpName');
generateGraphJs(payload).pipe(fs.createWriteStream(tmpPath, {encoding: 'utf-8'}));
bundler.require(tmpPath, {
basedir: options.baseDir,
expose: 'main'
});
var d = bundler.deps();
var seen = [];
d.on('data', function (data) {
if (seen.indexOf(data.id) !== -1 || data.id === tmpPath) {
return;
}
grunt.log.writeln('Bundling ' + data.id);
seen.push(data.id);
});
bundler.bundle(options.browserifyOptions, function (err, src) {
if (err) { grunt.log.error(err); }
grunt.file.write(f.dest, src);
grunt.log.writeln('Target bundle "' + f.dest + '" built.');
done();
});
})
.valueOf(); // not that we really care...
});
}.bind(this));
});
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment