Created
March 25, 2017 02:50
-
-
Save humanchimp/45ec4fbf2bb11f5e249b433a8dd610a1 to your computer and use it in GitHub Desktop.
observable toy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// from https://github.com/tc39/proposal-observable/blob/master/demo/mouse-drags.js | |
// Emits each element of the input stream until the control stream has emitted an | |
// element. | |
function takeUntil(stream, control) { | |
return new Observable(sink => { | |
let source = stream.subscribe(sink); | |
let input = control.subscribe({ | |
next: x => sink.complete(x), | |
error: x => sink.error(x), | |
complete: x => sink.complete(x), | |
}); | |
return _=> { | |
source.unsubscribe(); | |
input.unsubscribe(); | |
}; | |
}); | |
} | |
// For a nested stream, emits the elements of the inner stream contained within the | |
// most recent outer stream | |
function switchLatest(stream) { | |
return new Observable(sink => { | |
let inner = null; | |
let outer = stream.subscribe({ | |
next(value) { | |
if (inner) | |
inner.unsubscribe(); | |
inner = value.subscribe({ | |
next: x => sink.next(x), | |
error: x => sink.error(x), | |
}); | |
}, | |
error: x => sink.error(x), | |
complete: x => sink.complete(x), | |
}); | |
return _=> { | |
if (inner) | |
inner.unsubscribe(); | |
outer.unsubscribe(); | |
}; | |
}); | |
} | |
// Returns an observable of DOM element events | |
function listen(element, eventName) { | |
return new Observable(sink => { | |
function handler(event) { sink.next(event) } | |
element.addEventListener(eventName, handler); | |
return _=> element.removeEventListener(eventName, handler); | |
}); | |
} | |
// Returns an observable of drag move events for the specified element | |
function mouseDrags(element) { | |
// For each mousedown, emit a nested stream of mouse move events which stops | |
// when a mouseup event occurs | |
let moveStreams = listen(element, "mousedown").map(e => { | |
e.preventDefault(); | |
return takeUntil( | |
listen(element, "mousemove"), | |
listen(document, "mouseup")); | |
}); | |
// Return a stream of mouse moves nested within the most recent mouse down | |
return switchLatest(moveStreams); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<!doctype html> | |
<html> | |
<title>observable</title> | |
<body> | |
<canvas></canvas> | |
<style>body {margin:0;}</style> | |
<script src="./zen-observable.js"></script> | |
<script src="./mouse-drags.js"></script> | |
<script> | |
const canvas = document.querySelector('canvas'); | |
const context = canvas.getContext('2d'); | |
resizeCanvas(); | |
listen(canvas, "mousedown").subscribe({ | |
next() { | |
context.beginPath(); | |
}, | |
}); | |
const mouseMoves = mouseDrags(canvas); | |
mouseMoves.subscribe({ | |
next(e) { | |
context.strokeStyle = randomRgb(); | |
if (!e.shiftKey) { | |
context.lineTo(e.offsetX, e.offsetY); | |
} | |
context.stroke(); | |
}, | |
}); | |
let radius = 10; | |
mouseMoves.filter(e => e.metaKey).subscribe({ | |
next() { | |
radius = r(100); | |
} | |
}); | |
const τ = Math.PI * 2; | |
mouseMoves.filter(e => e.shiftKey).subscribe({ | |
next(e) { | |
context.arc(e.offsetX, e.offsetY, radius, 0, τ, true); | |
} | |
}); | |
mouseMoves.filter(e => e.altKey).subscribe({ | |
next() { | |
context.fillStyle = randomRgba(); | |
context.fill(); | |
} | |
}); | |
listen(canvas, "mouseup").subscribe({ | |
next(e) { | |
if (!e.shiftKey) { | |
context.fillStyle = randomRgba(); | |
context.fill(); | |
} | |
context.closePath(); | |
}, | |
}); | |
function resizeCanvas() { | |
canvas.width = window.innerWidth; | |
canvas.height = window.innerHeight; | |
} | |
function r(max, decimals) { | |
return (Math.random() * max).toFixed(decimals); | |
} | |
function randomRgb() { | |
return `rgb(${r(255)},${r(255)},${r(255)}`; | |
} | |
function randomRgba() { | |
return `rgba(${r(255)},${r(255)},${r(255)},${r(1, 2)}`; | |
} | |
</script> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; (function(fn, name) { if (typeof exports !== 'undefined') fn(exports, module); else if (typeof self !== 'undefined') fn(name === '*' ? self : (name ? self[name] = {} : {})); })(function(exports, module) { // === Symbol Support === | |
function hasSymbol(name) { | |
return typeof Symbol === "function" && Boolean(Symbol[name]); | |
} | |
function getSymbol(name) { | |
return hasSymbol(name) ? Symbol[name] : "@@" + name; | |
} | |
// Ponyfill Symbol.observable for interoperability with other libraries | |
if (typeof Symbol === "function" && !Symbol.observable) { | |
Symbol.observable = Symbol("observable"); | |
} | |
// === Abstract Operations === | |
function getMethod(obj, key) { | |
var value = obj[key]; | |
if (value == null) | |
return undefined; | |
if (typeof value !== "function") | |
throw new TypeError(value + " is not a function"); | |
return value; | |
} | |
function getSpecies(obj) { | |
var ctor = obj.constructor; | |
if (ctor !== undefined) { | |
ctor = ctor[getSymbol("species")]; | |
if (ctor === null) { | |
ctor = undefined; | |
} | |
} | |
return ctor !== undefined ? ctor : Observable; | |
} | |
function addMethods(target, methods) { | |
Object.keys(methods).forEach(function(k) { | |
var desc = Object.getOwnPropertyDescriptor(methods, k); | |
desc.enumerable = false; | |
Object.defineProperty(target, k, desc); | |
}); | |
} | |
function cleanupSubscription(subscription) { | |
// Assert: observer._observer is undefined | |
var cleanup = subscription._cleanup; | |
if (!cleanup) | |
return; | |
// Drop the reference to the cleanup function so that we won't call it | |
// more than once | |
subscription._cleanup = undefined; | |
// Call the cleanup function | |
cleanup(); | |
} | |
function subscriptionClosed(subscription) { | |
return subscription._observer === undefined; | |
} | |
function closeSubscription(subscription) { | |
if (subscriptionClosed(subscription)) | |
return; | |
subscription._observer = undefined; | |
cleanupSubscription(subscription); | |
} | |
function cleanupFromSubscription(subscription) { | |
return function() { subscription.unsubscribe() }; | |
} | |
function Subscription(observer, subscriber) { | |
// Assert: subscriber is callable | |
// The observer must be an object | |
if (Object(observer) !== observer) | |
throw new TypeError("Observer must be an object"); | |
this._cleanup = undefined; | |
this._observer = observer; | |
var start = getMethod(observer, "start"); | |
if (start) | |
start.call(observer, this); | |
if (subscriptionClosed(this)) | |
return; | |
observer = new SubscriptionObserver(this); | |
try { | |
// Call the subscriber function | |
var cleanup$0 = subscriber.call(undefined, observer); | |
// The return value must be undefined, null, a subscription object, or a function | |
if (cleanup$0 != null) { | |
if (typeof cleanup$0.unsubscribe === "function") | |
cleanup$0 = cleanupFromSubscription(cleanup$0); | |
else if (typeof cleanup$0 !== "function") | |
throw new TypeError(cleanup$0 + " is not a function"); | |
this._cleanup = cleanup$0; | |
} | |
} catch (e) { | |
// If an error occurs during startup, then attempt to send the error | |
// to the observer | |
observer.error(e); | |
return; | |
} | |
// If the stream is already finished, then perform cleanup | |
if (subscriptionClosed(this)) | |
cleanupSubscription(this); | |
} | |
addMethods(Subscription.prototype = {}, { | |
get closed() { return subscriptionClosed(this) }, | |
unsubscribe: function() { closeSubscription(this) }, | |
}); | |
function SubscriptionObserver(subscription) { | |
this._subscription = subscription; | |
} | |
addMethods(SubscriptionObserver.prototype = {}, { | |
get closed() { return subscriptionClosed(this._subscription) }, | |
next: function(value) { | |
var subscription = this._subscription; | |
// If the stream is closed, then return undefined | |
if (subscriptionClosed(subscription)) | |
return undefined; | |
var observer = subscription._observer; | |
var m = getMethod(observer, "next"); | |
// If the observer doesn't support "next", then return undefined | |
if (!m) | |
return undefined; | |
// Send the next value to the sink | |
return m.call(observer, value); | |
}, | |
error: function(value) { | |
var subscription = this._subscription; | |
// If the stream is closed, throw the error to the caller | |
if (subscriptionClosed(subscription)) | |
throw value; | |
var observer = subscription._observer; | |
subscription._observer = undefined; | |
try { | |
var m$0 = getMethod(observer, "error"); | |
// If the sink does not support "error", then throw the error to the caller | |
if (!m$0) | |
throw value; | |
value = m$0.call(observer, value); | |
} catch (e) { | |
try { cleanupSubscription(subscription) } | |
finally { throw e } | |
} | |
cleanupSubscription(subscription); | |
return value; | |
}, | |
complete: function(value) { | |
var subscription = this._subscription; | |
// If the stream is closed, then return undefined | |
if (subscriptionClosed(subscription)) | |
return undefined; | |
var observer = subscription._observer; | |
subscription._observer = undefined; | |
try { | |
var m$1 = getMethod(observer, "complete"); | |
// If the sink does not support "complete", then return undefined | |
value = m$1 ? m$1.call(observer, value) : undefined; | |
} catch (e) { | |
try { cleanupSubscription(subscription) } | |
finally { throw e } | |
} | |
cleanupSubscription(subscription); | |
return value; | |
}, | |
}); | |
function Observable(subscriber) { | |
// The stream subscriber must be a function | |
if (typeof subscriber !== "function") | |
throw new TypeError("Observable initializer must be a function"); | |
this._subscriber = subscriber; | |
} | |
addMethods(Observable.prototype, { | |
subscribe: function(observer) { for (var args = [], __$0 = 1; __$0 < arguments.length; ++__$0) args.push(arguments[__$0]); | |
if (typeof observer === 'function') { | |
observer = { | |
next: observer, | |
error: args[0], | |
complete: args[1], | |
}; | |
} | |
return new Subscription(observer, this._subscriber); | |
}, | |
forEach: function(fn) { var __this = this; | |
return new Promise(function(resolve, reject) { | |
if (typeof fn !== "function") | |
return Promise.reject(new TypeError(fn + " is not a function")); | |
__this.subscribe({ | |
_subscription: null, | |
start: function(subscription) { | |
if (Object(subscription) !== subscription) | |
throw new TypeError(subscription + " is not an object"); | |
this._subscription = subscription; | |
}, | |
next: function(value) { | |
var subscription = this._subscription; | |
if (subscription.closed) | |
return; | |
try { | |
return fn(value); | |
} catch (err) { | |
reject(err); | |
subscription.unsubscribe(); | |
} | |
}, | |
error: reject, | |
complete: resolve, | |
}); | |
}); | |
}, | |
map: function(fn) { var __this = this; | |
if (typeof fn !== "function") | |
throw new TypeError(fn + " is not a function"); | |
var C = getSpecies(this); | |
return new C(function(observer) { return __this.subscribe({ | |
next: function(value) { | |
if (observer.closed) | |
return; | |
try { value = fn(value) } | |
catch (e) { return observer.error(e) } | |
return observer.next(value); | |
}, | |
error: function(e) { return observer.error(e) }, | |
complete: function(x) { return observer.complete(x) }, | |
}); }); | |
}, | |
filter: function(fn) { var __this = this; | |
if (typeof fn !== "function") | |
throw new TypeError(fn + " is not a function"); | |
var C = getSpecies(this); | |
return new C(function(observer) { return __this.subscribe({ | |
next: function(value) { | |
if (observer.closed) | |
return; | |
try { if (!fn(value)) return undefined } | |
catch (e) { return observer.error(e) } | |
return observer.next(value); | |
}, | |
error: function(e) { return observer.error(e) }, | |
complete: function() { return observer.complete() }, | |
}); }); | |
}, | |
reduce: function(fn) { var __this = this; | |
if (typeof fn !== "function") | |
throw new TypeError(fn + " is not a function"); | |
var C = getSpecies(this); | |
var hasSeed = arguments.length > 1; | |
var hasValue = false; | |
var seed = arguments[1]; | |
var acc = seed; | |
return new C(function(observer) { return __this.subscribe({ | |
next: function(value) { | |
if (observer.closed) | |
return; | |
var first = !hasValue; | |
hasValue = true; | |
if (!first || hasSeed) { | |
try { acc = fn(acc, value) } | |
catch (e) { return observer.error(e) } | |
} else { | |
acc = value; | |
} | |
}, | |
error: function(e) { observer.error(e) }, | |
complete: function() { | |
if (!hasValue && !hasSeed) { | |
observer.error(new TypeError("Cannot reduce an empty sequence")); | |
return; | |
} | |
observer.next(acc); | |
observer.complete(); | |
}, | |
}); }); | |
}, | |
flatMap: function(fn) { var __this = this; | |
if (typeof fn !== "function") | |
throw new TypeError(fn + " is not a function"); | |
var C = getSpecies(this); | |
return new C(function(observer) { | |
var completed = false; | |
var subscriptions = []; | |
// Subscribe to the outer Observable | |
var outer = __this.subscribe({ | |
next: function(value) { | |
if (fn) { | |
try { | |
value = fn(value); | |
} catch (x) { | |
observer.error(x); | |
return; | |
} | |
} | |
// Subscribe to the inner Observable | |
Observable.from(value).subscribe({ | |
_subscription: null, | |
start: function(s) { subscriptions.push(this._subscription = s) }, | |
next: function(value) { observer.next(value) }, | |
error: function(e) { observer.error(e) }, | |
complete: function() { | |
var i = subscriptions.indexOf(this._subscription); | |
if (i >= 0) | |
subscriptions.splice(i, 1); | |
closeIfDone(); | |
} | |
}); | |
}, | |
error: function(e) { | |
return observer.error(e); | |
}, | |
complete: function() { | |
completed = true; | |
closeIfDone(); | |
} | |
}); | |
function closeIfDone() { | |
if (completed && subscriptions.length === 0) | |
observer.complete(); | |
} | |
return function() { | |
subscriptions.forEach(function(s) { return s.unsubscribe(); }); | |
outer.unsubscribe(); | |
}; | |
}); | |
}, | |
}); | |
Object.defineProperty(Observable.prototype, getSymbol("observable"), { | |
value: function() { return this }, | |
writable: true, | |
configurable: true, | |
}); | |
addMethods(Observable, { | |
from: function(x) { | |
var C = typeof this === "function" ? this : Observable; | |
if (x == null) | |
throw new TypeError(x + " is not an object"); | |
var method = getMethod(x, getSymbol("observable")); | |
if (method) { | |
var observable$0 = method.call(x); | |
if (Object(observable$0) !== observable$0) | |
throw new TypeError(observable$0 + " is not an object"); | |
if (observable$0.constructor === C) | |
return observable$0; | |
return new C(function(observer) { return observable$0.subscribe(observer); }); | |
} | |
if (hasSymbol("iterator") && (method = getMethod(x, getSymbol("iterator")))) { | |
return new C(function(observer) { | |
for (var __$0 = (method.call(x))[Symbol.iterator](), __$1; __$1 = __$0.next(), !__$1.done;) { var item$0 = __$1.value; | |
observer.next(item$0); | |
if (observer.closed) | |
return; | |
} | |
observer.complete(); | |
}); | |
} | |
if (Array.isArray(x)) { | |
return new C(function(observer) { | |
for (var i$0 = 0; i$0 < x.length; ++i$0) { | |
observer.next(x[i$0]); | |
if (observer.closed) | |
return; | |
} | |
observer.complete(); | |
}); | |
} | |
throw new TypeError(x + " is not observable"); | |
}, | |
of: function() { for (var items = [], __$0 = 0; __$0 < arguments.length; ++__$0) items.push(arguments[__$0]); | |
var C = typeof this === "function" ? this : Observable; | |
return new C(function(observer) { | |
for (var i$1 = 0; i$1 < items.length; ++i$1) { | |
observer.next(items[i$1]); | |
if (observer.closed) | |
return; | |
} | |
observer.complete(); | |
}); | |
}, | |
}); | |
Object.defineProperty(Observable, getSymbol("species"), { | |
get: function() { return this }, | |
configurable: true, | |
}); | |
exports.Observable = Observable; | |
}, "*"); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment