Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Rerun observable until the response meets a given condition
export type RepeatUntilOperator = <T>(
conditionChecker: (response: T) => boolean,
delayInterval?: number
) => (source$: Observable<T>) => Observable<T>;
/**
* Rerun source observable until the response meets a defined condition.
* @param conditionChecker Function which checks every response for a given condition
* @param delayInterval [Optional] Delay between a response and a new request if condition is not met. Unit: ms. Default: 5000.
*/
export const repeatUntil: RepeatUntilOperator = <T>(
conditionChecker: (response: T) => boolean,
delayInterval: number = 5000
) =>
(source$: Observable<T>) =>
new Observable((observer: Subscriber<T>) => {
const load$: BehaviorSubject<null> = new BehaviorSubject<null>(null);
const refresh$: Observable<null> = of<null>(null).pipe(
delay(delayInterval),
tap(() => load$.next(null)),
skip(1)
);
const repeat$: Observable<T> = concat(source$, refresh$);
load$.pipe(
concatMap(() => repeat$),
filter((response) => conditionChecker(response)),
take(1)
).subscribe(observer);
});
@Injectable({
providedIn: 'root'
})
export class YourHttpService {
static BASE_URL = `${environment.apigateway}/report`;
static STATUS_URL = `${VeryImportantHttpService.BASE_URL}/status`;
static DOWNLOAD_URL = `${VeryImportantHttpService.BASE_URL}/download`;
constructor(private http: HttpClient) {}
getImportantReport() {
return this.http.get<{ reportId: string; }>(YourHttpService.BASE_URL)
.pipe(
switchMap(({ reportId }) =>
this.http.get<{ status: 'IN_PROGRESS'|'DONE' }>(YourHttpService.STATUS_URL, { reportId })
.pipe(
repeatUntil(({ status }) => status === 'DONE', 2000),
map(() => reportId)
)
),
switchMap(({ reportId }) =>
this.http.get<{ url: string }>(YourHttpService.DOWNLOAD_URL, { reportId })
.pipe(
map(({ url }) => url)
)
)
)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment