-
-
Save pLeBlanc93/1c7d189a3d0cbb550d2fb4502b754d9d to your computer and use it in GitHub Desktop.
ArcGIS Pro Python toolbox for adding text descriptions to Export Subnetwork JSON
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import json | |
import os | |
from typing import Union | |
import arcpy | |
def save_json(data: dict, file: str): | |
with open(file, 'w', encoding='utf-8') as writer: | |
json.dump(data, writer, indent=' ' * 2, separators=(',', ' : ')) | |
class Schema: | |
""" Creates a JSON payload to be used in export subnetwork to convert codes to descriptions """ | |
SORT_OUTER = True # Sort top level arrays | |
SORT_INNER = True # Sort inner arrays (codedValue domains, subtypes, fields, etc) | |
SKIP_NO_DOMAINS = False # Exclude network attributes and fields without domains | |
def __init__(self, path: str): | |
self.utility_network = path | |
print("Describing Utility Network") | |
self.describe = arcpy.Describe(self.utility_network) | |
self.is_service = path.lower().startswith('http') | |
# When describing a service, the layer name is returned -- we need to map this map to layer ID | |
self.layer_mapping = {} | |
if self.is_service: | |
print("Describing the Service") | |
for layer in arcpy.da.Describe(self.describe.path)['children']: | |
if 'aliasName' in layer: | |
self.layer_mapping[layer['aliasName']] = layer['name'] | |
# Cached copy of domain assignment per source. | |
self.lookup = {} | |
self.payload = dict(domains=[], | |
terminals=[], | |
networkAttributes=[], | |
sources=[]) | |
def _join(self, name: str): | |
""" path joining to support different workspaces """ | |
if not self.is_service: | |
return os.path.join(os.path.dirname(self.describe.path), name) | |
return "/".join([self.describe.path, self.layer_mapping[name]]) | |
def query_domains(self): | |
domains = self.payload['domains'] | |
# FGDB UN will be .gdb/FD/UN | |
path = self.describe.path | |
if not self.is_service: | |
path = os.path.dirname(path) | |
print("Querying domains") | |
for domain in arcpy.da.ListDomains(path): | |
# We're only interested in codedValue domains because we cannot replace range domains with a description. | |
if domain.domainType == 'Range': | |
continue | |
coded_values = [dict(name=v, code=k) for k, v in domain.codedValues.items()] | |
domains.append(dict(domainName=domain.name, codedValues=coded_values)) | |
if self.SORT_INNER: | |
coded_values.sort(key=lambda f: f['code']) | |
if self.SORT_OUTER: | |
domains.sort(key=lambda f: f['domainName']) | |
def query_terminals(self): | |
terminals = self.payload['terminals'] | |
for config in self.describe.terminalConfigurations: | |
for terminal in config.terminals: | |
terminals.append(dict(terminalName=terminal.terminalName, terminalId=terminal.terminalID)) | |
if self.SORT_OUTER: | |
terminals.sort(key=lambda f: f['terminalId']) | |
def query_network_attributes(self): | |
attributes = self.payload['networkAttributes'] | |
for na in self.describe.networkAttributes: | |
domain = na.domainName or None | |
# For NA without a domain, we look at the assignments so see if the assigned field has a domain. | |
if domain is None: | |
for assignment in na.assignments: | |
domain_assignments = self.lookup.get(assignment.networkSourceName.lower(), {}) | |
domain = domain_assignments.get(assignment.evaluator.fieldName.lower(), None) | |
if domain: | |
break | |
if self.SKIP_NO_DOMAINS and domain is None: | |
continue | |
attributes.append(dict(networkAttributeName=na.name, domainName=domain)) | |
if self.SORT_OUTER: | |
attributes.sort(key=lambda f: f['networkAttributeName']) | |
def query_sources(self): | |
sources = self.payload['sources'] | |
# The system tables (and subnetLine) and reported here for completeness. | |
for name, code in [('Associations', 1), | |
('SystemJunctions', 2)]: | |
sources.append(dict(sourceName=name, sourceId=code, subtypes=[])) | |
for network in self.describe.domainNetworks: | |
for source in network.junctionSources + network.edgeSources: | |
asset_groups = [] | |
sources.append(dict(sourceName=source.sourceName, sourceId=source.sourceID, subtypes=asset_groups)) | |
if source.utilityNetworkFeatureClassUsageType == 'SubnetLine': | |
continue | |
# We need both fields and subtypes because if a field at the subtype level does not have a subtype, | |
# we revert to the field at the root level. | |
print(f'Describing {source.sourceName}') | |
source_path = self._join(source.sourceName) | |
subtypes = arcpy.da.ListSubtypes(source_path) | |
field_lookup = {f.name.lower(): f.domain or None for f in arcpy.ListFields(source_path)} | |
self.lookup[source.sourceName.lower()] = field_lookup | |
for subtype_code, info in subtypes.items(): | |
fields = [] | |
for field, (_, domain) in info['FieldValues'].items(): | |
if domain is None: | |
domain = field_lookup.get(field.lower(), None) | |
else: | |
domain = domain.name | |
if self.SKIP_NO_DOMAINS and domain is None: | |
continue | |
fields.append(dict(fieldName=field, domainName=domain)) | |
if self.SORT_INNER: | |
fields.sort(key=lambda f: f['fieldName']) | |
asset_groups.append(dict(subtypeName=info['Name'], subtypeCode=subtype_code, fields=fields)) | |
if self.SORT_INNER: | |
asset_groups.sort(key=lambda f: f['subtypeCode']) | |
if self.SORT_OUTER: | |
sources.sort(key=lambda f: f['sourceId']) | |
def main(self): | |
import uuid | |
# Creating a layer on the service territory will speedup calls to the DE (eg, ListFields, ListSubtypes, etc) | |
layer = None | |
if not self.is_service: | |
fc = self._join(self.describe.serviceTerritoryFeatureClassName) | |
layer = arcpy.MakeFeatureLayer_management(in_features=fc, | |
out_layer=f'svc_{uuid.uuid4().hex}', | |
where_clause='1=0')[0] | |
# We need to query sources before NA, because we use NA assignments to lookup field assignments | |
self.query_domains() | |
self.query_terminals() | |
self.query_sources() | |
self.query_network_attributes() | |
if layer: | |
arcpy.Delete_management(layer) | |
class Inject: | |
""" Injects or replaces the codes in Export Subnetwork payload with descriptions """ | |
def __init__(self, | |
export_subnetwork_payload: Union[dict, str], | |
mapping_payload: Union[dict, str], | |
add_keys: bool = True): | |
self.export = self._load(export_subnetwork_payload) | |
self.lookup = self._load(mapping_payload) | |
self.insert = add_keys | |
@staticmethod | |
def _load(data: Union[dict, str]): | |
if isinstance(data, dict): | |
return data | |
return json.load(open(data, encoding='utf-8')) | |
@staticmethod | |
def flatten(data: dict) -> dict: | |
terminals = {t['terminalId']: t['terminalName'] | |
for t in data.get('terminals', [])} | |
domains = {d['domainName'].lower(): {c['code']: c['name'] for c in d['codedValues']} | |
for d in data.get('domains', [])} | |
attributes = {n['networkAttributeName'].lower(): domains.get((n['domainName'] or '').lower(), {}) | |
for n in data.get('networkAttributes', [])} | |
sources = {} | |
for source in data.get('sources', []): | |
subtypes = {} | |
for subtype in source.get('subtypes', []): | |
fields = {field['fieldName'].lower(): domains.get((field['domainName'] or '').lower(), {}) | |
for field in subtype.get('fields', [])} | |
subtypes[subtype['subtypeCode']] = dict(subtypeName=subtype['subtypeName'], fields=fields) | |
sources[source['sourceId']] = dict(networkSourceName=source['sourceName'], subtypes=subtypes) | |
return dict(terminals=terminals, | |
networkAttributes=attributes, | |
sources=sources) | |
def lookup_value(self, data: dict, old_key: str, new_key: str, lookup: dict): | |
old = data.get(old_key, None) | |
if old is None: | |
return | |
data[new_key if self.insert else old_key] = lookup.get(old, old) | |
def replace(self): | |
lookup = self.flatten(self.lookup) | |
record: dict | |
terminal_lookup = lookup['terminals'] | |
na_lookup = lookup['networkAttributes'] | |
source_lookup = {} | |
group_lookup = {} | |
field_lookup = {} | |
for source_id, values in lookup.get('sources', {}).items(): | |
source_lookup[source_id] = values['networkSourceName'] | |
for code, records in values.get('subtypes', {}).items(): | |
group_lookup.setdefault(source_id, {})[code] = records['subtypeName'] | |
for field_name, fields in records.get('fields', {}).items(): | |
(field_lookup | |
.setdefault(source_id, {}) | |
.setdefault(code, {}))[field_name] = fields | |
for record in self.export.get('controllers', []): | |
src = record['networkSourceId'] | |
ag = record['assetGroupCode'] | |
self.lookup_value(record, 'networkSourceId', 'networkSourceName', source_lookup) | |
self.lookup_value(record, 'assetGroupCode', 'assetGroupName', group_lookup[src]) | |
self.lookup_value(record, 'assetTypeCode', 'assetTypeName', field_lookup[src][ag]['assettype']) | |
self.lookup_value(record, 'terminalId', 'terminalName', terminal_lookup) | |
for record in self.export.get('featureElements', []): | |
src = record['networkSourceId'] | |
ag = record['assetGroupCode'] | |
# Feature Elements from the system classes (Assocations, SystemJunctions) report AG as 0. | |
# This can only exist in feature space as an error, so it's safe to assume system if we find a 0. | |
if record['assetGroupCode'] == 0: | |
is_system = True | |
g = f = t = {0: 'Unknown'} | |
else: | |
is_system = False | |
g = group_lookup[src] | |
f = field_lookup[src][ag]['assettype'] | |
t = terminal_lookup | |
self.lookup_value(record, 'networkSourceId', 'networkSourceName', source_lookup) | |
self.lookup_value(record, 'assetGroupCode', 'assetGroupName', g) | |
self.lookup_value(record, 'assetTypeCode', 'assetTypeName', f) | |
self.lookup_value(record, 'terminalId', 'terminalName', t) | |
if is_system: | |
continue | |
fields = field_lookup[src][ag] | |
for field in record.get('fieldValues', []): | |
self.lookup_value(field, 'value', 'description', fields[field['fieldName'].lower()]) | |
# Network Attributes use their name as the key. | |
for na in record.get('networkAttributeValues', []): | |
na_name = list(na)[0] | |
self.lookup_value(na, na_name, '0', na_lookup[na_name.lower()]) | |
for record in self.export.get('connectivity', []): | |
for prefix in ('via', 'from', 'to'): | |
self.lookup_value(record, f'{prefix}NetworkSourceId', f'{prefix}NetworkSourceName', source_lookup) | |
self.lookup_value(record, f'{prefix}TerminalId', f'{prefix}TerminalName', terminal_lookup) | |
class Toolbox(object): | |
def __init__(self): | |
self.label = "Export Subnetwork Name Mapping" | |
self.alias = "sub" | |
# List of tool classes associated with this toolbox | |
self.tools = [CreateLookup, InjectCodes] | |
class CreateLookup(object): | |
def __init__(self): | |
self.label = "Create Lookup File" | |
self.description = "Creates a JSON lookup file to convert codes to names in the Export Subnetwork output" | |
self.canRunInBackground = False | |
def getParameterInfo(self): | |
un = arcpy.Parameter(name='in_utility_network', | |
displayName='Utility Network', | |
direction='Input', | |
datatype='DEUtilityNetwork', | |
parameterType='Required') | |
file = arcpy.Parameter(name='output_json', | |
displayName='Output Lookup File', | |
direction='Output', | |
datatype='DEFile', | |
parameterType='Required') | |
file.filter.list = ['json'] | |
return [un, file] | |
def execute(self, parameters, messages): | |
un, file = parameters | |
s = Schema(un.valueAsText) | |
s.main() | |
save_json(s.payload, file.valueAsText) | |
class InjectCodes(object): | |
def __init__(self): | |
self.label = "Lookup Subnetwork Codes" | |
self.description = "Uses the subnetwork lookup file to replace or add names" | |
self.canRunInBackground = False | |
def getParameterInfo(self): | |
lookup = arcpy.Parameter(name='lookup_file', | |
displayName='Lookup File', | |
direction='Input', | |
datatype='DEFile', | |
parameterType='Required') | |
files = arcpy.Parameter(name='subnetwork_files', | |
displayName='Export Subnetwork results', | |
direction='Input', | |
datatype='DEFile', | |
parameterType='Required', | |
multiValue=True) | |
lookup.filter.list = files.filter.list = ['json'] | |
return [lookup, files] | |
def execute(self, parameters, messages): | |
lookup, files = parameters | |
lookup = Inject._load(lookup.valueAsText) | |
arcpy.SetProgressor('STEP', 'Exporting...', 0, len(files.values)) | |
for file in files.values: | |
file = file.value | |
i = Inject(export_subnetwork_payload=file, | |
mapping_payload=lookup, | |
add_keys=True) | |
i.replace() | |
save_json(i.export, file) | |
arcpy.SetProgressorPosition() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment