Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
const fs = require('fs')
const glob = require('glob')
const unzip = require('unzip')
const parse = require('csv-parse')
const exec = require('child_process').exec
const rimraf = require('rimraf')
// Census blocks downloaded via FTP from:
// http://www2.census.gov/geo/tiger/TIGER2016/TABBLOCK/
// The blocksFolder has all census blocks from that FTP server.
const blocksFolder = 'TABBLOCK/'
const zipFiles = '*.zip'
// Census blocks are zipped in files named like:
// tl_2016_05_tabblock10.shp
// The 05 in that file name corresponds to a FIPS code for a state.
// When .split()'ing a file name, the FIPS code will be at index 2.
const fipsPosition = 2
const csvsFolder = 'out/'
const csvFiles = '*.csv'
const unzippedFolder = 'unzipped/'
// key: value corresponds to state_abbr: fips_code
const stateToFips = JSON.parse(fs.readFileSync('state-fips.json'))
// keys: values are fips_code: state_abbr
let fipsToState = {}
Object.keys(stateToFips).forEach(key => fipsToState[stateToFips[key]] = key)
// keys: values are state_abbr: census_block_ids[]
let blockIds = {}
let filesToUnzip = []
// ogr2ogr writes batches of filtered census blocks to this folder.
// const filteredBlocksFolder = 'filtered-census-blocks/'
const filteredBlocksFolder = 'testable/'
// Start by reading all .csv files.
glob(csvsFolder + csvFiles, null, (err, matches) => {
// Pull out blockIds from .csv files.
// Keep track of total number of .csv files and how many have been processed.
let csvsRead = 0
let totalCsvs = matches.length
// .csv files are read one at a time with readCsv().
// After reading a file, checkTotalCsvsProcessed is called.
// If there are more .csv files to read, readCsv is called.
// This continues (recursion) until all .CSVs are processed.
// Once all .CSVs are read, filterCensusBlocks is called.
// The global blockIds keeps track of all the block IDs to pull out of the
// census block shapefiles.
let checkTotalCsvsProcessed = () => {
csvsRead += 1
if ( csvsRead < totalCsvs ) {
readCsv(matches[csvsRead], checkTotalCsvsProcessed)
} else {
console.log('Read all .csv files with block IDs.')
// console.log(blockIds)
console.log(Object.keys(blockIds).map(id => [id, blockIds[id].length]))
console.log('Doing shapefiles now.')
filterCensusBlocks()
}
}
let readCsv = (csv, callback) => {
console.log('Reading: ', csv)
let csvData = [];
fs.createReadStream(csv).pipe(parse())
.on('data', row => {
csvData.push(row)
})
.on('end', () => {
// Keep only the value from the first column because that is the block ID.
csvData = csvData.map(row => row[0])
// Remove header row.
csvData.shift()
// Split data into batches of 1k.
//
// The lists of block IDs will be used in an ogr2ogr where clause.
// The ogr2ogr command will be very long.
// If it's over `getconf ARG_MAX`, trying to exec() it will throw:
// Error: spawn E2BIG
// OSX's limit is 262144.
// Using 1k block IDs per batch stays way under that.
//
// ogr2ogr was also throwing:
// ogr2ogr stderr ERROR 1: SQL Expression Parsing Error: memory exhausted. Occured around :
// 54','191315602001058','191315602001078','191315602001085','191315602001094','191
//
// Using batches of 1k avoids that error too.
let blockIdBatches = []
let batchSize = 1000
let batchCount = csvData.length / batchSize
let batchStart = 0
let batchEnd = batchSize
if ( csvData.length > batchSize ) {
// Pull out chunks of 10k items.
while ( batchStart < csvData.length ) {
blockIdBatches.push(csvData.slice(batchStart, batchEnd))
batchStart += batchSize
batchEnd += batchSize
}
} else {
blockIdBatches.push(csvData)
}
let currentState = csv.replace(csvsFolder, '').replace('.csv', '')
blockIds[currentState] = blockIdBatches
callback()
});
}
readCsv(matches[csvsRead], checkTotalCsvsProcessed)
// Create an array of state abbreviations.
let stateCsvs = matches.map(m => m.replace(csvsFolder, '').replace('.csv', ''))
let filterCensusBlocks = () => {
// Find all the zipped Census block shapefiles.
glob(blocksFolder + zipFiles, null, (err, matches) => {
matches.forEach(f => {
// Strip out the folder name, split the file name to find the FIPS code.
let pieces = f.replace(blocksFolder, '').split('_')
// console.log(f, 'corresponds to', fipsToState[pieces[fipsPosition]])
// Keep track of zip files to know what to unzip.
if ( stateCsvs.indexOf(fipsToState[pieces[fipsPosition]]) > -1 ) {
filesToUnzip.push({
state: fipsToState[pieces[fipsPosition]],
name: f
})
}
})
// Same pattern as above where .csv files were read and processed.
// Keep track of how many shapefiles there are to unzip, then:
// - unzip Census block shapefiles one at a time
// - use ogr2ogr to create a new shapefile with blocks of interest
// - delete original shapefile
// - repeat until all zipped shapefiles are processed.
let unzipped = 0
let total = filesToUnzip.length
// total = 1
let checkTotalUnzipped = () => {
unzipped += 1
if ( unzipped < total ) {
unzipBlocks(filesToUnzip[unzipped], checkTotalUnzipped)
} else {
console.log('Unzipped and processed all!', unzipped, total)
}
}
let unzipBlocks = (f, callback) => {
console.log('unzipBlocks', f)
fs.createReadStream(f.name).pipe(unzip.Parse()).on('entry', (entry) => {
let parts = entry.path.split('.');
let outName = `${f.state}_all_blocks.${parts[parts.length - 1]}`
// console.log('processing', outName)
let stateBlocks = unzippedFolder + outName
entry.pipe(fs.createWriteStream(stateBlocks));
}).on('close', () => {
// Unzipping is finished.
// Refer to the corresponding array of block IDs to keep in blockIds.
// Again, track total things to process, call a function recursively
// until everything is processed.
let batchesProcessed = 0
let checkBatchesProcessed = () => {
batchesProcessed += 1
if ( batchesProcessed < blockIds[f.state].length ) {
exportBlocks(batchesProcessed, checkBatchesProcessed)
} else {
console.log('DELETING', f.state)
rimraf(unzippedFolder, () => {
fs.mkdir(unzippedFolder, callback)
})
}
}
let exportBlocks = (batch, callback) => {
// Build an ogr2ogr command. Wrap block IDs in single-quotes.
let ids = blockIds[f.state][batch].map(id => `'${id}'`).join(',')
// Build a where clause.
let where = `"GEOID10 IN (${ids})"`
// Execute ogr2ogr -where "GEOID10 IN (${ids})" output input.
let output = `${filteredBlocksFolder}CAF_${f.state}_${batch}.shp`
let input = `${unzippedFolder}${f.state}_all_blocks.shp`
let ogr2ogr = `ogr2ogr -where ${where} ${output} ${input}`
exec(ogr2ogr, (error, stdout, stderr) => {
// console.log('ogr2ogr error', error)
// console.log('ogr2ogr stdout', stdout)
// console.log('ogr2ogr stderr', stderr)
console.log(f.state, 'batch', batch, 'finished.')
callback()
})
}
exportBlocks(batchesProcessed, checkBatchesProcessed)
})
}
unzipBlocks(filesToUnzip[unzipped], checkTotalUnzipped)
})
}
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.