Skip to content

Instantly share code, notes, and snippets.

@DomiR
Forked from evxn/observable-queue.ts
Created March 11, 2018 18:54
Show Gist options
  • Save DomiR/cf2f521a53b3d2e793fad5e7eabb6f77 to your computer and use it in GitHub Desktop.
Save DomiR/cf2f521a53b3d2e793fad5e7eabb6f77 to your computer and use it in GitHub Desktop.
Add Observable-like entities to a queue (Promises, Observables, Subjects, Arrays, generators, Iterables). After each stream is completed (i.e. Promise resolved) it's elements are emitted to the result stream in the order of addition (first in first out). Each subscriber receives result only for items added to queue after the subscription.
import {Subject} from 'rxjs/Subject';
import {BehaviorSubject} from 'rxjs/BehaviorSubject';
import {Observable, ObservableInput} from 'rxjs/Observable';
import {concatMap, switchMap} from 'rxjs/operators';
enum QueueActions {
RESET = 'RESET',
ADD = 'ADD',
}
interface QueueCommand {
action: QueueActions;
payload?: any;
}
export class ObservableQueue<T> {
private _commands: Subject<QueueCommand> = new Subject<QueueCommand>();
private _queues: BehaviorSubject<Subject<ObservableInput<T>>> = new BehaviorSubject<Subject<ObservableInput<T>>>(new Subject<ObservableInput<T>>());
constructor() {
this._commands.asObservable()
.subscribe((command) => {
switch (command && command.action) {
case QueueActions.RESET:
this._queues.next(new Subject<ObservableInput<T>>());
break;
case QueueActions.ADD:
this._queues.value.next(command.payload);
break;
default:
this._queues.value.error(`[ObservableQueue] unknown command: ${JSON.stringify(command)}`);
}
});
}
add(item: ObservableInput<T>): this {
this._commands.next({action: QueueActions.ADD, payload: item});
return this;
}
reset(): this {
this._commands.next({action: QueueActions.RESET});
return this;
}
asObservable(): Observable<T> {
return this._queues.asObservable().pipe(
switchMap(queue =>
queue.pipe(
concatMap(item => item)
)
)
);
}
}
// Example 1: Basic usage
import {ObservableQueue} from './observable-queue'
// function producing promises
function delay(ms) {
return new Promise((resolve, reject) => {
console.log('sending ' + ms);
setTimeout(() => {
console.log('resolving ' + ms);
resolve(ms);
}, ms);
});
}
const queue = new ObservableQueue();
queue.asObservable().subscribe(res => console.log('result: ' + res));
queue.add(delay(100)) // supports fluent style calls
.add(delay(3000))
.add(delay(1000))
.add([42,43]);
setTimeout(()=>queue.add(delay(111)),5000)
// Output:
// "sending 100"
// "sending 3000"
// "sending 1000"
// "resolving 100"
// "result: 100"
// "resolving 1000"
// "resolving 3000"
// "result: 3000"
// "result: 1000"
// "result: 42"
// "result: 43"
// "sending 111"
// "resolving 111"
// "result: 111"
// Example 2: Multiple subscriptions
import {ObservableQueue} from './observable-queue'
// function producing promises
function delay(ms) {
return new Promise((resolve, reject) => {
console.log('sending ' + ms);
setTimeout(() => {
console.log('resolving ' + ms);
resolve(ms);
}, ms);
});
}
const queue = new ObservableQueue();
queue.asObservable().subscribe(res => console.log('result: ' + res));
queue
.add(delay(100)) // supports fluent style calls
.reset()
.add(delay(3000));
queue.asObservable().subscribe(res => console.log('result2: ' + res));
queue
.add(delay(1000))
.add([42,43]);
setTimeout(()=>queue.add(delay(333)),5000);
setTimeout(()=>queue.add(delay(222)),5000);
setTimeout(()=>queue.asObservable().subscribe(res => console.log('result3: ' + res)),3000)
// Output
// sending 100
// sending 3000
// sending 1000
// resolving 100
// resolving 1000
// result2: 1000
// result2: 42
// result2: 43
// resolving 3000
// result: 3000
// result: 1000
// result: 42
// result: 43
// sending 333
// sending 222
// resolving 222
// resolving 333
// result2: 333
// result3: 333
// result2: 222
// result3: 222
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment