Skip to content

Instantly share code, notes, and snippets.

@jamesikanos
Created April 22, 2021 12:32
Show Gist options
  • Save jamesikanos/9f4ba633fc4d15befa02134084324ad4 to your computer and use it in GitHub Desktop.
Save jamesikanos/9f4ba633fc4d15befa02134084324ad4 to your computer and use it in GitHub Desktop.
An Angular-compatible uploader for Azure Block Blobs. Requires a compatible SAS upload URL
/**
MIT License
Copyright (c) 2021, James Woodall
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
**/
import { HttpClient, HttpHeaders, HttpErrorResponse, HttpEvent, HttpEventType, HttpRequest } from '@angular/common/http';
import { environment } from 'src/environments/environment';
import { BehaviorSubject, Observable, combineLatest, Subscription } from 'rxjs';
import { shareReplay, map, filter, retry, tap, last, delay, throttleTime } from 'rxjs/operators';
export const blockIdPrefix = "block-";
export class FileUploader {
private _maxBlockSize: number;
private _submitUrl: string;
private _completedUploadedBytes: number;
private _accessTier: string = "Cool";
private _currentBlockSubscription : Subscription;
private readonly _currentFilePointer = new BehaviorSubject<number>(0);
private readonly _blockIds: Array<string>;
private readonly _reader: FileReader;
private readonly _file = new BehaviorSubject<File>(null);
private readonly _percent : Observable<number>;
private readonly _friendlyFilename: Observable<string>;
private readonly _isActive: Observable<boolean>;
private _isActiveVal: boolean;
private readonly _blockBytesUploaded = new BehaviorSubject<number>(0);
private readonly _bytesProgress: Observable<number>;
public readonly completed = new BehaviorSubject<boolean>(false);
public readonly cancelled = new BehaviorSubject<boolean>(false);
public readonly failed = new BehaviorSubject<boolean>(false);
public readonly committing = new BehaviorSubject<boolean>(false);
public readonly started = new BehaviorSubject<boolean>(false);
constructor(private http: HttpClient) {
this._friendlyFilename = this._file.pipe(filter(i => !!i), map(j => j.name));
this._blockIds = new Array<string>();
this._completedUploadedBytes = 0;
// Create percent observable (emits every 0.5 second)
this._bytesProgress = this._blockBytesUploaded.pipe(throttleTime(500));
this._percent = combineLatest([this._bytesProgress, this.committing, this.completed, this.failed, this._file])
.pipe(
delay(0),
filter(([p, co, c, f, file]) => !!file),
map(([pointer, committing, completed, failed, file]) => (completed || failed || committing) ? 1 : (pointer / file.size))
);
this._isActive = combineLatest([this.completed, this.cancelled, this.failed])
.pipe(
map(([completed, cancelled, failed]) => !(completed || cancelled || failed)),
shareReplay());
// Subscribe to active changes to ensure that the observable is running
this._isActive.subscribe(r => this._isActiveVal = r);
this._reader = new FileReader();
this._reader.onloadend = (evt) => {
if ((evt.target as FileReader).readyState == FileReader.DONE) {
var uri = this._submitUrl + '&comp=block&blockid=' + this._blockIds[this._blockIds.length - 1];
var requestData = new Uint8Array((<any>evt.target).result);
var buffer = requestData.buffer;
this._tryBlock(uri, buffer, 0);
}
};
}
get accessTier() {
return this._accessTier;
}
set accessTier(value: string) {
this._accessTier = value;
}
/** Current percentage of the upload progress */
get percent() {
return this._percent;
}
get bytesUploaded() {
return this._bytesProgress;
}
/** Friendly name of the file we're uploading */
get friendlyFilename() {
return this._friendlyFilename;
}
/** File attached to this upload session */
get file() {
return this._file.value;
}
/** Total number of bytes remaining */
get totalBytesRemaining() {
if (!this._file.value) return 0;
return Math.max(0, this._file.value.size - this._currentFilePointer.value);
}
/** If the upload is currently active or not */
get isActive() {
return this._isActiveVal;
}
/** Observable for detecting if the upload is active */
get onIsActiveChanged() {
return this.isActive;
}
/** Cancel the current upload */
cancel() {
if (this._currentBlockSubscription)
this._currentBlockSubscription.unsubscribe();
this.cancelled.next(true);
}
private _tryBlock(uri: string, buffer: ArrayBuffer | SharedArrayBuffer, retryCount: number) {
if (!this.isActive)
return;
if (retryCount > 10) {
debugLog("Too many retries");
this.failed.next(true);
return;
}
const req = new HttpRequest('PUT', uri, buffer, {
reportProgress: true
});
// Save the subscription so we can cancel later
this._currentBlockSubscription = this.http.request(req)
.pipe(
tap((j: HttpEvent<any>) => {
if (j && j.type === HttpEventType.UploadProgress) {
this._blockBytesUploaded.next(this._completedUploadedBytes + j.loaded);
}
}),
last()
)
.subscribe((result) => {
debugLog(result);
this._completedUploadedBytes += req.body.byteLength;
this._blockBytesUploaded.next(this._completedUploadedBytes);
if (!this.isActive)
return;
this._uploadFileInBlocks();
}, (error: HttpErrorResponse) => {
// If we get a 401/403 error then fail the upload
if (error.status === 401 || error.status === 403) {
this.failed.next(true);
return;
}
debugLog(error);
debugLog("Error with block, trying again");
this._tryBlock(uri, buffer, ++retryCount);
});
}
public intialise(baseUrl: string, file: File) {
this._file.next(file);
this._submitUrl = baseUrl;
this._maxBlockSize = 1024 * 1024 * 40; // 40mb minimum size
this._currentFilePointer.next(0);
// TODO - Check for maximum file size (500gb)
// Calculate the size of each block assuming 48,000 blocks (max is 50,000)
var size = (file.size / 48000);
// Maximum block size will be Math.max(maxBlockSize, size)
this._maxBlockSize = Math.floor(Math.max(this._maxBlockSize, size));
if (this._file.value.size < this._maxBlockSize) {
this._maxBlockSize = this._file.value.size;
}
debugLog("max block size = " + this._maxBlockSize);
}
public startUpload() {
if (this.started.value) return;
this._uploadFileInBlocks();
this.started.next(true);
}
private _uploadFileInBlocks() {
if (this.totalBytesRemaining > 0) {
debugLog("current file pointer = " + this._currentFilePointer + " bytes read = " + this._maxBlockSize);
let fileContent = this._file.value.slice(this._currentFilePointer.value, this._currentFilePointer.value + this._maxBlockSize);
let blockId = blockIdPrefix + this.pad(this._blockIds.length, 6);
debugLog("block id = " + blockId);
this._blockIds.push(btoa(blockId));
this._reader.readAsArrayBuffer(fileContent);
// Advance by block size
this._currentFilePointer.next(this._currentFilePointer.value + this._maxBlockSize);
if (this.totalBytesRemaining < this._maxBlockSize) {
this._maxBlockSize = this.totalBytesRemaining;
}
} else {
this.committing.next(true);
setTimeout(() => this._commitBlockList(), 500);
}
}
/**
* Commit a file via block list
*/
private _commitBlockList() {
let uri = this._submitUrl + '&comp=blocklist';
debugLog(uri);
let requestBody = '<?xml version="1.0" encoding="utf-8"?><BlockList>';
for (let i = 0; i < this._blockIds.length; i++) {
requestBody += '<Latest>' + this._blockIds[i] + '</Latest>';
}
requestBody += '</BlockList>';
debugLog(requestBody);
// Set the Params (friendly filename, content type, disposition and access tier)
let params = new HttpHeaders({
"x-ms-meta-filename" : this.file.name,
"x-ms-blob-content-type" : this.file.type,
"x-ms-blob-content-disposition" : `attachment; filename=\"${encodeURIComponent(this.file.name)}\"`,
"x-ms-access-tier" : this.accessTier
});
// Perform the request (retry 5 times)
this.http.put(uri, requestBody, {
headers: params
}).pipe(retry(3)).subscribe((result) => {
debugLog(result);
this.completed.next(true);
}, (error : HttpErrorResponse) => {
debugLog(error);
this.failed.next(true);
});
}
/** Pad a number for a Block ID */
pad(number: number, length: number) {
let str = '' + number;
while (str.length < length) {
str = '0' + str;
}
return str;
}
}
/**
* Log if Debug
* @param msg Message to log
*/
export function debugLog(msg: any) {
if (!environment.production)
console.log(msg);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment