Last active
August 25, 2022 00:16
-
-
Save Dr-Emann/a774bad40fc6f7035b82256b75dcd8aa to your computer and use it in GitHub Desktop.
A script to export attachments from ArcGIS Pro 3, pulling fields from either the attach table or the base table. Will create files based on the passed format string (expecting a string like "C:\Path\{CleanID}-{ATT_NAME}")
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import os.path | |
import string | |
import sys | |
from functools import lru_cache | |
import arcpy | |
from arcpy import da | |
_orig_stdout = sys.stdout | |
_orig_stderr = sys.stderr | |
log_file = open(r"C:\Users\JordanFox\OneDrive - Center for Watershed Protection\Desktop\fancylog.txt", "w", buffering=1) | |
print("Python Version: {}".format(sys.version), file=log_file) | |
sys.stdout = log_file | |
sys.stderr = log_file | |
base_table: str = arcpy.GetParameterAsText(0) | |
# Pass a filename format like `C:\dir\FILE_NAME-{CleanID}-{ATT_NAME} | |
format_str: str = arcpy.GetParameterAsText(1) | |
print("base_table: {}".format(base_table)) | |
print("format_str: {}".format(format_str)) | |
if not (base_table and format_str): | |
arcpy.AddError("Expected two arguments: the base table, and a filename pattern") | |
if base_table.endswith("__ATTACH"): | |
arcpy.AddWarning("Expected a base table as first parameter, does it really end in __ATTACH?") | |
attach_table = base_table + "__ATTACH" | |
def which_table(column: str) -> str: | |
print("Trying to find column {}".format(column)) | |
for t in (attach_table, base_table): | |
try: | |
with da.SearchCursor(t, [column]) as cursor: | |
item = next(iter(cursor)) | |
if item[0] is None or item[0] == "": | |
raise Exception("Got None") | |
# Success | |
table_name = "base" if t == base_table else "attach" | |
print("Found column '{}' in {}".format(column, table_name)) | |
return table_name | |
except Exception: | |
print("No column '{}' in {}".format(column, t)) | |
pass | |
raise Exception("Found no column '{}' on either table".format(column)) | |
base_table_columns = [] | |
attach_table_columns = [] | |
format_args = set( | |
[column for _, column, _, _ in string.Formatter().parse(format_str) if column is not None] | |
) | |
print("Format args: {}".format(format_args)) | |
if not format_args: | |
raise Exception("expected some format arguments (like `{SimpleID}`)") | |
for format_arg in format_args: | |
if format_arg == "": | |
raise "Expected all named format args! {FieldName}, not {}" | |
if which_table(format_arg) == "base": | |
base_table_columns.append(format_arg) | |
else: | |
attach_table_columns.append(format_arg) | |
def dict_from_columns(columns, row) -> dict: | |
result = {} | |
for i, column in enumerate(columns): | |
result[column] = row[i] | |
return result | |
@lru_cache(maxsize=1024) | |
def from_base_table(global_id) -> dict: | |
if not base_table_columns: | |
return {} | |
print("Finding {} in base table".format(global_id)) | |
global_id = str(global_id) | |
field = arcpy.AddFieldDelimiters(base_table, "GlobalID") | |
where_clause = "{} = '{}'".format(field, global_id) | |
with da.SearchCursor(base_table, base_table_columns, where_clause) as cursor: | |
items = list(cursor) | |
if len(items) != 1: | |
raise Exception("Expected a single item in the base table with global id") | |
item = items[0] | |
return dict_from_columns(base_table_columns, item) | |
EXTRA_COLUMNS = ["ATT_NAME", 'DATA', "REL_GLOBALID"] | |
for column in EXTRA_COLUMNS: | |
if column in attach_table_columns: | |
attach_table_columns.remove(column) | |
with da.SearchCursor(attach_table, EXTRA_COLUMNS + attach_table_columns) as cursor: | |
for item in cursor: | |
att_name, data, rel_globalid = item[:3] | |
if "." in att_name: | |
period_pos = att_name.find(".") | |
extension = att_name[period_pos:] | |
else: | |
# Take a guess | |
extension = ".jpg" | |
arg_values = dict_from_columns(attach_table_columns, item[3:]) | |
arg_values.update(from_base_table(rel_globalid)) | |
arg_values["ATT_NAME"] = att_name | |
arg_values["DATA"] = data | |
arg_values["REL_GLOBALID"] = rel_globalid | |
filename = format_str.format(**arg_values) | |
if "." not in filename: | |
filename += extension | |
directory = os.path.dirname(filename) | |
if not os.path.exists(directory): | |
print("{} does not exist yet, trying to create".format(directory)) | |
os.makedirs(directory, exist_ok=True) | |
with open(filename, "wb") as f: | |
f.write(data.tobytes()) | |
print("Wrote {}", filename) | |
del data | |
del item | |
sys.stdout = _orig_stdout | |
sys.stderr = _orig_stderr | |
log_file.flush() | |
del log_file |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment