Created
December 3, 2016 07:37
-
-
Save swingley/17ef14373e7c564d7ed65f8e7a0a8b88 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const fs = require('fs') | |
const glob = require('glob') | |
const unzip = require('unzip') | |
const parse = require('csv-parse') | |
const exec = require('child_process').exec | |
const rimraf = require('rimraf') | |
// Census blocks downloaded via FTP from: | |
// http://www2.census.gov/geo/tiger/TIGER2016/TABBLOCK/ | |
// The blocksFolder has all census blocks from that FTP server. | |
const blocksFolder = 'TABBLOCK/' | |
const zipFiles = '*.zip' | |
// Census blocks are zipped in files named like: | |
// tl_2016_05_tabblock10.shp | |
// The 05 in that file name corresponds to a FIPS code for a state. | |
// When .split()'ing a file name, the FIPS code will be at index 2. | |
const fipsPosition = 2 | |
const csvsFolder = 'out/' | |
const csvFiles = '*.csv' | |
const unzippedFolder = 'unzipped/' | |
// key: value corresponds to state_abbr: fips_code | |
const stateToFips = JSON.parse(fs.readFileSync('state-fips.json')) | |
// keys: values are fips_code: state_abbr | |
let fipsToState = {} | |
Object.keys(stateToFips).forEach(key => fipsToState[stateToFips[key]] = key) | |
// keys: values are state_abbr: census_block_ids[] | |
let blockIds = {} | |
let filesToUnzip = [] | |
// ogr2ogr writes batches of filtered census blocks to this folder. | |
// const filteredBlocksFolder = 'filtered-census-blocks/' | |
const filteredBlocksFolder = 'testable/' | |
// Start by reading all .csv files. | |
glob(csvsFolder + csvFiles, null, (err, matches) => { | |
// Pull out blockIds from .csv files. | |
// Keep track of total number of .csv files and how many have been processed. | |
let csvsRead = 0 | |
let totalCsvs = matches.length | |
// .csv files are read one at a time with readCsv(). | |
// After reading a file, checkTotalCsvsProcessed is called. | |
// If there are more .csv files to read, readCsv is called. | |
// This continues (recursion) until all .CSVs are processed. | |
// Once all .CSVs are read, filterCensusBlocks is called. | |
// The global blockIds keeps track of all the block IDs to pull out of the | |
// census block shapefiles. | |
let checkTotalCsvsProcessed = () => { | |
csvsRead += 1 | |
if ( csvsRead < totalCsvs ) { | |
readCsv(matches[csvsRead], checkTotalCsvsProcessed) | |
} else { | |
console.log('Read all .csv files with block IDs.') | |
// console.log(blockIds) | |
console.log(Object.keys(blockIds).map(id => [id, blockIds[id].length])) | |
console.log('Doing shapefiles now.') | |
filterCensusBlocks() | |
} | |
} | |
let readCsv = (csv, callback) => { | |
console.log('Reading: ', csv) | |
let csvData = []; | |
fs.createReadStream(csv).pipe(parse()) | |
.on('data', row => { | |
csvData.push(row) | |
}) | |
.on('end', () => { | |
// Keep only the value from the first column because that is the block ID. | |
csvData = csvData.map(row => row[0]) | |
// Remove header row. | |
csvData.shift() | |
// Split data into batches of 1k. | |
// | |
// The lists of block IDs will be used in an ogr2ogr where clause. | |
// The ogr2ogr command will be very long. | |
// If it's over `getconf ARG_MAX`, trying to exec() it will throw: | |
// Error: spawn E2BIG | |
// OSX's limit is 262144. | |
// Using 1k block IDs per batch stays way under that. | |
// | |
// ogr2ogr was also throwing: | |
// ogr2ogr stderr ERROR 1: SQL Expression Parsing Error: memory exhausted. Occured around : | |
// 54','191315602001058','191315602001078','191315602001085','191315602001094','191 | |
// | |
// Using batches of 1k avoids that error too. | |
let blockIdBatches = [] | |
let batchSize = 1000 | |
let batchCount = csvData.length / batchSize | |
let batchStart = 0 | |
let batchEnd = batchSize | |
if ( csvData.length > batchSize ) { | |
// Pull out chunks of 10k items. | |
while ( batchStart < csvData.length ) { | |
blockIdBatches.push(csvData.slice(batchStart, batchEnd)) | |
batchStart += batchSize | |
batchEnd += batchSize | |
} | |
} else { | |
blockIdBatches.push(csvData) | |
} | |
let currentState = csv.replace(csvsFolder, '').replace('.csv', '') | |
blockIds[currentState] = blockIdBatches | |
callback() | |
}); | |
} | |
readCsv(matches[csvsRead], checkTotalCsvsProcessed) | |
// Create an array of state abbreviations. | |
let stateCsvs = matches.map(m => m.replace(csvsFolder, '').replace('.csv', '')) | |
let filterCensusBlocks = () => { | |
// Find all the zipped Census block shapefiles. | |
glob(blocksFolder + zipFiles, null, (err, matches) => { | |
matches.forEach(f => { | |
// Strip out the folder name, split the file name to find the FIPS code. | |
let pieces = f.replace(blocksFolder, '').split('_') | |
// console.log(f, 'corresponds to', fipsToState[pieces[fipsPosition]]) | |
// Keep track of zip files to know what to unzip. | |
if ( stateCsvs.indexOf(fipsToState[pieces[fipsPosition]]) > -1 ) { | |
filesToUnzip.push({ | |
state: fipsToState[pieces[fipsPosition]], | |
name: f | |
}) | |
} | |
}) | |
// Same pattern as above where .csv files were read and processed. | |
// Keep track of how many shapefiles there are to unzip, then: | |
// - unzip Census block shapefiles one at a time | |
// - use ogr2ogr to create a new shapefile with blocks of interest | |
// - delete original shapefile | |
// - repeat until all zipped shapefiles are processed. | |
let unzipped = 0 | |
let total = filesToUnzip.length | |
// total = 1 | |
let checkTotalUnzipped = () => { | |
unzipped += 1 | |
if ( unzipped < total ) { | |
unzipBlocks(filesToUnzip[unzipped], checkTotalUnzipped) | |
} else { | |
console.log('Unzipped and processed all!', unzipped, total) | |
} | |
} | |
let unzipBlocks = (f, callback) => { | |
console.log('unzipBlocks', f) | |
fs.createReadStream(f.name).pipe(unzip.Parse()).on('entry', (entry) => { | |
let parts = entry.path.split('.'); | |
let outName = `${f.state}_all_blocks.${parts[parts.length - 1]}` | |
// console.log('processing', outName) | |
let stateBlocks = unzippedFolder + outName | |
entry.pipe(fs.createWriteStream(stateBlocks)); | |
}).on('close', () => { | |
// Unzipping is finished. | |
// Refer to the corresponding array of block IDs to keep in blockIds. | |
// Again, track total things to process, call a function recursively | |
// until everything is processed. | |
let batchesProcessed = 0 | |
let checkBatchesProcessed = () => { | |
batchesProcessed += 1 | |
if ( batchesProcessed < blockIds[f.state].length ) { | |
exportBlocks(batchesProcessed, checkBatchesProcessed) | |
} else { | |
console.log('DELETING', f.state) | |
rimraf(unzippedFolder, () => { | |
fs.mkdir(unzippedFolder, callback) | |
}) | |
} | |
} | |
let exportBlocks = (batch, callback) => { | |
// Build an ogr2ogr command. Wrap block IDs in single-quotes. | |
let ids = blockIds[f.state][batch].map(id => `'${id}'`).join(',') | |
// Build a where clause. | |
let where = `"GEOID10 IN (${ids})"` | |
// Execute ogr2ogr -where "GEOID10 IN (${ids})" output input. | |
let output = `${filteredBlocksFolder}CAF_${f.state}_${batch}.shp` | |
let input = `${unzippedFolder}${f.state}_all_blocks.shp` | |
let ogr2ogr = `ogr2ogr -where ${where} ${output} ${input}` | |
exec(ogr2ogr, (error, stdout, stderr) => { | |
// console.log('ogr2ogr error', error) | |
// console.log('ogr2ogr stdout', stdout) | |
// console.log('ogr2ogr stderr', stderr) | |
console.log(f.state, 'batch', batch, 'finished.') | |
callback() | |
}) | |
} | |
exportBlocks(batchesProcessed, checkBatchesProcessed) | |
}) | |
} | |
unzipBlocks(filesToUnzip[unzipped], checkTotalUnzipped) | |
}) | |
} | |
}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment