Skip to content

Instantly share code, notes, and snippets.

@spaceribs
Created October 2, 2020 13:00
Show Gist options
  • Save spaceribs/738ec30dac2874d5ba9f7718208410e9 to your computer and use it in GitHub Desktop.
Save spaceribs/738ec30dac2874d5ba9f7718208410e9 to your computer and use it in GitHub Desktop.

I've been exploring combining the concept of "Behavior Trees" (https://en.wikipedia.org/wiki/Behavior_Trees) with RxJS, and this revealed to me a combiner that I don't think currently exists, so I wanted to discuss if that's the case and if I should put up a PR to add such a combiner.

The most basic behavior tree at minimum contains an AND (sequence) combiner and an OR (selector) combiner. AND combiners run their children in sequence, and stop running if there are any failures. An AND combiner is actually very easy to translate to RxJS, it's literally just a concat.

An OR combiner instead runs it's children until success, and only fails if all it's children fail to return success. This is actually very similar to onErrorResumeNext except for the following attributes:

  • When all selectors are exhausted, throw an error.
  • Stop executing when any observable completes.

The code for the combiner in typescript/rxjs is below:

import {
  Observable,
  ObservableInput,
  throwError,
  onErrorResumeNext,
  from,
} from "rxjs";

/**
 * This combiner is very similar to `onErrorResumeNext`, except
 * for the following attributes:
 *   - When all selectors are exhausted, throw an error.
 *   - Stop executing when any observable completes.
 *
 * @see {@link onErrorResumeNext}
 *
 * @param {...ObservableInput} sources Observables (or anything that *is* observable) passed either directly or as an array.
 * @return {Observable} An Observable that concatenates all sources, one after the other,
 * ignoring all errors, such that any error causes it to move on to the next source.
 */
export function selector<T, R>(
  ...sources: Array<
    | ObservableInput<any>
    | Array<ObservableInput<any>>
    | ((...values: Array<any>) => R)
  >
): Observable<R> {
  if (sources.length === 0) {
    return throwError(new Error("No selections were successfully executed."));
  }

  const [first, ...remainder] = sources;

  if (sources.length === 1 && Array.isArray(first)) {
    return onErrorResumeNext(...first);
  }

  return new Observable((subscriber) => {
    const subNext = () =>
      subscriber.add(selector(...remainder).subscribe(subscriber));

    return from(first).subscribe({
      next(value) {
        subscriber.next(value);
      },
      error: subNext,
      complete: () => {
        subscriber.complete();
      },
    });
  });
}

The tests for this combiner are below:

import { selector } from "./selector";
import { cold, hot, time } from "jest-marbles";

describe("selector()", () => {
  it("finds the first successful selection from a set.", () => {
    const firstObs$ = cold("--#");
    const secondObs$ = cold("--z--#");
    const thirdObs$ = cold("--x--y|");
    expect(selector(firstObs$, secondObs$, thirdObs$)).toBeMarble(
      "----z----x--y|"
    );
  });

  it("completes on the first successful selection.", () => {
    const firstObs$ = cold("--y#");
    const secondObs$ = cold("--z--|");
    const thirdObs$ = cold("--x--#");
    expect(selector(firstObs$, secondObs$, thirdObs$)).toBeMarble("--y--z--|");
  });

  it("errors if no selections are successful", () => {
    const firstObs$ = cold("--y#");
    const secondObs$ = cold("z--#");
    const thirdObs$ = cold("--x--#");
    expect(selector(firstObs$, secondObs$, thirdObs$)).toBeMarble(
      "--yz----x--#"
    );
  });
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment