Skip to content

Instantly share code, notes, and snippets.

@leebyron
Last active August 8, 2017 18:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save leebyron/85115c44c89bf2c202ae8e6450978654 to your computer and use it in GitHub Desktop.
Save leebyron/85115c44c89bf2c202ae8e6450978654 to your computer and use it in GitHub Desktop.
requirebin sketch
const Observable = require("zen-observable");
let sink;
const list = [];
const error = new Error();
new Observable(_sink => {
sink = _sink;
return () => list.push('cleanup');
}).subscribe({
next: val => {
list.push('next:' + val);
throw error;
},
error: err => {
list.push('error-handler:');
list.push(err);
},
complete: () => list.push('complete'),
});
try {
sink.next(1);
} catch (e) {
list.push('try-catch:');
list.push(e);
}
try {
sink.next(2);
} catch (e) {
list.push('try-catch:');
list.push(e);
}
console.log(list);
setTimeout(function(){
;require=(function e(t,n,r){function s(o,u){if(!n[o]){if(!t[o]){var a=typeof require=="function"&&require;if(!u&&a)return a(o,!0);if(i)return i(o,!0);var f=new Error("Cannot find module '"+o+"'");throw f.code="MODULE_NOT_FOUND",f}var l=n[o]={exports:{}};t[o][0].call(l.exports,function(e){var n=t[o][1][e];return s(n?n:e)},l,l.exports,e,t,n,r)}return n[o].exports}var i=typeof require=="function"&&require;for(var o=0;o<r.length;o++)s(r[o]);return s})({1:[function(require,module,exports){
'use strict'; (function(fn, name) { if (typeof exports !== 'undefined') fn(exports, module); else if (typeof self !== 'undefined') fn(name === '*' ? self : (name ? self[name] = {} : {})); })(function(exports, module) { // === Symbol Support ===
function hasSymbol(name) {
return typeof Symbol === "function" && Boolean(Symbol[name]);
}
function getSymbol(name) {
return hasSymbol(name) ? Symbol[name] : "@@" + name;
}
// Ponyfill Symbol.observable for interoperability with other libraries
if (typeof Symbol === "function" && !Symbol.observable) {
Symbol.observable = Symbol("observable");
}
// === Abstract Operations ===
function getMethod(obj, key) {
var value = obj[key];
if (value == null)
return undefined;
if (typeof value !== "function")
throw new TypeError(value + " is not a function");
return value;
}
function getSpecies(obj) {
var ctor = obj.constructor;
if (ctor !== undefined) {
ctor = ctor[getSymbol("species")];
if (ctor === null) {
ctor = undefined;
}
}
return ctor !== undefined ? ctor : Observable;
}
function addMethods(target, methods) {
Object.keys(methods).forEach(function(k) {
var desc = Object.getOwnPropertyDescriptor(methods, k);
desc.enumerable = false;
Object.defineProperty(target, k, desc);
});
}
function cleanupSubscription(subscription) {
// Assert: observer._observer is undefined
var cleanup = subscription._cleanup;
if (!cleanup)
return;
// Drop the reference to the cleanup function so that we won't call it
// more than once
subscription._cleanup = undefined;
// Call the cleanup function
cleanup();
}
function subscriptionClosed(subscription) {
return subscription._observer === undefined;
}
function closeSubscription(subscription) {
if (subscriptionClosed(subscription))
return;
subscription._observer = undefined;
cleanupSubscription(subscription);
}
function cleanupFromSubscription(subscription) {
return function() { subscription.unsubscribe() };
}
function Subscription(observer, subscriber) {
// Assert: subscriber is callable
// The observer must be an object
if (Object(observer) !== observer)
throw new TypeError("Observer must be an object");
this._cleanup = undefined;
this._observer = observer;
var start = getMethod(observer, "start");
if (start)
start.call(observer, this);
if (subscriptionClosed(this))
return;
observer = new SubscriptionObserver(this);
try {
// Call the subscriber function
var cleanup$0 = subscriber.call(undefined, observer);
// The return value must be undefined, null, a subscription object, or a function
if (cleanup$0 != null) {
if (typeof cleanup$0.unsubscribe === "function")
cleanup$0 = cleanupFromSubscription(cleanup$0);
else if (typeof cleanup$0 !== "function")
throw new TypeError(cleanup$0 + " is not a function");
this._cleanup = cleanup$0;
}
} catch (e) {
// If an error occurs during startup, then attempt to send the error
// to the observer
observer.error(e);
return;
}
// If the stream is already finished, then perform cleanup
if (subscriptionClosed(this))
cleanupSubscription(this);
}
addMethods(Subscription.prototype = {}, {
get closed() { return subscriptionClosed(this) },
unsubscribe: function() { closeSubscription(this) },
});
function SubscriptionObserver(subscription) {
this._subscription = subscription;
}
addMethods(SubscriptionObserver.prototype = {}, {
get closed() { return subscriptionClosed(this._subscription) },
next: function(value) {
var subscription = this._subscription;
// If the stream is closed, then return undefined
if (subscriptionClosed(subscription))
return undefined;
var observer = subscription._observer;
var m = getMethod(observer, "next");
// If the observer doesn't support "next", then return undefined
if (!m)
return undefined;
// Send the next value to the sink
return m.call(observer, value);
},
error: function(value) {
var subscription = this._subscription;
// If the stream is closed, throw the error to the caller
if (subscriptionClosed(subscription))
throw value;
var observer = subscription._observer;
subscription._observer = undefined;
try {
var m$0 = getMethod(observer, "error");
// If the sink does not support "error", then throw the error to the caller
if (!m$0)
throw value;
value = m$0.call(observer, value);
} catch (e) {
try { cleanupSubscription(subscription) }
finally { throw e }
}
cleanupSubscription(subscription);
return value;
},
complete: function(value) {
var subscription = this._subscription;
// If the stream is closed, then return undefined
if (subscriptionClosed(subscription))
return undefined;
var observer = subscription._observer;
subscription._observer = undefined;
try {
var m$1 = getMethod(observer, "complete");
// If the sink does not support "complete", then return undefined
value = m$1 ? m$1.call(observer, value) : undefined;
} catch (e) {
try { cleanupSubscription(subscription) }
finally { throw e }
}
cleanupSubscription(subscription);
return value;
},
});
function Observable(subscriber) {
// The stream subscriber must be a function
if (typeof subscriber !== "function")
throw new TypeError("Observable initializer must be a function");
this._subscriber = subscriber;
}
addMethods(Observable.prototype, {
subscribe: function(observer) { for (var args = [], __$0 = 1; __$0 < arguments.length; ++__$0) args.push(arguments[__$0]);
if (typeof observer === 'function') {
observer = {
next: observer,
error: args[0],
complete: args[1],
};
}
return new Subscription(observer, this._subscriber);
},
forEach: function(fn) { var __this = this;
return new Promise(function(resolve, reject) {
if (typeof fn !== "function")
return Promise.reject(new TypeError(fn + " is not a function"));
__this.subscribe({
_subscription: null,
start: function(subscription) {
if (Object(subscription) !== subscription)
throw new TypeError(subscription + " is not an object");
this._subscription = subscription;
},
next: function(value) {
var subscription = this._subscription;
if (subscription.closed)
return;
try {
return fn(value);
} catch (err) {
reject(err);
subscription.unsubscribe();
}
},
error: reject,
complete: resolve,
});
});
},
map: function(fn) { var __this = this;
if (typeof fn !== "function")
throw new TypeError(fn + " is not a function");
var C = getSpecies(this);
return new C(function(observer) { return __this.subscribe({
next: function(value) {
if (observer.closed)
return;
try { value = fn(value) }
catch (e) { return observer.error(e) }
return observer.next(value);
},
error: function(e) { return observer.error(e) },
complete: function(x) { return observer.complete(x) },
}); });
},
filter: function(fn) { var __this = this;
if (typeof fn !== "function")
throw new TypeError(fn + " is not a function");
var C = getSpecies(this);
return new C(function(observer) { return __this.subscribe({
next: function(value) {
if (observer.closed)
return;
try { if (!fn(value)) return undefined }
catch (e) { return observer.error(e) }
return observer.next(value);
},
error: function(e) { return observer.error(e) },
complete: function() { return observer.complete() },
}); });
},
reduce: function(fn) { var __this = this;
if (typeof fn !== "function")
throw new TypeError(fn + " is not a function");
var C = getSpecies(this);
var hasSeed = arguments.length > 1;
var hasValue = false;
var seed = arguments[1];
var acc = seed;
return new C(function(observer) { return __this.subscribe({
next: function(value) {
if (observer.closed)
return;
var first = !hasValue;
hasValue = true;
if (!first || hasSeed) {
try { acc = fn(acc, value) }
catch (e) { return observer.error(e) }
} else {
acc = value;
}
},
error: function(e) { observer.error(e) },
complete: function() {
if (!hasValue && !hasSeed) {
observer.error(new TypeError("Cannot reduce an empty sequence"));
return;
}
observer.next(acc);
observer.complete();
},
}); });
},
flatMap: function(fn) { var __this = this;
if (typeof fn !== "function")
throw new TypeError(fn + " is not a function");
var C = getSpecies(this);
return new C(function(observer) {
var completed = false;
var subscriptions = [];
// Subscribe to the outer Observable
var outer = __this.subscribe({
next: function(value) {
if (fn) {
try {
value = fn(value);
} catch (x) {
observer.error(x);
return;
}
}
// Subscribe to the inner Observable
Observable.from(value).subscribe({
_subscription: null,
start: function(s) { subscriptions.push(this._subscription = s) },
next: function(value) { observer.next(value) },
error: function(e) { observer.error(e) },
complete: function() {
var i = subscriptions.indexOf(this._subscription);
if (i >= 0)
subscriptions.splice(i, 1);
closeIfDone();
}
});
},
error: function(e) {
return observer.error(e);
},
complete: function() {
completed = true;
closeIfDone();
}
});
function closeIfDone() {
if (completed && subscriptions.length === 0)
observer.complete();
}
return function() {
subscriptions.forEach(function(s) { return s.unsubscribe(); });
outer.unsubscribe();
};
});
},
});
Object.defineProperty(Observable.prototype, getSymbol("observable"), {
value: function() { return this },
writable: true,
configurable: true,
});
addMethods(Observable, {
from: function(x) {
var C = typeof this === "function" ? this : Observable;
if (x == null)
throw new TypeError(x + " is not an object");
var method = getMethod(x, getSymbol("observable"));
if (method) {
var observable$0 = method.call(x);
if (Object(observable$0) !== observable$0)
throw new TypeError(observable$0 + " is not an object");
if (observable$0.constructor === C)
return observable$0;
return new C(function(observer) { return observable$0.subscribe(observer); });
}
if (hasSymbol("iterator") && (method = getMethod(x, getSymbol("iterator")))) {
return new C(function(observer) {
for (var __$0 = (method.call(x))[Symbol.iterator](), __$1; __$1 = __$0.next(), !__$1.done;) { var item$0 = __$1.value;
observer.next(item$0);
if (observer.closed)
return;
}
observer.complete();
});
}
if (Array.isArray(x)) {
return new C(function(observer) {
for (var i$0 = 0; i$0 < x.length; ++i$0) {
observer.next(x[i$0]);
if (observer.closed)
return;
}
observer.complete();
});
}
throw new TypeError(x + " is not observable");
},
of: function() { for (var items = [], __$0 = 0; __$0 < arguments.length; ++__$0) items.push(arguments[__$0]);
var C = typeof this === "function" ? this : Observable;
return new C(function(observer) {
for (var i$1 = 0; i$1 < items.length; ++i$1) {
observer.next(items[i$1]);
if (observer.closed)
return;
}
observer.complete();
});
},
});
Object.defineProperty(Observable, getSymbol("species"), {
get: function() { return this },
configurable: true,
});
exports.Observable = Observable;
}, "*");
},{}],"zen-observable":[function(require,module,exports){
module.exports = require("./zen-observable.js").Observable;
},{"./zen-observable.js":1}]},{},[])
//# sourceMappingURL=data:application/json;charset=utf-8;base64,{"version":3,"sources":["../../../../home/admin/browserify-cdn/node_modules/browserify/node_modules/browser-pack/_prelude.js","zen-observable.js","zen-observable"],"names":[],"mappings":"AAAA;ACAA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;AACA;;ACjfA;AACA","file":"generated.js","sourceRoot":"","sourcesContent":["(function e(t,n,r){function s(o,u){if(!n[o]){if(!t[o]){var a=typeof require==\"function\"&&require;if(!u&&a)return a(o,!0);if(i)return i(o,!0);var f=new Error(\"Cannot find module '\"+o+\"'\");throw f.code=\"MODULE_NOT_FOUND\",f}var l=n[o]={exports:{}};t[o][0].call(l.exports,function(e){var n=t[o][1][e];return s(n?n:e)},l,l.exports,e,t,n,r)}return n[o].exports}var i=typeof require==\"function\"&&require;for(var o=0;o<r.length;o++)s(r[o]);return s})","'use strict'; (function(fn, name) { if (typeof exports !== 'undefined') fn(exports, module); else if (typeof self !== 'undefined') fn(name === '*' ? self : (name ? self[name] = {} : {})); })(function(exports, module) { // === Symbol Support ===\n\nfunction hasSymbol(name) {\n  return typeof Symbol === \"function\" && Boolean(Symbol[name]);\n}\n\nfunction getSymbol(name) {\n  return hasSymbol(name) ? Symbol[name] : \"@@\" + name;\n}\n\n// Ponyfill Symbol.observable for interoperability with other libraries\nif (typeof Symbol === \"function\" && !Symbol.observable) {\n  Symbol.observable = Symbol(\"observable\");\n}\n\n// === Abstract Operations ===\n\nfunction getMethod(obj, key) {\n  var value = obj[key];\n\n  if (value == null)\n    return undefined;\n\n  if (typeof value !== \"function\")\n    throw new TypeError(value + \" is not a function\");\n\n  return value;\n}\n\nfunction getSpecies(obj) {\n  var ctor = obj.constructor;\n  if (ctor !== undefined) {\n    ctor = ctor[getSymbol(\"species\")];\n    if (ctor === null) {\n      ctor = undefined;\n    }\n  }\n  return ctor !== undefined ? ctor : Observable;\n}\n\nfunction addMethods(target, methods) {\n  Object.keys(methods).forEach(function(k) {\n    var desc = Object.getOwnPropertyDescriptor(methods, k);\n    desc.enumerable = false;\n    Object.defineProperty(target, k, desc);\n  });\n}\n\nfunction cleanupSubscription(subscription) {\n  // Assert:  observer._observer is undefined\n\n  var cleanup = subscription._cleanup;\n\n  if (!cleanup)\n    return;\n\n  // Drop the reference to the cleanup function so that we won't call it\n  // more than once\n  subscription._cleanup = undefined;\n\n  // Call the cleanup function\n  cleanup();\n}\n\nfunction subscriptionClosed(subscription) {\n  return subscription._observer === undefined;\n}\n\nfunction closeSubscription(subscription) {\n  if (subscriptionClosed(subscription))\n    return;\n\n  subscription._observer = undefined;\n  cleanupSubscription(subscription);\n}\n\nfunction cleanupFromSubscription(subscription) {\n  return function() { subscription.unsubscribe() };\n}\n\nfunction Subscription(observer, subscriber) {\n  // Assert: subscriber is callable\n\n  // The observer must be an object\n  if (Object(observer) !== observer)\n    throw new TypeError(\"Observer must be an object\");\n\n  this._cleanup = undefined;\n  this._observer = observer;\n\n  var start = getMethod(observer, \"start\");\n\n  if (start)\n    start.call(observer, this);\n\n  if (subscriptionClosed(this))\n    return;\n\n  observer = new SubscriptionObserver(this);\n\n  try {\n    // Call the subscriber function\n    var cleanup$0 = subscriber.call(undefined, observer);\n\n    // The return value must be undefined, null, a subscription object, or a function\n    if (cleanup$0 != null) {\n      if (typeof cleanup$0.unsubscribe === \"function\")\n        cleanup$0 = cleanupFromSubscription(cleanup$0);\n      else if (typeof cleanup$0 !== \"function\")\n        throw new TypeError(cleanup$0 + \" is not a function\");\n\n      this._cleanup = cleanup$0;\n    }\n  } catch (e) {\n    // If an error occurs during startup, then attempt to send the error\n    // to the observer\n    observer.error(e);\n    return;\n  }\n\n  // If the stream is already finished, then perform cleanup\n  if (subscriptionClosed(this))\n    cleanupSubscription(this);\n}\n\naddMethods(Subscription.prototype = {}, {\n  get closed() { return subscriptionClosed(this) },\n  unsubscribe: function() { closeSubscription(this) },\n});\n\nfunction SubscriptionObserver(subscription) {\n  this._subscription = subscription;\n}\n\naddMethods(SubscriptionObserver.prototype = {}, {\n\n  get closed() { return subscriptionClosed(this._subscription) },\n\n  next: function(value) {\n    var subscription = this._subscription;\n\n    // If the stream is closed, then return undefined\n    if (subscriptionClosed(subscription))\n      return undefined;\n\n    var observer = subscription._observer;\n    var m = getMethod(observer, \"next\");\n\n    // If the observer doesn't support \"next\", then return undefined\n    if (!m)\n      return undefined;\n\n    // Send the next value to the sink\n    return m.call(observer, value);\n  },\n\n  error: function(value) {\n    var subscription = this._subscription;\n\n    // If the stream is closed, throw the error to the caller\n    if (subscriptionClosed(subscription))\n      throw value;\n\n    var observer = subscription._observer;\n    subscription._observer = undefined;\n\n    try {\n      var m$0 = getMethod(observer, \"error\");\n\n      // If the sink does not support \"error\", then throw the error to the caller\n      if (!m$0)\n        throw value;\n\n      value = m$0.call(observer, value);\n    } catch (e) {\n      try { cleanupSubscription(subscription) }\n      finally { throw e }\n    }\n\n    cleanupSubscription(subscription);\n    return value;\n  },\n\n  complete: function(value) {\n    var subscription = this._subscription;\n\n    // If the stream is closed, then return undefined\n    if (subscriptionClosed(subscription))\n      return undefined;\n\n    var observer = subscription._observer;\n    subscription._observer = undefined;\n\n    try {\n      var m$1 = getMethod(observer, \"complete\");\n\n      // If the sink does not support \"complete\", then return undefined\n      value = m$1 ? m$1.call(observer, value) : undefined;\n    } catch (e) {\n      try { cleanupSubscription(subscription) }\n      finally { throw e }\n    }\n\n    cleanupSubscription(subscription);\n    return value;\n  },\n\n});\n\nfunction Observable(subscriber) {\n  // The stream subscriber must be a function\n  if (typeof subscriber !== \"function\")\n    throw new TypeError(\"Observable initializer must be a function\");\n\n  this._subscriber = subscriber;\n}\n\naddMethods(Observable.prototype, {\n\n  subscribe: function(observer) { for (var args = [], __$0 = 1; __$0 < arguments.length; ++__$0) args.push(arguments[__$0]); \n    if (typeof observer === 'function') {\n      observer = {\n        next: observer,\n        error: args[0],\n        complete: args[1],\n      };\n    }\n\n    return new Subscription(observer, this._subscriber);\n  },\n\n  forEach: function(fn) { var __this = this; \n    return new Promise(function(resolve, reject) {\n      if (typeof fn !== \"function\")\n        return Promise.reject(new TypeError(fn + \" is not a function\"));\n\n      __this.subscribe({\n        _subscription: null,\n\n        start: function(subscription) {\n          if (Object(subscription) !== subscription)\n            throw new TypeError(subscription + \" is not an object\");\n\n          this._subscription = subscription;\n        },\n\n        next: function(value) {\n          var subscription = this._subscription;\n\n          if (subscription.closed)\n            return;\n\n          try {\n            return fn(value);\n          } catch (err) {\n            reject(err);\n            subscription.unsubscribe();\n          }\n        },\n\n        error: reject,\n        complete: resolve,\n      });\n    });\n  },\n\n  map: function(fn) { var __this = this; \n    if (typeof fn !== \"function\")\n      throw new TypeError(fn + \" is not a function\");\n\n    var C = getSpecies(this);\n\n    return new C(function(observer) { return __this.subscribe({\n      next: function(value) {\n        if (observer.closed)\n          return;\n\n        try { value = fn(value) }\n        catch (e) { return observer.error(e) }\n\n        return observer.next(value);\n      },\n\n      error: function(e) { return observer.error(e) },\n      complete: function(x) { return observer.complete(x) },\n    }); });\n  },\n\n  filter: function(fn) { var __this = this; \n    if (typeof fn !== \"function\")\n      throw new TypeError(fn + \" is not a function\");\n\n    var C = getSpecies(this);\n\n    return new C(function(observer) { return __this.subscribe({\n      next: function(value) {\n        if (observer.closed)\n          return;\n\n        try { if (!fn(value)) return undefined }\n        catch (e) { return observer.error(e) }\n\n        return observer.next(value);\n      },\n\n      error: function(e) { return observer.error(e) },\n      complete: function() { return observer.complete() },\n    }); });\n  },\n\n  reduce: function(fn) { var __this = this; \n    if (typeof fn !== \"function\")\n      throw new TypeError(fn + \" is not a function\");\n\n    var C = getSpecies(this);\n    var hasSeed = arguments.length > 1;\n    var hasValue = false;\n    var seed = arguments[1];\n    var acc = seed;\n\n    return new C(function(observer) { return __this.subscribe({\n\n      next: function(value) {\n        if (observer.closed)\n          return;\n\n        var first = !hasValue;\n        hasValue = true;\n\n        if (!first || hasSeed) {\n          try { acc = fn(acc, value) }\n          catch (e) { return observer.error(e) }\n        } else {\n          acc = value;\n        }\n      },\n\n      error: function(e) { observer.error(e) },\n\n      complete: function() {\n        if (!hasValue && !hasSeed) {\n          observer.error(new TypeError(\"Cannot reduce an empty sequence\"));\n          return;\n        }\n\n        observer.next(acc);\n        observer.complete();\n      },\n\n    }); });\n  },\n\n  flatMap: function(fn) { var __this = this; \n    if (typeof fn !== \"function\")\n      throw new TypeError(fn + \" is not a function\");\n\n    var C = getSpecies(this);\n\n    return new C(function(observer) {\n      var completed = false;\n      var subscriptions = [];\n\n      // Subscribe to the outer Observable\n      var outer = __this.subscribe({\n\n        next: function(value) {\n          if (fn) {\n            try {\n              value = fn(value);\n            } catch (x) {\n              observer.error(x);\n              return;\n            }\n          }\n\n          // Subscribe to the inner Observable\n          Observable.from(value).subscribe({\n            _subscription: null,\n\n            start: function(s) { subscriptions.push(this._subscription = s) },\n            next: function(value) { observer.next(value) },\n            error: function(e) { observer.error(e) },\n\n            complete: function() {\n              var i = subscriptions.indexOf(this._subscription);\n\n              if (i >= 0)\n                subscriptions.splice(i, 1);\n\n              closeIfDone();\n            }\n          });\n        },\n\n        error: function(e) {\n          return observer.error(e);\n        },\n\n        complete: function() {\n          completed = true;\n          closeIfDone();\n        }\n      });\n\n      function closeIfDone() {\n        if (completed && subscriptions.length === 0)\n          observer.complete();\n      }\n\n      return function() {\n        subscriptions.forEach(function(s) { return s.unsubscribe(); });\n        outer.unsubscribe();\n      };\n    });\n  },\n\n});\n\nObject.defineProperty(Observable.prototype, getSymbol(\"observable\"), {\n  value: function() { return this },\n  writable: true,\n  configurable: true,\n});\n\naddMethods(Observable, {\n\n  from: function(x) {\n    var C = typeof this === \"function\" ? this : Observable;\n\n    if (x == null)\n      throw new TypeError(x + \" is not an object\");\n\n    var method = getMethod(x, getSymbol(\"observable\"));\n\n    if (method) {\n      var observable$0 = method.call(x);\n\n      if (Object(observable$0) !== observable$0)\n        throw new TypeError(observable$0 + \" is not an object\");\n\n      if (observable$0.constructor === C)\n        return observable$0;\n\n      return new C(function(observer) { return observable$0.subscribe(observer); });\n    }\n\n    if (hasSymbol(\"iterator\") && (method = getMethod(x, getSymbol(\"iterator\")))) {\n      return new C(function(observer) {\n        for (var __$0 = (method.call(x))[Symbol.iterator](), __$1; __$1 = __$0.next(), !__$1.done;) { var item$0 = __$1.value; \n          observer.next(item$0);\n          if (observer.closed)\n            return;\n        }\n\n        observer.complete();\n      });\n    }\n\n    if (Array.isArray(x)) {\n      return new C(function(observer) {\n        for (var i$0 = 0; i$0 < x.length; ++i$0) {\n          observer.next(x[i$0]);\n          if (observer.closed)\n            return;\n        }\n\n        observer.complete();\n      });\n    }\n\n    throw new TypeError(x + \" is not observable\");\n  },\n\n  of: function() { for (var items = [], __$0 = 0; __$0 < arguments.length; ++__$0) items.push(arguments[__$0]); \n    var C = typeof this === \"function\" ? this : Observable;\n\n    return new C(function(observer) {\n      for (var i$1 = 0; i$1 < items.length; ++i$1) {\n        observer.next(items[i$1]);\n        if (observer.closed)\n          return;\n      }\n\n      observer.complete();\n    });\n  },\n\n});\n\nObject.defineProperty(Observable, getSymbol(\"species\"), {\n  get: function() { return this },\n  configurable: true,\n});\n\nexports.Observable = Observable;\n\n\n}, \"*\");","module.exports = require(\"./zen-observable.js\").Observable;\n"]}
const Observable = require("zen-observable");
let sink;
const list = [];
const error = new Error();
new Observable(_sink => {
sink = _sink;
return () => list.push('cleanup');
}).subscribe({
next: val => {
list.push('next:' + val);
throw error;
},
error: err => {
list.push('error-handler:');
list.push(err);
},
complete: () => list.push('complete'),
});
try {
sink.next(1);
} catch (e) {
list.push('try-catch:');
list.push(e);
}
try {
sink.next(2);
} catch (e) {
list.push('try-catch:');
list.push(e);
}
console.log(list);
;}, 0)
{
"name": "requirebin-sketch",
"version": "1.0.0",
"dependencies": {
"zen-observable": "0.5.2"
}
}
<!-- contents of this file will be placed inside the <body> -->
<!-- contents of this file will be placed inside the <head> -->
@leebyron
Copy link
Author

leebyron commented Aug 8, 2017

xstream:

const xc = require("xstream").default;

xc.create({
  start: _sink => {
    sink = _sink;
  },
  stop: () => list.push('cleanup'),
})

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment