Skip to content

Instantly share code, notes, and snippets.

@scottpersinger
Last active November 23, 2023 04:34
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save scottpersinger/e038ddc7c094c14bde0a to your computer and use it in GitHub Desktop.
Save scottpersinger/e038ddc7c094c14bde0a to your computer and use it in GitHub Desktop.
Node-based ETL pipeline

Node.js offers a great environment for building ETL scripts. This is because Node is very easy to program and work with, AND has interface libraries for almost everything under the sun.

We need a framework that makes writing ETL scripts easy:

  • Supports creation and re-use of components
  • Cleanly divides input/output concerns from transformation concerns
  • Works identically in both event-driven and batch scenarios
  • Supports an easy development/testing mode
  • Provides good visibility when running in production
  • Has a simple DSL for expressing transformation logic
  • Supports SQL easily as a transformation/aggregation tool
  • Provides a strong prescriptive framework to make future maintenance easier

Data pipeline

The framework is built around the simple concept of a pipeline. Components are assembled into nodes in DAG to construct the pipeline, and data passes through the pipeline from inputs to outputs. This is similar to the concept of UNIX pipes and the pipeline Node Red.

Data is contained in messages and messages are passed down the pipeline. By default, messages passed as input to a component will be generated unchanged as output from the component. However, a component may choose to edit or remove messages from the pipeline, or a component may generate new messages to be added to the pipeline.

Messages have a fixed "meta" schema, but may contain loosely-typed data.

In addition to messages, the pipeline contains a single global context. This dictionary contains value which may be setup by initialization of the pipeline or set by components during processing. The context lives for the duration of the pipeline processing (until the program exits).

Components

The pipeline is

Example

// Read table "roles":
//            firstname, lastname, email_address, salary, bonus
// Write to table "users":
//            name, email, compensation
var dbquery = require('dbquery');
var dbwriter = require('dbwriter');
var Pipeline = require('pipeline');
var Scheduler = require('scheduler');

pipeline = Pipeline();
pipeline.use(dbquery('select * from roles'));
pipeline.use(function(msgs, context) {
  return msgs.map(function(role) {
    return {
      meta: role.meta,
      payload: {
        name: role.payload.firstname + ' ' + role.payload.lastname,
        email: role.payload.email_address,
        compensation: role.payload.salary + role.bonus
      }
    }
  });
});
pipeline.use(dbwriter('users'));

Scheduler.run(pipeline, {interval: 60*60});

Component interface

Components can be written as either synchronous or async functions, depending on the signature of the function:

f(msg_array, context) - Indicates a synchronous function. The return value will be added to the pipeline. An error should be reported by throwing an exception.

f(msg_array. context, callback) - Indicates an async function. Invoke the callback as callback(err, msgs).

Example

def dbquery(msg_array, context, callback) {
  db.select("select * from users", function(err, rows) {
    if (err) {
      callback(err);
    } else {
      callback(null, rows.map(function(row) {return {payload:row}}));
    }
  });
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment