Skip to content

Instantly share code, notes, and snippets.

@bryanluman
Created August 5, 2014 20:52
Show Gist options
  • Save bryanluman/e7d782a75c34e8d2b561 to your computer and use it in GitHub Desktop.
Save bryanluman/e7d782a75c34e8d2b561 to your computer and use it in GitHub Desktop.
Mini-implementation of ArcSDE I made for ArcGIS 9 when ArcPy really sucked
class FeatureClass(object):
def __init__(self, conn, database, schema, table_name):
self.conn = conn
self.database = database
self.schema = schema
self.table_name = table_name
self._set_registration_id()
self._set_geometry_table()
@property
def sql_name(self):
return ".".join([self.database, self.schema, self.table_name])
@property
def geometry_sql_name(self):
return ".".join([self.database, self.schema, self.geometry_table_name])
@property
def adds_table(self):
return "%s.%s.a%d" % (self.database, self.schema, self.registration_id)
@property
def deletes_table(self):
return "%s.%s.D%d" % (self.database, self.schema, self.registration_id)
def _set_registration_id(self):
cursor = self.conn.cursor()
cursor.execute("""SELECT registration_id
FROM sde.SDE_table_registry
WHERE database_name = '%s'
AND table_name = '%s'""" % (self.database, self.table_name))
row = cursor.fetchone()
self.registration_id = row.registration_id
def _set_geometry_table(self):
cursor = self.conn.cursor()
cursor.execute("""SELECT f_geometry_column, g_table_name, f_table_schema
FROM sde.sde.SDE_geometry_columns
WHERE f_table_catalog = '%s'
AND f_table_name = '%s'""" % (self.database, self.table_name))
row = cursor.fetchone()
self.geometry_table_name = row.g_table_name
def get_textanno_in_extent(self, extent, version):
cursor = self.conn.cursor()
shape_ids = self._shape_ids_in_extent(extent)
if len(shape_ids) == 0:
return []
default_text = self._textvalues_in_default_version_matching_shapes(shape_ids)
text_pairs = {}
if version.name == 'DEFAULT':
for row in default_text:
text_pairs[row.TEXTVALUE] = row.TEXTVALUE2
else:
deleted_ids = self._deleted_object_ids(version)
added_text = self._added_textvalues_matching_shapes(shape_ids, version)
for row in default_text:
if not row.OBJECTID in deleted_ids:
text_pairs[row.TEXTVALUE] = row.TEXTVALUE2
for row in added_text:
text_pairs[row.TEXTVALUE] = row.TEXTVALUE2
text_list = [ [k,text_pairs[k]] for k in text_pairs.keys() ]
return text_list
def object_ids_in_extent(self, extent, version):
cursor = self.conn.cursor()
shape_ids = self._shape_ids_in_extent(extent)
if len(shape_ids) == 0:
return []
default_ids = self._ids_in_default_version_matching_shapes(shape_ids)
if version.name == 'DEFAULT':
return default_ids
else:
deleted_ids = self._deleted_object_ids(version)
added_ids = self._added_object_ids_matching_shapes(shape_ids, version)
ids_in_version = (set(default_ids) - set(deleted_ids)).union(set(added_ids))
return list(ids_in_version)
def _ids_in_default_version_matching_shapes(self, shape_ids):
cursor = self.conn.cursor()
shape_ids_sql = ", ".join(map(str, set(shape_ids)))
cursor.execute("""SELECT DISTINCT(OBJECTID)
FROM %s
WHERE Shape IN (%s)""" % (self.sql_name, shape_ids_sql))
return [ row.OBJECTID for row in cursor.fetchall() ]
def _textvalues_in_default_version_matching_shapes(self, shape_ids):
cursor = self.conn.cursor()
shape_ids_sql = ", ".join(map(str, set(shape_ids)))
cursor.execute("""SELECT OBJECTID, TEXTVALUE, TEXTVALUE2
FROM %s
WHERE Shape IN (%s)
ORDER BY TEXTVALUE""" % (self.sql_name, shape_ids_sql))
return [ row for row in cursor.fetchall() ]
def _shape_ids_in_extent(self, extent):
cursor = self.conn.cursor()
cursor.execute(""" SELECT fid
FROM %s
WHERE eminx > %s
AND eminy > %s
AND emaxx < %s
AND emaxy < %s""" % (self.geometry_sql_name, extent.xmin,
extent.ymin, extent.xmax, extent.ymax))
return [ row.fid for row in cursor.fetchall() ]
def _deleted_object_ids(self, version):
cursor = self.conn.cursor()
cursor.execute("""SELECT DISTINCT(SDE_DELETES_ROW_ID)
FROM %s
WHERE DELETED_AT = '%d'""" % (self.deletes_table, version.state_id))
return [ row[0] for row in cursor.fetchall() ]
def _added_object_ids_matching_shapes(self, shape_ids, version):
cursor = self.conn.cursor()
shape_ids_sql = ", ".join(map(str, set(shape_ids)))
cursor.execute("""SELECT OBJECTID, Shape
FROM %s
WHERE Shape in (%s)
AND SDE_STATE_ID = %d""" % (self.adds_table, shape_ids_sql, version.state_id) )
return [ row[0] for row in cursor.fetchall() ]
def _added_textvalues_matching_shapes(self, shape_ids, version):
cursor = self.conn.cursor()
shape_ids_sql = ", ".join(map(str, set(shape_ids)))
cursor.execute("""SELECT OBJECTID, TEXTVALUE, TEXTVALUE2, Shape
FROM %s
WHERE Shape in (%s)
AND SDE_STATE_ID = %d
ORDER BY TEXTVALUE""" % (self.adds_table, shape_ids_sql, version.state_id) )
return [ row for row in cursor.fetchall() ]
class Version(object):
def __init__(self, conn, name):
self.conn = conn
self.name = name
self._set_state_id()
def _set_state_id(self):
cursor = self.conn.cursor()
cursor.execute("SELECT state_id FROM sde.SDE_versions WHERE name = '%s'" % (self.name))
row = cursor.fetchone()
self.state_id = row.state_id
@bryanluman
Copy link
Author

Wrote this years ago, slightly proud, slightly horrified.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment