Skip to content

Instantly share code, notes, and snippets.

@Brooooooklyn
Created December 21, 2016 13:10
Show Gist options
  • Save Brooooooklyn/1cc45249701c11320f7c657845be72b2 to your computer and use it in GitHub Desktop.
Save Brooooooklyn/1cc45249701c11320f7c657845be72b2 to your computer and use it in GitHub Desktop.
RxJS file uploader demo
import { Fetch, Utils } from 'teambition-sdk'
import { Observable } from 'rxjs/Observable'
import { Subject } from 'rxjs/Subject'
import { ReplaySubject } from 'rxjs/ReplaySubject'
import { AjaxResponse } from 'rxjs/observable/dom/AjaxObservable'
import { Subscriber } from 'rxjs/Subscriber'
import * as config from 'config'
export interface ChunkMeta {
fileType: string
fileName: string
mimeType: string
fileCategory: string
fileSize: number
chunkSize: number
chunks: number
created: string
fileMD5: string
lastUpdated: string
uploadedChunks: number[]
storage: string
token: {
userId: string
exp: number
storage: string
}
fileKey: string
downloadUrl: string
thumbnailUrl: string
previewUrl: string
}
export class ChunkUploader {
static resume$ = new Subject<void>()
static pause$ = new Subject<void>()
private fetch = new Fetch()
private progress = new Subject<number>()
chunkUploader$: Observable<ChunkMeta> = this.getChunkMeta()
.concatMap((chunkMeta) => {
const progress$ = this.progress
const blobs = this.slice(this.file, chunkMeta.chunks, chunkMeta.chunkSize)
const uploaded: number[] = []
const dists = blobs.map((blob, index) => {
let currentLoaded = 0
const result = this.uploadChunk(chunkMeta, index, blob)
return result.progress$
.catch(() => Promise.resolve({ loaded: uploaded[index] * this.file.size }))
.do((r) => {
currentLoaded = r.loaded / this.file.size
uploaded[index] = currentLoaded
const percent = uploaded.reduce((acc, val) => acc + (val ? val : 0))
progress$.next(percent)
if (percent >= 0.99) {
progress$.complete()
}
})
})
const uploadStream = Observable.from(dists)
.mergeAll(this.concurrency)
return Observable.forkJoin(uploadStream)
.mapTo(chunkMeta)
})
progress$ = this.progress
.distinctUntilChanged((x, y) => x - y >= 0)
settleFile$ = this.chunkUploader$
.concatMap((meta) => {
return this.fetch.post(`/upload/chunk/${meta.fileKey}`)
.retryWhen(() => ChunkUploader.resume$)
.catch((e) => Promise.resolve(e))
})
constructor(
private file: File,
private strikerToken: string,
private chunkRetryCount = 2,
private concurrency = 3
) { }
slice(file: File, n: number, chunkSize: number): Blob[] {
const result: Blob[] = []
for (let i = 0; i < n; i ++) {
const startSize = i * chunkSize
const slice = file.slice(startSize, i === n - 1 ? startSize + (file.size - startSize) : (i + 1) * chunkSize)
result.push(slice)
}
return result
}
private getChunkMeta() {
this.fetch.setAPIHost(config.FILE_HOST)
this.fetch.setHeaders({
Authorization: this.strikerToken
})
return this.fetch
.post<ChunkMeta>(`/upload/chunk`, {
fileSize: this.file.size,
fileMD5: Utils.uuid(),
lastUpdated: this.file.lastModifiedDate,
fileName: this.file.name
})
}
private uploadChunk(meta: ChunkMeta, index: number, blob: Blob) {
const host = `${config.FILE_HOST}/upload/chunk/${meta.fileKey}?chunk=${index + 1}&chunks=${meta.chunks}`
const complete$ = new ReplaySubject<void>(1)
const progress$: Observable<ProgressEvent | AjaxResponse> = Observable
.create((subscriber: Subscriber<ProgressEvent | AjaxResponse>) => {
const ajax$ = Observable.ajax({
url: host,
body: blob,
method: 'post',
crossDomain: true,
headers: {
Authorization: this.strikerToken,
'Content-Type': 'application/octet-stream'
},
progressSubscriber: subscriber
})
.retry(this.chunkRetryCount)
.takeUntil(ChunkUploader.pause$)
.catch((e) => Promise.resolve(e))
.repeatWhen(() => {
return ChunkUploader.resume$
.takeUntil(complete$)
})
const subscription = ajax$.subscribe()
return () => subscription.unsubscribe()
})
.retryWhen(() => ChunkUploader.resume$)
.publish()
.refCount()
return { progress$, meta }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment