-
-
Save tommedema/04de4004ee5b0f17fcb3fd29b2ad92b1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* A simplified and more robust version of the standard amplitude retry class | |
* with a few important changes: | |
* | |
* - resolved issue https://github.com/amplitude/Amplitude-Node/issues/86 | |
* where on retry, a separate async chain would be initiated and the | |
* flush promise would resolve immediately | |
* - resolved issue where rate limit errors are swallowed and | |
* where not all rate limits are considered, see also | |
* https://github.com/amplitude/Amplitude-Node/issues/86 | |
* - removed actual retry mechanism; instead consuming application | |
* can push events to a retry queue. In the future we can consider | |
* adding a simple retry layer here and then bubble up | |
* retry failures instead of flush failures. | |
* | |
* @see https://github.com/amplitude/Amplitude-Node/blob/main/packages/node/src/retry/defaultRetry.ts | |
*/ | |
import { | |
Event, | |
Options, | |
Transport, | |
Payload, | |
PayloadOptions, | |
Status, | |
Response, | |
RetryClass, | |
} from '@amplitude/types' | |
import { DEFAULT_OPTIONS } from '.' | |
import { setupDefaultTransport } from './util' | |
import { AmplitudeFlushError } from './errors' | |
const DAY_IN_MS = 24 * 60 * 60 * 1000 | |
/** | |
* How quickly a throttled event should be retried. | |
* @see https://developers.amplitude.com/docs/http-api-v2#response-format | |
*/ | |
const THROTTLE_RETRY_MS = 30 * 1000 | |
/** | |
* @param event the actual event that failed to sent | |
* @param isRetryable whether the event can be retried, | |
* these are not caused by application errors but rather by network | |
* issues or rate limits. Events that cannot be retried should be seen as | |
* application errors that should be handled, e.g. as a result of an event | |
* that weas created with an invalid payload. | |
* @param retryInMs defined if isRetryable is true, in which case it's | |
* the suggested number of miliseconds in which the event can be retried | |
*/ | |
export interface IAmplitudeFlushErrorEventDescription { | |
event: Event | |
isRetryable: boolean | |
retryInMs?: number | |
errorResponse?: Response | |
} | |
export class AmplitudeFlushErrorHandler implements RetryClass { | |
protected readonly _apiKey: string | |
protected readonly _options: Options | |
private readonly _transport: Transport | |
public constructor(apiKey: string, options: Partial<Options>) { | |
this._apiKey = apiKey | |
this._options = { ...DEFAULT_OPTIONS, ...options } | |
this._transport = this._options.transportClass ?? setupDefaultTransport(this._options) | |
} | |
/** | |
* @inheritDoc | |
*/ | |
public async sendEventsWithRetry(events: readonly Event[]): Promise<Response> { | |
let response: Response = { status: Status.Unknown, statusCode: 0 } | |
try { | |
response = await this._transport.sendPayload(this._getPayload(events)) | |
if (response.status !== Status.Success) { | |
throw new Error(response.status) | |
} | |
} catch { | |
const flushError = this._getFlushError(events, response) | |
throw flushError | |
} | |
return response | |
} | |
/** | |
* Given an error response, returns an `AmplitudeFlushError` | |
* containing the events that failed to flush with the suggested | |
* time in which to retry. | |
*/ | |
private _getFlushError(events: readonly Event[], errorResponse: Response): AmplitudeFlushError { | |
let failedEventDescriptions: IAmplitudeFlushErrorEventDescription[] = [] | |
if (errorResponse.status === Status.RateLimit) { | |
if (!errorResponse.body) { | |
throw new Error('received rate limit response without a body') | |
} | |
const { | |
exceededDailyQuotaUsers = {}, | |
exceededDailyQuotaDevices = {}, | |
throttledEvents, | |
} = errorResponse.body | |
// Retry throttledEvents when applicable | |
failedEventDescriptions = throttledEvents.map((eventIndex) => { | |
const event = events[eventIndex] | |
const hasReachedDailyQuota = | |
(event.user_id && event.user_id in exceededDailyQuotaUsers) || | |
(event.device_id && event.device_id in exceededDailyQuotaDevices) | |
return { | |
event, | |
isRetryable: true, | |
retryInMs: hasReachedDailyQuota ? DAY_IN_MS : THROTTLE_RETRY_MS, | |
} | |
}) | |
} else if (errorResponse.status === Status.Unknown || errorResponse.status === Status.Failed) { | |
// Network error or Amplitude error, all events failed, retryable immediately | |
failedEventDescriptions = events.map((event) => ({ | |
event, | |
isRetryable: true, | |
retryInMs: 0, | |
})) | |
} else if ( | |
errorResponse.status === Status.PayloadTooLarge || | |
errorResponse.status === Status.Invalid | |
) { | |
// Application error because we've sent an invalid payload | |
failedEventDescriptions = events.map((event) => ({ | |
event, | |
errorResponse, | |
isRetryable: false, | |
})) | |
} else if (errorResponse.status === Status.Success) { | |
throw new Error('tried to create a flush error for a successful') | |
} else { | |
throw new Error( | |
'tried to create a flush error with an invalid errorResponse: ' + | |
JSON.stringify(errorResponse) | |
) | |
} | |
return new AmplitudeFlushError(failedEventDescriptions) | |
} | |
protected _getPayloadOptions(): { options?: PayloadOptions } { | |
if (typeof this._options.minIdLength === 'number') { | |
return { | |
options: { | |
min_id_length: this._options.minIdLength, | |
}, | |
} | |
} | |
return {} | |
} | |
protected _getPayload(events: readonly Event[]): Payload { | |
return { | |
api_key: this._apiKey, | |
events, | |
...this._getPayloadOptions(), | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment