Skip to content

Instantly share code, notes, and snippets.

@matt212
Last active August 1, 2017 11:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save matt212/9aceee8d314b2a533c3598806ed06d74 to your computer and use it in GitHub Desktop.
Save matt212/9aceee8d314b2a533c3598806ed06d74 to your computer and use it in GitHub Desktop.
promise based codeflow for unziping , reading, formating and bulk streaming into postgres
/*my normal promise function looks like this
normalfunc.then(function (data){
}).error(function (){
})
function normalfunc(data)
{
return new Promise(function(resolve, reject) {
resolve/reject
})
}
*/
//////////////////Please scroll down for code flow part/////////////////////////////
/******************************streambulkinsert codebase looks like this****************************************************/
function streambulkinsert(data) {
return new Promises(function(resolve, reject) {
//resolve(data[0]);
pg.connect(connectionString, function(err, client, done) {
if (err) {
console.log(err)
}
var sqlcopysyntax = 'COPY srt(starttime, endtime,content,showname,season,ep, createdat, updatedat) FROM STDIN ';
var stream = client.query(copyFrom(sqlcopysyntax));
console.log(data[0])
var started = false;
var started = false;
var internmap = through2.obj(function(arr, enc, cb) {
var rowText = (started ? '\n' : '') + arr.join('\t');
started = true;
cb(null, rowText);
})
data.forEach(function(r) {
internmap.write(r);
})
internmap.end();
internmap.pipe(stream);
stream.on('finish', function() {
console.log("stream man")
resolve(0);
})
stream.on('error', function(err) {
console.log(err)
console.log("here man2")
reject(0);
})
internmap.on('finish', function() {
console.log("internmap man2")
done();
resolve(0);
})
internmap.on('error', function(err) {
console.log(err)
reject(0);
})
})
})
}
//////////////////////////code flow part commences//////////////////////////////////////////////////////////////////////////
/*put the whole thing in queue for background process and socket.io instance for update-ing dom when queues are finished !*/
var baseobj=new Object();
baseobj.testFolder = "folder path"
baseobj.ref = req.app.io
q.push(baseobj, jobcompleted);
function jobcompleted(argument)
{
argument.ref.emit('news', "completed");
}
var q = new Queue(function(testFolder, cb) {
//unzip files and readsrtfiles will read files and perform modification as per req specs
unzipfiles(baseobj.testFolder).then(readsrtfiles).then(function(resultset) {
//got formatted object array
//using mapseries to process single files data i.e each resultset contain data from single files which is read from
Promises.mapSeries(resultset, function(filemeh) {
if (filemeh != undefined) {
//return conventionaldatadump(filemeh)
return streambulkinsert(filemeh)
.then(function(a) {
console.log(a);
return a;
}).catch(function(err) {
console.log("Promise Rejected" + err);
})
}
}).then(function(allItems) {
console.log("*************************************all process completed**********************************************************")
cb(baseobj);
})
})
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment