Skip to content

Instantly share code, notes, and snippets.

@sh0rtwave
Created August 16, 2021 15:01
Show Gist options
  • Save sh0rtwave/0bd820a3a2a5405c2f46fe9714cefbb7 to your computer and use it in GitHub Desktop.
Save sh0rtwave/0bd820a3a2a5405c2f46fe9714cefbb7 to your computer and use it in GitHub Desktop.
import json
import csv
from geojson_rewind import rewind
def is_clockwise(poly): # linear-ring analysis function, checks for directionality of ring winding
total = poly[-1][0] * poly[0][1] - poly[0][0] * poly[-1][1]
for i in range(len(poly) - 1):
total += poly[i][0] * poly[i + 1][1] - poly[i + 1][0] * poly[i][1]
if total <= 0:
return True
else:
return False
def centroid(*points): # centroid calculation function. Effectively takes the mean of all points in the array to generate a
x_coords = [p[0] for p in points] # center point location
y_coords = [p[1] for p in points]
_len = len(points)
cent_x_tot = 0
cent_y_tot = 0
for i in range(_len):
cent_x_tot += x_coords[i][0]
cent_y_tot += y_coords[i][1]
centroid_x = cent_x_tot/_len
centroid_y = cent_y_tot/_len
return [centroid_x, centroid_y]
# Create a lookup table for Territories, so we can merge
# the coordinate tables with each Territory
csv_rows_by_zip = {}
print("Load SalesForce Territories")
with open('./Territories__c__Zip+ID.csv') as csvIn: # open the territory data file
dictReader = csv.DictReader(csvIn, delimiter=',', quotechar='"')
for row in dictReader: # load that entire list of territories into a lookup table, indexed by Zip code (a string)
csv_rows_by_zip[row['Five_Digit_Zipcode__c']] = row
print("Opening output file")
csvFileOut = open('./SF_Zips_With_GeoBounds.csv', 'w', newline='') # open the OUT file - If this file exists, it gets overwritten dog
fieldNames = ['Id',
'Five_Digit_Zipcode__c',
'geoType',
'centroid_x__c',
'centroid_y__c',
'GeoStr_A__c', 'GeoStr_B__c', 'GeoStr_C__c', 'GeoStr_D__c', 'GeoStr_E__c', 'GeoStr_F__c', 'GeoStr_G__c',
'GeoStr_H__c', 'GeoStr_I__c', 'GeoStr_J__c', 'GeoStr_K__c', 'GeoStr_L__c', 'GeoStr_M__c', 'GeoStr_N__c',
'GeoStr_O__c', 'GeoStr_P__c', 'GeoStr_Q__c', 'GeoStr_R__c', 'GeoStr_S__c', 'GeoStr_T__c', 'GeoStr_U__c',
'GeoStr_V__c', 'GeoStr_W__c', 'GeoStr_X__c', 'GeoStr_Y__c', 'GeoStr_Z__c', 'GeoStr_AA__c', 'GeoStr_AB__c',
'GeoStr_AC__c', 'GeoStr_AD__c', 'GeoStr_AE__c', 'GeoStr_AF__c', 'GeoStr_AG__c', 'GeoStr_AH__c', 'GeoStr_AI__c',
'GeoStr_AJ__c', 'GeoStr_AK__c', 'GeoStr_AL__c', 'GeoStr_AM__c', 'GeoStr_AN__c', 'GeoStr_AO__c', 'GeoStr_AP__c',
'GeoStr_AQ__c', 'GeoStr_AR__c', 'GeoStr_AS__c', 'GeoStr_AT__c', 'GeoStr_AU__c', 'GeoStr_AV__c', 'GeoStr_AW__c',
'GeoStr_AX__c', 'GeoStr_AY__c', 'GeoStr_AZ__c'
]
print("Generating CSV header")
print(fieldNames)
dictWriter = csv.DictWriter(csvFileOut, # open an output file handle for the CSV data
fieldnames=fieldNames,
delimiter=',',
quotechar='"',
dialect='unix' )
dictWriter.writeheader() # and write the CSV header to that file
print("loading GeoJSON")
with open('./zcta5.geo.json') as geoFile: # open and load the GeoJSON file
data = json.load(geoFile)
features = data.get('features') # get the feature list from it
geoStrMaxLength = 0
recordsWritten = 0
recordsSkippedMissingGeo = 0
recordsSkippedList = []
recordsSkippedMissingSF_Record = 0
cellsContinued = 0
# how many features we got to swizzle through?
featureCount = len(features)
for featureIdx in range(featureCount): # iterate the feature list, extracting and correcting each
feature = features[featureIdx]
geo = feature['geometry'] # extract the geometry object from the feature. We can reconstitute everything else in the GeoJSON from field properties
if (geo is not None): # look to make sure it's actually there...
polygons = []
# stage 1, is to test the geometry for the correct ring winding order
# extract geo-coordinate data. In the case of MultiPolyons, it's an array that has to be iterated
# in the case of a polygon, it's a single GeoJSON polygon. To treat these both in the same way,
# polygons are temporarily wrapped into an array similar to MultiPolygons, and processed in the same
# way
if geo["type"] == 'MultiPolygon':
try:
polygons = geo['coordinates'] # A multipolygon contains an array of polys. So it's an array here
except:
recordsSkippedMissingGeo +=1
continue
else:
try:
polygons = [ geo['coordinates'] ] # note that this wraps a single polygon data object(An array), into a list,
# so we only have to write a list processor to do all the work, simulating multipolygons
except:
recordsSkippedMissingGeo +=1
continue
polygonCount = len(polygons) # how many polygons to iterate?
for polyIndex in range(polygonCount):
poly = polygons[polyIndex] # get the poly
for polyLengthStep in range(len(poly)): # iterate the polygons
ring = poly[0] # get the first ring (the exterior)
if is_clockwise(ring) != True: # test it for clockwise-ness
polygons[polyLengthStep] = correctRings(poly) # repair if necessary
zip = feature['properties']['GEOID10'] # GEOID10 is the GEOID of the Zone we're interested, or the ZIP Code.
if zip in csv_rows_by_zip:
row = csv_rows_by_zip[zip] # grab the CSV row from SF
row['geoType'] = geo['type'][0] # extract the polygon type specifier (MultiPolygon/Polygon)
else:
recordsSkippedMissingSF_Record += 1
continue
# stage 2 is to generate a centroid value so that each record
# can be queried for whether or not it falls within a geographical area
# note that for MultiPolygon rings, we first generate a centroid
# for each ring, and then using that group of centroids,
# generate a centroid for the GROUP of the polygons
if geo['type'] == "MultiPolygon":
coords = polygons # assign polygons to geo data straight in
ringCentroids = [] # make a variable to hold the temporary list
for ring in coords: # loop through the polygons in the coordinate list
cent_coord = ring[0] # grab each ring
ringCentroids.append(centroid(cent_coord)) # generate a centroid, and push it into the ringCentroids array
groupCentroid = centroid(ringCentroids) # after crunching the individual rings, generate a new centroid using the group of centroids we got from the polygon list
row['centroid_x__c'] = groupCentroid[0] # store values in the row
row['centroid_y__c'] = groupCentroid[1]
else:
coords = polygons[0][0] # if only a poly, then it's an array of ONE polygon, we only need to generate one centroid
polyCent = centroid(coords) # get the centroid using just the bare list of coordinates
row['centroid_x__c'] = polyCent[0] # store values in the row
row['centroid_y__c'] = polyCent[1]
# stage 3: Merge the corrected polygon data with the Salesforce Territory objects via their ZipCode fields.
geoStr = json.dumps(coords, separators=(',',':')) # convert the geometry to a JSON string
geoStrLength = len(geoStr) # get the size of the resulting string
if geoStrLength > geoStrMaxLength:
geoStrMaxLength = geoStrLength # Accumalated maximum size of the geoStr, required to project final required storage
SF_Field_Size_Limit = 32000 # max possible string size PER FIELD
steps = int(geoStrLength / SF_Field_Size_Limit) + 1 # add 1 extra step to handle remainder < 1
for step in range(steps): # iterate steps
fieldName = fieldNames[step + 5] # extract the target field name, from our list of fields, offset by 5 to skip the first 5 columns
stepOff = step * SF_Field_Size_Limit # calculate offset from the starting point of the geostring, depending upon how far along we are
geoSlice = geoStr[stepOff:stepOff+SF_Field_Size_Limit] # extract a slice of the right size from the string
##print("Writing field ", fieldName)
row[fieldName] = geoSlice # stuff that slice into the destination row
# The SF dataloader will only accept records < 400,000 characters in length.
# Additionally, the Einstein Analytics Data set importer, can only accept cells of 32K characters length,
# this means we can't use the full width of the SF custom object. We're forced to use cells of 32 K in length,
# OR, 12 fields to be output at a time.
# For a long zip coordinate set that covers more than 12 fields, we have to emit
# an export record for each group of up to 12 cells that the text data consumes.
# to compensate for this, iterate the cells, and for every field up to the last field in the
# row will be exported to comprise update records that each contain 3 cells or the remainder of the text
#
if step == steps - 1: # if we run out of steps, write the output file
#print("Writing end record: " + row['Five_Digit_Zipcode__c'])
dictWriter.writerow(row)
recordsWritten += 1
elif (step + 1) % 12 == 0: # otherwise, on each 12th step, emit a record as the DataLoader's record-length limit will hit
print("starting continuation record: " + row['Five_Digit_Zipcode__c'])
dictWriter.writerow(row)
cellsContinued +=1
recordsWritten += 1
newRow = { # generate a new copy of this row to be the next export record
"Id" : row["Id"],
"Five_Digit_Zipcode__c" : row["Five_Digit_Zipcode__c"],
"geoType" : row["geoType"],
"centroid_x__c" : row['centroid_x__c'],
"centroid_y__c" : row["centroid_y__c"]
}
row = newRow
# this object then replaces the row that was previously generated for this Feature
# on the next iteration of this loop, the script will be operating on the
# continuation record for this Feature
else:
recordsSkippedMissingGeo += 1
print("Results:")
print('Records not processed:', recordsSkippedMissingGeo)
print('Missing SF Zip records: ', recordsSkippedMissingSF_Record)
print("Records processed:", featureCount)
print("Records exported: ", recordsWritten)
print("Contination Records:", cellsContinued)
print("Required GeoJSON Column Size", geoStrMaxLength)
quit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment