Skip to content

Instantly share code, notes, and snippets.

@mattpodwysocki
Created July 27, 2015 06:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattpodwysocki/33bfa5628e31b3cd3276 to your computer and use it in GitHub Desktop.
Save mattpodwysocki/33bfa5628e31b3cd3276 to your computer and use it in GitHub Desktop.
var spawn = require('child_process').spawn;
var Rx = require('rx');
function spawnAsObservable(command, args, options) {
return new Rx.AnonymousObservable(function (o) {
var cmd = spawn(command, args, options);
var dataHandler = function (data) { o.onNext(data); };
var errHandler = function (err) { o.onError(err); };
var closeHandler = function () { o.onCompleted(); }
cmd.stdout.addListener('data', dataHandler);
cmd.stderr.addListener('data', errHandler);
cmd.addListener('close', closeHandler);
return Rx.Disposable.create(function () {
cmd.stdout.removeListener('data', dataHandler);
cmd.stderr.removeListener('data', errHandler);
cmd.removeListener('close', closeHandler);
});
});
}
@anaisbetts
Copy link

I found one of the internal ones that might have been what I was thinking of but I swear it isn't:

  // Private: Maps an Update.exe process into an {Observable}
  //
  // params - Arguments passed to the process
  //
  // Returns an {Observable} with a single value, that is the output of the
  // spawned process
  spawnUpdateDotExe(params) {
    let updateDotExe = path.resolve(path.dirname(process.execPath), '..', 'update.exe');
    if (!fs.statSyncNoException(updateDotExe)) {
      return rx.Observable.return('');
    }

    let spawnUpdate = rx.Observable.create((subj) => {
      let proc = spawn(updateDotExe, params);

      var stdout = '';
      let bufHandler = (b) => {
        let line = b.toString();
        stdout += line;
        subj.onNext(line);
      };

      proc.stdout.on('data', bufHandler);
      proc.stderr.on('data', bufHandler);
      proc.on('error', (e) => subj.onError(e));

      proc.on('close', (code) => {
        if (code === 0) {
          subj.onCompleted();
        } else {
          subj.onError(new Error("Failed with exit code: " + code + "\nOutput:\n" + stdout));
        }
      });
    });

    return spawnUpdate
      .reduce((acc, x) => acc += x, '');

@mattpodwysocki
Copy link
Author

Why is stderr directed to an onNext call? Is the only error you want on the overall process error?

@anaisbetts
Copy link

It's not ideal, but lots of people write non-fatal warnings to stderr, so taking "any stderr output == onError" is probably not a good tack in the general sense. Exit code should always be the end-all for failed vs succeeded

@anaisbetts
Copy link

Alright, here's a general one:

import rx from 'rx';
import {spawn} from 'child_process';

// Public: Maps a process's output into an {Observable}
//
// exe - The program to execute
// params - Arguments passed to the process
// opts - Options that will be passed to child_process.spawn
//
// Returns an {Observable} with a single value, that is the output of the
// spawned process
export default function spawn(exe, params, opts=null) {
  let spawnObs = rx.Observable.create((subj) => {
    let proc = spawn(exe, params, opts);

    let stdout = '';
    let bufHandler = (b) => {
      let chunk = b.toString();
      stdout += chunk;
      subj.onNext(line);
    };

    proc.stdout.on('data', bufHandler);
    proc.stderr.on('data', bufHandler);
    proc.on('error', (e) => subj.onError(e));

    proc.on('close', (code) => {
      if (code === 0) {
        subj.onCompleted();
      } else {
        subj.onError(new Error("Failed with exit code: " + code + "\nOutput:\n" + stdout));
      }
    });
  });

  return spawnObs.reduce((acc, x) => acc += x, '');
}

@mattpodwysocki
Copy link
Author

Typo on subj.onNext(line) should be subj.onNext(chunk)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment