Skip to content

Instantly share code, notes, and snippets.

@lancejpollard
Created February 5, 2024 22:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lancejpollard/9158415049896d0b96192d4c34761436 to your computer and use it in GitHub Desktop.
Save lancejpollard/9158415049896d0b96192d4c34761436 to your computer and use it in GitHub Desktop.
AWS Lambda file uploader
import _ from 'lodash'
import { ReadStream } from 'fs'
import busboy, { FileInfo } from 'busboy'
import DEBUG from 'debug'
import kink from './kink.js'
import { Event } from './router.js'
const debug = DEBUG('load-file')
export type HandleFileInput = {
event: Event
stream: ReadStream
file: LoadFile
}
export type LoadFile = {
name: string
encoding: string
mimeType: string
buffer?: Buffer
path?: string
}
export type LoadFileLink = {
path: string
min?: number
max?: number
}
export type HandleFile = (
props: HandleFileInput,
) => Promise<string | Buffer>
export default function loadFiles(
event: Event,
{
link,
hook,
halt,
}: {
hook: HandleFile
link: Array<LoadFileLink>
halt?: {}
},
) {
return new Promise((res, rej) => {
let bb: busboy.Busboy
const fieldMap = link.reduce<Record<string, LoadFileLink>>(
(m, x) => {
m[x.path] = x
return m
},
{},
)
event.json ??= {}
bb = busboy({
headers: event.headers,
limits: halt,
})
let uploadError
let processing = 0
// handle text field data
bb.on(
'field',
(
fieldName: string,
value: string,
fieldNameTruncated: boolean,
valueTruncated: boolean,
) => {
if (fieldName == null) {
return abort(kink('missing_field_name'))
}
if (fieldNameTruncated) {
return abort(kink('field_name_truncated'))
}
if (valueTruncated) {
return abort(
kink('field_value_truncated', { name: fieldName }),
)
}
_.set(event.json, fieldName, value)
},
)
let isDone = false
let readFinished = false
// handle files
const files: Record<string, LoadFile> = {}
const sizes: Record<string, number> = link.reduce<
Record<string, number>
>((m, x) => {
m[x.path] = 0
return m
}, {})
const streams: Record<string, ReadStream> = {}
const aborted: Record<string, boolean> = {}
bb.on(
'file',
async (fieldName: string, stream: ReadStream, info: FileInfo) => {
if (!fieldMap[fieldName]) {
return abort(
kink('invalid_file_field_name', {
name: fieldName,
}),
)
}
sizes[fieldName]++
const field = fieldMap[fieldName]
const size = sizes[fieldName]
if (field?.max && size && size > field.max) {
return abort(kink('too_many_files'))
}
processing++
const file: LoadFile = (files[fieldName] = {
name: info.filename,
encoding: info.encoding,
mimeType: info.mimeType,
})
streams[fieldName] = stream
const handleClear = () => {
debug('handle upload stream clear')
stream.off('clear', handleClear)
processing--
process.nextTick(finish)
}
const handleError = err => {
debug('handle upload stream error')
// stream.off('error', handleError)
// handled in custom handler
aborted[fieldName] = true
stream.emit('abort', err)
abort(err)
}
const handleLimit = () => {
debug('handle upload stream limit')
stream.off('limit', handleLimit)
const err = kink('file_size_limit_reached')
aborted[fieldName] = true
stream.emit('abort', err)
abort(err)
}
// emit in custom handler.
stream.on('clear', handleClear)
stream.on('error', handleError)
stream.on('limit', handleLimit)
try {
debug('before upload handling')
const data = await hook({ event, file, stream })
debug('after upload handling')
if (data instanceof Buffer) {
file.buffer = data
} else if (typeof data === 'string') {
file.path = data
}
_.set(event.json, fieldName, file)
stream.emit('clear')
} catch (e) {
debug('error uploading')
return abort(e)
}
},
)
bb.on('error', err => {
abort(err)
})
bb.on('partsLimit', () => {
// abort(kink('LIMIT_PART_COUNT'))
})
bb.on('filesLimit', () => {
// abort(kink('LIMIT_FILE_COUNT'))
})
bb.on('fieldsLimit', () => {
// abort(kink('LIMIT_FIELD_COUNT'))
})
bb.on('finish', () => {
readFinished = true
finish()
})
bb.write(event.body, 'binary')
bb.end()
function abort(err) {
if (uploadError) {
return
}
debug('stream abort')
uploadError = err
for (const fieldName in streams) {
const stream = streams[fieldName]
if (stream && !aborted[fieldName]) {
aborted[fieldName] = true
stream.emit('abort', err)
}
}
}
function finalize(err: Error | null) {
if (isDone) {
return
}
isDone = true
// debug('upload finalize')
bb.removeAllListeners()
if (err) {
return rej(err)
}
res(undefined)
}
function finish() {
debug('upload finish')
if ((uploadError || readFinished) && !processing) {
finalize(uploadError)
}
}
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment