Skip to content

Instantly share code, notes, and snippets.

@gioragutt
Created June 5, 2021 18:07
Show Gist options
  • Save gioragutt/9f7658dafbb40c963a9b915b91fa548d to your computer and use it in GitHub Desktop.
Save gioragutt/9f7658dafbb40c963a9b915b91fa548d to your computer and use it in GitHub Desktop.
I'm an RXJS Pleb please help
import { defer, NEVER, Subject, throwError } from "rxjs";
import {
bufferTime,
catchError,
filter,
mapTo,
mergeMap,
retry,
// tap,
} from "rxjs/operators";
import type { Risk } from "./types";
import type { upsertInsights as upsertInsightsToGraph } from "./upsertInsights.mutation";
function splitArrayInTwo<T>(arr: T[]): [T[], T[]] {
const indexToSplit = arr.length / 2;
const firstHalf = arr.slice(0, indexToSplit);
const secondHalf = arr.slice(indexToSplit);
return [firstHalf, secondHalf];
}
export function handleRiskUpdates(
updates: Subject<Risk>,
upsertInsights: typeof upsertInsightsToGraph
) {
const graphUpsertRequests$ = new Subject<Risk[]>();
updates.pipe(bufferTime(1000, null, 10)).subscribe(graphUpsertRequests$);
// .subscribe((chunks) => graphUpsertRequests$.next(chunks));
return graphUpsertRequests$.pipe(
// tap((risks) => {
// if (updates.closed && risks.length === 0) {
// graphUpsertRequests$.complete();
// }
// }),
filter((risks) => risks.length > 0),
mergeMap((risks) => {
// Defer so that the promise will not be reused
return defer(() => upsertInsights(risks)).pipe(
catchError((err) => {
if (err.code === 413) {
// If Payload Too Large, make the payload smaller and retry
splitArrayInTwo(risks).forEach((chunk) =>
graphUpsertRequests$.next(chunk)
);
// NEVER to stop acting on that update
return NEVER;
}
return throwError(err);
}),
retry(3),
catchError(() => NEVER),
mapTo(risks)
);
})
);
}
import { subscribeSpyTo } from "@hirez_io/observer-spy";
import { splitEvery } from "ramda";
import { Subject } from "rxjs";
import { handleRiskUpdates } from "./handleRiskUpdates";
import { asyncFakeTime } from "./testing-utils";
import type { Risk } from "./types";
import type { upsertInsights as upsertInsightsToGraph } from "./upsertInsights.mutation";
let riskId = 0;
const createRisk = (): Risk<null> => {
riskId++;
const risk = `test_risk_${riskId}`;
(risk as any).id = risk;
return risk as any;
};
const upsertInsights: jest.MockedFunction<typeof upsertInsightsToGraph> =
jest.fn();
describe("handleRiskUpdates", () => {
beforeEach(() => {
riskId = 0;
upsertInsights.mockClear();
});
it(
"happy path",
asyncFakeTime(async (flush) => {
const updates$ = new Subject<Risk>();
const spy = subscribeSpyTo(handleRiskUpdates(updates$, upsertInsights));
upsertInsights.mockResolvedValue();
const risks = Array.from({ length: 12 }).map(createRisk);
const [firstTen, rest] = splitEvery(10, risks);
risks.forEach((risk) => updates$.next(risk));
updates$.complete();
await flush();
expect(spy.receivedComplete()).toBeTruthy();
expect(spy.getValues()).toStrictEqual([firstTen, rest]);
})
);
it(
"bla",
asyncFakeTime(async (flush) => {
const updates$ = new Subject<Risk>();
const spy = subscribeSpyTo(handleRiskUpdates(updates$, upsertInsights));
upsertInsights.mockImplementation(async (_, sentRisks) => {
console.log("sending risks", sentRisks);
if (sentRisks.length >= 10) {
const error = new Error();
Object.assign(error, { code: 413 });
throw error;
}
});
const risks = Array.from({ length: 12 }).map(createRisk);
const firstTen = risks.slice(0, 10);
const [firstChunk, secondChunk] = splitEvery(5, firstTen);
const rest = risks.slice(10);
risks.forEach((risk) => updates$.next(risk));
updates$.complete();
await flush();
expect(upsertInsights).toHaveBeenCalledWith(firstTen);
expect(upsertInsights).toHaveBeenCalledWith(firstChunk);
expect(upsertInsights).toHaveBeenCalledWith(secondChunk);
expect(spy.getValues()).toStrictEqual([rest, firstChunk, secondChunk]);
// Should be this with something like concatMap & expand
// expect(spy.getValues()).toStrictEqual([firstChunk, secondChunk, rest]);
})
);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment