Skip to content

Instantly share code, notes, and snippets.

@allynt
Last active July 19, 2019 12:34
Show Gist options
  • Save allynt/8992d8cbb4239c04d8835fb19cdf402e to your computer and use it in GitHub Desktop.
Save allynt/8992d8cbb4239c04d8835fb19cdf402e to your computer and use it in GitHub Desktop.
import fnmatch
import os
from functools import reduce
from itertools import islice
from math import floor
from io import BytesIO
import numpy as np
import pandas as pd
import geopandas as gpd
from django import db
from django.conf import settings
from django.db import transaction
from rest_framework.utils import json
from mapbox import Uploader
from celery import shared_task
from core.utils import (
DataClient,
select_and_rename_columns,
combine_data_frames,
adapt_geojson_to_django,
bulk_update_or_create,
)
from core.utils import grouper
from thermcert.constants import PROCESSED_DATA_PATH
from thermcert.models impor, ThermcertData
from thermcert.utils import lsoa_code_to_int
logger = logging.getLogger(__name__)
def batch_import(model_generator, batch_size, import_fn, verbose=None):
"""
Performs bulk_create or bulk_update one "batch" at a time.
"""
i = 0
while True:
model_batch = list(islice(model_generator, batch_size))
if not model_batch:
break
if verbose: print(f"importing objects {i} to {i + len(model_batch)}...")
# db.reset_queries()
import_fn(model_batch)
i += batch_size
@shared_task
@transaction.atomic
def import_data(pattern="lsoa_*.csv", batch_size=0, active_only=False, verbose=False):
"""
Imports data from the S3 Bucket specified in settings. The bucket objects are filtered according
to 'pattern'. Combines the properties from all data files into a single data_frame. Then works
out which rows of that frame to UPDATE, and which to INSERT. It creates model generators for each
subset of rows and then calls bulk_create or bulk_update as appropriate in batches.
"""
n_created = 0
n_updated = 0
columns = ThermcertData.get_name_mapping("data_name", "field_name", active_only=active_only)
index_name = "lsoa_code"
data_frames = []
pattern = f"^{PROCESSED_DATA_PATH}/lsoa_csv/{fnmatch.translate(pattern)}$"
if verbose: print(f"looking for data files at '{pattern}'...")
client = DataClient()
for data_object in client.get_all_matching_objects(pattern):
file_path = data_object.metadata["Key"]
file_name, file_extension = os.path.splitext(file_path)
if verbose: print(f"fetching '{file_path}' and converting it to a data_frame... ")
# read the current data...
if file_extension == ".geojson":
geo_data_frame = gpd.read_file(data_object.stream)
data_frame = pd.DataFrame(geo_data_frame.drop(columns="geometry"))
elif file_extension == ".csv":
data_frame = pd.read_csv(data_object.stream)
else:
msg = f"Unable to read data file of type: '{file_extension}'.'"
raise NotImplementedError(msg)
# restrict it to the desired columns...
data_frame_columns = {
k: v for k, v in columns.items()
if k in data_frame.columns.tolist()
}
# rename those columns and re-index the data...
data_frame = select_and_rename_columns(
data_frame,
data_frame_columns,
index_name=index_name,
)
# replace NaN w/ None...
data_frame = data_frame.where((pd.notnull(data_frame)), None)
# make sure there are no duplicate rows...
assert not any(data_frame.index.duplicated()), f"Error: {index_name} is duplicated in {file_path}."
# store it for later...
data_frames.append(data_frame)
if data_frames:
if verbose: print("combining all data into a single data_frame...")
# combine all of the stored data...
combined_data_frame = reduce(
lambda a, b: combine_data_frames(a, b),
data_frames,
)
# and put the index back...
combined_data_frame[index_name] = combined_data_frame.index
# ordinarily, this bit is done on the save method but save doesn't get called during bulk operations
# I _could_ overwrite bulk_create, and loop through (pre-saved) django models
# but it's just faster to work directly on a data_frame here
combined_data_frame["feature_id"] = combined_data_frame["lsoa_code"].apply(lsoa_code_to_int)
if verbose: print("determining which data to INSERT and which data to UPDATE...")
existing_lsoa_codes = ThermcertData.objects.values_list("lsoa_code", flat=True)
rows_to_update = combined_data_frame.filter(items=existing_lsoa_codes, axis=0)
columns_to_update = rows_to_update.drop(index_name, axis=1).columns.to_list()
columns_to_update_chunk_size = floor(len(columns_to_update) / 2)
columns_to_update_chunks = [list(filter(None, chunk)) for chunk in grouper(columns_to_update, columns_to_update_chunk_size)]
models_to_update = (
ThermcertData(**row, pk=model.pk)
for model, (i, row) in zip(ThermcertData.objects.filter(lsoa_code__in=rows_to_update.lsoa_code), rows_to_update.iterrows())
)
rows_to_create = combined_data_frame[~combined_data_frame.index.isin(rows_to_update.index)]
models_to_create = (
ThermcertData(**row)
for i, row in rows_to_create.iterrows()
)
# ######################
# # BEGIN TECHNIQUE #1 #
# ######################
# n_chunks = 1000
# i = 0
# for _, combined_data_frame_chunk in combined_data_frame.groupby(np.arange(len(combined_data_frame))//n_chunks):
# if verbose:
# # the above loop on "combined_data_frame.groupby()" is a bit confusing;
# # it exists in order to provide feedback here on the progress of this long-running operation
# combined_data_frame_chunk_size = len(combined_data_frame_chunk)
# print(f"importing objects {i} to {i + combined_data_frame_chunk_size}...")
# i += combined_data_frame_chunk_size
# for index, row in combined_data_frame_chunk.iterrows():
# thermcert_data, created = ThermcertData.objects.update_or_create(
# lsoa_code=index,
# defaults=row.to_dict()
# )
# if created:
# n_created += 1
# else:
# n_updated += 1
# # CREATE: 405.295362873
# # UPDATE: 455.6755572450056s:
####################
# END TECHNIQUE #1 #
####################
# ######################
# # BEGIN TECHNIQUE #2 #
# ######################
# bulk_update_or_create(
# ThermcertData,
# (row.to_dict() for i, row in combined_data_frame.iterrows()),
# comparator_fn=None,
# )
# # CREATE: 239.54632384701108s:
# # UPDATE: fails
# ####################
# # END TECHNIQUE #2 #
# ####################
######################
# BEGIN TECHNIQUE #3 #
######################
# if verbose: print(f"going to perform INSERT on {len(rows_to_create)} objects...")
# if not batch_size:
# create_batch_size=len(rows_to_create)
# else:
# create_batch_size = batch_size
# batch_import(
# models_to_create, create_batch_size,
# lambda model_batch: ThermcertData.objects.bulk_create(model_batch, batch_size=create_batch_size),
# verbose=verbose
# )
# n_created += len(rows_to_create)
# if verbose: print(f"going to perform UPDATE on {len(rows_to_update)} objects (in {len(columns_to_update_chunks)} chunks)...")
# if not batch_size:
# update_batch_size=len(rows_to_update)
# else:
# update_batch_size = batch_size
# # have to update columns in chunks or I risk maxing out memory
# for columns_to_update_chunk in columns_to_update_chunks:
# batch_import(
# models_to_update, update_batch_size,
# lambda model_batch: ThermcertData.objects.bulk_update(model_batch, columns_to_update_chunk, batch_size=update_batch_size),
# verbose=verbose
# )
# n_updated += len(rows_to_update)
# CREATE (BATCH 0): #119.625746768
# CREATE (BATCH 10000): 299.505621346
# UPDATE (BATCH 0): unnacceptably slow
# UPDATE (BATCH 10000): unnacceptably slow
# ####################
# # END TECHNIQUE #3 #
# ####################
return (n_created, n_updated)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment