I've been exploring combining the concept of "Behavior Trees" (https://en.wikipedia.org/wiki/Behavior_Trees) with RxJS, and this revealed to me a combiner that I don't think currently exists, so I wanted to discuss if that's the case and if I should put up a PR to add such a combiner.
The most basic behavior tree at minimum contains an AND (sequence) combiner and an OR (selector) combiner. AND combiners run their children in sequence, and stop running if there are any failures. An AND combiner is actually very easy to translate to RxJS, it's literally just a concat.
An OR combiner instead runs it's children until success, and only fails if all it's children fail to return success. This is actually very similar to onErrorResumeNext except for the following attributes:
- When all selectors are exhausted, throw an error.
- Stop executing when any observable completes.
The code for the combiner in typescript/rxjs is below:
import {
Observable,
ObservableInput,
throwError,
onErrorResumeNext,
from,
} from "rxjs";
/**
* This combiner is very similar to `onErrorResumeNext`, except
* for the following attributes:
* - When all selectors are exhausted, throw an error.
* - Stop executing when any observable completes.
*
* @see {@link onErrorResumeNext}
*
* @param {...ObservableInput} sources Observables (or anything that *is* observable) passed either directly or as an array.
* @return {Observable} An Observable that concatenates all sources, one after the other,
* ignoring all errors, such that any error causes it to move on to the next source.
*/
export function selector<T, R>(
...sources: Array<
| ObservableInput<any>
| Array<ObservableInput<any>>
| ((...values: Array<any>) => R)
>
): Observable<R> {
if (sources.length === 0) {
return throwError(new Error("No selections were successfully executed."));
}
const [first, ...remainder] = sources;
if (sources.length === 1 && Array.isArray(first)) {
return onErrorResumeNext(...first);
}
return new Observable((subscriber) => {
const subNext = () =>
subscriber.add(selector(...remainder).subscribe(subscriber));
return from(first).subscribe({
next(value) {
subscriber.next(value);
},
error: subNext,
complete: () => {
subscriber.complete();
},
});
});
}
The tests for this combiner are below:
import { selector } from "./selector";
import { cold, hot, time } from "jest-marbles";
describe("selector()", () => {
it("finds the first successful selection from a set.", () => {
const firstObs$ = cold("--#");
const secondObs$ = cold("--z--#");
const thirdObs$ = cold("--x--y|");
expect(selector(firstObs$, secondObs$, thirdObs$)).toBeMarble(
"----z----x--y|"
);
});
it("completes on the first successful selection.", () => {
const firstObs$ = cold("--y#");
const secondObs$ = cold("--z--|");
const thirdObs$ = cold("--x--#");
expect(selector(firstObs$, secondObs$, thirdObs$)).toBeMarble("--y--z--|");
});
it("errors if no selections are successful", () => {
const firstObs$ = cold("--y#");
const secondObs$ = cold("z--#");
const thirdObs$ = cold("--x--#");
expect(selector(firstObs$, secondObs$, thirdObs$)).toBeMarble(
"--yz----x--#"
);
});
});