Skip to content

Instantly share code, notes, and snippets.

@humanchimp
Created March 25, 2017 02:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save humanchimp/45ec4fbf2bb11f5e249b433a8dd610a1 to your computer and use it in GitHub Desktop.
Save humanchimp/45ec4fbf2bb11f5e249b433a8dd610a1 to your computer and use it in GitHub Desktop.
observable toy
// from https://github.com/tc39/proposal-observable/blob/master/demo/mouse-drags.js
// Emits each element of the input stream until the control stream has emitted an
// element.
function takeUntil(stream, control) {
return new Observable(sink => {
let source = stream.subscribe(sink);
let input = control.subscribe({
next: x => sink.complete(x),
error: x => sink.error(x),
complete: x => sink.complete(x),
});
return _=> {
source.unsubscribe();
input.unsubscribe();
};
});
}
// For a nested stream, emits the elements of the inner stream contained within the
// most recent outer stream
function switchLatest(stream) {
return new Observable(sink => {
let inner = null;
let outer = stream.subscribe({
next(value) {
if (inner)
inner.unsubscribe();
inner = value.subscribe({
next: x => sink.next(x),
error: x => sink.error(x),
});
},
error: x => sink.error(x),
complete: x => sink.complete(x),
});
return _=> {
if (inner)
inner.unsubscribe();
outer.unsubscribe();
};
});
}
// Returns an observable of DOM element events
function listen(element, eventName) {
return new Observable(sink => {
function handler(event) { sink.next(event) }
element.addEventListener(eventName, handler);
return _=> element.removeEventListener(eventName, handler);
});
}
// Returns an observable of drag move events for the specified element
function mouseDrags(element) {
// For each mousedown, emit a nested stream of mouse move events which stops
// when a mouseup event occurs
let moveStreams = listen(element, "mousedown").map(e => {
e.preventDefault();
return takeUntil(
listen(element, "mousemove"),
listen(document, "mouseup"));
});
// Return a stream of mouse moves nested within the most recent mouse down
return switchLatest(moveStreams);
}
<!doctype html>
<html>
<title>observable</title>
<body>
<canvas></canvas>
<style>body {margin:0;}</style>
<script src="./zen-observable.js"></script>
<script src="./mouse-drags.js"></script>
<script>
const canvas = document.querySelector('canvas');
const context = canvas.getContext('2d');
resizeCanvas();
listen(canvas, "mousedown").subscribe({
next() {
context.beginPath();
},
});
const mouseMoves = mouseDrags(canvas);
mouseMoves.subscribe({
next(e) {
context.strokeStyle = randomRgb();
if (!e.shiftKey) {
context.lineTo(e.offsetX, e.offsetY);
}
context.stroke();
},
});
let radius = 10;
mouseMoves.filter(e => e.metaKey).subscribe({
next() {
radius = r(100);
}
});
const τ = Math.PI * 2;
mouseMoves.filter(e => e.shiftKey).subscribe({
next(e) {
context.arc(e.offsetX, e.offsetY, radius, 0, τ, true);
}
});
mouseMoves.filter(e => e.altKey).subscribe({
next() {
context.fillStyle = randomRgba();
context.fill();
}
});
listen(canvas, "mouseup").subscribe({
next(e) {
if (!e.shiftKey) {
context.fillStyle = randomRgba();
context.fill();
}
context.closePath();
},
});
function resizeCanvas() {
canvas.width = window.innerWidth;
canvas.height = window.innerHeight;
}
function r(max, decimals) {
return (Math.random() * max).toFixed(decimals);
}
function randomRgb() {
return `rgb(${r(255)},${r(255)},${r(255)}`;
}
function randomRgba() {
return `rgba(${r(255)},${r(255)},${r(255)},${r(1, 2)}`;
}
</script>
'use strict'; (function(fn, name) { if (typeof exports !== 'undefined') fn(exports, module); else if (typeof self !== 'undefined') fn(name === '*' ? self : (name ? self[name] = {} : {})); })(function(exports, module) { // === Symbol Support ===
function hasSymbol(name) {
return typeof Symbol === "function" && Boolean(Symbol[name]);
}
function getSymbol(name) {
return hasSymbol(name) ? Symbol[name] : "@@" + name;
}
// Ponyfill Symbol.observable for interoperability with other libraries
if (typeof Symbol === "function" && !Symbol.observable) {
Symbol.observable = Symbol("observable");
}
// === Abstract Operations ===
function getMethod(obj, key) {
var value = obj[key];
if (value == null)
return undefined;
if (typeof value !== "function")
throw new TypeError(value + " is not a function");
return value;
}
function getSpecies(obj) {
var ctor = obj.constructor;
if (ctor !== undefined) {
ctor = ctor[getSymbol("species")];
if (ctor === null) {
ctor = undefined;
}
}
return ctor !== undefined ? ctor : Observable;
}
function addMethods(target, methods) {
Object.keys(methods).forEach(function(k) {
var desc = Object.getOwnPropertyDescriptor(methods, k);
desc.enumerable = false;
Object.defineProperty(target, k, desc);
});
}
function cleanupSubscription(subscription) {
// Assert: observer._observer is undefined
var cleanup = subscription._cleanup;
if (!cleanup)
return;
// Drop the reference to the cleanup function so that we won't call it
// more than once
subscription._cleanup = undefined;
// Call the cleanup function
cleanup();
}
function subscriptionClosed(subscription) {
return subscription._observer === undefined;
}
function closeSubscription(subscription) {
if (subscriptionClosed(subscription))
return;
subscription._observer = undefined;
cleanupSubscription(subscription);
}
function cleanupFromSubscription(subscription) {
return function() { subscription.unsubscribe() };
}
function Subscription(observer, subscriber) {
// Assert: subscriber is callable
// The observer must be an object
if (Object(observer) !== observer)
throw new TypeError("Observer must be an object");
this._cleanup = undefined;
this._observer = observer;
var start = getMethod(observer, "start");
if (start)
start.call(observer, this);
if (subscriptionClosed(this))
return;
observer = new SubscriptionObserver(this);
try {
// Call the subscriber function
var cleanup$0 = subscriber.call(undefined, observer);
// The return value must be undefined, null, a subscription object, or a function
if (cleanup$0 != null) {
if (typeof cleanup$0.unsubscribe === "function")
cleanup$0 = cleanupFromSubscription(cleanup$0);
else if (typeof cleanup$0 !== "function")
throw new TypeError(cleanup$0 + " is not a function");
this._cleanup = cleanup$0;
}
} catch (e) {
// If an error occurs during startup, then attempt to send the error
// to the observer
observer.error(e);
return;
}
// If the stream is already finished, then perform cleanup
if (subscriptionClosed(this))
cleanupSubscription(this);
}
addMethods(Subscription.prototype = {}, {
get closed() { return subscriptionClosed(this) },
unsubscribe: function() { closeSubscription(this) },
});
function SubscriptionObserver(subscription) {
this._subscription = subscription;
}
addMethods(SubscriptionObserver.prototype = {}, {
get closed() { return subscriptionClosed(this._subscription) },
next: function(value) {
var subscription = this._subscription;
// If the stream is closed, then return undefined
if (subscriptionClosed(subscription))
return undefined;
var observer = subscription._observer;
var m = getMethod(observer, "next");
// If the observer doesn't support "next", then return undefined
if (!m)
return undefined;
// Send the next value to the sink
return m.call(observer, value);
},
error: function(value) {
var subscription = this._subscription;
// If the stream is closed, throw the error to the caller
if (subscriptionClosed(subscription))
throw value;
var observer = subscription._observer;
subscription._observer = undefined;
try {
var m$0 = getMethod(observer, "error");
// If the sink does not support "error", then throw the error to the caller
if (!m$0)
throw value;
value = m$0.call(observer, value);
} catch (e) {
try { cleanupSubscription(subscription) }
finally { throw e }
}
cleanupSubscription(subscription);
return value;
},
complete: function(value) {
var subscription = this._subscription;
// If the stream is closed, then return undefined
if (subscriptionClosed(subscription))
return undefined;
var observer = subscription._observer;
subscription._observer = undefined;
try {
var m$1 = getMethod(observer, "complete");
// If the sink does not support "complete", then return undefined
value = m$1 ? m$1.call(observer, value) : undefined;
} catch (e) {
try { cleanupSubscription(subscription) }
finally { throw e }
}
cleanupSubscription(subscription);
return value;
},
});
function Observable(subscriber) {
// The stream subscriber must be a function
if (typeof subscriber !== "function")
throw new TypeError("Observable initializer must be a function");
this._subscriber = subscriber;
}
addMethods(Observable.prototype, {
subscribe: function(observer) { for (var args = [], __$0 = 1; __$0 < arguments.length; ++__$0) args.push(arguments[__$0]);
if (typeof observer === 'function') {
observer = {
next: observer,
error: args[0],
complete: args[1],
};
}
return new Subscription(observer, this._subscriber);
},
forEach: function(fn) { var __this = this;
return new Promise(function(resolve, reject) {
if (typeof fn !== "function")
return Promise.reject(new TypeError(fn + " is not a function"));
__this.subscribe({
_subscription: null,
start: function(subscription) {
if (Object(subscription) !== subscription)
throw new TypeError(subscription + " is not an object");
this._subscription = subscription;
},
next: function(value) {
var subscription = this._subscription;
if (subscription.closed)
return;
try {
return fn(value);
} catch (err) {
reject(err);
subscription.unsubscribe();
}
},
error: reject,
complete: resolve,
});
});
},
map: function(fn) { var __this = this;
if (typeof fn !== "function")
throw new TypeError(fn + " is not a function");
var C = getSpecies(this);
return new C(function(observer) { return __this.subscribe({
next: function(value) {
if (observer.closed)
return;
try { value = fn(value) }
catch (e) { return observer.error(e) }
return observer.next(value);
},
error: function(e) { return observer.error(e) },
complete: function(x) { return observer.complete(x) },
}); });
},
filter: function(fn) { var __this = this;
if (typeof fn !== "function")
throw new TypeError(fn + " is not a function");
var C = getSpecies(this);
return new C(function(observer) { return __this.subscribe({
next: function(value) {
if (observer.closed)
return;
try { if (!fn(value)) return undefined }
catch (e) { return observer.error(e) }
return observer.next(value);
},
error: function(e) { return observer.error(e) },
complete: function() { return observer.complete() },
}); });
},
reduce: function(fn) { var __this = this;
if (typeof fn !== "function")
throw new TypeError(fn + " is not a function");
var C = getSpecies(this);
var hasSeed = arguments.length > 1;
var hasValue = false;
var seed = arguments[1];
var acc = seed;
return new C(function(observer) { return __this.subscribe({
next: function(value) {
if (observer.closed)
return;
var first = !hasValue;
hasValue = true;
if (!first || hasSeed) {
try { acc = fn(acc, value) }
catch (e) { return observer.error(e) }
} else {
acc = value;
}
},
error: function(e) { observer.error(e) },
complete: function() {
if (!hasValue && !hasSeed) {
observer.error(new TypeError("Cannot reduce an empty sequence"));
return;
}
observer.next(acc);
observer.complete();
},
}); });
},
flatMap: function(fn) { var __this = this;
if (typeof fn !== "function")
throw new TypeError(fn + " is not a function");
var C = getSpecies(this);
return new C(function(observer) {
var completed = false;
var subscriptions = [];
// Subscribe to the outer Observable
var outer = __this.subscribe({
next: function(value) {
if (fn) {
try {
value = fn(value);
} catch (x) {
observer.error(x);
return;
}
}
// Subscribe to the inner Observable
Observable.from(value).subscribe({
_subscription: null,
start: function(s) { subscriptions.push(this._subscription = s) },
next: function(value) { observer.next(value) },
error: function(e) { observer.error(e) },
complete: function() {
var i = subscriptions.indexOf(this._subscription);
if (i >= 0)
subscriptions.splice(i, 1);
closeIfDone();
}
});
},
error: function(e) {
return observer.error(e);
},
complete: function() {
completed = true;
closeIfDone();
}
});
function closeIfDone() {
if (completed && subscriptions.length === 0)
observer.complete();
}
return function() {
subscriptions.forEach(function(s) { return s.unsubscribe(); });
outer.unsubscribe();
};
});
},
});
Object.defineProperty(Observable.prototype, getSymbol("observable"), {
value: function() { return this },
writable: true,
configurable: true,
});
addMethods(Observable, {
from: function(x) {
var C = typeof this === "function" ? this : Observable;
if (x == null)
throw new TypeError(x + " is not an object");
var method = getMethod(x, getSymbol("observable"));
if (method) {
var observable$0 = method.call(x);
if (Object(observable$0) !== observable$0)
throw new TypeError(observable$0 + " is not an object");
if (observable$0.constructor === C)
return observable$0;
return new C(function(observer) { return observable$0.subscribe(observer); });
}
if (hasSymbol("iterator") && (method = getMethod(x, getSymbol("iterator")))) {
return new C(function(observer) {
for (var __$0 = (method.call(x))[Symbol.iterator](), __$1; __$1 = __$0.next(), !__$1.done;) { var item$0 = __$1.value;
observer.next(item$0);
if (observer.closed)
return;
}
observer.complete();
});
}
if (Array.isArray(x)) {
return new C(function(observer) {
for (var i$0 = 0; i$0 < x.length; ++i$0) {
observer.next(x[i$0]);
if (observer.closed)
return;
}
observer.complete();
});
}
throw new TypeError(x + " is not observable");
},
of: function() { for (var items = [], __$0 = 0; __$0 < arguments.length; ++__$0) items.push(arguments[__$0]);
var C = typeof this === "function" ? this : Observable;
return new C(function(observer) {
for (var i$1 = 0; i$1 < items.length; ++i$1) {
observer.next(items[i$1]);
if (observer.closed)
return;
}
observer.complete();
});
},
});
Object.defineProperty(Observable, getSymbol("species"), {
get: function() { return this },
configurable: true,
});
exports.Observable = Observable;
}, "*");
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment