Skip to content

Instantly share code, notes, and snippets.

@gustabot42
Last active August 25, 2017 21:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gustabot42/28dbfbaac1afd7f8b043805b941178b0 to your computer and use it in GitHub Desktop.
Save gustabot42/28dbfbaac1afd7f8b043805b941178b0 to your computer and use it in GitHub Desktop.
Script para comparar gdb
#!python3
# Using python3 anconda
# conda install -c conda-forge gdal
# gustavodiazjaimes@gmail.com
# versión 0.1
#usage:
#
#from gdb_cmp import *
#
#ORIGIN_PATH = "/home/kiwi/grupo2/AMB-Fase2/BACKUP FGDB/SIG_AMB15082017.gdb/"
#EDITED_PATH = ""
#ORIGIN_LAYER = "VisitaRESPEL"
#EDITED_LAYER = "VisitaRESPELHistorico"
#ID_ATTRNAME = "IDVISITARESPEL"
#
#print_compare(ORIGIN_PATH, EDITED_PATH, ORIGIN_LAYER, ID_ATTRNAME, EDITED_LAYER, rstrip=True)
from pathlib import Path
from contextlib import contextmanager
from collections import Counter
import pandas as pd
from osgeo import ogr
@contextmanager
def _get_dataset(gdb_path):
try:
driver = ogr.GetDriverByName("OpenFileGDB")
dataset = driver.Open(gdb_path, False) # False = no edition, OpenFileGDB driver is read only
yield dataset
finally:
del dataset
def get_layers(gdb_path):
with _get_dataset(gdb_path) as dataset:
layers = []
for layer in dataset:
layers.append(layer.GetName())
return layers
def get_layer_schema(gdb_path, layer_name):
with _get_dataset(gdb_path) as dataset:
#print(gdb_path, layer_name)
layer = dataset.GetLayerByName(layer_name)
schema = [(field.GetName(), field.GetTypeName()) for field in layer.schema]
return schema
def get_layer_ids(gdb_path, layer_name, id_attrname):
with _get_dataset(gdb_path) as dataset:
layer = dataset.GetLayerByName(layer_name)
ids = [e.GetField(id_attrname) for e in layer]
return ids
def find_repeated_ids(gdb_path, layer_name, id_attrname):
ids = get_layer_ids(gdb_path, layer_name, id_attrname)
repeated_ids = set([e for e,v in Counter(ids).items() if v > 1])
with _get_dataset(gdb_path) as dataset:
layer = dataset.GetLayerByName(layer_name)
repeated_obj = [(e.GetFID(), e.GetField(id_attrname))
for e in layer if e.GetField(id_attrname) in repeated_ids]
return sorted(repeated_obj, key=lambda v: (v[1],v[0]))
def print_elements(origin_path, edited_path, origin_layer, id_attrname, edited_layer="", ids_toprint=[]):
if not edited_path:
edited_path = origin_path
if not edited_layer:
edited_layer = origin_layer
*_, shared_schema = cmp_layer_schema(origin_path, edited_path, origin_layer, edited_layer)
shared_attrs = [name for name,_ in shared_schema]
elements = [shared_attrs]
for gdb_path, layer_name in [(origin_path, origin_layer), (edited_path, edited_layer)]:
with _get_dataset(gdb_path) as dataset:
layer = dataset.GetLayerByName(layer_name)
for e in layer:
if e.GetField(id_attrname) in ids_toprint:
values = [e.GetField(attr) for attr in shared_attrs]
elements.append(values)
return elements
def gdb_to_pandas(gdb_path, layer_name, id_attrname, attrs=None, ids=None):
if not attrs:
attrs = [name for name,_ in get_layer_schema(gdb_path, layer_name)]
data = []
with _get_dataset(gdb_path) as dataset:
layer = dataset.GetLayerByName(layer_name)
for e in layer:
if ids and e.GetField(id_attrname) not in ids:
continue
values = [e.GetField(attr) for attr in attrs]
data.append(values)
df = pd.DataFrame(data)
df.columns = attrs
return df
def cmp_layer_schema(origin_path, edited_path, origin_layer, edited_layer=""):
if not edited_path:
edited_path = origin_path
if not edited_layer:
edited_layer = origin_layer
origin_schema = set(get_layer_schema(origin_path, origin_layer))
edited_schema = set(get_layer_schema(edited_path, edited_layer))
origin_only = origin_schema - edited_schema
edited_only = edited_schema - origin_schema
shared = origin_schema & edited_schema
return origin_only, edited_only, shared
def find_related_ids(origin_path, edited_path, origin_layer, id_attrname, edited_layer=""):
if not edited_path:
edited_path = origin_path
if not edited_layer:
edited_layer = origin_layer
origin_ids = set(get_layer_ids(origin_path, origin_layer, id_attrname))
edited_ids = set(get_layer_ids(edited_path, edited_layer, id_attrname))
origin_only = origin_ids - edited_ids
edited_only = edited_ids - origin_ids
shared_ids = origin_ids & edited_ids
with _get_dataset(origin_path) as dataset:
layer = dataset.GetLayerByName(origin_layer)
origin_list = []
for e in layer:
_id = e.GetField(id_attrname)
if _id in origin_only:
origin_list.append(_id)
with _get_dataset(edited_path) as dataset:
layer = dataset.GetLayerByName(edited_layer)
edited_list = []
for e in layer:
_id = e.GetField(id_attrname)
if _id in edited_only:
edited_list.append(_id)
return origin_list, edited_list, shared_ids
def cmp_related_ids(origin_path, edited_path, origin_layer, id_attrname, edited_layer="", rstrip=False):
def _clean(value):
if value is None:
return ""
if rstrip and isinstance(value, str):
value = value.rstrip()
if value == "0":
return ""
return value
if not edited_path:
edited_path = origin_path
if not edited_layer:
edited_layer = origin_layer
*_, shared_schema = cmp_layer_schema(origin_path, edited_path, origin_layer, edited_layer)
shared_attrs = [name for name,_ in shared_schema]
*_, shared_ids = find_related_ids(origin_path, edited_path, origin_layer, id_attrname, edited_layer)
origin_df = gdb_to_pandas(origin_path, origin_layer, id_attrname, shared_attrs, shared_ids)
edited_df = gdb_to_pandas(edited_path, edited_layer, id_attrname, shared_attrs, shared_ids)
compare = []
for i, o_row in origin_df.iterrows():
_id = o_row[id_attrname]
for j, e_row in edited_df[edited_df[id_attrname] == _id].iterrows():
attr_diff = []
for k in range(len(shared_attrs)):
if _clean(o_row[k]) != _clean(e_row[k]):
attr_diff.append(shared_attrs[k])
compare.append((_id, attr_diff))
return compare
def print_compare(origin_path, edited_path, origin_layer, id_attrname, edited_layer="", rstrip=False):
def _print_layer_info(attrs, ids, repeated):
# layer
print("# Layer")
## Unique Attrs
print("## Atributos únicos = {}".format(len(attrs)))
print(", ".join(attrs))
## Núm Elemento
print("## Elementos en total = {}".format(len(ids)))
print(",".join([str(i) for i in ids]))
## Repited ids
print("## Elementos repetidos = {}".format(len(repeated)))
print("objectid, {}".format(id_attrname))
for r in repeated:
print("{}, {}".format(*r))
print("")
if not edited_path:
edited_path = origin_path
if not edited_layer:
edited_layer = origin_layer
origin_schema, edited_schema, shared_schema = cmp_layer_schema(origin_path, edited_path, origin_layer, edited_layer)
origin_attrs = [name for name,_ in origin_schema]
edited_attrs = [name for name,_ in edited_schema]
shared_attrs = [name for name,_ in shared_schema]
origin_ids = get_layer_ids(origin_path, origin_layer, id_attrname)
edited_ids = get_layer_ids(edited_path, edited_layer, id_attrname)
origin_repeated = find_repeated_ids(origin_path, origin_layer, id_attrname)
edited_repeated = find_repeated_ids(edited_path, edited_layer, id_attrname)
origin_unqids, edited_unqids, shared_ids = find_related_ids(origin_path, edited_path, origin_layer,
id_attrname, edited_layer)
compared = cmp_related_ids(origin_path, edited_path, origin_layer, id_attrname, edited_layer, rstrip)
_print_layer_info(origin_attrs, origin_ids, origin_repeated)
_print_layer_info(edited_attrs, edited_ids, edited_repeated)
print("# Compartidos")
print("## Atributos = {}".format(len(shared_attrs)))
print(", ".join(shared_attrs))
print("")
print("Comparación de IDs")
print("{}, Tipo, Attr modificados".format(id_attrname))
for nid in origin_unqids:
print("{}, <".format(nid))
for nid in edited_unqids:
print("{}, >".format(nid))
for nid, attrs in compared:
_type = "=" if not attrs else "+"
print("{}, {}, {}".format(nid, _type, "|".join(attrs)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment