Skip to content

Instantly share code, notes, and snippets.

@ORESoftware
Last active December 27, 2020 01:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ORESoftware/e73257ee495f949d2e2ce262af4bac9e to your computer and use it in GitHub Desktop.
Save ORESoftware/e73257ee495f949d2e2ce262af4bac9e to your computer and use it in GitHub Desktop.

If I want to create an async queue with concurrency = 1, I use promises like so:

let p = Promise.resolve()

export const runTask(run, onerror){
    return p = p.then(run, onerror);
}

this will run each promise after the previous one completes, so a queue with concurrency of 1.

Is there a way to achieve this with Observables? Something like this:

let curr = of(null)

export const runTask = (o: Observable<any>) => {
  return curr = curr.pipe()  // .... but what?
}

First attempt:

This seems to work, but I can't get the concurrency to be anything other than 1

import {Observable, of, merge} from "rxjs";
import {mergeAll, mergeMap} from "rxjs/operators";

let curr = of(null)

export const runTask = (o: Observable<any>) => {
  return curr = curr.pipe(
    mergeMap(() => o, 3)  // concurrency is set to 3, but it still functions as 1
  )
}

runTask(new Observable(obs => {
  setTimeout(() => {
    obs.next('boop 1')
  }, 1000);

})).subscribe(v => {
  console.log(v);
})

runTask(new Observable(obs => {
  setTimeout(() => {
    obs.next('boop 2')
  }, 3000);

})).subscribe(v => {
  console.log(v);
})

runTask(new Observable(obs => {
  obs.next('boop 3')
})).subscribe(v => {
  console.log(v);
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment