Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save schmidt-sebastian/302c19baa2eda89b23435da41afb077f to your computer and use it in GitHub Desktop.
Save schmidt-sebastian/302c19baa2eda89b23435da41afb077f to your computer and use it in GitHub Desktop.
diff --git a/dev/src/bulk-writer.ts b/dev/src/bulk-writer.ts
index e4f4ca7..fea0994 100644
--- a/dev/src/bulk-writer.ts
+++ b/dev/src/bulk-writer.ts
@@ -276,9 +276,6 @@ class BulkCommitBatch {
const stack = Error().stack!;
let results: BatchWriteResult[] = [];
- for (let attempt = 0; attempt < MAX_RETRY_ATTEMPTS; attempt++) {
- await this.backoff.backoffAndWait();
-
try {
results = await this.writeBatch.bulkCommit();
} catch (err) {
@@ -290,69 +287,31 @@ class BulkCommitBatch {
};
});
}
- this.processResults(results, /* allowRetry= */ true);
-
- if (this.pendingOps.length > 0) {
- logger(
- 'BulkWriter.bulkCommit',
- null,
- `Current batch failed at retry #${attempt}. Num failures: ` +
- `${this.pendingOps.length}.`
- );
-
- this.writeBatch = new WriteBatch(
- this.firestore,
- this.writeBatch,
- new Set(this.pendingOps.map(op => op.writeBatchIndex))
- );
- } else {
- this.completedDeferred.resolve();
- return;
- }
- }
-
- this.processResults(results);
- this.completedDeferred.resolve();
+
+ this.processResults(results);
+ this.completedDeferred.resolve();
}
/**
* Resolves the individual operations in the batch with the results.
*/
private processResults(
- results: BatchWriteResult[],
- allowRetry = false
+ results: BatchWriteResult[]
): void {
- const newPendingOps: Array<PendingOp> = [];
for (let i = 0; i < results.length; i++) {
const result = results[i];
const op = this.pendingOps[i];
if (result.status.code === Status.OK) {
op.deferred.resolve(result);
- } else if (!allowRetry || !this.shouldRetry(result.status.code)) {
+ } else {
const error = new BulkWriterError(
op.operation,
(result.status.code as unknown) as GrpcStatus,
result.status.message
);
op.deferred.reject(error);
- } else {
- // Retry the operation if it has not been processed.
- // Store the current index of pendingOps to preserve the mapping of
- // this operation's index in the underlying WriteBatch.
- newPendingOps.push({
- writeBatchIndex: i,
- deferred: op.deferred,
- operation: op.operation,
- });
}
}
-
- this.pendingOps = newPendingOps;
- }
-
- private shouldRetry(code: Status | undefined): boolean {
- const retryCodes = getRetryCodes('batchWrite');
- return code !== undefined && retryCodes.includes(code);
}
/**
@@ -404,7 +363,8 @@ export class BulkWriterError extends Error {
constructor(
readonly operation: BulkWriterOperation,
readonly code: GrpcStatus,
- readonly message: string
+ readonly message: string,
+ readonly attemptCount = 0
) {
super(message);
}
@@ -464,7 +424,13 @@ export class BulkWriter {
* The user-provided callback to be run every time a BulkWriter operation
* fails.
*/
- private errorFn: (error: BulkWriterError) => void;
+ private errorFn = (error: BulkWriterError) => {
+ const retryCodes = getRetryCodes('batchWrite');
+ if (error.attemptCount < 5 && retryCodes.includes(error.code)) {
+ return this.retry(error.operation);
+ }
+ return Promise.reject(error);
+ };
/** @hideconstructor */
constructor(
@@ -472,7 +438,6 @@ export class BulkWriter {
options?: firestore.BulkWriterOptions
) {
this.firestore._incrementBulkWritersCount();
- this.errorFn = () => {};
this.successFn = () => {};
validateBulkWriterOptions(options);
@@ -765,7 +730,7 @@ export class BulkWriter {
* @param callback A callback to be called every time a BulkWriter operation
* fails.
*/
- onWriteError(callback: (error: BulkWriterError) => void): void {
+ onWriteError(callback: (error: BulkWriterError) => Promise<WriteResult>): void {
this.errorFn = callback;
}
diff --git a/dev/src/write-batch.ts b/dev/src/write-batch.ts
index 20ef4f9..b172c03 100644
--- a/dev/src/write-batch.ts
+++ b/dev/src/write-batch.ts
@@ -142,32 +142,11 @@ export class WriteBatch implements firestore.WriteBatch {
* @hideconstructor
*
* @param firestore The Firestore Database client.
- * @param retryBatch The WriteBatch that needs to be retried.
- * @param indexesToRetry The indexes of the operations from the provided
- * WriteBatch that need to be retried.
*/
- constructor(
- firestore: Firestore,
- retryBatch: WriteBatch,
- indexesToRetry: Set<number>
- );
- constructor(firestore: Firestore);
- constructor(
- firestore: Firestore,
- retryBatch?: WriteBatch,
- indexesToRetry?: Set<number>
- ) {
+ constructor(firestore: Firestore) {
this._firestore = firestore;
this._serializer = new Serializer(firestore);
this._allowUndefined = !!firestore._settings.ignoreUndefinedProperties;
-
- if (retryBatch) {
- // Creates a new WriteBatch containing only the indexes from the provided
- // indexes to retry.
- for (const index of indexesToRetry!.values()) {
- this._ops.push(retryBatch._ops[index]);
- }
- }
}
/**
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment