Skip to content

Instantly share code, notes, and snippets.

Created July 27, 2015 06:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattpodwysocki/33bfa5628e31b3cd3276 to your computer and use it in GitHub Desktop.
Save mattpodwysocki/33bfa5628e31b3cd3276 to your computer and use it in GitHub Desktop.
var spawn = require('child_process').spawn;
var Rx = require('rx');
function spawnAsObservable(command, args, options) {
return new Rx.AnonymousObservable(function (o) {
var cmd = spawn(command, args, options);
var dataHandler = function (data) { o.onNext(data); };
var errHandler = function (err) { o.onError(err); };
var closeHandler = function () { o.onCompleted(); }
cmd.stdout.addListener('data', dataHandler);
cmd.stderr.addListener('data', errHandler);
cmd.addListener('close', closeHandler);
return Rx.Disposable.create(function () {
cmd.stdout.removeListener('data', dataHandler);
cmd.stderr.removeListener('data', errHandler);
cmd.removeListener('close', closeHandler);
Copy link

I found one of the internal ones that might have been what I was thinking of but I swear it isn't:

  // Private: Maps an Update.exe process into an {Observable}
  // params - Arguments passed to the process
  // Returns an {Observable} with a single value, that is the output of the
  // spawned process
  spawnUpdateDotExe(params) {
    let updateDotExe = path.resolve(path.dirname(process.execPath), '..', 'update.exe');
    if (!fs.statSyncNoException(updateDotExe)) {
      return rx.Observable.return('');

    let spawnUpdate = rx.Observable.create((subj) => {
      let proc = spawn(updateDotExe, params);

      var stdout = '';
      let bufHandler = (b) => {
        let line = b.toString();
        stdout += line;

      proc.stdout.on('data', bufHandler);
      proc.stderr.on('data', bufHandler);
      proc.on('error', (e) => subj.onError(e));

      proc.on('close', (code) => {
        if (code === 0) {
        } else {
          subj.onError(new Error("Failed with exit code: " + code + "\nOutput:\n" + stdout));

    return spawnUpdate
      .reduce((acc, x) => acc += x, '');

Copy link

Why is stderr directed to an onNext call? Is the only error you want on the overall process error?

Copy link

It's not ideal, but lots of people write non-fatal warnings to stderr, so taking "any stderr output == onError" is probably not a good tack in the general sense. Exit code should always be the end-all for failed vs succeeded

Copy link

Alright, here's a general one:

import rx from 'rx';
import {spawn} from 'child_process';

// Public: Maps a process's output into an {Observable}
// exe - The program to execute
// params - Arguments passed to the process
// opts - Options that will be passed to child_process.spawn
// Returns an {Observable} with a single value, that is the output of the
// spawned process
export default function spawn(exe, params, opts=null) {
  let spawnObs = rx.Observable.create((subj) => {
    let proc = spawn(exe, params, opts);

    let stdout = '';
    let bufHandler = (b) => {
      let chunk = b.toString();
      stdout += chunk;

    proc.stdout.on('data', bufHandler);
    proc.stderr.on('data', bufHandler);
    proc.on('error', (e) => subj.onError(e));

    proc.on('close', (code) => {
      if (code === 0) {
      } else {
        subj.onError(new Error("Failed with exit code: " + code + "\nOutput:\n" + stdout));

  return spawnObs.reduce((acc, x) => acc += x, '');

Copy link

Typo on subj.onNext(line) should be subj.onNext(chunk)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment