Skip to content

Instantly share code, notes, and snippets.

@evz
Created March 10, 2016 22:27
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save evz/4c3aad41569049140ae3 to your computer and use it in GitHub Desktop.
Save evz/4c3aad41569049140ae3 to your computer and use it in GitHub Desktop.
Faster loader for open ee meter datastore
import csv
import sqlalchemy as sa
import os
import pytz
from datetime import datetime
import itertools
tz = pytz.timezone('America/Chicago')
# DB_CONN = os.environ['DATABASE_URL']
DB_CONN = 'postgres://eric:@localhost:5432/oeem_test'
engine = sa.create_engine(DB_CONN)
project_table = sa.Table('datastore_project',
sa.MetaData(),
autoload=True,
autoload_with=engine)
attribute_keys = [
{
'id': 1,
'name': 'project_cost',
'data_type': 'float_value',
'display_name': 'Project Cost',
},
{
'id': 2,
'name': 'predicted_electricity_savings',
'data_type': 'float_value',
'display_name': 'Estimated Electricity Savings'
},
]
def setupTables():
conn = engine.connect()
trans = conn.begin()
try:
conn.execute('''
INSERT INTO auth_user (
id,
password,
is_superuser,
username,
first_name,
last_name,
email,
is_staff,
is_active,
date_joined
) VALUES (
1,
'test',
TRUE,
'test',
'tester',
'mctesterson',
'test@test.com',
TRUE,
TRUE,
NOW()
)
''')
trans.commit()
except sa.exc.IntegrityError:
trans.rollback()
trans = conn.begin()
try:
conn.execute('''
INSERT INTO datastore_projectowner (
id,
user_id,
added,
updated
) VALUES (
1,
1,
NOW(),
NOW()
)
''')
trans.commit()
except sa.exc.IntegrityError:
trans.rollback()
trans = conn.begin()
try:
conn.execute('TRUNCATE datastore_projectattributekey CASCADE')
conn.execute(sa.text('''
INSERT INTO datastore_projectattributekey (
id,
name,
data_type,
display_name
) VALUES (
:id,
:name,
:data_type,
:display_name
)
'''), *attribute_keys)
trans.commit()
except sa.exc.IntegrityError as e:
trans.rollback()
def loadProjects():
with open("project-processed.csv", 'r') as project_f:
reader = csv.reader(project_f)
header = next(reader)
project_header = header[:7]
project_header = project_header + ['project_owner_id', 'added', 'updated']
projects = []
with engine.begin() as conn:
conn.execute('TRUNCATE datastore_project CASCADE')
for row in reader:
this_row = row[:7]
now = tz.localize(datetime.now())
this_row = this_row + [1, now, now]
projects.append(dict(zip(project_header, this_row)))
if len(projects) % 50000 == 0:
conn.execute(project_table.insert(), *projects)
projects = []
if projects:
conn.execute(project_table.insert(), *projects)
project_f.seek(0)
header = next(reader)
# float_value, key_id, project_id
attr_insert = '''
INSERT INTO datastore_projectattribute (
float_value,
key_id,
project_id
) VALUES (
:float_value,
:key_id,
(SELECT id FROM datastore_project WHERE project_id = :project_id)
)
'''
attributes = []
with engine.begin() as conn:
conn.execute('TRUNCATE datastore_projectattribute CASCADE')
for row in reader:
savings, cost = row[7:]
attributes.append({
'float_value': savings,
'key_id': 2,
'project_id': row[0]
})
attributes.append({
'float_value': cost,
'key_id': 1,
'project_id': row[0]
})
if len(projects) % 50000 == 0:
conn.execute(sa.text(attr_insert), *attributes)
attributes = []
if attributes:
conn.execute(sa.text(attr_insert), *attributes)
def loadConsumption():
grouper = lambda x: x[0]
metadata_insert = '''
INSERT INTO datastore_consumptionmetadata (
project_id,
energy_unit,
fuel_type,
added,
updated
)
SELECT
id,
'KWH' AS energy_unit,
'E' AS fuel_type,
NOW() AS added,
NOW() AS updated
FROM datastore_project
WHERE project_id = :project_id
RETURNING id
'''
record_insert = '''
INSERT INTO datastore_consumptionrecord (
start,
value,
estimated,
metadata_id
) VALUES (
:start,
:value,
:estimated,
:metadata_id
)
'''
with open('consumption-processed.csv', 'r') as f:
reader = csv.reader(f)
next(reader)
record_header = ['start', 'value', 'estimated']
total = 0
with engine.begin() as conn:
conn.execute('TRUNCATE datastore_consumptionmetadata CASCADE')
conn.execute('TRUNCATE datastore_consumptionrecord CASCADE')
with engine.begin() as conn:
record_inserts = []
for project_id, project_group in itertools.groupby(reader, key=grouper):
metadata_id = list(conn.execute(sa.text(metadata_insert),
project_id=project_id))
if metadata_id:
for record in project_group:
row = dict(zip(record_header, record[4:]))
row['metadata_id'] = metadata_id[0].id
record_inserts.append(row)
if len(record_inserts) % 50000 == 0:
total += 50000
print('inserted', total)
conn.execute(sa.text(record_insert), *record_inserts)
record_inserts = []
if record_inserts:
conn.execute(sa.text(record_insert), *record_inserts)
print('inserted', (total + len(record_inserts)))
if __name__ == "__main__":
setupTables()
loadProjects()
loadConsumption()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment