Skip to content

Instantly share code, notes, and snippets.

@cesarandreu
Created December 7, 2015 03:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cesarandreu/68f7b74d1f4767072b19 to your computer and use it in GitHub Desktop.
Save cesarandreu/68f7b74d1f4767072b19 to your computer and use it in GitHub Desktop.
/**
* Process uploaded images
*/
import Rx from 'rx'
import mime from 'mime'
import pump from 'pump'
import sharp from 'sharp'
import debug from 'debug'
const log = debug('worker:processImages')
const pump$ = Rx.Observable.fromNodeCallback(pump)
export function initialize ({ processImageQueue, s3fs }) {
log('initializing')
const updateStream = createUpdateStream({ processImageQueue })
updateStream.subscribe()
const jobStream = createJobProcessorStream({ processImageQueue, s3fs })
jobStream.subscribe()
log('ready')
}
// Image processor update stream
export function createUpdateStream ({ processImageQueue }) {
log('creating job update stream')
const EVENT_ARGS = {
active: ['job', 'jobPromise'],
completed: ['job', 'jobPromise'],
error: ['error'],
failed: ['job', 'err'],
progress: ['job', 'progress']
}
const EVENTS = Object.keys(EVENT_ARGS)
const streams = EVENTS.map(event =>
Rx.Observable.fromEvent(processImageQueue, event, (...args) =>
EVENT_ARGS[event].reduce((response, key, idx) => {
response[key] = args[idx]
return response
}, {
status: event
})
)
)
return Rx.Observable.merge(streams)
}
// Creates a job stream and processes each image
export function createJobProcessorStream ({ processImageQueue, s3fs }) {
log('creating job processor stream')
return createJobStream({ processImageQueue })
.flatMap(({ job, done }) => {
const { data, jobId } = job
log(`processing jobId=${jobId}`)
return Rx.Observable
.from(data.fileList)
.flatMap(file => imageMapper({ data, file, s3fs }))
.doOnNext(value => {
log(`progress jobId=${jobId}`, value)
// @TODO: Handle progress updates?
job.progress(value)
})
.doOnError(error => {
log(`error jobId=${jobId}`, error)
done(error)
})
.doOnCompleted(() => {
log(`completed jobId=${jobId}`)
done(null)
})
})
}
// Creates a job stream from a queue
export function createJobStream ({ processImageQueue }) {
log('creating job stream')
return Rx.Observable.create(observer => {
processImageQueue.process(10, (job, done) => {
observer.onNext({ job, done })
})
return async () => {
await processImageQueue.close()
observer.onCompleted()
}
})
}
// Image mapper
// Reads the image metadata
// Creates a thumbnail version of the image
// Copies full image to output with stripped metadata
export function imageMapper ({ s3fs, data, file }) {
const { bikeshedId, userId } = data
const extension = mime.extension(file.mimetype)
function getOutputName (size) {
return `${userId}/${bikeshedId}/${size}/${file.fieldname}.${extension}`
}
// Shared image pipeline
// Receives the image stream and passes it to each clone
const imagePipeline = sharp()
// Thumbnail image
// Tries resizing it to 320x320
// Maintains the aspect ratio and doesn't enlarge the image
const imageThumbnail$ = pump$([
imagePipeline.clone()
.resize(320, 320)
.max()
.withoutEnlargement()
.toFormat(extension),
s3fs.createWriteStream(getOutputName('thumbnail'))
])
// Full image
// Strips image metadata
const imageFull$ = pump$([
imagePipeline.clone()
.toFormat(extension),
s3fs.createWriteStream(getOutputName('full'))
])
// Image metadata
// Read the image metadata
const imageMetadata$ = Rx.Observable.fromPromise(
imagePipeline.clone()
.metadata()
)
// Read stream of the uploaded image on s3
// This stream gets passed to the shared image pipeline
const imagePipeline$ = pump$([
s3fs.createReadStream(file.key),
imagePipeline
])
return Rx.Observable.forkJoin([
imageMetadata$,
imageThumbnail$,
imageFull$,
imagePipeline$
], metadata => {
return { metadata, file }
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment