Skip to content

Instantly share code, notes, and snippets.

@tommedema
Created March 15, 2021 20:19
Show Gist options
  • Save tommedema/04de4004ee5b0f17fcb3fd29b2ad92b1 to your computer and use it in GitHub Desktop.
Save tommedema/04de4004ee5b0f17fcb3fd29b2ad92b1 to your computer and use it in GitHub Desktop.
/**
* A simplified and more robust version of the standard amplitude retry class
* with a few important changes:
*
* - resolved issue https://github.com/amplitude/Amplitude-Node/issues/86
* where on retry, a separate async chain would be initiated and the
* flush promise would resolve immediately
* - resolved issue where rate limit errors are swallowed and
* where not all rate limits are considered, see also
* https://github.com/amplitude/Amplitude-Node/issues/86
* - removed actual retry mechanism; instead consuming application
* can push events to a retry queue. In the future we can consider
* adding a simple retry layer here and then bubble up
* retry failures instead of flush failures.
*
* @see https://github.com/amplitude/Amplitude-Node/blob/main/packages/node/src/retry/defaultRetry.ts
*/
import {
Event,
Options,
Transport,
Payload,
PayloadOptions,
Status,
Response,
RetryClass,
} from '@amplitude/types'
import { DEFAULT_OPTIONS } from '.'
import { setupDefaultTransport } from './util'
import { AmplitudeFlushError } from './errors'
const DAY_IN_MS = 24 * 60 * 60 * 1000
/**
* How quickly a throttled event should be retried.
* @see https://developers.amplitude.com/docs/http-api-v2#response-format
*/
const THROTTLE_RETRY_MS = 30 * 1000
/**
* @param event the actual event that failed to sent
* @param isRetryable whether the event can be retried,
* these are not caused by application errors but rather by network
* issues or rate limits. Events that cannot be retried should be seen as
* application errors that should be handled, e.g. as a result of an event
* that weas created with an invalid payload.
* @param retryInMs defined if isRetryable is true, in which case it's
* the suggested number of miliseconds in which the event can be retried
*/
export interface IAmplitudeFlushErrorEventDescription {
event: Event
isRetryable: boolean
retryInMs?: number
errorResponse?: Response
}
export class AmplitudeFlushErrorHandler implements RetryClass {
protected readonly _apiKey: string
protected readonly _options: Options
private readonly _transport: Transport
public constructor(apiKey: string, options: Partial<Options>) {
this._apiKey = apiKey
this._options = { ...DEFAULT_OPTIONS, ...options }
this._transport = this._options.transportClass ?? setupDefaultTransport(this._options)
}
/**
* @inheritDoc
*/
public async sendEventsWithRetry(events: readonly Event[]): Promise<Response> {
let response: Response = { status: Status.Unknown, statusCode: 0 }
try {
response = await this._transport.sendPayload(this._getPayload(events))
if (response.status !== Status.Success) {
throw new Error(response.status)
}
} catch {
const flushError = this._getFlushError(events, response)
throw flushError
}
return response
}
/**
* Given an error response, returns an `AmplitudeFlushError`
* containing the events that failed to flush with the suggested
* time in which to retry.
*/
private _getFlushError(events: readonly Event[], errorResponse: Response): AmplitudeFlushError {
let failedEventDescriptions: IAmplitudeFlushErrorEventDescription[] = []
if (errorResponse.status === Status.RateLimit) {
if (!errorResponse.body) {
throw new Error('received rate limit response without a body')
}
const {
exceededDailyQuotaUsers = {},
exceededDailyQuotaDevices = {},
throttledEvents,
} = errorResponse.body
// Retry throttledEvents when applicable
failedEventDescriptions = throttledEvents.map((eventIndex) => {
const event = events[eventIndex]
const hasReachedDailyQuota =
(event.user_id && event.user_id in exceededDailyQuotaUsers) ||
(event.device_id && event.device_id in exceededDailyQuotaDevices)
return {
event,
isRetryable: true,
retryInMs: hasReachedDailyQuota ? DAY_IN_MS : THROTTLE_RETRY_MS,
}
})
} else if (errorResponse.status === Status.Unknown || errorResponse.status === Status.Failed) {
// Network error or Amplitude error, all events failed, retryable immediately
failedEventDescriptions = events.map((event) => ({
event,
isRetryable: true,
retryInMs: 0,
}))
} else if (
errorResponse.status === Status.PayloadTooLarge ||
errorResponse.status === Status.Invalid
) {
// Application error because we've sent an invalid payload
failedEventDescriptions = events.map((event) => ({
event,
errorResponse,
isRetryable: false,
}))
} else if (errorResponse.status === Status.Success) {
throw new Error('tried to create a flush error for a successful')
} else {
throw new Error(
'tried to create a flush error with an invalid errorResponse: ' +
JSON.stringify(errorResponse)
)
}
return new AmplitudeFlushError(failedEventDescriptions)
}
protected _getPayloadOptions(): { options?: PayloadOptions } {
if (typeof this._options.minIdLength === 'number') {
return {
options: {
min_id_length: this._options.minIdLength,
},
}
}
return {}
}
protected _getPayload(events: readonly Event[]): Payload {
return {
api_key: this._apiKey,
events,
...this._getPayloadOptions(),
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment