Skip to content

Instantly share code, notes, and snippets.

@nathanpeck
Created January 10, 2019 22:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nathanpeck/e54f1a7a61fcc87c49df7ca4416ed317 to your computer and use it in GitHub Desktop.
Save nathanpeck/e54f1a7a61fcc87c49df7ca4416ed317 to your computer and use it in GitHub Desktop.
const quargo = require('quargo');
const request = require('request');
const aw = require('awaitify-stream');
const byline = require('byline');
const MongoClient = require('mongodb').MongoClient;
var argv = require('minimist')(process.argv.slice(2));
// Read in the certificate authority file.
var ca = [require('fs').readFileSync(process.cwd() + '/connection/rds-ca-beta-2015-root.pem')];
// Basic configuration
var config = require(process.cwd() + '/config');
const COLLECTION_NAME = 'taxi-rides';
var connection, db, ridesCollection;
var count = 0;
// Queue dispatcher, used to send batches of taxi ride data to Chimera
var dispatcher = quargo(
function(rides, callback) {
var batch = ridesCollection.initializeUnorderedBulkOp();
for (var ride of rides) {
batch.insert(ride);
}
// Execute the operations
batch.execute(function() {
// Increment the number of rides successfully added to the DB
count += rides.length;
callback();
});
},
1000, // How many to save per batch
config.POOL_SIZE, // How many parallel workers dispatching batches
50 // Up to 50ms delay to wait for a batch to fill
);
// Helper function used for finding the index of a column, if given
// a list of potential names for the column.
function findColumn(columns, names) {
for (let name of names) {
var index = columns.indexOf(name);
if (index > -1) {
return index;
}
}
return undefined;
}
// Inititate a download stream of one of the taxi data files, and
// dispatch records. Takes care of pausing the download if the database can't
// keep up with the speed of the raw download.
async function loadFile(file, tripType) {
console.log(`Loading file: ${file}`);
// Open a read stream from the URL
let stream = request(file);
let lineStream = byline.createStream(stream, { keepEmptyLines: false });
let rideLines = aw.createReader(lineStream);
try {
let line;
// Read the header with the column names off the stream first.
// Because the NYC taxi raw data has changed a lot over the years
// we have to do some discovery to see what fields are available,
// and what they are named in this file, and where they are positioned
// in the CSV.
line = await rideLines.readAsync();
var columns = line.toString().toLowerCase().split(',');
// Trim each column name because the raw data is dirty and
// some column names have an extra space
columns = columns.map(function(column) {
return column.trim();
});
const VENDOR_ID = findColumn(columns, [
'vendorid'
]);
const PICKUP_DATETIME = findColumn(columns, [
'tpep_pickup_datetime',
'lpep_pickup_datetime',
'trip_pickup_datetime',
'pickup_datetime'
]);
const DROPOFF_DATETIME = findColumn(columns, [
'tpep_dropoff_datetime',
'lpep_dropoff_datetime',
'trip_dropoff_datetime',
'dropoff_datetime'
]);
const PICKUP_LOCATION_ID = findColumn(columns, [
'pulocationid'
]);
const DROPOFF_LOCATION_ID = findColumn(columns, [
'dolocationid'
]);
const PICKUP_LONGITUDE = findColumn(columns, [
'start_lon',
'pickup_longitude'
]);
const PICKUP_LATITUDE = findColumn(columns, [
'start_lat',
'pickup_latitude'
]);
const DROPOFF_LONGITUDE = findColumn(columns, [
'end_lon',
'dropoff_longitude'
]);
const DROPOFF_LATITUDE = findColumn(columns, [
'end_lat',
'dropoff_longitude'
]);
const PASSENGER_COUNT = findColumn(columns, [
'passenger_count'
]);
const TRIP_DISTANCE = findColumn(columns, [
'trip_distance'
]);
// Now start reading each line of the CSV and dispatching it to the DB.
while (null !== (line = await rideLines.readAsync())) {
let ride = line.toString().split(',');
let rideData = {};
rideData.vendor = {
type: tripType
};
if (VENDOR_ID !== undefined) {
rideData.vendor.vendorId = parseInt(ride[VENDOR_ID], 10);
}
rideData.pickup = {
time: new Date(ride[PICKUP_DATETIME]),
};
if (PICKUP_LOCATION_ID !== undefined) {
rideData.pickup.locationId = parseInt(ride[PICKUP_LOCATION_ID], 10);
}
if (PICKUP_LONGITUDE !== undefined && PICKUP_LATITUDE !== undefined) {
rideData.pickup.location = {
type: 'Point',
coordinates: [parseFloat(ride[PICKUP_LONGITUDE]), parseFloat(ride[PICKUP_LATITUDE])]
};
}
rideData.dropoff = {
time: new Date(ride[DROPOFF_DATETIME]),
};
if (DROPOFF_LOCATION_ID !== undefined) {
rideData.dropoff.locationId = parseInt(ride[DROPOFF_LOCATION_ID], 10);
}
if (DROPOFF_LONGITUDE !== undefined && DROPOFF_LATITUDE !== undefined) {
rideData.dropoff.location = {
type: 'Point',
coordinates: [parseFloat(ride[DROPOFF_LONGITUDE]), parseFloat(ride[DROPOFF_LATITUDE])]
};
}
rideData.passengerCount = parseInt(ride[PASSENGER_COUNT], 10);
rideData.tripDistanceMiles = parseFloat(ride[TRIP_DISTANCE], 10);
dispatcher.push(rideData, function() {
if (dispatcher.length() < 50000) {
// Resume downloading data if the database dispatch queue isn't backed up.
stream.resume();
}
});
// Stop downloading data if the database dispatch queue is too long
if (dispatcher.length() > 50000) {
stream.pause();
}
}
} catch (e) {
console.error(e);
} finally {
console.log(`Done loading file: ${file}`);
}
}
function reportProgress() {
console.log(`Imported ${count} taxi ride records`);
}
async function main() {
if (!argv.file) {
console.error('Expected parameter --file which is URL of an NYC taxi data CSV.');
}
// Connect to DB
try {
connection = await MongoClient.connect(config.DATABASE_URL, {
poolSize: config.POOL_SIZE,
ssl: true,
sslValidate: true,
sslCA: ca,
useNewUrlParser: true
});
} catch (e) {
console.error(e);
process.exit(255);
}
console.log('Connected to database');
db = connection.db(config.DB_NAME);
ridesCollection = db.collection(COLLECTION_NAME);
var i = setInterval(reportProgress, 2000);
console.log(`Loading NYC taxi data file: ${argv.file}`);
await loadFile(argv.file, 'yellow');
clearInterval(i);
console.log(`Done loading NYC taxi data file: ${argv.file}`);
process.exit();
}
main();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment