Skip to content

Instantly share code, notes, and snippets.

@funkjunky
Created July 28, 2018 20:03
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save funkjunky/57186f4b34dc5273110e0b9339408807 to your computer and use it in GitHub Desktop.
Save funkjunky/57186f4b34dc5273110e0b9339408807 to your computer and use it in GitHub Desktop.
This is some fun code I had written to handle uploading any number of files of any size, using compression and streaming. (Note: Node doesn't require you convert a stream to an iterator)
export const webFileToResumableDescriptor = (file, uploadId) => async function(chunkSize) {
uploadId = uploadId || file.name;
return {
bytesCount: file.size,
uploadId: uploadId,
filename: file.name,
generator: async function*() {
let chunkIndex = 0;
// While starting byte is less than the file size, continue.
while (chunkIndex * chunkSize < file.size) {
yield file.slice(chunkIndex * chunkSize, Math.min((chunkIndex + 1) * chunkSize, file.size));
++chunkIndex;
}
},
};
};
export const nodeFileToResumableDescriptor = (filename, uploadId) => async function(chunkSize) {
uploadId = uploadId || path.basename(filename);
const { nodeModule, path, fs } = await getElectron;
const mzfs = nodeModule('mz/fs');
const bytesCount = (await mzfs.stat(filename)).size;
return {
bytesCount,
uploadId: uploadId,
filename: path.basename(filename),
generator: async function*() {
let chunkIndex = 0;
const stream = fs.createReadStream(filename, { highWaterMark: chunkSize });
await new Promise(resolve => stream.on('readable', resolve));
// While starting byte is less than the file size, continue.
while (chunkIndex * chunkSize < bytesCount) {
const chunk = stream.read(chunkSize);
yield chunk;
++chunkIndex;
}
stream.close();
},
};
};
export const filesToResumableDescriptor = (files, uploadId) => async function(chunkSize) {
const { nodeModule } = await getElectron;
const fs = nodeModule('mz/fs');
let bytesCount = 0;
for (let i=0; i<files.length; ++i) {
bytesCount += (await fs.stat(files[i])).size;
}
const filename = Date.now() + '-scanAssets.zip';
uploadId = uploadId || filename;
return ({
bytesCount,
uploadId,
filename,
generator: async function*() {
var zip = new JSZip();
let streams = [];
for (let i=0; i!==files.length; ++i) {
streams[i] = fs.createReadStream(files[i]);
zip.file(files[i], streams[i]);
}
let stream = zip.generateInternalStream({ type: 'array', streamFiles: true }).on('error', e => console.log('JSZIP stream error: ', e));
yield* streamToAsyncGenerator(chunkSize, stream, streams);
},
});
};
function streamToAsyncIterator(chunkSize, stream) {
let done = false;
let endPromise = new Promise(resolve => {
//flush out the last data.
stream.on('end', () => {
resolve({ value: collector, done: true });
});
});
let collector = []; //type of data? String
let currentFile;
//two-track queue for expecting and sending data with promises
let dataPromises = [];
let dataResolves = [];
let dataRejects = [];
stream.on('data', (data, meta) => {
if (currentFile !== meta.currentFile) {
// Handy for debugging purposes... could be removed eventually.
currentFile = meta.currentFile;
}
collector = collector.concat(data);
if (collector.length >= chunkSize) {
//console.log('promises and resolves: ', dataPromises.length, dataResolves.length);
const value = collector.slice(0, chunkSize);
collector = collector.slice(chunkSize);
const dataResolve = dataResolves.shift();
if (dataResolve) {
dataResolve({ value, done: false });
} else {
dataPromises.push(Promise.resolve({ value, done: false }));
}
stream.pause();
}
});
stream.on('error', (...errArgs) => {
console.error('Rejecting resolves. Error zipping: ', errArgs);
dataRejects.forEach(reject => reject(errArgs));
});
return {
[Symbol.asyncIterator]() {
return this;
},
//TODO handle return() to close the stream
next() {
if (done) return Promise.resolve({ done });
//Only once we're on the last promise, should we continue the stream.
if (dataPromises.length <= 1) {
stream.resume();
}
let dataPromise = dataPromises.shift();
// If there is no data available...
if (!dataPromise) {
// queue up a resolve for the next time data is available
dataPromise = new Promise((resolve, reject) => {
dataResolves.push(resolve);
dataRejects.push(reject);
});
}
return Promise.race([
dataPromise,
endPromise,
]).then(next => {
if (next.done) {
done = true;
next.done = false;
}
return next;
});
},
};
}
async function* streamToAsyncGenerator(chunkSize, stream, streams) {
const iterator = streamToAsyncIterator(chunkSize, stream, streams);
let next = await iterator.next();
while (!next.done) {
yield next.value;
// Delete is needed to release resources.
delete next.value;
next = await iterator.next();
}
};
export async function chunkedUpload(url, formData, resumableDescriptor, onProgress) {
const chunkSize = 1000000; //approx 1MB
let { bytesCount, filename, generator, uploadId } = await resumableDescriptor(chunkSize);
//Default for uploadId is url, if it wasn't provided.
uploadId = uploadId || url;
const chunksLength = Math.ceil(bytesCount / chunkSize);
formData.set('chunkSize', chunkSize);
formData.set('chunksLength', chunksLength);
formData.set('uploadId', uploadId);
formData.set('bytesCount', bytesCount);
formData.set('filename', filename);
const onChunkProgress = chunkIndex => ({ loaded }) => onProgress({ loaded: (chunkIndex - 1) * chunkSize + loaded, total: bytesCount });
// ask the server for uploaded chunks... maybe we can shortcut!
const uploadStatus = await uploadPost('/api/files/', formData);
let result;
let chunkIndex = 0;
for await (const chunkData of generator()) {
formData.set('chunkIndex', ++chunkIndex);
if (chunkIndex < chunksLength && chunkIndex < uploadStatus.res.completedChunks) {
continue;
}
if (chunkData instanceof Blob) {
formData.set('chunkData', chunkData);
} else {
formData.set('chunkData', new File([new Uint8Array(chunkData)], filename));
}
result = await uploadPost(url, formData, onChunkProgress(chunkIndex));
//Retry: If it didn't succeed or critically fail, then try again.
while (result.status && ![200, 201, 202, 404, 415, 500, 501].includes(result.status)) {
result = await uploadPost(url, formData, onChunkProgress(chunkIndex));
}
// if the status is not in the 200 range, then it's critically failed.
if (result.status && (result.status < 200 || result.status > 400)) {
return { err: { status: result.status, message: 'upload failed with a 400 error code probably' } };
}
}
return result.res;
}
export const uploadPost = (url, body, onProgress) => xhrResult(new Promise(resolve => {
let xhr = new XMLHttpRequest();
xhr.open('post', url);
xhr.withCredentials = true;
xhr.setRequestHeader('Authorization', 'Token ' + getToken());
xhr.setRequestHeader('Accept', 'application/json');
function onComplete(e) {
return resolve({
ok: true,
status: xhr.status,
json: async function() {
return JSON.parse(e.target.responseText);
},
});
};
xhr.onload = xhr.onerror = onComplete;
if (xhr.upload && onProgress) {
xhr.upload.onprogress = onProgress;
}
xhr.send(body);
}));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment