Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
imports ACS 2012 1yr extension - rev9: has geodata fkeys; imports multiple states as previously; added 'c' continue load option
#!/usr/bin/python
##===========================================================
## Import ACS 2012 1 year extension
## GPLv3 Oct 2013 -dbb
##
## Useage: python import_master_20121yr_9.py Sequence_Number_and_Table_Number_Lookup.txt work_dir
## assumes an existing database 'census_acs_2012' ; currently writes to public schema
##
## Note: results in ~1480 tables plus 1478 more per state, so
## max_locks_per_transaction must be manually increased
## for example
## max_locks_per_transaction = 10000
## max_connections = 40
## shared_mem = 2400MB
##
## the test is to pg_dump the table when completed
##
# change search_path to destination Postgres schema.
schema = '''
SET search_path TO census_acs_20121;
'''
import psycopg2
import sys,os, re
import glob
##-----------------------------------------------------------
## SQL Strings misc
tTableMetaDataSQL = '''
INSERT into acs20121_tables_list
VALUES ( %s,%s,%s,%s,%s,%s );
'''
tCreate_ACSmtable_SQL_pre = '''
drop table if exists {0} cascade;
create table {0} (
'''
tCreate_ACSmtable_SQL_post = '''
) inherits (acs20121_base);
'''
tInsStr = '''
COPY {0} from e20121ca{1}000.txt with CSV delimiter E',';
'''
tColsRecSQL = '''
INSERT into acs20121_table_cols_list VALUES ( %s,%s,%s );
'''
#--------------
tTablesKey_setup = '''
drop table if exists acs20121_tables_list cascade;
create table acs20121_tables_list (
table_indb_name text,
table_id text,
table_seq_num text,
table_title text,
table_subject_area text,
table_universe text,
pkey serial primary key );
'''
tTableCols_setup = '''
drop table if exists acs20121_table_cols_list cascade;
create table acs20121_table_cols_list (
table_indb_name text,
table_col text,
table_col_orig text,
pkey serial primary key );
'''
##------------------------------------------------------
##-- data loading SQL
tBaseRaw_setup = '''
drop table if exists acs20121_base cascade;
CREATE TABLE acs20121_base (
pkey serial PRIMARY KEY,
seq_metadata_key integer
);
'''
tSeqMetaData_setup = '''
drop table if exists acs20121_seq_metadata cascade;
CREATE TABLE acs20121_seq_metadata (
pkey serial PRIMARY KEY,
e_filename text,
e_file_id text,
e_file_type text,
e_stusab text,
e_chariter text,
e_sequence text,
e_logrecno text );
'''
tGet_mtables = '''
select table_name from information_schema.tables
where table_name ~ '^acs_mtable_' and table_name ~ '_raw$' and table_schema = 'public';
'''
tGet_Existing_Inputs = '''
select e_filename from acs20121_seq_metadata;
'''
tMakeChildTableSQL = '''
create table {0}_{1} ( state_name char(2) )
inherits ({0});
'''
tMakeChildGeoTableSQL = '''
create table acs20121_geo_defs_{0} ( state_name char(2) )
inherits (acs20121_geo_defs);
'''
tGetInDB_TablesSQL = '''
SELECT table_indb_name, table_id
FROM acs20121_tables_list
WHERE table_seq_num = '{0}';
'''
tGetColumnsForTableSQL = '''
SELECT column_name FROM information_schema.columns
WHERE table_name = '{0}' and column_name ~ '^mtbl_' and table_schema='public';
'''
tInsDataSQL = '''
INSERT into {0} VALUES ( {1} );
'''
##-- note the following colmn defs are lowercased,
##-- and name -> geo_name
tGeoTable_setup = '''
drop table if exists acs20121_geo_defs cascade;
create table acs20121_geo_defs (
fileid text, stusab text, sumlevel text, component text,
logrecno text, us text, region text, division text, statece text,
state text, county text, cousub text, place text, tract text, blkgrp text,
concit text, aianhh text, aianhhfp text, aihhtli text, aitsce text,
aits text, anrc text, cbsa text, csa text, metdiv text, macc text,
memi text, necta text, cnecta text, nectadiv text, ua text, blank_0 text,
cdcurr text, sldu text, sldl text, blank_1 text, blank_2 text,
zcta5 text, submcd text, sdelm text, sdsec text, sduni text, ur text,
pci text, blank_3 text, blank_4 text, puma5 text, blank_5 text,
geoid text,
geo_name text, bttr text, btbg text, blank_6 text
);
alter table acs20121_geo_defs add PRIMARY KEY (logrecno);
'''
tExampleSQLStr = '''
SELECT
table_indb_name,
table_id, table_seq_num,
table_title, table_subject_area,
table_universe
FROM acs20121_tables_list
WHERE
table_indb_name = 'acs_mtable_2_raw';
SELECT table_col, table_col_orig
FROM acs20121_table_cols_list
where
table_indb_name = 'acs_mtable_2_raw'
order by pkey;
-- will show 49 named columns for table_id B01001
-- a population table on age
--
-- adding the contents of col_1 and col_25 should equal col_0
-- in an example row 59368 + 61909 = 121277 check
'''
##--------------------------------------------------------
## some globals
gSkipExisting = False
gTableNum = 0
##--------------------------------------------------------
def do_process_lkup_defs( inFileName ):
''' build the primary table definitions from file '''
try:
curs.execute( tBaseRaw_setup )
curs.execute( tTablesKey_setup )
curs.execute( tTableCols_setup )
curs.execute( tGeoTable_setup )
curs.execute( tSeqMetaData_setup )
except Exception, E:
print str(E)
exit(0)
#inFileBase = inFileName[0:-4]
try:
tFile = open(inFileName,'r')
except Exception, E:
print str(E)
exit(0)
## read header line
tLine = tFile.readline()
tA = tLine.split(',')
tFirstRec = True
tColTitleaA = []
tCommitCnt = 0 ## slight speedup, dont commit each loop
for tLine in tFile:
tLine = tLine.rstrip()
## unfortunately, quoted strings w/ commas means no simple split()
tA = do_get_acs_line( tLine )
#tA = tLine.split(',')
## first time in table desc
if tA[3] == ' ' and tFirstRec == True:
tTableID = tA[1]
tTableSeqID = tA[2]
tTableTitle = tA[7]
tSubjArea = tA[8]
tFirstRec = False
continue
if tA[3] == ' ' and tFirstRec == False:
if tA[4] == ' ':
tUniverseStr = do_clean_univ_str( tA[7])
continue
else:
## new record, wrap up the old record
## tUniverseStr,'|',tSubjArea,'|',tTableTitle tColTitleaA
do_emit_table( tTableID, tTableSeqID, tUniverseStr, tSubjArea, tTableTitle, tColTitleaA )
if (tCommitCnt % 12) == 0 :
conn.commit()
tCommitCnt += 1
tColTitleaA = []
tTableID = tA[1]
tTableSeqID = tA[2]
tTableTitle = tA[7]
tSubjArea = tA[8]
continue
tColTitleaA.append( tA[7] )
conn.commit()
return
##--------------------------------------------------------
def do_emit_table( inTableID, inSeqID, inUniv, inSubj, inTableTitle, inColA ):
''' after a table desc is parsed, func to act on the desc
'''
global gTableNum
tTableNameStr = 'acs_mtable_' + str( gTableNum ) + '_raw'
try:
curs.execute( tTableMetaDataSQL, [ tTableNameStr,inTableID,inSeqID,inTableTitle,inSubj,inUniv ] )
except Exception, E:
print str(E)
exit(1)
tCrStr = tCreate_ACSmtable_SQL_pre
tColNum = 0
for tCol in inColA:
tColName = 'mtbl_' + str(gTableNum) + '_col_' + str(tColNum)
tColNum += 1
curs.execute( tColsRecSQL, [ tTableNameStr, tColName, tCol ] )
tCrStr += tColName + ' text,\n'
tCrStr = tCrStr.rstrip( ',\n' )
tCrStr = tCrStr + tCreate_ACSmtable_SQL_post
try:
curs.execute( tCrStr.format( tTableNameStr ) )
except Exception, E:
print str(E)
exit(1)
gTableNum += 1
return
##------------------------------------------------------
def do_clean_univ_str( inStr ):
''' Not Yet Implemented - docstring '''
#TODO get rid of annoying universe prefix, watching for anomolies
## it seems that there is an implied major/minor universe, too hmm
return inStr
def do_get_acs_line( inStr ):
''' read a single line from a e*.txt sequence file,
split it into an array of fields, return the array
'''
tInQuotStr = False
tCurStr = ''
resA = []
for tChar in inStr:
if tChar == '"':
if tInQuotStr:
#resA.append( tCurStr)
tInQuotStr = False
#tCurStr = ''
else:
tInQuotStr = True
elif tChar == ',':
if tInQuotStr:
tCurStr += tChar
else:
resA.append( tCurStr )
tCurStr = ''
else:
tCurStr += tChar
#resA = inStr.split(',')
resA.append( tCurStr )
return resA
##===================================================================
## begin per-file sequence reading and loading
##------------------------------------------------------
## skip the multi-sequence
## see TechDoc Appendix C.7 for multi-sequence tables
cNotYetSequences = [
'0116','0117',
'0122','0123',
'0133','0134','0135',
'0136','0137','0138',
'0139','0140','0141',
'0142','0143','0144',
'0145','0146','0147',
'0148','0149','0150'
]
#---
def do_process_dir( inDirName ):
''' assume all states are unzipped in this directory..
'''
try:
os.chdir( inDirName )
except Exception, E:
print 'ERR working data directory ' + inDirName
print str(E)
exit(0)
## basic defs of import file names for this release
## see ACS_2012_SF_Tech_Doc.pdf page 12
##
tPrefixStr = 'e20121'
tStateStr = '00'
tSeqStr = '0086'
tCIter = '000'
tSuffixStr = '.txt'
## TMP array of states in this directory - CAN CHANGE
tStatesA = []
for fileStr in glob.glob("e*.txt"):
if not (
fileStr[0:6] == tPrefixStr and
fileStr[12:15] == tCIter and
fileStr[15:19] == tSuffixStr
):
print "ERROR: filename not in format in dir {0}".format(inDirName)
exit(0)
tStateStr = fileStr[6:8]
tSeqStr = fileStr[8:12]
if gSkipExisting == True:
## test to see of we have this file already
try:
curs.execute(tGet_Existing_Inputs)
except Exception, E:
print str(E)
exit(0)
tResTA = curs.fetchall()
tGTNamesA = []
[tGTNamesA.append(tResT[0]) for tResT in tResTA]
if fileStr in tGTNamesA:
## add skipped states to detection list of states, though...
tStatesA.append( tStateStr )
print 'skipped ' + fileStr
continue
if tSeqStr in cNotYetSequences:
print 'skipped file ' + fileStr
continue
if not (tStateStr in tStatesA):
#print 'tStatesA = ' + str(tStatesA) + ' ;fileStr = ' + fileStr
do_make_geo_child_table( tStateStr )
do_make_states_child_tables( tStateStr )
tStatesA.append( tStateStr )
do_process_input_file( fileStr, tStateStr, tSeqStr )
return
##-------------------------------------------------------------------
def do_make_geo_child_table( inStateStr ):
try:
curs.execute( tMakeChildGeoTableSQL.format( inStateStr ) )
except Exception, E:
print str(E)
exit(0)
tInsGeoSQL = '''
COPY acs20121_geo_defs_{0} (
fileid, stusab, sumlevel, component,
logrecno, us, region, division, statece,
state, county, cousub, place, tract, blkgrp,
concit, aianhh, aianhhfp, aihhtli, aitsce,
aits, anrc, cbsa, csa, metdiv, macc,
memi, necta, cnecta, nectadiv, ua, blank_0,
cdcurr, sldu, sldl, blank_1, blank_2,
zcta5, submcd, sdelm, sdsec, sduni, ur,
pci, blank_3, blank_4, puma5, blank_5,
geoid, geo_name, bttr, btbg, blank_6) from
'{1}/g20121{0}.csv' with CSV delimiter E',';
'''
try:
curs.execute( tInsGeoSQL.format( inStateStr, inDataDir) )
except Exception, E:
print str(E)
exit(0)
return
##-------------------------------------------------------------------
def do_make_states_child_tables( inStateStr ):
try:
curs.execute( tGet_mtables )
except Exception, E:
print str(E)
exit(0)
if len(inStateStr) > 2:
print 'ERR: do_make_states_child_tables() inStateStr = ' + inStateStr
return
## array of tables for this sequence, per line
tResTablesTA = curs.fetchall()
tIndCnt = 0
for tTableT in tResTablesTA:
tIndCnt += 1
try:
curs.execute( tMakeChildTableSQL.format( tTableT[0], inStateStr ) )
except Exception, E:
print str(E)
exit(0)
if (tIndCnt % 100) == 0:
conn.commit()
conn.commit()
return
##-------------------------------------------------------------------
gErrTablesInfoA = []
def do_process_input_file( inFileStr, inStateStr, inSeqStr ):
''' create all tables and populate, for one e*txt sequence file
'''
try:
tDF = open( inFileStr, 'r' )
except Exception, E:
print 'ERR in do_process_input_file: file={0} state={1} seq={2}'.format( inFileStr, inStateStr, inSeqStr)
print str(E)
return
## --
global gErrTablesInfoA
try:
curs.execute( tGetInDB_TablesSQL.format( inSeqStr ))
except Exception, E:
print str(E)
exit(0)
## array of tables for this sequence, per line
tResTablesA = curs.fetchall()
tProcessedLineCnt = 0
for tLine in tDF:
##TODO COPY a line into the right table
##NOTE: I really hope a simple split by comma will do
tProcessedLineCnt += 1
tDataA = tLine.split(',')
#print ' ' + str(len(tA)) + ' flds for ' + str(len(tLine)) + ' chars'
## record the meta-data for this sequence,
## see ACS_2012_SF_Tech_Doc.pdf sec. 2.5
## FILEID,FILETYPE,STUSAB,CHARITER,SEQUENCE,LOGRECNO
tSeqMetaDataA = []
tSeqMetaDataA.append( inFileStr )
tSeqMetaDataA.append( tDataA[0] )
tSeqMetaDataA.append( tDataA[1] )
tSeqMetaDataA.append( tDataA[2] )
tSeqMetaDataA.append( tDataA[3] )
tSeqMetaDataA.append( tDataA[4] )
tSeqMetaDataA.append( tDataA[5] )
tSQL = '''
INSERT into acs20121_seq_metadata (e_filename,e_file_id,e_file_type,e_stusab,e_chariter,e_sequence,e_logrecno)
VALUES (%s,%s,%s,%s,%s,%s,%s) returning pkey;
'''
try:
curs.execute( tSQL, tSeqMetaDataA )
#tFStr = '%s,%s,%s,%s,%s,%s,%s'
#curs.execute( tInsDataSQL.format( 'acs20121_seq_metadata', tFStr ), tSeqMetaDataA )
except Exception, E:
print str(E)
exit(0)
tResSeqMD_pkey = curs.fetchone()[0]
## LOOP each dst table for this sequence line
tInLineOffset = 0 + 6
for tElem in tResTablesA:
#print tElem
tCurTable = tElem[0]
## note that this could be done once, TODO NEXT
try:
curs.execute( tGetColumnsForTableSQL.format( tCurTable))
except Exception, E:
print str(E)
exit(0)
tResColsA = curs.fetchall()
##-- get the len of the array
tColsToCopyCnt = len(tResColsA)
#print tCurTable + ' -- ' + str(tColsToCopyCnt)
## copy the slice to the right table
## starting at col[6], grab data, make the format string, INSERT, repeat
tColNum = 0
tColsA = []
## preflight, may need more attention
try:
tInsA = []
tDstTable = tCurTable + '_' + inStateStr
tInsStrPre = 'INSERT into ' + tDstTable + '(seq_metadata_key,'
tInsStrMid = ' VALUES (' + str( tResSeqMD_pkey) +','
tIFrmtStr_0 = ''
tIFrmtStr_1 = ''
for tInd in xrange( tColsToCopyCnt):
tIFrmtStr_0 += tResColsA[tInd][0] + ','
tIFrmtStr_1 += '%s,'
tInsA.append( tDataA[ tInLineOffset ] )
tInLineOffset += 1
#tInsStr = tInsStr.rstrip(',')
tInsA.append( inStateStr )
tIFrmtStr_0 += 'state_name)'
tIFrmtStr_1 += '%s'
tIFrmtStr_1 += ');'
try:
tInsStr = tInsStrPre + tIFrmtStr_0 + tInsStrMid + tIFrmtStr_1
curs.execute( tInsStr, tInsA )
conn.commit()
except Exception, E:
print str(E)
exit(0)
except Exception, E:
#print 'ERR: '+tCurTable+' '+' line '+str(tProcessedLineCnt)
tErrA = []
tErrA.append( inFileStr)
tErrA.append( tCurTable)
tErrA.append( tProcessedLineCnt)
gErrTablesInfoA.append( tErrA)
continue
#print tCurTable + '--' + tColsA
#print '--'
conn.commit()
tDF.close()
return
##===========================================================================
## main - just do it
try:
# change dbname to destination Postgres database.
conn = psycopg2.connect( "dbname=census_acs_2012")
conn.set_client_encoding( 'LATIN1' )
curs = conn.cursor()
except Exception, E:
print str(E)
exit(0)
##-----------------------
inLkupDefsFileName = None
inDataDir = None
if len( sys.argv ) > 1:
inLkupDefsFileName = sys.argv[1]
else:
#inFileName = 'Sequence_Number_and_Table_Number_Lookup_m.txt'
inLkupDefsFileName = 'Sequence_Number_and_Table_Number_Lookup.txt'
if len(sys.argv ) > 2:
inDataDir = sys.argv[2]
else:
inDataDir = 'acs2012_1yr_work/work_dir'
if len(sys.argv ) > 3 and sys.argv[3] == 'c':
gSkipExisting = True
## TODO more robust args checking
if inLkupDefsFileName is None or inDataDir is None:
print 'USAGE: {0} Sequence_Number_and_Table_Number_Lookup.txt data_dir'.format(sys.argv[0])
exit(0)
##-------------------------------------------
if not gSkipExisting:
do_process_lkup_defs( inLkupDefsFileName )
do_process_dir( inDataDir )
#print str(gErrTablesInfoA)
##-------------------------------------------
#curs.execute( 'VACUUM ANALYZE;')
conn.close()
exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.