Skip to content

Instantly share code, notes, and snippets.

@calvinmetcalf
Created April 2, 2020 13:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save calvinmetcalf/31bfb2c117eb7fb284b8638fbb12b34a to your computer and use it in GitHub Desktop.
Save calvinmetcalf/31bfb2c117eb7fb284b8638fbb12b34a to your computer and use it in GitHub Desktop.
const fs = require('fs');
const stream = require('stream')
const {Transform, Writable} = stream;
const util = require('util');
const pipeline = util.promisify(stream.pipeline);
const csvParser = = require('csv-parser'); // best csv parser
const db = require('./connection');
const readStream = fs.createReadStream('./path/to.csv');
const csv = csvParser(); // turns binary stream into object stream
const transform = new Transform({ // one way to do it
objectMode: true,
transform(chunk, _, next) {
const {lng, lat} = chunk;
chunk.geom = db.raw('ST_SetSRID( ST_Point( :lng, :lat), 4326)', {
lng, lat
});
next(null, chunk)
}
})
class CacheStream extends Transform {
constructor(size = 200) {
super({
objectMode: true
});
this.size = size;
this.cache = [];
}
_transform(chunk, _, next) {
this.cache.push(chunk);
if (this.cache.length < this.size) {
return next();
}
const cache = this.cache;
this.cache = [];
this.push(cache);
return next();
}
_final(done) {
if (this.cache.length) {
this.push(this.cache);
}
done();
}
}
const dbStream = (db, table) => new Writable({
objectMode: true,
write(chunk, _, next) {
db(table).insert(chunk).then(()=>{
next();
}, next())
}
});
const run = async () => {
const readStream = fs.createReadStream('./path/to.csv');
const csv = csvParser(); // turns binary stream into object stream
const transform = new Transform({ // one way to do it
objectMode: true,
transform(chunk, _, next) {
const {lng, lat} = chunk;
chunk.geom = db.raw('ST_SetSRID( ST_Point( :lng, :lat), 4326)', {
lng, lat
});
next(null, chunk)
}
})
await pipeline(
readStream,
csv,
transform,
new CacheStream(),
dbStream(db, 'table_name')
);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment