Skip to content

Instantly share code, notes, and snippets.

@jeremyben
Last active November 24, 2018 23:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jeremyben/35e8a9cf637075c510456746fc4936dd to your computer and use it in GitHub Desktop.
Save jeremyben/35e8a9cf637075c510456746fc4936dd to your computer and use it in GitHub Desktop.
// pipe's operator function maps an Observable<T> to an Observable<R>
// lift's operator function maps an Observer<R> to an Observer<T>
//
// This is just another way to represent the idea of either:
// building an Observable chain down from the source to the sink
// or building an Observer chain up from the sink to the source
//
// Pipe Implementation
//
const pipe = (...fns: Function[]) => (source) => {
return fns.reduce((acc, currentFn) => currentFn(acc), source)
}
//
// Map Implementation
//
class MyMapSubscriber extends Subscriber<any> {
fn: Function
constructor(subscriber, fn) {
super(subscriber)
this.fn = fn
}
_next(value) {
this.destination.next(this.fn(value))
}
}
const map = (fn) => (source) => {
return source.lift({
call(subscriber, source) {
source.subscribe(new MyMapSubscriber(subscriber, fn))
},
})
// Shorthand for deprecated way :
// const o$ = new Observable()
// o$.source = source
// o$.operator = {
// call(subscriber, source) {
// source.subscribe(new MapSubscriber(subscriber, fn))
// },
// }
// return o$
}
//
// MergeMap Implementation
//
class MyMergeMapSubscriber extends Subscriber<any> {
fn: Function
constructor(sub, fn) {
super(sub)
this.fn = fn
}
_next(value) {
console.log('outer', value)
const o$ = this.fn(value)
o$.subscribe({
next: (value) => {
console.log(' inner', value)
this.destination.next(value)
},
})
}
}
const myMergeMap = (fn) => (source) => {
return source.lift({
call(subscriber, source) {
source.subscribe(new MyMergeMapSubscriber(subscriber, fn))
},
})
}
//
// SwitchMap Implementation
//
class MySwitchMapSubscriber extends Subscriber<any> {
innerSubscription: Subscription
fn: Function
constructor(sub, fn) {
super(sub)
this.fn = fn
}
_next(value) {
console.log('outer', value)
const o$ = this.fn(value) as Observable<any>
if (this.innerSubscription) {
this.innerSubscription.unsubscribe()
}
this.innerSubscription = o$.subscribe({
next: (value) => {
console.log(' inner', value)
this.destination.next(value)
},
})
}
}
const mySwitchMap = (fn) => (source) => {
return source.lift({
call(sub, source) {
source.subscribe(new MySwitchMapSubscriber(sub, fn))
},
})
}
//
// ConcatMap Implementation
//
class MyConcatMapSubscriber extends Subscriber<any> {
fn: Function
innerSubscription: Subscription
buffer = []
constructor(sub, fn) {
super(sub)
this.fn = fn
}
_next(value) {
if (this.innerSubscription) {
this.buffer = [...this.buffer, value]
} else {
const o$ = this.fn(value) as Observable<any>
this.innerSubscription = o$.subscribe({
next: (value) => {
this.destination.next(value)
},
complete: () => {
console.log(this.buffer)
if (this.buffer.length) {
const [first, ...rest] = this.buffer
this.buffer = rest
this._next(first)
}
},
})
this.add(this.innerSubscription)
}
}
}
const myConcatMap = (fn) => (source) => {
return source.lift({
call(sub, source) {
source.subscribe(new MyConcatMapSubscriber(sub, fn))
},
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment