Skip to content

Instantly share code, notes, and snippets.

@SimonGoring
Last active September 28, 2018 05:15
Show Gist options
  • Save SimonGoring/4841d4ff9171e1082e605c5718008c29 to your computer and use it in GitHub Desktop.
Save SimonGoring/4841d4ff9171e1082e605c5718008c29 to your computer and use it in GitHub Desktop.
A script to build the database from the raw files.
""" Import libraries and create the connection. If the connection fails check the JSON file. """
import json
import csv
import re
import psycopg2
# Set to True if we want to delete the whole database as it runs:
dropall = True
# Connect to the AWS database and pass in parameters using a dict that contains
# parameters in the same order as requested by `connect`
with open('awsconnect.json') as f:
data = json.load(f)
conn = psycopg2.connect(**data)
cur = conn.cursor()
# If we're removing all the tables:
if dropall:
cur.execute("""
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'onlineactivity'
""")
dbtables = [r[0] for r in cur.fetchall()]
print(dbtables)
for tab in dbtables:
query = "DROP TABLE onlineactivity." + tab + " CASCADE;"
cur.execute(query)
# Regardless, check to see what tables exist:
cur.execute("""
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'onlineactivity'
""")
# Convert the tuple returned by the DB to a list.
dbtables = [r[0] for r in cur.fetchall()]
files = [['./data/input/22100019.csv', 'Online Sales'],
['./data/input/27100285.csv', 'Financing Source'],
['./data/input/27100318.csv', 'Innovation']]
iso_c = './data/input/iso3166codes.csv'
naics = './data/input/NAICS-SCIAN-2017-Structure-V1-eng.csv'
year, naics, size, code, \
financing, sales, innovation = set(), set(), set(), set(), set(), set(), set()
units, scalar = [], []
# Run through each file once. They all have a common set of columns that can
# be used. dinp is dataInput
for dinp in files:
with open(dinp[0], 'r') as setup:
# First open the file and find the right header columns:
inputdata = csv.reader(setup)
headers = next(inputdata)
# These occur only in some files. Will be length 0 if they do not occur in the file.
finidx = [i for i, item in enumerate(headers) if re.search('financing', item)]
saleidx = [i for i, item in enumerate(headers) if re.search('Online sales', item)]
innoidx = [i for i, item in enumerate(headers) if re.search('Type of innovation', item)]
yindex = [i for i, item in enumerate(headers) if re.search('REF_DATE', item)]
nacindex = [i for i, item in enumerate(headers) if re.search('NAICS', item)]
entidx = [i for i, item in enumerate(headers) if re.search('nterprise', item)]
scidx = [i for i, item in enumerate(headers) if re.search('SCALAR_FACTOR', item)]
scid_idx = [i for i, item in enumerate(headers) if re.search('SCALAR_ID', item)]
uidx = [i for i, item in enumerate(headers) if re.search('UOM', item)]
uid_idx = [i for i, item in enumerate(headers) if re.search('UOM_ID', item)]
# Need checks for sale class ('Online sales and purchases'),
# innovation ('Type of innovation')
# sources of financing: 'Sources of financing'
for rows in inputdata:
# This deals with the fact that each data file contains different data.
if finidx:
financing.add(rows[finidx[0]])
if saleidx:
sales.add(rows[saleidx[0]])
if innoidx:
innovation.add(rows[innoidx[0]])
naics.add(rows[nacindex[0]])
year.add(rows[yindex[0]])
size.add(rows[entidx[0]])
newlist = [rows[uidx[0]], rows[uid_idx[0]]]
if newlist not in units:
units.append(newlist)
newscalar = [rows[scidx[0]], rows[scid_idx[0]]]
if newscalar not in scalar:
scalar.append(newscalar)
######################
# Innovation:
if 'innovation' not in dbtables:
cur.execute("""
CREATE TABLE onlineactivity.innovation(
innoid SERIAL PRIMARY KEY,
innovation CHARACTER VARYING)
""")
sql = "INSERT INTO onlineactivity.innovation(innovation) VALUES (%s)"
for inno in innovation:
cur.execute(sql, [inno])
conn.commit()
######################
# Sale Class
if 'sales' not in dbtables:
cur.execute("""
CREATE TABLE onlineactivity.sales(
saleid SERIAL PRIMARY KEY,
saleclass CHARACTER VARYING)
""")
sql = "INSERT INTO onlineactivity.sales(saleclass) VALUES (%s)"
for sale in sales:
cur.execute(sql, [sale])
conn.commit()
######################
# Financing Class
if 'finances' not in dbtables:
cur.execute("""
CREATE TABLE onlineactivity.finances(
finid SERIAL PRIMARY KEY,
financing CHARACTER VARYING)
""")
sql = "INSERT INTO onlineactivity.finances(financing) VALUES (%s)"
for fin in financing:
cur.execute(sql, [fin])
conn.commit()
################################################################################
# We're going to add the new iso table:
if 'isocountry' not in dbtables:
cur.execute("\
CREATE TABLE onlineactivity.isocountry(\
name CHARACTER VARYING,\
alpha2 CHARACTER(2),\
alpha3 CHARACTER(3) PRIMARY KEY,\
countrycode INTEGER,\
iso_3166_2 CHARACTER VARYING(13),\
region CHARACTER VARYING,\
subregion CHARACTER VARYING,\
intermediateregion CHARACTER VARYING,\
regioncode INTEGER,\
subregioncode INTEGER,\
intermediateregioncode INTEGER)\
")
# This requires us to change all the values into 0, removing the escape characters.
sql = """
INSERT INTO onlineactivity.isocountry VALUES(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
with open('./data/input/iso3166codes.csv', 'r') as iso:
inputdata = csv.reader(iso)
next(inputdata) # Skip the first line.
for rows in inputdata:
cur.execute(sql, rows)
conn.commit()
################################################################################
# We're going to add the naics table:
if 'naics' not in dbtables:
cur.execute(" \
CREATE TABLE onlineactivity.naics( \
nid SERIAL PRIMARY KEY, \
level INTEGER, \
hierstruct CHARACTER VARYING, \
code CHARACTER VARYING, \
title CHARACTER VARYING, \
superscript CHARACTER VARYING, \
definition CHARACTER VARYING); \
")
sql = """
INSERT INTO onlineactivity.naics(level,
hierstruct,
code,
title,
superscript,
definition)
VALUES(%s, %s, %s, %s, %s, %s)
"""
with open('./data/input/NAICS-SCIAN-2017-Structure-V1-eng.csv', 'r') as iso:
inputdata = csv.reader(iso)
next(inputdata) # Skip the first line.
for rows in inputdata:
cur.execute(sql, rows)
conn.commit()
########################
# Adding the units data:
if 'units' not in dbtables:
sql = """
CREATE TABLE onlineactivity.units(unitid INTEGER PRIMARY KEY,
units CHARACTER VARYING)
"""
cur.execute(sql)
sql = """
INSERT INTO onlineactivity.units(units, unitid) VALUES (%s, %s)
"""
for unit in units:
cur.execute(sql, [unit[0], unit[1]])
conn.commit()
###################################
# Adding the scalar data:
if "scalar" not in dbtables:
sql = """
CREATE TABLE onlineactivity.scalar(scaleid SERIAL PRIMARY KEY,
scalar CHARACTER VARYING,
scale INTEGER)
"""
cur.execute(sql)
sql = "INSERT INTO onlineactivity.scalar(scalar, scale) VALUES (%s, %s)"
for scale in scalar:
cur.execute(sql, [scale[0], scale[1]])
conn.commit()
##################################
# Adding the enterprise size information:
if "entsize" not in dbtables:
sql = """
CREATE TABLE onlineactivity.entsize(sizeid SERIAL PRIMARY KEY,
size CHARACTER VARYING)
"""
cur.execute(sql)
sql = "INSERT INTO onlineactivity.entsize(size) VALUES (%s)"
for sz in list(size):
cur.execute(sql, [sz])
conn.commit()
# All tables have been built.
print('All reference tables have now been built.')
################################################################################
# Check to see if there are classes missing from the set of `naics` values in the
# datasets, and then, figure out how to deal with them. I think in general
# we will accept the naics classes, and assume there are mistaken entries in the
# datasets.
cur.execute("SELECT title FROM onlineactivity.naics")
test = list(naics.difference([r[0] for r in cur.fetchall()]))
# We see several patterns:
# Several classes have '-distributors' added to the end. The classification
# system includes distributors in the general class, so we'll cut that from
# naics strings
# Wholesale agents and brokers gets modified to 'Wholesale trade'
# Petroleum and coal products manufacturing gets modified to coal "product"
def naicsString(field, connect):
"Translate any strange naics strings, check if they exist and then convert them"
curs = connect.cursor()
newfield = field
if field == 'Wholesale agents and brokers':
newfield = 'Wholesale trade agents and brokers'
if re.match('.*wholesaler-distributors', field):
newfield = re.sub('wholesaler-distributors', 'merchant wholesalers', field)
if re.match('.*coal products.*', field):
newfield = re.sub('coal products', 'coal product', field)
if newfield == 'All surveyed industries':
newfield = 'Private sector'
if newfield == 'Petroleum product merchant wholesalers':
newfield = 'Petroleum and petroleum products merchant wholesalers'
if newfield == 'Selected chemical manufacturing subsectors':
newfield = 'Chemical manufacturing'
if newfield == 'Selected machinery manufacturing subsectors':
newfield = 'Machinery manufacturing'
if newfield == 'Motor vehicle and parts merchant wholesalers':
newfield = 'Motor vehicle and motor vehicle parts and accessories merchant wholesalers'
if newfield == 'Selected industry groups within professional, scientific and technical services':
newfield = 'Professional, scientific and technical services'
if newfield == 'Postal service and couriers and messengers':
newfield = 'Postal service'
curs.execute("SELECT title FROM onlineactivity.naics WHERE title LIKE %s", [newfield])
results = curs.fetchall()
if results:
out = newfield
else:
print('The field ' + field +
' could not be converted properly. Was converted to ' + newfield)
out = -1
return out
################################################################################
# Build tables:
# InnovationFund is '318'
if 'innovationfinancing' not in dbtables:
cur.execute("""
CREATE TABLE onlineactivity.innovationfinancing(
year INTEGER,
country CHARACTER(3) REFERENCES onlineactivity.isocountry(alpha3),
innofund INTEGER REFERENCES onlineactivity.finances(finid),
naics INTEGER REFERENCES onlineactivity.naics(nid),
size INTEGER REFERENCES onlineactivity.entsize(sizeid),
units INTEGER REFERENCES onlineactivity.units(unitid),
scalar INTEGER REFERENCES onlineactivity.scalar(scaleid),
values NUMERIC
)""")
if 'innovationtype' not in dbtables:
cur.execute("""
CREATE TABLE onlineactivity.innovationtype(
year INTEGER,
country CHARACTER(3) REFERENCES onlineactivity.isocountry(alpha3),
innotype INTEGER REFERENCES onlineactivity.innovation(innoid),
naics INTEGER REFERENCES onlineactivity.naics(nid),
size INTEGER REFERENCES onlineactivity.entsize(sizeid),
units INTEGER REFERENCES onlineactivity.units(unitid),
scalar INTEGER REFERENCES onlineactivity.scalar(scaleid),
values NUMERIC
)""")
if 'onlinesales' not in dbtables:
cur.execute("""
CREATE TABLE onlineactivity.onlinesales(
year INTEGER,
country CHARACTER(3) REFERENCES onlineactivity.isocountry(alpha3),
naics INTEGER REFERENCES onlineactivity.naics(nid),
size INTEGER REFERENCES onlineactivity.entsize(sizeid),
saletype INTEGER REFERENCES onlineactivity.sales(saleid),
units INTEGER REFERENCES onlineactivity.units(unitid),
scalar INTEGER REFERENCES onlineactivity.scalar(scaleid),
values NUMERIC
)""")
# Across all three files the sets of columns we want to use are the same, it's
# just that the order of the columns makes a difference, so it's kind of annoying.
goodcols = [0, 1, 3, 4, 5, 6, 8, 12]
for dinp in files:
print("Opening the " + dinp[1] + " data file. . .")
with open(dinp[0], 'r') as setup:
inputdata = csv.reader(setup)
headers = next(inputdata)
for rows in inputdata:
if dinp[1] == 'Online Sales':
sql = """
INSERT INTO onlineactivity.onlinesales
VALUES(%s,
(SELECT alpha3 FROM onlineactivity.isocountry AS iso WHERE iso.name LIKE %s),
(SELECT MIN(nid) FROM onlineactivity.naics AS nc WHERE nc.title LIKE %s),
(SELECT sizeid FROM onlineactivity.entsize AS sz WHERE sz.size LIKE %s),
(SELECT saleid FROM onlineactivity.sales AS sls WHERE sls.saleclass LIKE %s),
(SELECT unitid FROM onlineactivity.units AS unt WHERE unt.units LIKE %s),
(SELECT scaleid FROM onlineactivity.scalar AS scl WHERE scl.scalar LIKE %s),
%s)
"""
newrow = [rows[i] for i in goodcols]
newrow[2] = naicsString(newrow[2], conn)
if newrow[-1] == '':
newrow[-1] = '0'
cur.execute(sql, newrow)
if dinp[1] == 'Financing Source':
if rows[1] == 'Canada':
sql = """
INSERT INTO onlineactivity.innovationfinancing
VALUES(%s,
(SELECT alpha3 FROM onlineactivity.isocountry AS iso WHERE iso.name LIKE %s),
(SELECT finid FROM onlineactivity.finances AS fin WHERE fin.financing LIKE %s),
(SELECT MIN(nid) FROM onlineactivity.naics AS nc WHERE nc.title LIKE %s),
(SELECT sizeid FROM onlineactivity.entsize AS sz WHERE sz.size LIKE %s),
(SELECT unitid FROM onlineactivity.units AS unt WHERE unt.units LIKE %s),
(SELECT scaleid FROM onlineactivity.scalar AS scl WHERE scl.scalar LIKE %s),
%s)
"""
newrow = [rows[i] for i in goodcols]
newrow[3] = naicsString(newrow[3], conn)
if newrow[-1] == '':
newrow[-1] = '0'
cur.execute(sql, newrow)
if dinp[1] == 'Innovation':
if rows[1] == 'Canada':
sql = """
INSERT INTO onlineactivity.innovationtype
VALUES(%s,
(SELECT alpha3 FROM onlineactivity.isocountry AS iso WHERE iso.name LIKE %s),
(SELECT innoid FROM onlineactivity.innovation AS inn WHERE inn.innovation LIKE %s),
(SELECT MIN(nid) FROM onlineactivity.naics AS nc WHERE nc.title LIKE %s),
(SELECT sizeid FROM onlineactivity.entsize AS sz WHERE sz.size LIKE %s),
(SELECT unitid FROM onlineactivity.units AS unt WHERE unt.units LIKE %s),
(SELECT scaleid FROM onlineactivity.scalar AS scl WHERE scl.scalar LIKE %s),
%s)
"""
newrow = [rows[i] for i in goodcols]
newrow[3] = naicsString(newrow[3], conn)
if newrow[-1] == '':
newrow[-1] = '0'
cur.execute(sql, newrow)
print("Finished the " + dinp[1] + " data file. Closing.")
conn.commit()
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment