Skip to content
View fromPromise.js
var FromPromiseObservable = (function(__super__) {
inherits(FromPromiseObservable, __super__);
function FromPromiseObservable(p) {
this.p = p;
__super__.call(this);
}
FromPromiseObservable.prototype.subscribeCore = function(o) {
this.p
.then(function (data) {
View currentthread.js
Rx.Observable.fromArray([1,2,3], Rx.Scheduler.currentThread)
.flatMap(function (x) {
return Rx.Observable.repeat(x, x);
})
.subscribe(function (x) {
console.log('Next: %s', x)
});
// => Next 1
// => Next 2
View zip.js
/* CombineLatest with staggering intervals */
var source1 = Rx.Observable.interval(100)
.map(function (i) { return 'First: ' + i; });
var source2 = Rx.Observable.interval(150)
.map(function (i) { return 'Second: ' + i; });
// Combine latest of source1 and source2 whenever either gives a value with selector
var source = source1.combineLatest(
source2
View tcpmultiplex.js
'use strict';
var multiplex = require('multiplex');
function noop () { }
function muxServerBridge(tcpEndpointServerPort) {
var serverPlex = multiplex({}, function(stream, id) {
var clientSocket = net.createConnection({port: tcpEndpointServerPort});
stream.pipe(clientSocket).pipe(stream);
View spawn.js
var spawn = require('child_process').spawn;
var Rx = require('rx');
function spawnAsObservable(command, args, options) {
return new Rx.AnonymousObservable(function (o) {
var cmd = spawn(command, args, options);
var dataHandler = function (data) { o.onNext(data); };
var errHandler = function (err) { o.onError(err); };
var closeHandler = function () { o.onCompleted(); }
View tests.js
var test = require('tape');
var net = require('net');
var randomstring = require('randomstring');
var ThaliEmitter = require('../thali/thaliemitter');
test('ThaliEmitter can call startBroadcasting and endBroadcasting without error', function (t) {
var e = new ThaliEmitter();
console.log('about to call startBroadcasting');
e.startBroadcasting((+ new Date()).toString(), 5000, function (err1) {
View tap-and-hold.js
const touchStart = Rx.Observable.fromEvent(window, 'touchstart').timestamp();
const touchMove = Rx.Observable.fromEvent(window, 'touchmmove');
const touchEnd = Rx.Observable.fromEvent(window, 'touchend').timestamp();
const touchAndHold = touchStart
.flatMap(() => touchEnd.takeUntil(touchMove), (x, y) => { start: x.timestamp, end: y.timestamp })
.filter(ts => (ts.end - ts.start) / 1000 > 2);
const subscription = touchAndHold.subscribe(
() => console.log('touch and hold!')
View bacon-vs-rx.md

Answer the Question: Q: What's your opinion in the claims of RxJS having too cumbersome API? AFAIK eg. Bacon.js was borned from that frustration.

Bacon.js was created for a number of reasons, the first of which was due to the nature of the source of RxJS. At the time that RxJS was created, it was not open source back in 2010, and in fact wasn't open sourced until 2012. That was the first frustration because Juha, the author of Bacon.js, could not fully understand the code, nor was it well documented. We have since remedied that in a number of ways with our ReactiveX.io site which contains all implementations of Rx and their operators. In addition, we have added extensive documentation for RxJS as well on the RxJS Documentation Page

The second reason is that RxJS was not idiomatic JavaScript at the time of its inception, in fact it more looked like C# just transla

View retrywhen.js
Rx.Observable.create(o => {
console.log("subscribing");
o.onError(new Error("always fails"));
}).retryWhen(attempts -> {
return attempts.zip(Rx.Observable.range(1, 3), (n, i) => i).flatMap(i => {
console.log("delay retry by " + i + " second(s)");
return Rx.Observable.timer(i * 1000);
});
}).subscribe();
View sample.js
function noop() { }
var ThaliAclDb = require('./lib/thaliacldb');
var express = require('express');
var app = express();
var acl = new ThaliAclDb('acl', { db: require('memdown')});
acl.addRole('/guest')
.then(function () {
Something went wrong with that request. Please try again.