Created
January 10, 2019 22:48
-
-
Save nathanpeck/e54f1a7a61fcc87c49df7ca4416ed317 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const quargo = require('quargo'); | |
const request = require('request'); | |
const aw = require('awaitify-stream'); | |
const byline = require('byline'); | |
const MongoClient = require('mongodb').MongoClient; | |
var argv = require('minimist')(process.argv.slice(2)); | |
// Read in the certificate authority file. | |
var ca = [require('fs').readFileSync(process.cwd() + '/connection/rds-ca-beta-2015-root.pem')]; | |
// Basic configuration | |
var config = require(process.cwd() + '/config'); | |
const COLLECTION_NAME = 'taxi-rides'; | |
var connection, db, ridesCollection; | |
var count = 0; | |
// Queue dispatcher, used to send batches of taxi ride data to Chimera | |
var dispatcher = quargo( | |
function(rides, callback) { | |
var batch = ridesCollection.initializeUnorderedBulkOp(); | |
for (var ride of rides) { | |
batch.insert(ride); | |
} | |
// Execute the operations | |
batch.execute(function() { | |
// Increment the number of rides successfully added to the DB | |
count += rides.length; | |
callback(); | |
}); | |
}, | |
1000, // How many to save per batch | |
config.POOL_SIZE, // How many parallel workers dispatching batches | |
50 // Up to 50ms delay to wait for a batch to fill | |
); | |
// Helper function used for finding the index of a column, if given | |
// a list of potential names for the column. | |
function findColumn(columns, names) { | |
for (let name of names) { | |
var index = columns.indexOf(name); | |
if (index > -1) { | |
return index; | |
} | |
} | |
return undefined; | |
} | |
// Inititate a download stream of one of the taxi data files, and | |
// dispatch records. Takes care of pausing the download if the database can't | |
// keep up with the speed of the raw download. | |
async function loadFile(file, tripType) { | |
console.log(`Loading file: ${file}`); | |
// Open a read stream from the URL | |
let stream = request(file); | |
let lineStream = byline.createStream(stream, { keepEmptyLines: false }); | |
let rideLines = aw.createReader(lineStream); | |
try { | |
let line; | |
// Read the header with the column names off the stream first. | |
// Because the NYC taxi raw data has changed a lot over the years | |
// we have to do some discovery to see what fields are available, | |
// and what they are named in this file, and where they are positioned | |
// in the CSV. | |
line = await rideLines.readAsync(); | |
var columns = line.toString().toLowerCase().split(','); | |
// Trim each column name because the raw data is dirty and | |
// some column names have an extra space | |
columns = columns.map(function(column) { | |
return column.trim(); | |
}); | |
const VENDOR_ID = findColumn(columns, [ | |
'vendorid' | |
]); | |
const PICKUP_DATETIME = findColumn(columns, [ | |
'tpep_pickup_datetime', | |
'lpep_pickup_datetime', | |
'trip_pickup_datetime', | |
'pickup_datetime' | |
]); | |
const DROPOFF_DATETIME = findColumn(columns, [ | |
'tpep_dropoff_datetime', | |
'lpep_dropoff_datetime', | |
'trip_dropoff_datetime', | |
'dropoff_datetime' | |
]); | |
const PICKUP_LOCATION_ID = findColumn(columns, [ | |
'pulocationid' | |
]); | |
const DROPOFF_LOCATION_ID = findColumn(columns, [ | |
'dolocationid' | |
]); | |
const PICKUP_LONGITUDE = findColumn(columns, [ | |
'start_lon', | |
'pickup_longitude' | |
]); | |
const PICKUP_LATITUDE = findColumn(columns, [ | |
'start_lat', | |
'pickup_latitude' | |
]); | |
const DROPOFF_LONGITUDE = findColumn(columns, [ | |
'end_lon', | |
'dropoff_longitude' | |
]); | |
const DROPOFF_LATITUDE = findColumn(columns, [ | |
'end_lat', | |
'dropoff_longitude' | |
]); | |
const PASSENGER_COUNT = findColumn(columns, [ | |
'passenger_count' | |
]); | |
const TRIP_DISTANCE = findColumn(columns, [ | |
'trip_distance' | |
]); | |
// Now start reading each line of the CSV and dispatching it to the DB. | |
while (null !== (line = await rideLines.readAsync())) { | |
let ride = line.toString().split(','); | |
let rideData = {}; | |
rideData.vendor = { | |
type: tripType | |
}; | |
if (VENDOR_ID !== undefined) { | |
rideData.vendor.vendorId = parseInt(ride[VENDOR_ID], 10); | |
} | |
rideData.pickup = { | |
time: new Date(ride[PICKUP_DATETIME]), | |
}; | |
if (PICKUP_LOCATION_ID !== undefined) { | |
rideData.pickup.locationId = parseInt(ride[PICKUP_LOCATION_ID], 10); | |
} | |
if (PICKUP_LONGITUDE !== undefined && PICKUP_LATITUDE !== undefined) { | |
rideData.pickup.location = { | |
type: 'Point', | |
coordinates: [parseFloat(ride[PICKUP_LONGITUDE]), parseFloat(ride[PICKUP_LATITUDE])] | |
}; | |
} | |
rideData.dropoff = { | |
time: new Date(ride[DROPOFF_DATETIME]), | |
}; | |
if (DROPOFF_LOCATION_ID !== undefined) { | |
rideData.dropoff.locationId = parseInt(ride[DROPOFF_LOCATION_ID], 10); | |
} | |
if (DROPOFF_LONGITUDE !== undefined && DROPOFF_LATITUDE !== undefined) { | |
rideData.dropoff.location = { | |
type: 'Point', | |
coordinates: [parseFloat(ride[DROPOFF_LONGITUDE]), parseFloat(ride[DROPOFF_LATITUDE])] | |
}; | |
} | |
rideData.passengerCount = parseInt(ride[PASSENGER_COUNT], 10); | |
rideData.tripDistanceMiles = parseFloat(ride[TRIP_DISTANCE], 10); | |
dispatcher.push(rideData, function() { | |
if (dispatcher.length() < 50000) { | |
// Resume downloading data if the database dispatch queue isn't backed up. | |
stream.resume(); | |
} | |
}); | |
// Stop downloading data if the database dispatch queue is too long | |
if (dispatcher.length() > 50000) { | |
stream.pause(); | |
} | |
} | |
} catch (e) { | |
console.error(e); | |
} finally { | |
console.log(`Done loading file: ${file}`); | |
} | |
} | |
function reportProgress() { | |
console.log(`Imported ${count} taxi ride records`); | |
} | |
async function main() { | |
if (!argv.file) { | |
console.error('Expected parameter --file which is URL of an NYC taxi data CSV.'); | |
} | |
// Connect to DB | |
try { | |
connection = await MongoClient.connect(config.DATABASE_URL, { | |
poolSize: config.POOL_SIZE, | |
ssl: true, | |
sslValidate: true, | |
sslCA: ca, | |
useNewUrlParser: true | |
}); | |
} catch (e) { | |
console.error(e); | |
process.exit(255); | |
} | |
console.log('Connected to database'); | |
db = connection.db(config.DB_NAME); | |
ridesCollection = db.collection(COLLECTION_NAME); | |
var i = setInterval(reportProgress, 2000); | |
console.log(`Loading NYC taxi data file: ${argv.file}`); | |
await loadFile(argv.file, 'yellow'); | |
clearInterval(i); | |
console.log(`Done loading NYC taxi data file: ${argv.file}`); | |
process.exit(); | |
} | |
main(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment