Skip to content

Instantly share code, notes, and snippets.

@pLeBlanc93
Last active September 11, 2020 18:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pLeBlanc93/1c7d189a3d0cbb550d2fb4502b754d9d to your computer and use it in GitHub Desktop.
Save pLeBlanc93/1c7d189a3d0cbb550d2fb4502b754d9d to your computer and use it in GitHub Desktop.
ArcGIS Pro Python toolbox for adding text descriptions to Export Subnetwork JSON
# -*- coding: utf-8 -*-
import json
import os
from typing import Union
import arcpy
def save_json(data: dict, file: str):
with open(file, 'w', encoding='utf-8') as writer:
json.dump(data, writer, indent=' ' * 2, separators=(',', ' : '))
class Schema:
""" Creates a JSON payload to be used in export subnetwork to convert codes to descriptions """
SORT_OUTER = True # Sort top level arrays
SORT_INNER = True # Sort inner arrays (codedValue domains, subtypes, fields, etc)
SKIP_NO_DOMAINS = False # Exclude network attributes and fields without domains
def __init__(self, path: str):
self.utility_network = path
print("Describing Utility Network")
self.describe = arcpy.Describe(self.utility_network)
self.is_service = path.lower().startswith('http')
# When describing a service, the layer name is returned -- we need to map this map to layer ID
self.layer_mapping = {}
if self.is_service:
print("Describing the Service")
for layer in arcpy.da.Describe(self.describe.path)['children']:
if 'aliasName' in layer:
self.layer_mapping[layer['aliasName']] = layer['name']
# Cached copy of domain assignment per source.
self.lookup = {}
self.payload = dict(domains=[],
terminals=[],
networkAttributes=[],
sources=[])
def _join(self, name: str):
""" path joining to support different workspaces """
if not self.is_service:
return os.path.join(os.path.dirname(self.describe.path), name)
return "/".join([self.describe.path, self.layer_mapping[name]])
def query_domains(self):
domains = self.payload['domains']
# FGDB UN will be .gdb/FD/UN
path = self.describe.path
if not self.is_service:
path = os.path.dirname(path)
print("Querying domains")
for domain in arcpy.da.ListDomains(path):
# We're only interested in codedValue domains because we cannot replace range domains with a description.
if domain.domainType == 'Range':
continue
coded_values = [dict(name=v, code=k) for k, v in domain.codedValues.items()]
domains.append(dict(domainName=domain.name, codedValues=coded_values))
if self.SORT_INNER:
coded_values.sort(key=lambda f: f['code'])
if self.SORT_OUTER:
domains.sort(key=lambda f: f['domainName'])
def query_terminals(self):
terminals = self.payload['terminals']
for config in self.describe.terminalConfigurations:
for terminal in config.terminals:
terminals.append(dict(terminalName=terminal.terminalName, terminalId=terminal.terminalID))
if self.SORT_OUTER:
terminals.sort(key=lambda f: f['terminalId'])
def query_network_attributes(self):
attributes = self.payload['networkAttributes']
for na in self.describe.networkAttributes:
domain = na.domainName or None
# For NA without a domain, we look at the assignments so see if the assigned field has a domain.
if domain is None:
for assignment in na.assignments:
domain_assignments = self.lookup.get(assignment.networkSourceName.lower(), {})
domain = domain_assignments.get(assignment.evaluator.fieldName.lower(), None)
if domain:
break
if self.SKIP_NO_DOMAINS and domain is None:
continue
attributes.append(dict(networkAttributeName=na.name, domainName=domain))
if self.SORT_OUTER:
attributes.sort(key=lambda f: f['networkAttributeName'])
def query_sources(self):
sources = self.payload['sources']
# The system tables (and subnetLine) and reported here for completeness.
for name, code in [('Associations', 1),
('SystemJunctions', 2)]:
sources.append(dict(sourceName=name, sourceId=code, subtypes=[]))
for network in self.describe.domainNetworks:
for source in network.junctionSources + network.edgeSources:
asset_groups = []
sources.append(dict(sourceName=source.sourceName, sourceId=source.sourceID, subtypes=asset_groups))
if source.utilityNetworkFeatureClassUsageType == 'SubnetLine':
continue
# We need both fields and subtypes because if a field at the subtype level does not have a subtype,
# we revert to the field at the root level.
print(f'Describing {source.sourceName}')
source_path = self._join(source.sourceName)
subtypes = arcpy.da.ListSubtypes(source_path)
field_lookup = {f.name.lower(): f.domain or None for f in arcpy.ListFields(source_path)}
self.lookup[source.sourceName.lower()] = field_lookup
for subtype_code, info in subtypes.items():
fields = []
for field, (_, domain) in info['FieldValues'].items():
if domain is None:
domain = field_lookup.get(field.lower(), None)
else:
domain = domain.name
if self.SKIP_NO_DOMAINS and domain is None:
continue
fields.append(dict(fieldName=field, domainName=domain))
if self.SORT_INNER:
fields.sort(key=lambda f: f['fieldName'])
asset_groups.append(dict(subtypeName=info['Name'], subtypeCode=subtype_code, fields=fields))
if self.SORT_INNER:
asset_groups.sort(key=lambda f: f['subtypeCode'])
if self.SORT_OUTER:
sources.sort(key=lambda f: f['sourceId'])
def main(self):
import uuid
# Creating a layer on the service territory will speedup calls to the DE (eg, ListFields, ListSubtypes, etc)
layer = None
if not self.is_service:
fc = self._join(self.describe.serviceTerritoryFeatureClassName)
layer = arcpy.MakeFeatureLayer_management(in_features=fc,
out_layer=f'svc_{uuid.uuid4().hex}',
where_clause='1=0')[0]
# We need to query sources before NA, because we use NA assignments to lookup field assignments
self.query_domains()
self.query_terminals()
self.query_sources()
self.query_network_attributes()
if layer:
arcpy.Delete_management(layer)
class Inject:
""" Injects or replaces the codes in Export Subnetwork payload with descriptions """
def __init__(self,
export_subnetwork_payload: Union[dict, str],
mapping_payload: Union[dict, str],
add_keys: bool = True):
self.export = self._load(export_subnetwork_payload)
self.lookup = self._load(mapping_payload)
self.insert = add_keys
@staticmethod
def _load(data: Union[dict, str]):
if isinstance(data, dict):
return data
return json.load(open(data, encoding='utf-8'))
@staticmethod
def flatten(data: dict) -> dict:
terminals = {t['terminalId']: t['terminalName']
for t in data.get('terminals', [])}
domains = {d['domainName'].lower(): {c['code']: c['name'] for c in d['codedValues']}
for d in data.get('domains', [])}
attributes = {n['networkAttributeName'].lower(): domains.get((n['domainName'] or '').lower(), {})
for n in data.get('networkAttributes', [])}
sources = {}
for source in data.get('sources', []):
subtypes = {}
for subtype in source.get('subtypes', []):
fields = {field['fieldName'].lower(): domains.get((field['domainName'] or '').lower(), {})
for field in subtype.get('fields', [])}
subtypes[subtype['subtypeCode']] = dict(subtypeName=subtype['subtypeName'], fields=fields)
sources[source['sourceId']] = dict(networkSourceName=source['sourceName'], subtypes=subtypes)
return dict(terminals=terminals,
networkAttributes=attributes,
sources=sources)
def lookup_value(self, data: dict, old_key: str, new_key: str, lookup: dict):
old = data.get(old_key, None)
if old is None:
return
data[new_key if self.insert else old_key] = lookup.get(old, old)
def replace(self):
lookup = self.flatten(self.lookup)
record: dict
terminal_lookup = lookup['terminals']
na_lookup = lookup['networkAttributes']
source_lookup = {}
group_lookup = {}
field_lookup = {}
for source_id, values in lookup.get('sources', {}).items():
source_lookup[source_id] = values['networkSourceName']
for code, records in values.get('subtypes', {}).items():
group_lookup.setdefault(source_id, {})[code] = records['subtypeName']
for field_name, fields in records.get('fields', {}).items():
(field_lookup
.setdefault(source_id, {})
.setdefault(code, {}))[field_name] = fields
for record in self.export.get('controllers', []):
src = record['networkSourceId']
ag = record['assetGroupCode']
self.lookup_value(record, 'networkSourceId', 'networkSourceName', source_lookup)
self.lookup_value(record, 'assetGroupCode', 'assetGroupName', group_lookup[src])
self.lookup_value(record, 'assetTypeCode', 'assetTypeName', field_lookup[src][ag]['assettype'])
self.lookup_value(record, 'terminalId', 'terminalName', terminal_lookup)
for record in self.export.get('featureElements', []):
src = record['networkSourceId']
ag = record['assetGroupCode']
# Feature Elements from the system classes (Assocations, SystemJunctions) report AG as 0.
# This can only exist in feature space as an error, so it's safe to assume system if we find a 0.
if record['assetGroupCode'] == 0:
is_system = True
g = f = t = {0: 'Unknown'}
else:
is_system = False
g = group_lookup[src]
f = field_lookup[src][ag]['assettype']
t = terminal_lookup
self.lookup_value(record, 'networkSourceId', 'networkSourceName', source_lookup)
self.lookup_value(record, 'assetGroupCode', 'assetGroupName', g)
self.lookup_value(record, 'assetTypeCode', 'assetTypeName', f)
self.lookup_value(record, 'terminalId', 'terminalName', t)
if is_system:
continue
fields = field_lookup[src][ag]
for field in record.get('fieldValues', []):
self.lookup_value(field, 'value', 'description', fields[field['fieldName'].lower()])
# Network Attributes use their name as the key.
for na in record.get('networkAttributeValues', []):
na_name = list(na)[0]
self.lookup_value(na, na_name, '0', na_lookup[na_name.lower()])
for record in self.export.get('connectivity', []):
for prefix in ('via', 'from', 'to'):
self.lookup_value(record, f'{prefix}NetworkSourceId', f'{prefix}NetworkSourceName', source_lookup)
self.lookup_value(record, f'{prefix}TerminalId', f'{prefix}TerminalName', terminal_lookup)
class Toolbox(object):
def __init__(self):
self.label = "Export Subnetwork Name Mapping"
self.alias = "sub"
# List of tool classes associated with this toolbox
self.tools = [CreateLookup, InjectCodes]
class CreateLookup(object):
def __init__(self):
self.label = "Create Lookup File"
self.description = "Creates a JSON lookup file to convert codes to names in the Export Subnetwork output"
self.canRunInBackground = False
def getParameterInfo(self):
un = arcpy.Parameter(name='in_utility_network',
displayName='Utility Network',
direction='Input',
datatype='DEUtilityNetwork',
parameterType='Required')
file = arcpy.Parameter(name='output_json',
displayName='Output Lookup File',
direction='Output',
datatype='DEFile',
parameterType='Required')
file.filter.list = ['json']
return [un, file]
def execute(self, parameters, messages):
un, file = parameters
s = Schema(un.valueAsText)
s.main()
save_json(s.payload, file.valueAsText)
class InjectCodes(object):
def __init__(self):
self.label = "Lookup Subnetwork Codes"
self.description = "Uses the subnetwork lookup file to replace or add names"
self.canRunInBackground = False
def getParameterInfo(self):
lookup = arcpy.Parameter(name='lookup_file',
displayName='Lookup File',
direction='Input',
datatype='DEFile',
parameterType='Required')
files = arcpy.Parameter(name='subnetwork_files',
displayName='Export Subnetwork results',
direction='Input',
datatype='DEFile',
parameterType='Required',
multiValue=True)
lookup.filter.list = files.filter.list = ['json']
return [lookup, files]
def execute(self, parameters, messages):
lookup, files = parameters
lookup = Inject._load(lookup.valueAsText)
arcpy.SetProgressor('STEP', 'Exporting...', 0, len(files.values))
for file in files.values:
file = file.value
i = Inject(export_subnetwork_payload=file,
mapping_payload=lookup,
add_keys=True)
i.replace()
save_json(i.export, file)
arcpy.SetProgressorPosition()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment