Skip to content

Instantly share code, notes, and snippets.

@spion
Last active October 14, 2015 17:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save spion/42528d641779983de252 to your computer and use it in GitHub Desktop.
Save spion/42528d641779983de252 to your computer and use it in GitHub Desktop.
var observer = require('promise-observer')
var Promise = require('bluebird');
var assert = require('assert');
function mkTransaction(tId) {
var queries = []
return {
query: function query(q) { queries.push(q); return query },
close: function() { queries.push('close ' + tId); return queries; }
}
}
function makeEnvironment() {
var emit;
var onTransaction = observer.create(function(e) { emit = e; })
return {
subscribe: onTransaction,
start: function(id) {
var tx = mkTransaction(id);
return emit(tx.query).then(tx.close)
}
}
}
function test() {
var env = makeEnvironment();
var q1 = env.subscribe(function(q) {
return Promise.resolve(1).delay(1).then(q)
});
var q2 = env.subscribe(function(q) {
return Promise.resolve(2).delay(9).then(q)
});
var q3 = env.subscribe(function(q){
return Promise.resolve(3).delay(20).then(q)
});
env.start(1).then(function(queries) {
console.log(queries);
assert.deepEqual(queries, [1,2,3,'close 1']);
})
env.start(2).then(function(queries) {
console.log(queries);
assert.deepEqual(queries, [1,2,3,'close 2']);
})
}
test();
@ysmood
Copy link

ysmood commented Oct 14, 2015

I think you are doing a simple thing in a wrong direction, but I will show you Yaku can do it any way.

var Observable = require('yaku/lib/Observable')
var Promise = require('bluebird');
var assert = require('assert');

function mkTransaction(tId) {
    var queries = []
    return {
        query: function query(q) { queries.push(q); return query },
        close: function() { queries.push('close ' + tId); return queries; }
    }
}

function makeEnvironment() {
    var trans = new Observable();
    return {
        subscribe: trans.subscribe.bind(trans),
        start: function(id) {
            var tx = mkTransaction(id);

            var out = Observable.tree(trans).subscribe(tx.close);
            trans.emit(tx.query);

            return new Promise(out.subscribe.bind(out));
        }
    }
}

function test() {
    var env = makeEnvironment();

    var q1 = env.subscribe(function(q) {
        return Promise.resolve(1).delay(1).then(q)
    });
    var q2 = env.subscribe(function(q) {
        return Promise.resolve(2).delay(9).then(q)
    });
    var q3 = env.subscribe(function(q){
        return Promise.resolve(3).delay(20).then(q)
    });


    env.start(1).then(function(queries) {
        console.log(queries);
        assert.deepEqual(queries, [1,2,3,'close 1']);
    })

    env.start(2).then(function(queries) {
        console.log(queries);
        assert.deepEqual(queries, [1,2,3,'close 2']);
    })
}

test();

@ysmood
Copy link

ysmood commented Oct 14, 2015

Your implementation's biggest problem is that you are simply using function to wrap a list of promises, people cannot see pub-sub pattern in your code. I would like to call your lib promise-all rather than promise-observer.

@ysmood
Copy link

ysmood commented Oct 14, 2015

Your lib cannot control how to subscribe other observables, you simply subscribe all of them, for example your lib can do this? Only the makeEnvironment is allowed to be changed:

var Observable = require('yaku/lib/Observable')
var Promise = require('bluebird');
var assert = require('assert');

function mkTransaction(tId) {
    var queries = []
    return {
        query: function query(q) { queries.push(q); return query },
        close: function() { queries.push('close ' + tId); return queries; }
    }
}

function makeEnvironment() {
    var trans = new Observable();
    return {
        subscribe: trans.subscribe.bind(trans),
        start: function(id) {
            var tx = mkTransaction(id);

            var out = Observable.all(trans.subscribers.slice(0, 3)).subscribe(tx.close);
            trans.emit(tx.query);

            return new Promise(out.subscribe.bind(out));
        }
    }
}

function test() {
    var env = makeEnvironment();

    var q1 = env.subscribe(function(q) {
        return Promise.resolve(1).delay(1).then(q)
    });
    var q2 = env.subscribe(function(q) {
        return Promise.resolve(2).delay(9).then(q)
    });
    var q3 = env.subscribe(function(q){
        return Promise.resolve(3).delay(20).then(q)
    });
    var q4 = env.subscribe(function(q){
        return new Promise(function () {});
    });


    env.start(1).then(function(queries) {
        console.log(queries);
        assert.deepEqual(queries, [1,2,3,'close 1']);
    })

    env.start(2).then(function(queries) {
        console.log(queries);
        assert.deepEqual(queries, [1,2,3,'close 2']);
    })
}

test();

@spion
Copy link
Author

spion commented Oct 14, 2015

The name is adequate. Its meant to resemble typical synchronous Java / NET observers, where once you notify your subscribers you have to wait for their update functions to execute. I might want to add that to the readme though, thanks.

For that use case, there is no reason to wait only for a subset of observers. As I've said before, its meant to solve a different problem.

Also, it seems that your solution doesn't really work right if I change the event timings a little:

var YObservable = require('yaku/lib/Observable')
var PObservable = require('promise-observer')
var Promise = require('bluebird');
var assert = require('assert');

function mkTransaction(tId) {
    var queries = []
    return {
        query: function query(q) { queries.push(q); return query },
        close: function() { queries.push('close ' + tId); return queries; }
    }
}

function makeYakuEnvironment() {
    var trans = new YObservable();
    return {
        subscribe: trans.subscribe.bind(trans),
        start: function(id) {
            var tx = mkTransaction(id);
            var out = YObservable.tree(trans).subscribe(tx.close);
            trans.emit(tx.query);
            return new Promise(out.subscribe.bind(out));
        }
    }
}

function makePOEnvironment() {
    var emit;
    var onTransaction = PObservable.create(function(e) { emit = e; })
    return {
        subscribe: onTransaction,
        start: function(id) {
          var tx = mkTransaction(id);
          return emit(tx.query).then(tx.close)
        }
    }
}




function test(tag, makeEnvironment) {
    var env = makeEnvironment();

    var q1 = env.subscribe(function(q) {
        return Promise.resolve(1).delay(1).then(q)
    });
    var q2 = env.subscribe(function(q) {
        return Promise.resolve(2).delay(9).then(q)
    });
    var q3 = env.subscribe(function(q){
        return Promise.resolve(3).delay(20).then(q)
    });


    var p1 = env.start(1);
    var p2 = Promise.delay(25).then(function() {
        return env.start(2)
    });
    var p3 = Promise.delay(50).then(function() {
        return env.start(3);
    })
    var all = Promise.all([p1, p2, p3])
    all.then(function(items) {
        console.log(tag, items)
        assert.deepEqual(items[0], [1,2,3,'close 1']);
        assert.deepEqual(items[1], [1,2,3,'close 2']);
        assert.deepEqual(items[2], [1,2,3,'close 3']);
    })
}

test("Yaku", makeYakuEnvironment);
test("PO", makePOEnvironment)

Output:

PO [ [ 1, 2, 3, 'close 1' ],
  [ 1, 2, 3, 'close 2' ],
  [ 1, 2, 3, 'close 3' ] ]
Yaku [ [ 1, 2, 3, 'close 1', 'close 1', 'close 1' ],
  [ 1, 2, 'close 2', 3, 'close 2' ],
  [ 1, 2, 3, 'close 3' ] ]

@ysmood
Copy link

ysmood commented Oct 14, 2015

No problem with Yaku. Which version are you using?

PO [ [ 1, 2, 3, 'close 1' ],
  [ 1, 2, 3, 'close 2' ],
  [ 1, 2, 3, 'close 3' ] ]
Yaku [ [ 1, 2, 3, 'close 1' ],
  [ 1, 2, 3, 'close 2' ],
  [ 1, 2, 3, 'close 3' ] ]

@spion
Copy link
Author

spion commented Oct 14, 2015

@ysmood npm ls says

├── bluebird@2.10.2
├── promise-observer@1.0.12
└── yaku@0.10.0

Works with the original example, but not with the updated one where the starts are delayed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment