Skip to content

Instantly share code, notes, and snippets.

@melbourne2991
Last active August 5, 2020 08:51
Show Gist options
  • Save melbourne2991/3a59cc569011e376fbfaa85cdd94c37c to your computer and use it in GitHub Desktop.
Save melbourne2991/3a59cc569011e376fbfaa85cdd94c37c to your computer and use it in GitHub Desktop.
ReasonML Observables Implementation
type observer('a, 'e) = {
.
next: 'a => unit,
complete: unit => unit,
error: 'e => unit,
};
type subscription = {
.
unsubscribe: unit => unit
};
type observable('a, 'e, 'c) = {
..
subscribe: observer('a, 'e) => subscription,
} as 'c;
type subject('a, 'e) = {
.
subscribe: observer('a, 'e) => subscription,
next: 'a => unit,
complete: unit => unit,
error: 'e => unit
};
type createFn('a, 'e) = observer('a, 'e) => option(unit => unit);
let createSubscription: (unit => unit) => subscription = (onUnsubscribe) => {
let subscription = {
pub unsubscribe = () => onUnsubscribe();
};
subscription;
};
module Observer = {
let noop = () => ();
let create = (onNext, ~onError=noop, ~onComplete=noop, ()) => {
let observer: observer('a, 'e) = {
pub next = onNext;
pub complete = onComplete;
pub error = onError;
};
observer;
};
};
module Observable = {
let create: (createFn('a, 'e)) => observable('a, 'e, 'c) = (createFn) => {
let observable: observable('a, 'e, 'c) = {
pub subscribe = (observer) => {
let subscriptionRef: ref(option(subscription)) = ref(None);
let callObserver = (fn) => {
switch (subscriptionRef^) {
| Some(subscription) => fn(observer)
| None => ()
};
};
let proxyObserver = {
pub next = value => {
callObserver((obs) => obs#next(value));
};
pub complete = () => {
callObserver((obs) => obs#complete());
};
pub error = (err) => {
callObserver((obs) => obs#error(err));
}
};
let maybeUnsubscribeFn = createFn(proxyObserver);
let subscription = createSubscription(() => {
subscriptionRef := None;
switch (maybeUnsubscribeFn) {
| Some(unsubscribeFn) => unsubscribeFn();
| None => ()
};
});
subscriptionRef := Some(subscription);
subscription;
};
};
observable;
};
};
module Subject {
let create: unit => subject('a, 'e) = () => {
let subject: subject('a, 'e) = {
val subscribers: ref(list(observer('a, 'e))) = ref([]);
pub subscribe = (observer) => {
let innerObservable = Observable.create(innerObserver => {
this#addSubscriber(innerObserver);
Some(() => {
this#removeSubscriber(innerObserver);
});
});
innerObservable#subscribe(observer);
};
pub next = (value: 'a) => {
this#eachSubscriber(subscriber => subscriber#next(value));
};
pub complete = () => {
this#eachSubscriber(subscriber => subscriber#complete());
};
pub error = (err: 'e) => {
this#eachSubscriber(subscriber => subscriber#error(err));
};
pri eachSubscriber = (fn) => {
List.iter(fn, subscribers^);
};
pri addSubscriber = (observer) => {
subscribers := subscribers^ @ [observer];
};
pri removeSubscriber = (observer) => {
subscribers := List.filter(subscriber =>
subscriber !== observer, subscribers^);
};
};
subject;
};
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment