Created
May 30, 2017 01:03
-
-
Save Arlen22/e3b2cf8c01a32507fa62ef5809455ee6 to your computer and use it in GitHub Desktop.
A rather obtuse RxJS operator to take a path and list all the folders in it, doing a stat on each one and checking whether it contains a certain file. It should use stat on the file instead of readdir on the folder, but I'm not using this code!
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export function statFolderInBatch(subscriber, input: Observable<any>) { | |
const signal = new Subject<number>(); | |
var count = 0; | |
//use set timeout to fire after the buffer recieves this item | |
const sendSignal = (item) => setTimeout(() => { count = 0; signal.next(item); }); | |
return input.concatMap(([folder, tag]) => { | |
return obs_readdir({ folder, tag })(folder); | |
}).lift({ | |
call: (subs: Subscriber<any>, source: Observable<any>) => { | |
const signalFunction = (count) => signal.mapTo(1), forwardWhenEmpty = true; | |
const waiting = []; | |
const _output = new Subject(); | |
var _count = new Subject<number>() | |
const countFactory = Observable.defer(() => { | |
return Observable.create(subscriber => { | |
_count.subscribe(subscriber); | |
}) | |
}); | |
var isEmpty = true; | |
const sourceSubs = source.subscribe(item => { | |
if (isEmpty && forwardWhenEmpty) { | |
_output.next(item); | |
} else { | |
waiting.push(item) | |
} | |
isEmpty = false; | |
}) | |
const pulse = new Subject<any>(); | |
const signalSubs = pulse.switchMap(() => { | |
return signalFunction(countFactory) | |
}).subscribe(count => { | |
//act on the closing observable value | |
var i = 0; | |
while (waiting.length > 0 && i++ < count) | |
_output.next(waiting.shift()); | |
//if nothing was output, then we are empty | |
//if something was output then we are not | |
//this is meant to be used with bufferWhen | |
if (i === 0) isEmpty = true; | |
_count.next(i); | |
_count.complete(); | |
_count = new Subject<number>(); | |
pulse.next(); | |
}) | |
pulse.next(); //prime the pump | |
const outputSubs = Observable.create((subscriber) => { | |
return _output.subscribe(subscriber); | |
}).subscribe(subs) as Subscription; | |
return function () { | |
outputSubs.unsubscribe(); | |
signalSubs.unsubscribe(); | |
sourceSubs.unsubscribe(); | |
} | |
} | |
}).mergeMap(([err, files, { folder, tag }]) => { | |
if (err) { sendSignal(err); return Observable.empty(); } | |
return Observable.from(files.map(a => [a, folder, files.length, tag])) as any; | |
}).mergeMap((res: any) => { | |
let [file, folder, fileCount, tag] = res as [string, string, number, any]; | |
return obs_stat([file, folder, fileCount, tag])(path.join(folder, file)) | |
}, 20).map(statFolderEntryCB).mergeMap<any, any>((res) => { | |
let [entry, [name, folder, fileCount, tag]] = res as [any, [string, string, number, any]]; | |
if (entry.type === 'folder') | |
return obs_readdir([entry, fileCount, tag])(path.join(entry.folder, entry.name)); | |
else return Observable.of([true, entry, fileCount, tag]); | |
}, 20).map((res) => { | |
//if (res === false) return (false); | |
if (res[0] === true) return (res); | |
let [err, files, [entry, fileCount, tag]] = res as [any, string[], [FolderEntry, number, any]]; | |
if (err) { | |
entry.type = "error"; | |
} else if (files.indexOf('tiddlywiki.json') > -1) | |
entry.type = 'datafolder'; | |
return ([true, entry, fileCount, tag]); | |
}).map(([dud, entry, fileCount, tag]) => { | |
count++; | |
if (count === fileCount) { | |
sendSignal([count, tag]); | |
} | |
return entry; | |
}).bufferWhen(() => signal).withLatestFrom(signal).map(([files, [sigResult, tag]]: any) => { | |
return [ | |
typeof sigResult !== 'number' ? sigResult : null, //error object | |
files, //file list | |
typeof sigResult === 'number' ? sigResult : null, //file count | |
tag //tag | |
]; | |
}).subscribe(subscriber); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment