Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?

If I want to create an async queue with concurrency = 1, I use promises like so:

let p = Promise.resolve()

export const runTask(run, onerror){
    return p = p.then(run, onerror);
}

this will run each promise after the previous one completes, so a queue with concurrency of 1.

Is there a way to achieve this with Observables? Something like this:

let curr = of(null)

export const runTask = (o: Observable<any>) => {
  return curr = curr.pipe()  // .... but what?
}

First attempt:

This seems to work, but I can't get the concurrency to be anything other than 1

import {Observable, of, merge} from "rxjs";
import {mergeAll, mergeMap} from "rxjs/operators";

let curr = of(null)

export const runTask = (o: Observable<any>) => {
  return curr = curr.pipe(
    mergeMap(() => o, 3)  // concurrency is set to 3, but it still functions as 1
  )
}

runTask(new Observable(obs => {
  setTimeout(() => {
    obs.next('boop 1')
  }, 1000);

})).subscribe(v => {
  console.log(v);
})

runTask(new Observable(obs => {
  setTimeout(() => {
    obs.next('boop 2')
  }, 3000);

})).subscribe(v => {
  console.log(v);
})

runTask(new Observable(obs => {
  obs.next('boop 3')
})).subscribe(v => {
  console.log(v);
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment