Skip to content

Instantly share code, notes, and snippets.

@steveoh
Created March 7, 2022 16:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save steveoh/7441758059c4eccdfb04e6d0af99247a to your computer and use it in GitHub Desktop.
Save steveoh/7441758059c4eccdfb04e6d0af99247a to your computer and use it in GitHub Desktop.
'''
AGOLPallet.py
A module that contains a pallet definition for the data that gets pushed to AGOL.
'''
import re
import traceback
from datetime import timedelta
from os import mkdir, rename
from os.path import basename, dirname, join
from pathlib import Path
from shutil import rmtree
from time import time
from uuid import UUID
import arcgis
import arcpy
import pydash
from forklift import config as forklift_config
from forklift import core
from forklift.lift import copy_with_overwrite, gift_wrap
from forklift.models import Crate, Pallet
from tqdm import tqdm
import agol
import sgid_secrets as secrets
statewide_parcels_name = 'StateWideParcels'
output_data_folder = join(dirname(forklift_config.get_config_prop('hashLocation')), 'agol-pallet-shelf')
sgid_write = join(secrets.SHARE, 'SGID.sde')
class AGOLPallet(Pallet):
process_on_fail = True
ship_on_fail = True
def __init__(self, manual_layer=None):
"""
manual_layer - string of SGID layer name that you want to run manually
e.g. SGID.WATER.Riparian
"""
super().__init__()
self.manual_layer = manual_layer
def build(self, config):
self.config = config
sgid = join(self.garage, 'SGID.sde')
self.agol_items_table = join(sgid, 'SGID.META.AGOLItems')
self.sgid_names_to_agol_infos = {}
full_names = []
where = None
if self.manual_layer is not None:
where = f'TABLENAME = \'{self.manual_layer}\''
with arcpy.da.SearchCursor(self.agol_items_table, ['TABLENAME', 'AGOL_ITEM_ID', 'AGOL_PUBLISHED_NAME'], where) as cursor:
for name, agol_id, agol_title in cursor:
#: agol_id may contain comments when a layer is hosted by another agency in AGOL
#: e.g. "hosted by SITLA" rather than a GUID.
#: We want to skip those rows for this pallet. Note that we *do* want to include empty
#: values since those are layers that need to be published for the first time to AGOL.
if agol_id is not None:
agol_id = agol_id.strip()
if agol_id is not None and agol_id != '':
try:
UUID(agol_id)
except ValueError:
#: skip rows with random comments in this field...
continue
self.sgid_names_to_agol_infos[name.split('.')[-1]] = (agol_id, agol_title)
full_names.append(name)
for full_name in full_names:
_, category, name = full_name.split('.')
destination_workspace = join(self.staging_rack, f'{category.lower()}.gdb')
self.add_crate([name, sgid, destination_workspace])
#: statewide parcels are not in the SGID but we can use the ParcelPallet output.
if config == 'Production' and (self.manual_layer is None or self.manual_layer == statewide_parcels_name):
self.sgid_names_to_agol_infos[statewide_parcels_name] = ('9df536b0835e4ee78e34d7cf6fe132c2', 'Utah Statewide Parcels')
parcels_gdb = join(self.staging_rack, 'parcels.gdb')
self.add_crate([statewide_parcels_name, parcels_gdb, join(self.staging_rack, 'parcels_agol_stage.gdb')])
def process(self):
self.cleanup_output_folder()
self.gift_wrap_crates(self.get_updated_crates())
def cleanup_output_folder(self):
self.log.debug('cleaning up output data folder')
try:
rmtree(output_data_folder)
except FileNotFoundError:
pass
mkdir(output_data_folder)
def gift_wrap_crates(self, crates):
self.log.info('giftwrapping data into the output data folder')
updated_fgdbs = set([crate.destination_workspace for crate in crates])
[copy_with_overwrite(fgdb, join(output_data_folder, basename(fgdb))) for fgdb in updated_fgdbs]
gift_wrap(output_data_folder)
def requires_processing(self):
return self.config == 'Dev' or super().requires_processing()
def is_ready_to_ship(self):
#: always attempt to process and ship so that an error with one layer doesn't ruin the entire process
return True
def get_updated_crates(self, update_tables_list=None, force_all=False):
if update_tables_list is not None:
return [crate for crate in self.get_crates() if crate.destination_name in update_tables_list]
if self.config == 'Dev' or force_all:
return [crate for crate in self.get_crates()]
else:
return [crate for crate in self.get_crates() if crate.was_updated()]
def ship(self, update_tables=None, is_manual=False):
try:
update_tables_list = update_tables.split(';')
except AttributeError:
update_tables_list = None
force_all = update_tables is None and is_manual == True
updated_crates = self.get_updated_crates(update_tables_list=update_tables_list, force_all=force_all)
if len(updated_crates) == 0:
self.log.info('no crates where updated')
return
if self.config == 'Dev':
pro_project_name = 'AGOL_Layers_TEST.aprx'
else:
pro_project_name = 'AGOL_Layers.aprx'
if is_manual:
temp_output_data_folder = f'{output_data_folder}_x'
#: this is a manual run of the pallet file and the output folder needs to be cleaned up and refreshed...
try:
rmtree(temp_output_data_folder)
except:
pass
rename(output_data_folder, temp_output_data_folder)
self.cleanup_output_folder()
self.gift_wrap_crates(updated_crates)
pro_project_path = str(Path(secrets.SHARE) / pro_project_name)
agol_instance = agol.AGOL(self.log, secrets.AGOL_USER, secrets.AGOL_PASSWORD, pro_project_path)
errors = []
services_updated = []
for crate in tqdm(updated_crates):
agol_id, agol_title = self.sgid_names_to_agol_infos[crate.destination_name]
staged_feature_class = str(Path(output_data_folder) / crate.destination_workspace / crate.destination_name)
try:
if agol_id is None or agol_id.strip() == '':
new_id = agol_instance.publish(agol_title, staged_feature_class)
self.update_table(crate.source_name, new_id)
else:
agol_instance.update(staged_feature_class, agol_id)
except:
import traceback
error_message = f'Error publishing/updating {agol_title}\n{traceback.format_exc()}'
errors.append(error_message)
self.log.error(error_message)
continue
services_updated.append(agol_title)
del agol_instance
services_updated_text = ', '.join(services_updated)
if len(errors) == 0:
self.success = (True, f'Services updated: \n{services_updated_text}')
else:
errors_message = ', '.join(errors)
self.success = (False, f'Errors: \n{errors_message}\n\nServices that were updated successfully: \n{services_updated_text}')
if is_manual:
rmtree(output_data_folder)
rename(temp_output_data_folder, output_data_folder)
def update_table(self, table_name, new_id):
#: this is so that edits are saved with each successful publish
with arcpy.da.Editor(sgid_write):
update_query = f'TABLENAME = \'{table_name}\''
with arcpy.da.UpdateCursor(str(Path(sgid_write) / Path(self.agol_items_table).name), ['AGOL_ITEM_ID'], update_query) as update_cursor:
for row in update_cursor:
update_cursor.updateRow((new_id,))
if __name__ == "__main__":
'''
optional arguments:
1 - forklift configuration (Production | Dev)
Defaults to Dev.
2 - semi-colon list of tables to force-update (e.g. StateWideParcels;OilGasWells)
If this argument is not passed, then the code updates all crates.
'''
import sys
try:
config = sys.argv[1]
except IndexError:
config = 'Dev'
try:
update_tables = sys.argv[2]
except IndexError:
update_tables = None
pallet = AGOLPallet()
pallet.configure_standalone_logging()
pallet.build(config)
pallet.ship(update_tables, is_manual=True)
print(pallet.success)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment