Skip to content

Instantly share code, notes, and snippets.

@sshopov
Last active August 29, 2015 13:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sshopov/10079217 to your computer and use it in GitHub Desktop.
Save sshopov/10079217 to your computer and use it in GitHub Desktop.
The Data Interoperability Extension for ArcGIS Desktop, also known as Safe FME, is great mainly thanks to its hunderds of transformers. However for simple use cases it's easier to perform such tasks in Python. The script below appends records from one feature class to another while applying transformers and mappings as defined in the config. Tag…
import sys
import os
import copy
import arcpy
def get_field_values(fc, field, where=""):
'''
@return: A list of the values for the given field in the given feature class
with the given optional where clause applied.
@rtype: list
'''
field_values = []
with arcpy.da.SearchCursor(fc, field, where) as rows:
for row in rows:
field_values.append(row[0])
return set(field_values)
def rows_as_dicts(cursor):
'''USAGE:
with arcpy.da.SearchCursor(r'c:\data\world.gdb\world_cities', '*') as sc:
for row in rows_as_dicts(sc):
print row['CITY_NAME']
Reference: http://arcpy.wordpress.com/2012/07/12/getting-arcpy-da-rows-back-as-dictionaries
'''
colnames = cursor.fields
for row in cursor:
yield dict(zip(colnames, row))
def get_field_names(fc):
return sorted([f.name for f in arcpy.ListFields(fc)])
def copy_field(src_fc, src_field, dstn_fc):
field = [field for field in arcpy.Describe(src_fc).Fields if field.Name.lower() == src_field.lower()][0]
if field.Name not in [dstnField.Name for dstnField in arcpy.Describe(dstn_fc).Fields]:
print 'copying %s'%field.Name
arcpy.AddField_management(dstn_fc, field.Name, field.Type, field.Precision, field.Scale, field.Length, field.AliasName, field.IsNullable, field.Required)
else:
print 'field could not be copied because it already exists %s'%field.Name
def check_for_missing_fields(src_fc, target_fc, src_fields, target_fields):
missing_fields = False
src_fields = [field.lower() for field in src_fields]
target_fields = [field.lower() for field in target_fields]
fields = get_field_names(src_fc)
src_diff = set(src_fields).difference(set([f.lower() for f in fields]))
if len(src_diff):
print 'There are src fields in the definition which do not exist in the src table: %s'%(','.join(list(src_diff)))
missing_fields = True
fields = get_field_names(target_fc)
target_diff = set(target_fields).difference(set([f.lower() for f in fields]))
if len(target_diff):
print 'There are target fields in the definition which do not exist in the target table: %s'%(','.join(list(target_diff)))
missing_fields = True
for field in src_fields:
if field not in target_fields:
copy_field(src_fc, field, target_fc)
return missing_fields
#TRANSFORMERS
def voltage_transformer(row, src_fc, target_fc):
mapping = {'11kV':11,'132kV':132,'19kV':19,'240V':240,'3300V':3300,'33kV':33,'433V':433,'480V':480,'66kV':66,'7.6kV':7,'Unknown':0}
return mapping[row['OPERATING_VOLTAGE']]
def phase_transformer(row, src_fc, target_fc):
mapping = {'Single Phase':4,'Single Phase Four Wire':4,'Single Phase Three Wire':3,'SWER':3,'SWER 2 Wire':2,'Three Phase':7,'Unknown':0}
return mapping[row['PHASING']]
def name_transformer(row, src_fc, target_fc):
return '%s_%s_%s'%(src_fc, row['OPERATING_VOLTAGE'], row['FEEDER1_ID'])
def material_id_transformer(row, src_fc, target_fc):
return None
def subtype_lookup_transformer(row, src_fc, target_fc):
mapping = {'S_HV_SWITCH':{'Fuse': 3}, 'OH_HV_SWITCH':{'Fuse':2}, 'UG_HV_SWITCH':{'Fuse':3}, 'S_TRANSMISSION_NODE':{'Fuse':2}}
if mapping.has_key(src_fc) and mapping[src_fc].has_key(target_fc):
return mapping[src_fc][target_fc]
else:
return 1
def subtype_lookup_from_phasing_transformer(row, src_fc, target_fc):
return 1
def is_feeder_head_transformer(row, src_fc, target_fc):
return row["SCH_DESCRIPT"] == 'Feeder Change Point'
def conductor_material_transformer(row, src_fc, target_fc):
return None
def conductor_size_transformer(row, src_fc, target_fc):
return None
def normal_position_transformer(row, src_fc, target_fc):
mapping = {'Closed':1, 'Open':0, None:0}
return mapping[row['OPERATING_STATUS']]
def code_gen():
s='''Single Phase
Single Phase Four Wire
Single Phase Three Wire
SWER
SWER 2 Wire
Three Phase
Unknown
480V
66kV
7.6kV
Unknown'''
o = []
for a in s.splitlines():
o.append("'%s':"%a)
print 'd={%s}'%(','.join(o))
s='''PROTECTION_DEVICE
SCH_DESCRIPT
SCH_TYPE
SW_NO
SWITCH_TYPE'''
for a in s.splitlines():
print "('%s', '%s', None)"%(a.strip(), a.strip())
#common field mapping definitions
common_fields = [
('DATE_CREATED', 'DATECREATED', None),
('DATE_MODIFIED', 'DATEMODIFIED', None),
('FACILITY_OWNER', 'FACILITY_OWNER', None),
('FEEDER1_ID', 'FEEDERID', None),
('FEEDER2_ID', 'FEEDERID2', None),
('G3E_FID', 'FacilityID', None),
('G3E_FNO', 'G3E_FNO', None),
('JOB_CREATED', 'CREATIONUSER', None),
('MOD_USER', 'LASTUSER', None),
('OPERATING_STATE', 'OPERATING_STATE', None),
('OPERATING_STATUS', 'OPERATING_STATUS', None),
('OPERATING_VOLTAGE', 'OPERATINGVOLTAGE', voltage_transformer),
('PHASING', 'PHASEDESIGNATION', phase_transformer),
('SOURCE', 'SOURCE', None),
('STATE', 'STATE', None),
('Shape', 'SHAPE', None),
(None, 'SUBTYPECD', subtype_lookup_transformer),
(None, 'NAME', name_transformer),
(None, 'MATERIAL_ID', material_id_transformer)]
normal_position_fields = [
('OPERATING_STATUS', 'NORMALPOSITION_A', normal_position_transformer),
('OPERATING_STATUS', 'NORMALPOSITION_B', normal_position_transformer),
('OPERATING_STATUS', 'NORMALPOSITION_C', normal_position_transformer)]
busbar_fields = copy.deepcopy(common_fields) + [
('IS_EARTHED', 'IS_EARTHED', None)]
tranformer_fields = copy.deepcopy(common_fields) + [
('EARTHING', 'EARTHING', None),
('OUTPUT_VOLTAGE', 'OUTPUT_VOLTAGE', None),
('TF_TYPE', 'TF_TYPE', None),
('TRANS_TYPE', 'TRANS_TYPE', None),
('TRANSFORMER_ID', 'DEVICE_ID', None)]
capacitor_fields = copy.deepcopy(common_fields) + [
('CAPACITOR_ID', 'DEVICE_ID', None)]
regulator_fields = copy.deepcopy(common_fields) + [
('REGULATOR_ID', 'DEVICE_ID', None)]
hv_bulk_supply_no_fields = copy.deepcopy(common_fields) + [
('BULK_SUPPLY_NO', 'DEVICE_ID', None)]
line_fields = copy.deepcopy(common_fields) + [
('COMMON_NAME', 'COMMON_NAME', None),
('CONDUCTOR_SIZE', 'CONDUCTOR_SIZE', None),
('CONDUCTOR_TYPE', 'CONDUCTOR_TYPE', None),
(None, 'CONDUCTORMATERIAL', conductor_material_transformer),
(None, 'CONDUCTORSIZE', conductor_size_transformer),
(None, 'SUBTYPECD', subtype_lookup_from_phasing_transformer)]
dpd_fields = copy.deepcopy(common_fields) + [
('PROTECTION_DEVICE', 'PROTECTION_DEVICE', None),
('SCH_DESCRIPT', 'SCH_DESCRIPT', None),
('SCH_TYPE', 'SCH_TYPE', None),
('SW_NO', 'SW_NO', None),
('SWITCH_TYPE', 'SWITCH_TYPE', None),
(None, 'isFeederHead', is_feeder_head_transformer)] + normal_position_fields
fuse_fields = copy.deepcopy(common_fields) + normal_position_fields
switch_fields = copy.deepcopy(common_fields) + normal_position_fields
fcs = [
('S_HV_BUSBAR', 'BusBar', busbar_fields, None),
('S_HV_ZONE_BUSBAR', 'BusBar', busbar_fields, None),
('S_HV_CONNECT_LINE', 'BusBar', common_fields, None),
('S_HV_ZONE_CONNECT_LINE', 'BusBar', common_fields, None),
('CAPACITOR', 'Transformer', capacitor_fields, None),
('HV_BULKSUPPLY', 'Transformer', hv_bulk_supply_no_fields, None),
('OH_TRANSFORMER', 'Transformer', tranformer_fields, None),
('PAD_TRANSFORMER', 'Transformer', tranformer_fields, None),
('REGULATOR', 'Transformer', regulator_fields, None),
('S_HV_TRANSFORMER', 'Transformer', tranformer_fields, None),
('S_HV_ZONE_TRANSFORMER', 'Transformer', tranformer_fields, None),
('HV_CABLE', 'PriUGElectricLineSegment', line_fields, None),
('HV_LINE', 'PriOHElectricLineSegment', line_fields, None),
('OH_HV_RECLOSER', 'DynamicProtectiveDevice', dpd_fields, None),
('OH_HV_SECTIONALISER', 'DynamicProtectiveDevice', dpd_fields, None),
('S_HV_CIRCUIT_BREAKER', 'DynamicProtectiveDevice', dpd_fields, None),
('S_TRANSMISSION_NODE', 'DynamicProtectiveDevice', dpd_fields, "SCH_DESCRIPT = 'Feeder Change Point'"),
('S_HV_SWITCH', 'DynamicProtectiveDevice', dpd_fields, "SCH_DESCRIPT = 'Feeder Change Point' OR SCH_TYPE IN ('CB', 'Circuit Breaker', 'R', 'Recloser (Substation)') "),
('OH_HV_SWITCH', 'DynamicProtectiveDevice', dpd_fields, "SCH_DESCRIPT = 'Feeder Change Point'"),
('UG_HV_SWITCH', 'DynamicProtectiveDevice', dpd_fields, "SCH_DESCRIPT = 'Feeder Change Point'"),
('S_TRANSMISSION_NODE', 'Fuse', fuse_fields, "SCH_DESCRIPT = 'fused'"),
('S_HV_SWITCH', 'Fuse', fuse_fields, "SCH_DESCRIPT = 'fused'"),
('OH_HV_SWITCH', 'Fuse', fuse_fields, "SCH_DESCRIPT = 'fused'"),
('UG_HV_SWITCH', 'Fuse', fuse_fields, "SCH_DESCRIPT = 'fused'"),
('S_TRANSMISSION_NODE', 'Switch', switch_fields, "SCH_DESCRIPT NOT IN ('fused', 'Feeder Change Point')"),
('S_HV_SWITCH', 'Switch', switch_fields, "SCH_DESCRIPT NOT IN ('fused', 'Feeder Change Point') AND SCH_TYPE NOT IN ('CB', 'Circuit Breaker', 'R', 'Recloser (Substation)')"),
('OH_HV_SWITCH', 'Switch', switch_fields, "SCH_DESCRIPT NOT IN ('fused', 'Feeder Change Point')"),
('UG_HV_SWITCH', 'Switch', switch_fields, "SCH_DESCRIPT NOT IN ('fused', 'Feeder Change Point')"),
('OH_HV_TERM_JOINT', 'Switch', switch_fields, None),
('OHUG_HV_JOINT', 'Switch', switch_fields, None),
('UG_HV_CABLE_JOINT', 'Swich', switch_fields, None),
('UG_HV_TERM_JOINT', 'Switch', switch_fields, None)]
src_root = r'C:\temp\input.gdb'
target_root = r'C:\temp\ArcFM_Schema.gdb\ElectricDataset'
for src_fc, target_fc, mapping, where in fcs:
src_fc_path = os.path.join(src_root, src_fc)
target_fc_path = os.path.join(target_root, target_fc)
src_fields = [src for src, target, transformer in mapping if src]
target_fields = [target for src, target, transformer in mapping if target]
fields_are_missing = check_for_missing_fields(src_fc_path, target_fc_path, src_fields, target_fields)
if not fields_are_missing:
with arcpy.da.SearchCursor(src_fc_path, src_fields, where) as rows:
with arcpy.da.InsertCursor(target_fc_path, target_fields) as new_rows:
for row in rows_as_dicts(rows):
new_row = []
try:
for src, target, transformer in mapping:
if target: #may be None
if transformer == None:
new_row.append(row[src])
elif type(transformer) == type(voltage_transformer):
new_row.append(transformer(row, src_fc, target_fc))
else: #must be a constant value
new_row.append(transformer)
#print new_row
new_rows.inserRow(new_row)
except:
print sys.exc_info()[1]
else:
print 'Append operation did not complete because there are missing fields'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment