Skip to content

Instantly share code, notes, and snippets.

@jvanbruegge
Last active August 12, 2017 20:42
Show Gist options
  • Save jvanbruegge/063c404c1b33443d47b7656b0b7759d6 to your computer and use it in GitHub Desktop.
Save jvanbruegge/063c404c1b33443d47b7656b0b7759d6 to your computer and use it in GitHub Desktop.
Stream implementation
export interface Source<T> {
subscribe(o: Observer<T>): any;
}
export class ArraySource<T> implements Source<T> {
constructor(private array: T[]) {}
public subscribe(observer: Observer<T>): any {
this.array.forEach(t => observer.next(t));
}
}
export class Stream<T> implements Source<T> {
constructor(public subscribe: (o: Observer<T>) => void) {
setupHelpers(this);
}
[$$observable](): Stream<T> {
return this as any as Stream<T>;
}
}
export interface Helpers<T> {
map<U>(fn: (t: T) => U): Stream<U>;
fold<U>(fn: (acc: U, curr: T) => U, seed: U): Stream<U>;
}
function setupHelpers(self: any): void {
self.map = mapStream.bind(null, this);
self.fold = foldStream.bind(null, this);
}
//++++++++++++++++++ streamOperators ++++++++++++++++++++++++++++//
export function mapStream<T, U>(stream: Stream<T>, fn: (t: T) => U): Stream<U> {
return new Stream(observer => {
stream.subscribe({
next: t => observer.next(fn(t)),
error: observer.error,
complete: observer.complete
});
});
}
export function foldStream<T, U>(stream: Stream<T>, fn: (acc: U, curr: T) => U, seed: U): Stream<U> {
return new Stream(observer => {
let accumulator = seed;
stream.subscribe({
next: t => {
accumulator = fn(accumulator, t);
observer.next(accumulator);
},
error: observer.error,
complete: observer.complete
});
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment