Created
August 5, 2014 20:52
-
-
Save bryanluman/e7d782a75c34e8d2b561 to your computer and use it in GitHub Desktop.
Mini-implementation of ArcSDE I made for ArcGIS 9 when ArcPy really sucked
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class FeatureClass(object): | |
def __init__(self, conn, database, schema, table_name): | |
self.conn = conn | |
self.database = database | |
self.schema = schema | |
self.table_name = table_name | |
self._set_registration_id() | |
self._set_geometry_table() | |
@property | |
def sql_name(self): | |
return ".".join([self.database, self.schema, self.table_name]) | |
@property | |
def geometry_sql_name(self): | |
return ".".join([self.database, self.schema, self.geometry_table_name]) | |
@property | |
def adds_table(self): | |
return "%s.%s.a%d" % (self.database, self.schema, self.registration_id) | |
@property | |
def deletes_table(self): | |
return "%s.%s.D%d" % (self.database, self.schema, self.registration_id) | |
def _set_registration_id(self): | |
cursor = self.conn.cursor() | |
cursor.execute("""SELECT registration_id | |
FROM sde.SDE_table_registry | |
WHERE database_name = '%s' | |
AND table_name = '%s'""" % (self.database, self.table_name)) | |
row = cursor.fetchone() | |
self.registration_id = row.registration_id | |
def _set_geometry_table(self): | |
cursor = self.conn.cursor() | |
cursor.execute("""SELECT f_geometry_column, g_table_name, f_table_schema | |
FROM sde.sde.SDE_geometry_columns | |
WHERE f_table_catalog = '%s' | |
AND f_table_name = '%s'""" % (self.database, self.table_name)) | |
row = cursor.fetchone() | |
self.geometry_table_name = row.g_table_name | |
def get_textanno_in_extent(self, extent, version): | |
cursor = self.conn.cursor() | |
shape_ids = self._shape_ids_in_extent(extent) | |
if len(shape_ids) == 0: | |
return [] | |
default_text = self._textvalues_in_default_version_matching_shapes(shape_ids) | |
text_pairs = {} | |
if version.name == 'DEFAULT': | |
for row in default_text: | |
text_pairs[row.TEXTVALUE] = row.TEXTVALUE2 | |
else: | |
deleted_ids = self._deleted_object_ids(version) | |
added_text = self._added_textvalues_matching_shapes(shape_ids, version) | |
for row in default_text: | |
if not row.OBJECTID in deleted_ids: | |
text_pairs[row.TEXTVALUE] = row.TEXTVALUE2 | |
for row in added_text: | |
text_pairs[row.TEXTVALUE] = row.TEXTVALUE2 | |
text_list = [ [k,text_pairs[k]] for k in text_pairs.keys() ] | |
return text_list | |
def object_ids_in_extent(self, extent, version): | |
cursor = self.conn.cursor() | |
shape_ids = self._shape_ids_in_extent(extent) | |
if len(shape_ids) == 0: | |
return [] | |
default_ids = self._ids_in_default_version_matching_shapes(shape_ids) | |
if version.name == 'DEFAULT': | |
return default_ids | |
else: | |
deleted_ids = self._deleted_object_ids(version) | |
added_ids = self._added_object_ids_matching_shapes(shape_ids, version) | |
ids_in_version = (set(default_ids) - set(deleted_ids)).union(set(added_ids)) | |
return list(ids_in_version) | |
def _ids_in_default_version_matching_shapes(self, shape_ids): | |
cursor = self.conn.cursor() | |
shape_ids_sql = ", ".join(map(str, set(shape_ids))) | |
cursor.execute("""SELECT DISTINCT(OBJECTID) | |
FROM %s | |
WHERE Shape IN (%s)""" % (self.sql_name, shape_ids_sql)) | |
return [ row.OBJECTID for row in cursor.fetchall() ] | |
def _textvalues_in_default_version_matching_shapes(self, shape_ids): | |
cursor = self.conn.cursor() | |
shape_ids_sql = ", ".join(map(str, set(shape_ids))) | |
cursor.execute("""SELECT OBJECTID, TEXTVALUE, TEXTVALUE2 | |
FROM %s | |
WHERE Shape IN (%s) | |
ORDER BY TEXTVALUE""" % (self.sql_name, shape_ids_sql)) | |
return [ row for row in cursor.fetchall() ] | |
def _shape_ids_in_extent(self, extent): | |
cursor = self.conn.cursor() | |
cursor.execute(""" SELECT fid | |
FROM %s | |
WHERE eminx > %s | |
AND eminy > %s | |
AND emaxx < %s | |
AND emaxy < %s""" % (self.geometry_sql_name, extent.xmin, | |
extent.ymin, extent.xmax, extent.ymax)) | |
return [ row.fid for row in cursor.fetchall() ] | |
def _deleted_object_ids(self, version): | |
cursor = self.conn.cursor() | |
cursor.execute("""SELECT DISTINCT(SDE_DELETES_ROW_ID) | |
FROM %s | |
WHERE DELETED_AT = '%d'""" % (self.deletes_table, version.state_id)) | |
return [ row[0] for row in cursor.fetchall() ] | |
def _added_object_ids_matching_shapes(self, shape_ids, version): | |
cursor = self.conn.cursor() | |
shape_ids_sql = ", ".join(map(str, set(shape_ids))) | |
cursor.execute("""SELECT OBJECTID, Shape | |
FROM %s | |
WHERE Shape in (%s) | |
AND SDE_STATE_ID = %d""" % (self.adds_table, shape_ids_sql, version.state_id) ) | |
return [ row[0] for row in cursor.fetchall() ] | |
def _added_textvalues_matching_shapes(self, shape_ids, version): | |
cursor = self.conn.cursor() | |
shape_ids_sql = ", ".join(map(str, set(shape_ids))) | |
cursor.execute("""SELECT OBJECTID, TEXTVALUE, TEXTVALUE2, Shape | |
FROM %s | |
WHERE Shape in (%s) | |
AND SDE_STATE_ID = %d | |
ORDER BY TEXTVALUE""" % (self.adds_table, shape_ids_sql, version.state_id) ) | |
return [ row for row in cursor.fetchall() ] | |
class Version(object): | |
def __init__(self, conn, name): | |
self.conn = conn | |
self.name = name | |
self._set_state_id() | |
def _set_state_id(self): | |
cursor = self.conn.cursor() | |
cursor.execute("SELECT state_id FROM sde.SDE_versions WHERE name = '%s'" % (self.name)) | |
row = cursor.fetchone() | |
self.state_id = row.state_id |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Wrote this years ago, slightly proud, slightly horrified.