Skip to content

Instantly share code, notes, and snippets.

@LibertyBeta
Last active December 20, 2017 18:07
Show Gist options
  • Save LibertyBeta/e1dc564ac256c57209b7e90f9f2fc677 to your computer and use it in GitHub Desktop.
Save LibertyBeta/e1dc564ac256c57209b7e90f9f2fc677 to your computer and use it in GitHub Desktop.
import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { Subject } from 'rxjs/Subject';
import { ReplaySubject } from 'rxjs/ReplaySubject';
import { Observable } from 'rxjs/Observable';
import { Subscription } from 'rxjs/Subscription';
import { Subscriber } from 'rxjs/Subscriber';
import { mergeMap, merge, catchError, take, filter, finalize, map, distinctUntilChanged } from 'rxjs/operators';
import 'rxjs/operator/map';
import 'rxjs/operator/merge';
import 'rxjs/add/observable/from';
interface PageRequest {
id: number;
request: Observable<any>;
}
interface Work {
source: Observable<any>;
subscriber: Subscriber<{}>;
}
@Injectable()
export class FetchBatchService {
private _que: Set<number> = new Set<number>();
readonly _activeWork = new Map<number, number>();
readonly completed = new Set<number>();
private _requests: ReplaySubject<any> = new ReplaySubject<any>();
constructor(private _alfred: HttpClient) {
}
get results() {
return this._requests.asObservable();
}
public query(id: number, file?) {
if (!this._activeWork.has(id) && !this.completed.has(id)) {
this._que.add(id);
}
this.canWeWork(5);
return this._requests.pipe(
filter(x => x.id === id),
map(data => data.payload),
take(1)
);
}
private canWeWork(n: Number): void {
while (this._que.size > 0 && this._activeWork.size < n) {
const work = this._que.keys().next().value;
this._que.delete(work);
this._activeWork.set(work, work);
const sub = this._alfred.get(`/thing/${work}`, {
...args
}).subscribe(
value => {
this._requests.next({ id: work, payload: value });
this.completed.add(work);
},
err => {
try {
this._requests.next(err);
} finally {
this.canWeWork(n);
}
},
() => {
this._activeWork.delete(work);
this.canWeWork(n);
}
);
// Track the subscription so it can be unsubscribed in the
// event of cancellation.
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment