Skip to content

Instantly share code, notes, and snippets.

@dantodev
Created August 27, 2021 09:24
Show Gist options
  • Save dantodev/49ad4f12cabae82938830a8ff91cac79 to your computer and use it in GitHub Desktop.
Save dantodev/49ad4f12cabae82938830a8ff91cac79 to your computer and use it in GitHub Desktop.
function createObservable(observerCallback) {
if (typeof observerCallback !== "function") {
throw new TypeError("createObservable() must be be called with callback function");
}
let started = false;
let observable = {};
let disposeCallback = null;
let errorCallback = null;
let completeCallback = null;
let subscribeCallback = null;
observable.startObserver = () => {
if (started) {
return;
}
started = true;
let observer = {
next(data = {}) {
if (typeof subscribeCallback === "function") {
subscribeCallback(data);
}
},
complete() {
if (typeof completeCallback === "function") {
completeCallback();
}
observable.dispose();
},
error(error) {
if (typeof errorCallback === "function") {
errorCallback(error);
}
observable.dispose();
}
}
try {
disposeCallback = observerCallback(observer);
} catch (error) {
observer(error);
}
};
observable.transform = callback => {
if (typeof callback !== "function") {
throw new TypeError("transform() must be be called with callback function");
}
return createObservable(transformObserver => {
observable.subscribe(data => callback(data, transformObserver.next));
observable.subscribeError(error => transformObserver.error(error));
observable.subscribeComplete(() => transformObserver.complete());
return observable.dispose;
});
};
observable.filter = (callback) => {
return observable.transform((data, next) => {
if (callback(data)) {
next();
}
});
}
observable.map = (callback) => {
return observable.transform((data, next) => {
next(callback(data));
});
}
observable.subscribe = callback => {
if (typeof callback !== "function") {
throw new TypeError("observable.subscribe() must be be called with callback function");
}
subscribeCallback = callback;
observable.startObserver();
return observable;
};
observable.subscribeError = callback => {
if (typeof callback !== "function") {
throw new TypeError("observable.subscribeError() must be be called with callback function");
}
errorCallback = callback;
observable.startObserver();
return observable;
};
observable.subscribeComplete = callback => {
if (typeof callback !== "function") {
throw new TypeError("observable.subscribeComplete() must be be called with callback function");
}
completeCallback = callback;
observable.startObserver();
return observable;
};
observable.dispose = () => {
if (typeof disposeCallback === "function") {
disposeCallback();
}
};
return observable;
}
// EXAMPLE SECTION
function createEventObservable(context, eventTypes = []) {
return createObservable(observer => {
eventTypes.forEach(eventType => {
context.addEventListener(eventType, observer.next);
});
});
}
function bufferTransform(bufferDelay) {
let buffer = [];
let bufferTimeout;
return (data, next) => {
let isFirst = buffer.length === 0;
buffer.push(data);
if (isFirst) {
bufferTimeout = setTimeout(() => {
next(buffer);
buffer = [];
}, bufferDelay);
}
};
}
createEventObservable(document, ["click"])
.transform(bufferTransform(250))
.filter(data => data.length === 2)
.subscribe(() => {
console.log(`double click at ${performance.now()}ms`);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment