Created
October 13, 2020 20:50
-
-
Save schmidt-sebastian/302c19baa2eda89b23435da41afb077f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/dev/src/bulk-writer.ts b/dev/src/bulk-writer.ts | |
index e4f4ca7..fea0994 100644 | |
--- a/dev/src/bulk-writer.ts | |
+++ b/dev/src/bulk-writer.ts | |
@@ -276,9 +276,6 @@ class BulkCommitBatch { | |
const stack = Error().stack!; | |
let results: BatchWriteResult[] = []; | |
- for (let attempt = 0; attempt < MAX_RETRY_ATTEMPTS; attempt++) { | |
- await this.backoff.backoffAndWait(); | |
- | |
try { | |
results = await this.writeBatch.bulkCommit(); | |
} catch (err) { | |
@@ -290,69 +287,31 @@ class BulkCommitBatch { | |
}; | |
}); | |
} | |
- this.processResults(results, /* allowRetry= */ true); | |
- | |
- if (this.pendingOps.length > 0) { | |
- logger( | |
- 'BulkWriter.bulkCommit', | |
- null, | |
- `Current batch failed at retry #${attempt}. Num failures: ` + | |
- `${this.pendingOps.length}.` | |
- ); | |
- | |
- this.writeBatch = new WriteBatch( | |
- this.firestore, | |
- this.writeBatch, | |
- new Set(this.pendingOps.map(op => op.writeBatchIndex)) | |
- ); | |
- } else { | |
- this.completedDeferred.resolve(); | |
- return; | |
- } | |
- } | |
- | |
- this.processResults(results); | |
- this.completedDeferred.resolve(); | |
+ | |
+ this.processResults(results); | |
+ this.completedDeferred.resolve(); | |
} | |
/** | |
* Resolves the individual operations in the batch with the results. | |
*/ | |
private processResults( | |
- results: BatchWriteResult[], | |
- allowRetry = false | |
+ results: BatchWriteResult[] | |
): void { | |
- const newPendingOps: Array<PendingOp> = []; | |
for (let i = 0; i < results.length; i++) { | |
const result = results[i]; | |
const op = this.pendingOps[i]; | |
if (result.status.code === Status.OK) { | |
op.deferred.resolve(result); | |
- } else if (!allowRetry || !this.shouldRetry(result.status.code)) { | |
+ } else { | |
const error = new BulkWriterError( | |
op.operation, | |
(result.status.code as unknown) as GrpcStatus, | |
result.status.message | |
); | |
op.deferred.reject(error); | |
- } else { | |
- // Retry the operation if it has not been processed. | |
- // Store the current index of pendingOps to preserve the mapping of | |
- // this operation's index in the underlying WriteBatch. | |
- newPendingOps.push({ | |
- writeBatchIndex: i, | |
- deferred: op.deferred, | |
- operation: op.operation, | |
- }); | |
} | |
} | |
- | |
- this.pendingOps = newPendingOps; | |
- } | |
- | |
- private shouldRetry(code: Status | undefined): boolean { | |
- const retryCodes = getRetryCodes('batchWrite'); | |
- return code !== undefined && retryCodes.includes(code); | |
} | |
/** | |
@@ -404,7 +363,8 @@ export class BulkWriterError extends Error { | |
constructor( | |
readonly operation: BulkWriterOperation, | |
readonly code: GrpcStatus, | |
- readonly message: string | |
+ readonly message: string, | |
+ readonly attemptCount = 0 | |
) { | |
super(message); | |
} | |
@@ -464,7 +424,13 @@ export class BulkWriter { | |
* The user-provided callback to be run every time a BulkWriter operation | |
* fails. | |
*/ | |
- private errorFn: (error: BulkWriterError) => void; | |
+ private errorFn = (error: BulkWriterError) => { | |
+ const retryCodes = getRetryCodes('batchWrite'); | |
+ if (error.attemptCount < 5 && retryCodes.includes(error.code)) { | |
+ return this.retry(error.operation); | |
+ } | |
+ return Promise.reject(error); | |
+ }; | |
/** @hideconstructor */ | |
constructor( | |
@@ -472,7 +438,6 @@ export class BulkWriter { | |
options?: firestore.BulkWriterOptions | |
) { | |
this.firestore._incrementBulkWritersCount(); | |
- this.errorFn = () => {}; | |
this.successFn = () => {}; | |
validateBulkWriterOptions(options); | |
@@ -765,7 +730,7 @@ export class BulkWriter { | |
* @param callback A callback to be called every time a BulkWriter operation | |
* fails. | |
*/ | |
- onWriteError(callback: (error: BulkWriterError) => void): void { | |
+ onWriteError(callback: (error: BulkWriterError) => Promise<WriteResult>): void { | |
this.errorFn = callback; | |
} | |
diff --git a/dev/src/write-batch.ts b/dev/src/write-batch.ts | |
index 20ef4f9..b172c03 100644 | |
--- a/dev/src/write-batch.ts | |
+++ b/dev/src/write-batch.ts | |
@@ -142,32 +142,11 @@ export class WriteBatch implements firestore.WriteBatch { | |
* @hideconstructor | |
* | |
* @param firestore The Firestore Database client. | |
- * @param retryBatch The WriteBatch that needs to be retried. | |
- * @param indexesToRetry The indexes of the operations from the provided | |
- * WriteBatch that need to be retried. | |
*/ | |
- constructor( | |
- firestore: Firestore, | |
- retryBatch: WriteBatch, | |
- indexesToRetry: Set<number> | |
- ); | |
- constructor(firestore: Firestore); | |
- constructor( | |
- firestore: Firestore, | |
- retryBatch?: WriteBatch, | |
- indexesToRetry?: Set<number> | |
- ) { | |
+ constructor(firestore: Firestore) { | |
this._firestore = firestore; | |
this._serializer = new Serializer(firestore); | |
this._allowUndefined = !!firestore._settings.ignoreUndefinedProperties; | |
- | |
- if (retryBatch) { | |
- // Creates a new WriteBatch containing only the indexes from the provided | |
- // indexes to retry. | |
- for (const index of indexesToRetry!.values()) { | |
- this._ops.push(retryBatch._ops[index]); | |
- } | |
- } | |
} | |
/** |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment