Skip to content

Instantly share code, notes, and snippets.

@reznord
Created June 9, 2020 10:07
Show Gist options
  • Save reznord/128594a5ccbb8923c603c3444d8fddb4 to your computer and use it in GitHub Desktop.
Save reznord/128594a5ccbb8923c603c3444d8fddb4 to your computer and use it in GitHub Desktop.
import fetch from "isomorphic-unfetch";
import {
makeSubject,
share,
pipe,
merge,
filter,
buffer,
take,
fromArray,
fromPromise,
fromValue,
concat,
delay,
mergeMap,
takeUntil,
} from "wonka";
export const authExchange = ({
initialDelayMs,
maxDelayMs,
randomDelay,
maxNumberAttempts,
retryIf: retryIfOption,
}) => {
let getNewSession;
const MIN_DELAY = initialDelayMs || 1000;
const MAX_DELAY = maxDelayMs || 15000;
const MAX_ATTEMPTS = maxNumberAttempts || 2;
const RANDOM_DELAY = randomDelay || true;
const retryIf = retryIfOption || ((err) => err && err.networkError);
// this is where the API call to get new session will be.
if (retryIf) {
const fetchOpts = {
method: "PUT",
body: JSON.stringify({}),
};
getNewSession = (opCtx) => {
console.log(opCtx);
return fetch(process.env.api_authentication_base_url, fetchOpts)
.then((r) => r.json)
.then((res) => console.log(res));
};
}
return ({ forward, dispatchDebug }) => (ops$) => {
let bufferedOps$ = [];
const sharedOps$ = pipe(ops$, share);
// bufferedOps$ = pipe(
// sharedOps$,
// buffer(fromPromise(getNewSession)),
// take(1),
// mergeMap(fromArray),
// );
const inputOps$ = pipe(concat([bufferedOps$, sharedOps$]), share);
const { source: retry$, next: nextRetryOperation } = makeSubject();
const retryWithBackoff$ = pipe(
retry$,
mergeMap((op) => {
const { key, context } = op;
const retryCount = (context.retryCount || 0) + 1;
let delayAmount = context.retryDelay || MIN_DELAY;
bufferedOps$ = pipe(
sharedOps$,
buffer(fromPromise(getNewSession(op))),
take(1),
mergeMap(fromArray),
);
const backoffFactor = Math.random() + 1.5;
// if randomDelay is enabled and it won't exceed the max delay, apply a random
// amount to the delay to avoid thundering herd problem
if (RANDOM_DELAY && delayAmount * backoffFactor < MAX_DELAY) {
delayAmount *= backoffFactor;
}
// We stop the retries if a teardown event for this operation comes in
// But if this event comes through regularly we also stop the retries, since it's
// basically the query retrying itself, no backoff should be added!
const teardown$ = pipe(
inputOps$,
filter((opt) => {
return (
(opt.operationName === "query" ||
opt.operationName === "teardown") &&
opt.key === key
);
}),
);
dispatchDebug({
type: "retryAttempt",
message: `The operation has failed and a retry has been triggered (${retryCount} / ${MAX_ATTEMPTS})`,
operation: op,
data: {
retryCount,
},
});
// Add new retryDelay and retryCount to operation
return pipe(
fromValue({
...op,
context: {
...op.context,
retryDelay: delayAmount,
retryCount,
},
}),
delay(delayAmount),
// Stop retry if a teardown comes in
takeUntil(teardown$),
);
}),
);
const result$ = pipe(
merge([inputOps$, retryWithBackoff$]),
forward,
share,
filter((res) => {
// Only retry if the error passes the conditional retryIf function (if passed)
// or if the error contains a networkError
if (!res.error || !retryIf(res.error)) {
return true;
}
const maxNumberAttemptsExceeded =
(res.operation.context.retryCount || 0) >= MAX_ATTEMPTS - 1;
if (!maxNumberAttemptsExceeded) {
// Send failed responses to be retried by calling next on the retry$ subject
// Exclude operations that have been retried more than the specified max
nextRetryOperation(res.operation);
return false;
}
dispatchDebug({
type: "retryExhausted",
message:
"Maximum number of retries has been reached. No further retries will be performed.",
operation: res.operation,
});
return true;
}),
);
return result$;
};
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment