Skip to content

Instantly share code, notes, and snippets.

@benlesh
Created May 19, 2016 18:44
Show Gist options
  • Save benlesh/9a114da51a04e77be353b4a1f36996ae to your computer and use it in GitHub Desktop.
Save benlesh/9a114da51a04e77be353b4a1f36996ae to your computer and use it in GitHub Desktop.
esnextbin sketch
<!doctype html>
<html>
<head>
<meta charset="utf-8">
<title>ESNextbin Sketch</title>
<!-- put additional styles and scripts here -->
</head>
<body>
<!-- put markup and other contents here -->
</body>
</html>
console.clear();
class Task {
constructor(action, observer) {
this.action = action;
this.observer = observer;
}
execute() {
const { action, observer } = this;
try {
action.call(this, observer);
} catch (err) {
observer.error(err);
}
}
unsubscribe() {
// noop
}
}
class AsyncTask extends Task {
constructor(action, delay, observer) {
super(action, observer);
this.delay = delay;
}
execute() {
this.id = setTimeout(() => super.execute(), this.delay);
}
unsubscribe() {
if (this.id) {
clearTimeout(this.id);
}
}
}
class RecursiveScheduler {
schedule(action, delay, state) {
let task;
if (delay > 0) {
task = new AsyncTask(action, delay, state);
} else {
task = new Task(action, state);
}
task.execute();
return task;
}
}
class AsyncScheduler {
schedule(action, delay, state) {
let task = new AsyncTask(action, delay, state);
task.execute();
return task;
}
}
const recursive = new RecursiveScheduler();
const asyncScheduler = new AsyncScheduler;
const noop = () => {};
const justThrow = (err) => {
console.log('thowing!', err);
throw err;
};
function ensureObserver(observerOrNext, error, complete) {
if (typeof observerOrNext === 'object') {
return observerOrNext;
} else {
return {
next: observerOrNext,
error,
complete
};
}
}
function ensureHandlers(observer) {
if (observer.next && observer.error && observer.complete) {
return observer;
}
return {
next: (observer.next && observer.next.bind(observer)) || noop,
error: (observer.error && observer.error.bind(observer)) || justThrow,
complete: (observer.complete && observer.complete.bind(observer)) || noop
};
}
class ProducerObserver {
constructor(destination, scheduler = recursive) {
this.scheduler = scheduler;
this.destination = ensureHandlers(destination);
}
next(value) {
if (!this.isStopped) {
this.scheduler.schedule(() => this.destination.next(value), 0, this);
}
}
error(err) {
if (!this.isStopped) {
this.isStopped = true;
this.scheduler.schedule(() => this.destination.error(err), 0, this);
}
}
complete(value) {
if (!this.isStopped) {
this.isStopped = true;
this.scheduler.schedule(() => this.destination.complete(value), 0, this);
}
}
}
class OperatorObserver {
constructor(destination) {
this.destination = ensureHandlers(destination);
}
next(value) {
this.destination.next(value);
}
error(err) {
this.destination.error(err);
}
complete(value) {
this.destination.complete(value);
}
}
class MapOperator {
constructor(project) {
this.project = project;
}
call(source, destination) {
return source.subscribe(new MapObserver(destination, this.project));
}
}
class MapObserver extends OperatorObserver {
constructor(destination, project) {
super(destination);
this.project = project;
}
next(value) {
super.next(this.project(value));
}
}
class FilterOperator {
constructor(predicate) {
this.predicate = predicate;
}
call(source, destination) {
return source.subscribe(new FilterObserver(destination, this.predicate));
}
}
class FilterObserver extends OperatorObserver {
constructor(destination, predicate) {
super(destination);
this.predicate = predicate;
}
next(value) {
if (this.predicate(value)) {
super.next(value);
}
}
}
class DoOperator {
constructor(callback) {
this.callback = callback;
}
call(source, destination) {
return source.subscribe(new DoObserver(destination, this.callback));
}
}
class DoObserver extends OperatorObserver {
constructor(destination, callback) {
super(destination);
this.callback = callback;
}
next(value) {
this.callback(value);
super.next(value);
}
}
class TakeOperator {
constructor(count) {
this.count = count;
}
call(source, destination) {
return source.subscribe(new TakeObserver(destination, this.count));
}
}
class TakeObserver extends OperatorObserver {
counter = 0;
constructor(destination, count) {
super(destination);
this.count = count;
}
next(value) {
this.counter++;
if (this.counter < this.count) {
super.next(value);
} else {
super.complete();
}
}
}
class Observable {
static fromPromise(promise, scheduler = recursive) {
return new Observable((observer) => {
promise.then(x => observer.next(x))
.catch(err => observer.error(err));
}, scheduler);
}
static fromIterable(iterable, scheduler = recursive) {
return new Observable((observer) => {
for(let x in iterable) {
observer.next(x);
}
observer.complete();
}, scheduler);
}
from(source, scheduler = recursive) {
if (Array.isArray(source) || source[Symbol.iterator]) {
return Observable.fromIterable(source, scheduler);
}
if (typeof source.then === 'function') {
return Observable.fromPromise(source, scheduler);
}
}
constructor(_subscribe, scheduler = recursive) {
this._subscribe = _subscribe;
this.scheduler = scheduler;
}
subscribe(observerOrNext, error, complete) {
let observer = ensureObserver(observerOrNext, error, complete);
let teardown;
if (!this.operator) {
let observer = new ProducerObserver(observer, this.scheduler)
let source = this;
this.scheduler.schedule(() => {
teardown = source._subscribe(observer);
}, 0, observer);
} else {
teardown = this.operator.call(this.source, observer);
}
return {
unsubscribe() {
if (typeof teardown === 'function') {
teardown();
} else if(teardown && teardown.unsubscribe) {
teardown.unsubscribe();
}
}
};
}
static timer(delay = 0, scheduler = asyncScheduler) {
return new Observable(observer => {
return scheduler.schedule(() => observer.next(0), delay, scheduler);
});
}
lift(operator) {
let obs = new Observable();
obs.source = this;
obs.operator = operator;
return obs;
}
map(project) {
return this.lift(new MapOperator(project));
}
filter(predicate) {
return this.lift(new FilterOperator(predicate));
}
do(callback) {
return this.lift(new DoOperator(callback));
}
take(count) {
return this.lift(new TakeOperator(count));
}
catch(handler) {
const source = this;
return new Observable(observer => {
let innerSubscription;
let catchObserver = {
next(x) { observer.next(x) },
error(err) {
let result = handler(err);
innerSubscription = result.subscribe(observer);
}
};
let subscription = source.subscribe(catchObserver);
return () => {
subscription.unsubscribe();
if (innerSubscription) {
innerSubscription.unsubscribe();
}
};
});
}
}
const test = new Observable((observer) => {
let i = 0;
let id = setInterval(() => {
observer.next(i++);
}, 100);
return () => clearInterval(id);
});
const test2 = new Observable((observer) => {
for (let i = 0; i < 50 && !observer.isStopped; i++) {
observer.next(i);
}
observer.complete();
})
const sub = test2.map(x => x + x)
.filter(x => x % 3 === 0)
.do(x => {
console.log('do', x);
if (x > 18) {
throw new Error('ha ha');
}
})
.catch((err) => {
return Observable.from(['weee']);
})
.subscribe({
next(x) { console.log(x) }
});
{
"name": "esnextbin-sketch",
"version": "0.0.0"
}
/*
unknown: We found a path that isn't a NodePath instance. Possiblly due to bad serialisation.
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment