Skip to content

Instantly share code, notes, and snippets.

@leebyron
Last active August 8, 2017 18:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save leebyron/85115c44c89bf2c202ae8e6450978654 to your computer and use it in GitHub Desktop.
Save leebyron/85115c44c89bf2c202ae8e6450978654 to your computer and use it in GitHub Desktop.
requirebin sketch
const Observable = require("zen-observable");
let sink;
const list = [];
const error = new Error();
new Observable(_sink => {
sink = _sink;
return () => list.push('cleanup');
}).subscribe({
next: val => {
list.push('next:' + val);
throw error;
},
error: err => {
list.push('error-handler:');
list.push(err);
},
complete: () => list.push('complete'),
});
try {
sink.next(1);
} catch (e) {
list.push('try-catch:');
list.push(e);
}
try {
sink.next(2);
} catch (e) {
list.push('try-catch:');
list.push(e);
}
console.log(list);
setTimeout(function(){
;require=(function e(t,n,r){function s(o,u){if(!n[o]){if(!t[o]){var a=typeof require=="function"&&require;if(!u&&a)return a(o,!0);if(i)return i(o,!0);var f=new Error("Cannot find module '"+o+"'");throw f.code="MODULE_NOT_FOUND",f}var l=n[o]={exports:{}};t[o][0].call(l.exports,function(e){var n=t[o][1][e];return s(n?n:e)},l,l.exports,e,t,n,r)}return n[o].exports}var i=typeof require=="function"&&require;for(var o=0;o<r.length;o++)s(r[o]);return s})({1:[function(require,module,exports){
'use strict'; (function(fn, name) { if (typeof exports !== 'undefined') fn(exports, module); else if (typeof self !== 'undefined') fn(name === '*' ? self : (name ? self[name] = {} : {})); })(function(exports, module) { // === Symbol Support ===
function hasSymbol(name) {
return typeof Symbol === "function" && Boolean(Symbol[name]);
}
function getSymbol(name) {
return hasSymbol(name) ? Symbol[name] : "@@" + name;
}
// Ponyfill Symbol.observable for interoperability with other libraries
if (typeof Symbol === "function" && !Symbol.observable) {
Symbol.observable = Symbol("observable");
}
// === Abstract Operations ===
function getMethod(obj, key) {
var value = obj[key];
if (value == null)
return undefined;
if (typeof value !== "function")
throw new TypeError(value + " is not a function");
return value;
}
function getSpecies(obj) {
var ctor = obj.constructor;
if (ctor !== undefined) {
ctor = ctor[getSymbol("species")];
if (ctor === null) {
ctor = undefined;
}
}
return ctor !== undefined ? ctor : Observable;
}
function addMethods(target, methods) {
Object.keys(methods).forEach(function(k) {
var desc = Object.getOwnPropertyDescriptor(methods, k);
desc.enumerable = false;
Object.defineProperty(target, k, desc);
});
}
function cleanupSubscription(subscription) {
// Assert: observer._observer is undefined
var cleanup = subscription._cleanup;
if (!cleanup)
return;
// Drop the reference to the cleanup function so that we won't call it
// more than once
subscription._cleanup = undefined;
// Call the cleanup function
cleanup();
}
function subscriptionClosed(subscription) {
return subscription._observer === undefined;
}
function closeSubscription(subscription) {
if (subscriptionClosed(subscription))
return;
subscription._observer = undefined;
cleanupSubscription(subscription);
}
function cleanupFromSubscription(subscription) {
return function() { subscription.unsubscribe() };
}
function Subscription(observer, subscriber) {
// Assert: subscriber is callable
// The observer must be an object
if (Object(observer) !== observer)
throw new TypeError("Observer must be an object");
this._cleanup = undefined;
this._observer = observer;
var start = getMethod(observer, "start");
if (start)
start.call(observer, this);
if (subscriptionClosed(this))
return;
observer = new SubscriptionObserver(this);
try {
// Call the subscriber function
var cleanup$0 = subscriber.call(undefined, observer);
// The return value must be undefined, null, a subscription object, or a function
if (cleanup$0 != null) {
if (typeof cleanup$0.unsubscribe === "function")
cleanup$0 = cleanupFromSubscription(cleanup$0);
else if (typeof cleanup$0 !== "function")
throw new TypeError(cleanup$0 + " is not a function");
this._cleanup = cleanup$0;
}
} catch (e) {
// If an error occurs during startup, then attempt to send the error
// to the observer
observer.error(e);
return;
}
// If the stream is already finished, then perform cleanup
if (subscriptionClosed(this))
cleanupSubscription(this);
}
addMethods(Subscription.prototype = {}, {
get closed() { return subscriptionClosed(this) },
unsubscribe: function() { closeSubscription(this) },
});
function SubscriptionObserver(subscription) {
this._subscription = subscription;
}
addMethods(SubscriptionObserver.prototype = {}, {
get closed() { return subscriptionClosed(this._subscription) },
next: function(value) {
var subscription = this._subscription;
// If the stream is closed, then return undefined
if (subscriptionClosed(subscription))
return undefined;
var observer = subscription._observer;
var m = getMethod(observer, "next");
// If the observer doesn't support "next", then return undefined
if (!m)
return undefined;
// Send the next value to the sink
return m.call(observer, value);
},
error: function(value) {
var subscription = this._subscription;
// If the stream is closed, throw the error to the caller
if (subscriptionClosed(subscription))
throw value;
var observer = subscription._observer;
subscription._observer = undefined;
try {
var m$0 = getMethod(observer, "error");
// If the sink does not support "error", then throw the error to the caller
if (!m$0)
throw value;
value = m$0.call(observer, value);
} catch (e) {
try { cleanupSubscription(subscription) }
finally { throw e }
}
cleanupSubscription(subscription);
return value;
},
complete: function(value) {
var subscription = this._subscription;
// If the stream is closed, then return undefined
if (subscriptionClosed(subscription))
return undefined;
var observer = subscription._observer;
subscription._observer = undefined;
try {
var m$1 = getMethod(observer, "complete");
// If the sink does not support "complete", then return undefined
value = m$1 ? m$1.call(observer, value) : undefined;
} catch (e) {
try { cleanupSubscription(subscription) }
finally { throw e }
}
cleanupSubscription(subscription);
return value;
},
});
function Observable(subscriber) {
// The stream subscriber must be a function
if (typeof subscriber !== "function")
throw new TypeError("Observable initializer must be a function");
this._subscriber = subscriber;
}
addMethods(Observable.prototype, {
subscribe: function(observer) { for (var args = [], __$0 = 1; __$0 < arguments.length; ++__$0) args.push(arguments[__$0]);
if (typeof observer === 'function') {
observer = {
next: observer,
error: args[0],
complete: args[1],
};
}
return new Subscription(observer, this._subscriber);
},
forEach: function(fn) { var __this = this;
return new Promise(function(resolve, reject) {
if (typeof fn !== "function")
return Promise.reject(new TypeError(fn + " is not a function"));
__this.subscribe({
_subscription: null,
start: function(subscription) {
if (Object(subscription) !== subscription)
throw new TypeError(subscription + " is not an object");
this._subscription = subscription;
},
next: function(value) {
var subscription = this._subscription;
if (subscription.closed)
return;
try {
return fn(value);
} catch (err) {
reject(err);
subscription.unsubscribe();
}
},
error: reject,
complete: resolve,
});
});
},
map: function(fn) { var __this = this;
if (typeof fn !== "function")
throw new TypeError(fn + " is not a function");
var C = getSpecies(this);
return new C(function(observer) { return __this.subscribe({
next: function(value) {
if (observer.closed)
return;
try { value = fn(value) }
catch (e) { return observer.error(e) }
return observer.next(value);
},
error: function(e) { return observer.error(e) },
complete: function(x) { return observer.complete(x) },
}); });
},
filter: function(fn) { var __this = this;
if (typeof fn !== "function")
throw new TypeError(fn + " is not a function");
var C = getSpecies(this);
return new C(function(observer) { return __this.subscribe({
next: function(value) {
if (observer.closed)
return;
try { if (!fn(value)) return undefined }
catch (e) { return observer.error(e) }
return observer.next(value);
},
error: function(e) { return observer.error(e) },
complete: function() { return observer.complete() },
}); });
},
reduce: function(fn) { var __this = this;
if (typeof fn !== "function")
throw new TypeError(fn + " is not a function");
var C = getSpecies(this);
var hasSeed = arguments.length > 1;
var hasValue = false;
var seed = arguments[1];
var acc = seed;
return new C(function(observer) { return __this.subscribe({
next: function(value) {
if (observer.closed)
return;
var first = !hasValue;
hasValue = true;
if (!first || hasSeed) {
try { acc = fn(acc, value) }
catch (e) { return observer.error(e) }
} else {
acc = value;
}
},
error: function(e) { observer.error(e) },
complete: function() {
if (!hasValue && !hasSeed) {
observer.error(new TypeError("Cannot reduce an empty sequence"));
return;
}
observer.next(acc);
observer.complete();
},
}); });
},
flatMap: function(fn) { var __this = this;
if (typeof fn !== "function")
throw new TypeError(fn + " is not a function");
var C = getSpecies(this);
return new C(function(observer) {
var completed = false;
var subscriptions = [];
// Subscribe to the outer Observable
var outer = __this.subscribe({
next: function(value) {
if (fn) {
try {
value = fn(value);
} catch (x) {
observer.error(x);
return;
}
}
// Subscribe to the inner Observable
Observable.from(value).subscribe({
_subscription: null,
start: function(s) { subscriptions.push(this._subscription = s) },
next: function(value) { observer.next(value) },
error: function(e) { observer.error(e) },
complete: function() {
var i = subscriptions.indexOf(this._subscription);
if (i >= 0)
subscriptions.splice(i, 1);
closeIfDone();
}
});
},
error: function(e) {
return observer.error(e);
},
complete: function() {
completed = true;
closeIfDone();
}
});
function closeIfDone() {
if (completed && subscriptions.length === 0)
observer.complete();
}
return function() {
subscriptions.forEach(function(s) { return s.unsubscribe(); });
outer.unsubscribe();
};
});
},
});
Object.defineProperty(Observable.prototype, getSymbol("observable"), {
value: function() { return this },
writable: true,
configurable: true,
});
addMethods(Observable, {
from: function(x) {
var C = typeof this === "function" ? this : Observable;
if (x == null)
throw new TypeError(x + " is not an object");
var method = getMethod(x, getSymbol("observable"));
if (method) {
var observable$0 = method.call(x);
if (Object(observable$0) !== observable$0)
throw new TypeError(observable$0 + " is not an object");
if (observable$0.constructor === C)
return observable$0;
return new C(function(observer) { return observable$0.subscribe(observer); });
}
if (hasSymbol("iterator") && (method = getMethod(x, getSymbol("iterator")))) {
return new C(function(observer) {
for (var __$0 = (method.call(x))[Symbol.iterator](), __$1; __$1 = __$0.next(), !__$1.done;) { var item$0 = __$1.value;
observer.next(item$0);
if (observer.closed)
return;
}
observer.complete();
});
}
if (Array.isArray(x)) {
return new C(function(observer) {
for (var i$0 = 0; i$0 < x.length; ++i$0) {
observer.next(x[i$0]);
if (observer.closed)
return;
}
observer.complete();
});
}
throw new TypeError(x + " is not observable");
},
of: function() { for (var items = [], __$0 = 0; __$0 < arguments.length; ++__$0) items.push(arguments[__$0]);
var C = typeof this === "function" ? this : Observable;
return new C(function(observer) {
for (var i$1 = 0; i$1 < items.length; ++i$1) {
observer.next(items[i$1]);
if (observer.closed)
return;
}
observer.complete();
});
},
});
Object.defineProperty(Observable, getSymbol("species"), {
get: function() { return this },
configurable: true,
});
exports.Observable = Observable;
}, "*");
},{}],"zen-observable":[function(require,module,exports){
module.exports = require("./zen-observable.js").Observable;
},{"./zen-observable.js":1}]},{},[])
//# sourceMappingURL=data:application/json;charset=utf-8;base64,
const Observable = require("zen-observable");
let sink;
const list = [];
const error = new Error();
new Observable(_sink => {
sink = _sink;
return () => list.push('cleanup');
}).subscribe({
next: val => {
list.push('next:' + val);
throw error;
},
error: err => {
list.push('error-handler:');
list.push(err);
},
complete: () => list.push('complete'),
});
try {
sink.next(1);
} catch (e) {
list.push('try-catch:');
list.push(e);
}
try {
sink.next(2);
} catch (e) {
list.push('try-catch:');
list.push(e);
}
console.log(list);
;}, 0)
{
"name": "requirebin-sketch",
"version": "1.0.0",
"dependencies": {
"zen-observable": "0.5.2"
}
}
<!-- contents of this file will be placed inside the <body> -->
<!-- contents of this file will be placed inside the <head> -->
@leebyron
Copy link
Author

leebyron commented Aug 8, 2017

xstream:

const xc = require("xstream").default;

xc.create({
  start: _sink => {
    sink = _sink;
  },
  stop: () => list.push('cleanup'),
})

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment