Skip to content

Instantly share code, notes, and snippets.

@evxn
Last active March 26, 2018 07:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save evxn/6b9e292a7755219ff4d10b0eb43f6aeb to your computer and use it in GitHub Desktop.
Save evxn/6b9e292a7755219ff4d10b0eb43f6aeb to your computer and use it in GitHub Desktop.
Add Observable-like entities to a queue (Promises, Observables, Subjects, Arrays, generators, Iterables). After each stream is completed (i.e. Promise resolved) it's elements are emitted to the result stream in the order of addition (first in first out). Each subscriber receives result only for items added to queue after the subscription.
import {Subject} from 'rxjs/Subject';
import {BehaviorSubject} from 'rxjs/BehaviorSubject';
import {Observable, ObservableInput} from 'rxjs/Observable';
import {concatMap, switchMap} from 'rxjs/operators';
enum QueueActions {
RESET = 'RESET',
ADD = 'ADD',
}
interface QueueCommand {
action: QueueActions;
payload?: any;
}
export class ObservableQueue<T> {
private _commands: Subject<QueueCommand> = new Subject<QueueCommand>();
private _queues: BehaviorSubject<Subject<ObservableInput<T>>> = new BehaviorSubject<Subject<ObservableInput<T>>>(new Subject<ObservableInput<T>>());
constructor() {
this._commands.asObservable()
.subscribe((command) => {
switch (command && command.action) {
case QueueActions.RESET:
this._queues.next(new Subject<ObservableInput<T>>());
break;
case QueueActions.ADD:
this._queues.value.next(command.payload);
break;
default:
this._queues.value.error(`[ObservableQueue] unknown command: ${JSON.stringify(command)}`);
}
});
}
add(item: ObservableInput<T>): this {
this._commands.next({action: QueueActions.ADD, payload: item});
return this;
}
reset(): this {
this._commands.next({action: QueueActions.RESET});
return this;
}
asObservable(): Observable<T> {
return this._queues.asObservable().pipe(
switchMap(queue =>
queue.pipe(
concatMap(item => item)
)
)
);
}
}
// Example 1: Basic usage
import {ObservableQueue} from './observable-queue'
// function producing promises
function delay(ms) {
return new Promise((resolve, reject) => {
console.log('sending ' + ms);
setTimeout(() => {
console.log('resolving ' + ms);
resolve(ms);
}, ms);
});
}
const queue = new ObservableQueue();
queue.asObservable().subscribe(res => console.log('result: ' + res));
queue.add(delay(100)) // supports fluent style calls
.add(delay(3000))
.add(delay(1000))
.add([42,43]);
setTimeout(()=>queue.add(delay(111)),5000)
// Output:
// "sending 100"
// "sending 3000"
// "sending 1000"
// "resolving 100"
// "result: 100"
// "resolving 1000"
// "resolving 3000"
// "result: 3000"
// "result: 1000"
// "result: 42"
// "result: 43"
// "sending 111"
// "resolving 111"
// "result: 111"
// Example 2: Multiple subscriptions
import {ObservableQueue} from './observable-queue'
// function producing promises
function delay(ms) {
return new Promise((resolve, reject) => {
console.log('sending ' + ms);
setTimeout(() => {
console.log('resolving ' + ms);
resolve(ms);
}, ms);
});
}
const queue = new ObservableQueue();
queue.asObservable().subscribe(res => console.log('result: ' + res));
queue
.add(delay(100)) // supports fluent style calls
.reset()
.add(delay(3000));
queue.asObservable().subscribe(res => console.log('result2: ' + res));
queue
.add(delay(1000))
.add([42,43]);
setTimeout(()=>queue.add(delay(333)),5000);
setTimeout(()=>queue.add(delay(222)),5000);
setTimeout(()=>queue.asObservable().subscribe(res => console.log('result3: ' + res)),3000)
// Output
// sending 100
// sending 3000
// sending 1000
// resolving 100
// resolving 1000
// result2: 1000
// result2: 42
// result2: 43
// resolving 3000
// result: 3000
// result: 1000
// result: 42
// result: 43
// sending 333
// sending 222
// resolving 222
// resolving 333
// result: 333
// result2: 333
// result3: 333
// result: 222
// result2: 222
// result3: 222
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment