Created
December 7, 2015 03:07
-
-
Save cesarandreu/68f7b74d1f4767072b19 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Process uploaded images | |
*/ | |
import Rx from 'rx' | |
import mime from 'mime' | |
import pump from 'pump' | |
import sharp from 'sharp' | |
import debug from 'debug' | |
const log = debug('worker:processImages') | |
const pump$ = Rx.Observable.fromNodeCallback(pump) | |
export function initialize ({ processImageQueue, s3fs }) { | |
log('initializing') | |
const updateStream = createUpdateStream({ processImageQueue }) | |
updateStream.subscribe() | |
const jobStream = createJobProcessorStream({ processImageQueue, s3fs }) | |
jobStream.subscribe() | |
log('ready') | |
} | |
// Image processor update stream | |
export function createUpdateStream ({ processImageQueue }) { | |
log('creating job update stream') | |
const EVENT_ARGS = { | |
active: ['job', 'jobPromise'], | |
completed: ['job', 'jobPromise'], | |
error: ['error'], | |
failed: ['job', 'err'], | |
progress: ['job', 'progress'] | |
} | |
const EVENTS = Object.keys(EVENT_ARGS) | |
const streams = EVENTS.map(event => | |
Rx.Observable.fromEvent(processImageQueue, event, (...args) => | |
EVENT_ARGS[event].reduce((response, key, idx) => { | |
response[key] = args[idx] | |
return response | |
}, { | |
status: event | |
}) | |
) | |
) | |
return Rx.Observable.merge(streams) | |
} | |
// Creates a job stream and processes each image | |
export function createJobProcessorStream ({ processImageQueue, s3fs }) { | |
log('creating job processor stream') | |
return createJobStream({ processImageQueue }) | |
.flatMap(({ job, done }) => { | |
const { data, jobId } = job | |
log(`processing jobId=${jobId}`) | |
return Rx.Observable | |
.from(data.fileList) | |
.flatMap(file => imageMapper({ data, file, s3fs })) | |
.doOnNext(value => { | |
log(`progress jobId=${jobId}`, value) | |
// @TODO: Handle progress updates? | |
job.progress(value) | |
}) | |
.doOnError(error => { | |
log(`error jobId=${jobId}`, error) | |
done(error) | |
}) | |
.doOnCompleted(() => { | |
log(`completed jobId=${jobId}`) | |
done(null) | |
}) | |
}) | |
} | |
// Creates a job stream from a queue | |
export function createJobStream ({ processImageQueue }) { | |
log('creating job stream') | |
return Rx.Observable.create(observer => { | |
processImageQueue.process(10, (job, done) => { | |
observer.onNext({ job, done }) | |
}) | |
return async () => { | |
await processImageQueue.close() | |
observer.onCompleted() | |
} | |
}) | |
} | |
// Image mapper | |
// Reads the image metadata | |
// Creates a thumbnail version of the image | |
// Copies full image to output with stripped metadata | |
export function imageMapper ({ s3fs, data, file }) { | |
const { bikeshedId, userId } = data | |
const extension = mime.extension(file.mimetype) | |
function getOutputName (size) { | |
return `${userId}/${bikeshedId}/${size}/${file.fieldname}.${extension}` | |
} | |
// Shared image pipeline | |
// Receives the image stream and passes it to each clone | |
const imagePipeline = sharp() | |
// Thumbnail image | |
// Tries resizing it to 320x320 | |
// Maintains the aspect ratio and doesn't enlarge the image | |
const imageThumbnail$ = pump$([ | |
imagePipeline.clone() | |
.resize(320, 320) | |
.max() | |
.withoutEnlargement() | |
.toFormat(extension), | |
s3fs.createWriteStream(getOutputName('thumbnail')) | |
]) | |
// Full image | |
// Strips image metadata | |
const imageFull$ = pump$([ | |
imagePipeline.clone() | |
.toFormat(extension), | |
s3fs.createWriteStream(getOutputName('full')) | |
]) | |
// Image metadata | |
// Read the image metadata | |
const imageMetadata$ = Rx.Observable.fromPromise( | |
imagePipeline.clone() | |
.metadata() | |
) | |
// Read stream of the uploaded image on s3 | |
// This stream gets passed to the shared image pipeline | |
const imagePipeline$ = pump$([ | |
s3fs.createReadStream(file.key), | |
imagePipeline | |
]) | |
return Rx.Observable.forkJoin([ | |
imageMetadata$, | |
imageThumbnail$, | |
imageFull$, | |
imagePipeline$ | |
], metadata => { | |
return { metadata, file } | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment