Skip to content

Instantly share code, notes, and snippets.

@craigds
Created October 4, 2018 21:02
Show Gist options
  • Save craigds/4bb86df3b3ead343c22be6c7c9c14a57 to your computer and use it in GitHub Desktop.
Save craigds/4bb86df3b3ead343c22be6c7c9c14a57 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
###############################################################################
# $Id$
#
# Project: GDAL/OGR Test Suite
# Purpose: FGDB driver testing.
# Author: Even Rouault <even dot rouault at mines dash paris dot org>
#
###############################################################################
# Copyright (c) 2011-2014, Even Rouault <even dot rouault at mines-paris dot org>
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the "Software"),
# to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
# and/or sell copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
###############################################################################
import os
import sys
import shutil
import gdaltest
import ogrtest
from osgeo import gdal
from osgeo import ogr
from osgeo import osr
###############################################################################
# Test if driver is available
def test_ogr_fgdb_init():
ogrtest.fgdb_drv = None
ogrtest.fgdb_drv = ogr.GetDriverByName('FileGDB')
if ogrtest.fgdb_drv is None:
return 'skip'
ogrtest.openfilegdb_drv = ogr.GetDriverByName('OpenFileGDB')
if ogrtest.openfilegdb_drv is not None:
ogrtest.openfilegdb_drv.Deregister()
try:
shutil.rmtree("tmp/test.gdb")
except OSError:
pass
return 'success'
###############################################################################
def ogr_fgdb_is_sdk_1_4_or_later():
if ogrtest.fgdb_drv is None:
return False
if hasattr(ogrtest, 'fgdb_is_sdk_1_4'):
return ogrtest.fgdb_is_sdk_1_4
ogrtest.fgdb_is_sdk_1_4 = False
try:
shutil.rmtree("tmp/ogr_fgdb_is_sdk_1_4_or_later.gdb")
except OSError:
pass
ds = ogrtest.fgdb_drv.CreateDataSource("tmp/ogr_fgdb_is_sdk_1_4_or_later.gdb")
srs = osr.SpatialReference()
srs.ImportFromProj4('+proj=tmerc +datum=WGS84 +no_defs')
with gdaltest.error_handler():
lyr = ds.CreateLayer('test', srs=srs, geom_type=ogr.wkbPoint)
if lyr is not None:
ogrtest.fgdb_is_sdk_1_4 = True
ds = None
shutil.rmtree("tmp/ogr_fgdb_is_sdk_1_4_or_later.gdb")
return ogrtest.fgdb_is_sdk_1_4
###############################################################################
# Write and read back various geometry types
def test_ogr_fgdb_1():
if ogrtest.fgdb_drv is None:
return 'skip'
srs = osr.SpatialReference()
srs.SetFromUserInput("WGS84")
ds = ogrtest.fgdb_drv.CreateDataSource("tmp/test.gdb")
datalist = [["none", ogr.wkbNone, None],
["point", ogr.wkbPoint, "POINT (1 2)"],
["multipoint", ogr.wkbMultiPoint, "MULTIPOINT (1 2,3 4)"],
["linestring", ogr.wkbLineString, "LINESTRING (1 2,3 4)", "MULTILINESTRING ((1 2,3 4))"],
["multilinestring", ogr.wkbMultiLineString, "MULTILINESTRING ((1 2,3 4),(5 6,7 8))"],
["polygon", ogr.wkbPolygon, "POLYGON ((0 0,0 1,1 1,1 0,0 0))", "MULTIPOLYGON (((0 0,0 1,1 1,1 0,0 0)))"],
["multipolygon", ogr.wkbMultiPolygon, "MULTIPOLYGON (((0 0,0 1,1 1,1 0,0 0),(0.25 0.25,0.75 0.25,0.75 0.75,0.25 0.75,0.25 0.25)),((2 0,2 1,3 1,3 0,2 0)))"],
["point25D", ogr.wkbPoint25D, "POINT (1 2 3)"],
["multipoint25D", ogr.wkbMultiPoint25D, "MULTIPOINT (1 2 -10,3 4 -20)"],
["linestring25D", ogr.wkbLineString25D, "LINESTRING (1 2 -10,3 4 -20)", "MULTILINESTRING ((1 2 -10,3 4 -20))"],
["multilinestring25D", ogr.wkbMultiLineString25D, "MULTILINESTRING ((1 2 -10,3 4 -20))"],
["polygon25D", ogr.wkbPolygon25D, "POLYGON ((0 0 -10,0 1 -10,1 1 -10,1 0 -10,0 0 -10))", "MULTIPOLYGON (((0 0 -10,0 1 -10,1 1 -10,1 0 -10,0 0 -10)))"],
["multipolygon25D", ogr.wkbMultiPolygon25D, "MULTIPOLYGON (((0 0 -10,0 1 -10,1 1 -10,1 0 -10,0 0 -10)))"],
["multipatch", ogr.wkbMultiPolygon25D, "GEOMETRYCOLLECTION Z (TIN Z (((0.0 0.0 0,0.0 1.0 0,1.0 0.0 0,0.0 0.0 0)),((0.0 1.0 0,1.0 0.0 0,1.0 1.0 0,0.0 1.0 0))),TIN Z (((10.0 0.0 0,10.0 1.0 0,11.0 0.0 0,10.0 0.0 0)),((10.0 0.0 0,11.0 0.0 0,10.0 -1.0 0,10.0 0.0 0))),TIN Z (((5.0 0.0 0,5.0 1.0 0,6.0 0.0 0,5.0 0.0 0))),MULTIPOLYGON Z (((100.0 0.0 0,100.0 1.0 0,101.0 1.0 0,101.0 0.0 0,100.0 0.0 0),(100.25 0.25 0,100.75 0.25 0,100.75 0.75 0,100.75 0.25 0,100.25 0.25 0))))"],
["tin", ogr.wkbTINZ, "TIN Z (((0.0 0.0 0,0.0 1.0 0,1.0 0.0 0,0.0 0.0 0)),((0.0 1.0 0,1.0 0.0 0,1.0 1.0 0,0.0 1.0 0)))"],
["null_polygon", ogr.wkbPolygon, None],
["empty_polygon", ogr.wkbPolygon, "POLYGON EMPTY", None],
]
options = ['COLUMN_TYPES=smallint=esriFieldTypeSmallInteger,float=esriFieldTypeSingle,guid=esriFieldTypeGUID,xml=esriFieldTypeXML']
for data in datalist:
if data[1] == ogr.wkbNone:
lyr = ds.CreateLayer(data[0], geom_type=data[1], options=options)
elif data[0] == 'multipatch':
lyr = ds.CreateLayer(data[0], geom_type=data[1], srs=srs, options=['CREATE_MULTIPATCH=YES', options[0]])
else:
lyr = ds.CreateLayer(data[0], geom_type=data[1], srs=srs, options=options)
lyr.CreateField(ogr.FieldDefn("id", ogr.OFTInteger))
lyr.CreateField(ogr.FieldDefn("str", ogr.OFTString))
lyr.CreateField(ogr.FieldDefn("smallint", ogr.OFTInteger))
lyr.CreateField(ogr.FieldDefn("int", ogr.OFTInteger))
lyr.CreateField(ogr.FieldDefn("float", ogr.OFTReal))
lyr.CreateField(ogr.FieldDefn("real", ogr.OFTReal))
lyr.CreateField(ogr.FieldDefn("adate", ogr.OFTDateTime))
lyr.CreateField(ogr.FieldDefn("guid", ogr.OFTString))
lyr.CreateField(ogr.FieldDefn("xml", ogr.OFTString))
lyr.CreateField(ogr.FieldDefn("binary", ogr.OFTBinary))
lyr.CreateField(ogr.FieldDefn("binary2", ogr.OFTBinary))
fld_defn = ogr.FieldDefn("smallint2", ogr.OFTInteger)
fld_defn.SetSubType(ogr.OFSTInt16)
lyr.CreateField(fld_defn)
fld_defn = ogr.FieldDefn("float2", ogr.OFTReal)
fld_defn.SetSubType(ogr.OFSTFloat32)
lyr.CreateField(fld_defn)
# We need at least 5 features so that test_ogrsf can test SetFeature()
for i in range(5):
feat = ogr.Feature(lyr.GetLayerDefn())
if data[1] != ogr.wkbNone and data[2] is not None:
feat.SetGeometry(ogr.CreateGeometryFromWkt(data[2]))
feat.SetField("id", i + 1)
feat.SetField("str", "foo_\xc3\xa9")
feat.SetField("smallint", -13)
feat.SetField("int", 123)
feat.SetField("float", 1.5)
feat.SetField("real", 4.56)
feat.SetField("adate", "2013/12/26 12:34:56")
feat.SetField("guid", "{12345678-9abc-DEF0-1234-567890ABCDEF}")
feat.SetField("xml", "<foo></foo>")
feat.SetFieldBinaryFromHexString("binary", "00FF7F")
feat.SetFieldBinaryFromHexString("binary2", "123456")
feat.SetField("smallint2", -32768)
feat.SetField("float2", 1.5)
lyr.CreateFeature(feat)
for data in datalist:
lyr = ds.GetLayerByName(data[0])
if data[1] != ogr.wkbNone:
if lyr.GetSpatialRef().IsSame(srs) != 1:
gdaltest.post_reason('fail')
print(lyr.GetSpatialRef())
return 'fail'
feat = lyr.GetNextFeature()
if data[1] != ogr.wkbNone:
try:
expected_wkt = data[3]
except IndexError:
expected_wkt = data[2]
if expected_wkt is None:
if feat.GetGeometryRef() is not None:
gdaltest.post_reason('fail')
print(data)
feat.DumpReadable()
return 'fail'
elif ogrtest.check_feature_geometry(feat, expected_wkt) != 0:
gdaltest.post_reason('fail')
print(data)
feat.DumpReadable()
return 'fail'
if feat.GetField('id') != 1 or \
feat.GetField('smallint') != -13 or \
feat.GetField('int') != 123 or \
feat.GetField('float') != 1.5 or \
feat.GetField('real') != 4.56 or \
feat.GetField('adate') != "2013/12/26 12:34:56" or \
feat.GetField('guid') != "{12345678-9ABC-DEF0-1234-567890ABCDEF}" or \
feat.GetField('xml') != "<foo></foo>" or \
feat.GetField('binary') != "00FF7F" or \
feat.GetField('binary2') != "123456" or \
feat.GetField('smallint2') != -32768:
gdaltest.post_reason('fail')
feat.DumpReadable()
return 'fail'
sql_lyr = ds.ExecuteSQL("GetLayerDefinition %s" % lyr.GetName())
if sql_lyr is None:
gdaltest.post_reason('failure')
return 'fail'
feat = sql_lyr.GetNextFeature()
if feat is None:
gdaltest.post_reason('failure')
return 'fail'
feat = sql_lyr.GetNextFeature()
if feat is not None:
gdaltest.post_reason('failure')
return 'fail'
lyr.ResetReading()
lyr.TestCapability("foo")
ds.ReleaseResultSet(sql_lyr)
sql_lyr = ds.ExecuteSQL("GetLayerMetadata %s" % lyr.GetName())
if sql_lyr is None:
gdaltest.post_reason('failure')
return 'fail'
feat = sql_lyr.GetNextFeature()
if feat is None:
gdaltest.post_reason('failure')
return 'fail'
ds.ReleaseResultSet(sql_lyr)
sql_lyr = ds.ExecuteSQL("GetLayerDefinition foo")
if sql_lyr is not None:
gdaltest.post_reason('failure')
return 'fail'
sql_lyr = ds.ExecuteSQL("GetLayerMetadata foo")
if sql_lyr is not None:
gdaltest.post_reason('failure')
return 'fail'
ds = None
return 'success'
###############################################################################
# Test DeleteField()
def test_ogr_fgdb_DeleteField():
if ogrtest.fgdb_drv is None:
return 'skip'
ds = ogr.Open("tmp/test.gdb", update=1)
lyr = ds.GetLayerByIndex(0)
if lyr.GetLayerDefn().GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex('smallint')).GetSubType() != ogr.OFSTInt16:
gdaltest.post_reason('fail')
return 'fail'
if lyr.GetLayerDefn().GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex('smallint2')).GetSubType() != ogr.OFSTInt16:
gdaltest.post_reason('fail')
return 'fail'
if lyr.GetLayerDefn().GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex('float')).GetSubType() != ogr.OFSTFloat32:
gdaltest.post_reason('fail')
return 'fail'
if lyr.GetLayerDefn().GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex('float2')).GetSubType() != ogr.OFSTFloat32:
gdaltest.post_reason('fail')
return 'fail'
if lyr.GetLayerDefn().GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex('smallint')).GetWidth() != 0:
gdaltest.post_reason('failure')
return 'fail'
if lyr.GetLayerDefn().GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex('str')).GetWidth() != 0:
gdaltest.post_reason('failure')
return 'fail'
if lyr.DeleteField(lyr.GetLayerDefn().GetFieldIndex('str')) != 0:
gdaltest.post_reason('failure')
return 'fail'
# Needed since FileGDB v1.4, otherwise crash/error ...
if True: # pylint: disable=using-constant-test
ds = ogr.Open("tmp/test.gdb", update=1)
lyr = ds.GetLayerByIndex(0)
fld_defn = ogr.FieldDefn("str2", ogr.OFTString)
fld_defn.SetWidth(80)
lyr.CreateField(fld_defn)
feat = lyr.GetNextFeature()
feat.SetField("str2", "foo2_\xc3\xa9")
lyr.SetFeature(feat)
# Test updating non-existing feature
feat.SetFID(-10)
if lyr.SetFeature(feat) != ogr.OGRERR_NON_EXISTING_FEATURE:
gdaltest.post_reason('Expected failure of SetFeature().')
return 'fail'
# Test deleting non-existing feature
if lyr.DeleteFeature(-10) != ogr.OGRERR_NON_EXISTING_FEATURE:
gdaltest.post_reason('Expected failure of DeleteFeature().')
return 'fail'
feat = None
ds = None
ds = ogr.Open("tmp/test.gdb")
lyr = ds.GetLayerByIndex(0)
if lyr.GetLayerDefn().GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex('str2')).GetWidth() != 80:
gdaltest.post_reason('failure')
return 'fail'
if lyr.GetLayerDefn().GetFieldIndex('str') != -1:
gdaltest.post_reason('failure')
return 'fail'
feat = lyr.GetNextFeature()
if feat.GetFieldAsString("str2") != "foo2_\xc3\xa9":
gdaltest.post_reason('failure')
return 'fail'
ds = None
return 'success'
###############################################################################
# Run test_ogrsf
def test_ogr_fgdb_2():
if ogrtest.fgdb_drv is None:
return 'skip'
import test_cli_utilities
if test_cli_utilities.get_test_ogrsf_path() is None:
return 'skip'
ret = gdaltest.runexternal(test_cli_utilities.get_test_ogrsf_path() + ' -ro tmp/test.gdb --config OGR_SKIP OpenFileGDB')
if ret.find('INFO') == -1 or ret.find('ERROR') != -1:
print(ret)
return 'fail'
return 'success'
###############################################################################
# Run ogr2ogr
def test_ogr_fgdb_3():
if ogrtest.fgdb_drv is None:
return 'skip'
import test_cli_utilities
if test_cli_utilities.get_ogr2ogr_path() is None:
return 'skip'
try:
shutil.rmtree("tmp/poly.gdb")
except OSError:
pass
gdaltest.runexternal(test_cli_utilities.get_ogr2ogr_path() + ' -f filegdb tmp/poly.gdb data/poly.shp -nlt MULTIPOLYGON -a_srs None')
ds = ogr.Open('tmp/poly.gdb')
if ds is None or ds.GetLayerCount() == 0:
gdaltest.post_reason('ogr2ogr failed')
return 'fail'
ds = None
if test_cli_utilities.get_test_ogrsf_path() is None:
return 'skip'
ret = gdaltest.runexternal(test_cli_utilities.get_test_ogrsf_path() + ' tmp/poly.gdb')
# print ret
if ret.find('INFO') == -1 or ret.find('ERROR') != -1:
print(ret)
return 'fail'
return 'success'
###############################################################################
# Test SQL support
def test_ogr_fgdb_sql():
if ogrtest.fgdb_drv is None:
return 'skip'
import test_cli_utilities
if test_cli_utilities.get_ogr2ogr_path() is None:
return 'skip'
ds = ogr.Open('tmp/poly.gdb')
ds.ExecuteSQL("CREATE INDEX idx_poly_eas_id ON poly(EAS_ID)")
sql_lyr = ds.ExecuteSQL("SELECT * FROM POLY WHERE EAS_ID = 170", dialect='FileGDB')
feat = sql_lyr.GetNextFeature()
if feat is None:
return 'fail'
feat = sql_lyr.GetNextFeature()
if feat is not None:
return 'fail'
feat = None
ds.ReleaseResultSet(sql_lyr)
ds = None
return 'success'
###############################################################################
# Test delete layer
def test_ogr_fgdb_4():
if ogrtest.fgdb_drv is None:
return 'skip'
for j in range(2):
# Create a layer
ds = ogr.Open("tmp/test.gdb", update=1)
srs = osr.SpatialReference()
srs.SetFromUserInput("WGS84")
lyr = ds.CreateLayer("layer_to_remove", geom_type=ogr.wkbPoint, srs=srs)
lyr.CreateField(ogr.FieldDefn("str", ogr.OFTString))
feat = ogr.Feature(lyr.GetLayerDefn())
feat.SetGeometry(ogr.CreateGeometryFromWkt('POINT(2 49)'))
feat.SetField("str", "foo")
feat = None
lyr = None
if j == 1:
ds = None
ds = ogr.Open("tmp/test.gdb", update=1)
# Delete it
for i in range(ds.GetLayerCount()):
if ds.GetLayer(i).GetName() == 'layer_to_remove':
ds.DeleteLayer(i)
break
# Check it no longer exists
lyr = ds.GetLayerByName('layer_to_remove')
ds = None
if lyr is not None:
gdaltest.post_reason('failed at iteration %d' % j)
return 'fail'
return 'success'
###############################################################################
# Test DeleteDataSource()
def test_ogr_fgdb_5():
if ogrtest.fgdb_drv is None:
return 'skip'
if ogrtest.fgdb_drv.DeleteDataSource("tmp/test.gdb") != 0:
gdaltest.post_reason('DeleteDataSource() failed')
return 'fail'
try:
os.stat("tmp/test.gdb")
gdaltest.post_reason("tmp/test.gdb still existing")
return 'fail'
except OSError:
pass
return 'success'
###############################################################################
# Test adding a layer to an existing feature dataset
def test_ogr_fgdb_6():
if ogrtest.fgdb_drv is None:
return 'skip'
srs = osr.SpatialReference()
srs.SetFromUserInput("WGS84")
ds = ogrtest.fgdb_drv.CreateDataSource('tmp/test.gdb')
ds.CreateLayer('layer1', srs=srs, geom_type=ogr.wkbPoint, options=['FEATURE_DATASET=featuredataset'])
ds.CreateLayer('layer2', srs=srs, geom_type=ogr.wkbPoint, options=['FEATURE_DATASET=featuredataset'])
ds = None
ds = ogr.Open('tmp/test.gdb')
if ds.GetLayerCount() != 2:
return 'fail'
ds = None
return 'success'
###############################################################################
# Test bulk loading (#4420)
def test_ogr_fgdb_7():
if ogrtest.fgdb_drv is None:
return 'skip'
try:
shutil.rmtree("tmp/test.gdb")
except OSError:
pass
srs = osr.SpatialReference()
srs.SetFromUserInput("WGS84")
ds = ogrtest.fgdb_drv.CreateDataSource('tmp/test.gdb')
lyr = ds.CreateLayer('test', srs=srs, geom_type=ogr.wkbPoint)
lyr.CreateField(ogr.FieldDefn('id', ogr.OFTInteger))
gdal.SetConfigOption('FGDB_BULK_LOAD', 'YES')
for i in range(1000):
feat = ogr.Feature(lyr.GetLayerDefn())
feat.SetField(0, i)
geom = ogr.CreateGeometryFromWkt('POINT(0 1)')
feat.SetGeometry(geom)
lyr.CreateFeature(feat)
feat = None
lyr.ResetReading()
feat = lyr.GetNextFeature()
if feat.GetField(0) != 0:
return 'fail'
ds = None
gdal.SetConfigOption('FGDB_BULK_LOAD', None)
return 'success'
###############################################################################
# Test field name laundering (#4458)
def test_ogr_fgdb_8():
if ogrtest.fgdb_drv is None:
return 'skip'
try:
shutil.rmtree("tmp/test.gdb")
except OSError:
pass
srs = osr.SpatialReference()
srs.SetFromUserInput("WGS84")
ds = ogrtest.fgdb_drv.CreateDataSource('tmp/test.gdb')
lyr = ds.CreateLayer('test', srs=srs, geom_type=ogr.wkbPoint)
gdal.PushErrorHandler('CPLQuietErrorHandler')
lyr.CreateField(ogr.FieldDefn('FROM', ogr.OFTInteger)) # reserved keyword
lyr.CreateField(ogr.FieldDefn('1NUMBER', ogr.OFTInteger)) # starting with a number
lyr.CreateField(ogr.FieldDefn('WITH SPACE AND !$*!- special characters', ogr.OFTInteger)) # unallowed characters
lyr.CreateField(ogr.FieldDefn('A123456789012345678901234567890123456789012345678901234567890123', ogr.OFTInteger)) # 64 characters : ok
lyr.CreateField(ogr.FieldDefn('A1234567890123456789012345678901234567890123456789012345678901234', ogr.OFTInteger)) # 65 characters : nok
lyr.CreateField(ogr.FieldDefn('A12345678901234567890123456789012345678901234567890123456789012345', ogr.OFTInteger)) # 66 characters : nok
gdal.PopErrorHandler()
lyr_defn = lyr.GetLayerDefn()
expected_names = ['FROM_', '_1NUMBER', 'WITH_SPACE_AND_______special_characters',
'A123456789012345678901234567890123456789012345678901234567890123',
'A1234567890123456789012345678901234567890123456789012345678901_1',
'A1234567890123456789012345678901234567890123456789012345678901_2']
for i in range(5):
if lyr_defn.GetFieldIndex(expected_names[i]) != i:
gdaltest.post_reason('did not find %s' % expected_names[i])
return 'fail'
return 'success'
###############################################################################
# Test layer name laundering (#4466)
def test_ogr_fgdb_9():
if ogrtest.fgdb_drv is None:
return 'skip'
try:
shutil.rmtree("tmp/test.gdb")
except OSError:
pass
srs = osr.SpatialReference()
srs.SetFromUserInput("WGS84")
_160char = 'A123456789' * 16
in_names = ['FROM', # reserved keyword
'1NUMBER', # starting with a number
'WITH SPACE AND !$*!- special characters', # banned characters
'sde_foo', # reserved prefixes
_160char, # OK
_160char + 'A', # too long
_160char + 'B', # still too long
]
ds = ogrtest.fgdb_drv.CreateDataSource('tmp/test.gdb')
gdal.PushErrorHandler('CPLQuietErrorHandler')
for in_name in in_names:
lyr = ds.CreateLayer(in_name, srs=srs, geom_type=ogr.wkbPoint)
gdal.PopErrorHandler()
lyr.GetLayerDefn()
expected_names = ['FROM_',
'_1NUMBER',
'WITH_SPACE_AND_______special_characters',
'_sde_foo',
_160char,
_160char[0:158] + '_1',
_160char[0:158] + '_2']
for i, exp_name in enumerate(expected_names):
if ds.GetLayerByIndex(i).GetName() != exp_name:
gdaltest.post_reason('did not find %s' % exp_name)
return 'fail'
return 'success'
###############################################################################
# Test SRS support
def test_ogr_fgdb_10():
if ogrtest.fgdb_drv is None:
return 'skip'
try:
shutil.rmtree("tmp/test.gdb")
except OSError:
pass
srs_exact_4326 = osr.SpatialReference()
srs_exact_4326.ImportFromEPSG(4326)
srs_approx_4326 = srs_exact_4326.Clone()
srs_approx_4326.MorphToESRI()
srs_approx_4326.MorphFromESRI()
srs_exact_2193 = osr.SpatialReference()
srs_exact_2193.ImportFromEPSG(2193)
srs_approx_2193 = srs_exact_2193.Clone()
srs_approx_2193.MorphToESRI()
srs_approx_2193.MorphFromESRI()
srs_not_in_db = osr.SpatialReference("""PROJCS["foo",
GEOGCS["foo",
DATUM["foo",
SPHEROID["foo",6000000,300]],
PRIMEM["Greenwich",0],
UNIT["Degree",0.017453292519943295]],
PROJECTION["Transverse_Mercator"],
PARAMETER["latitude_of_origin",0],
PARAMETER["central_meridian",0],
PARAMETER["scale_factor",1],
PARAMETER["false_easting",500000],
PARAMETER["false_northing",0],
UNIT["Meter",1]]""")
srs_exact_4230 = osr.SpatialReference()
srs_exact_4230.ImportFromEPSG(4230)
srs_approx_4230 = srs_exact_4230.Clone()
srs_approx_4230.MorphToESRI()
srs_approx_4230.MorphFromESRI()
srs_approx_intl = osr.SpatialReference()
srs_approx_intl.ImportFromProj4('+proj=longlat +ellps=intl +no_defs')
srs_exact_4233 = osr.SpatialReference()
srs_exact_4233.ImportFromEPSG(4233)
ds = ogrtest.fgdb_drv.CreateDataSource('tmp/test.gdb')
lyr = ds.CreateLayer("srs_exact_4326", srs=srs_exact_4326, geom_type=ogr.wkbPoint)
lyr = ds.CreateLayer("srs_approx_4326", srs=srs_approx_4326, geom_type=ogr.wkbPoint)
lyr = ds.CreateLayer("srs_exact_2193", srs=srs_exact_2193, geom_type=ogr.wkbPoint)
lyr = ds.CreateLayer("srs_approx_2193", srs=srs_approx_2193, geom_type=ogr.wkbPoint)
lyr = ds.CreateLayer("srs_approx_4230", srs=srs_approx_4230, geom_type=ogr.wkbPoint)
# will fail
gdal.PushErrorHandler('CPLQuietErrorHandler')
lyr = ds.CreateLayer("srs_approx_intl", srs=srs_approx_intl, geom_type=ogr.wkbPoint)
gdal.PopErrorHandler()
# will fail: 4233 doesn't exist in DB
gdal.PushErrorHandler('CPLQuietErrorHandler')
lyr = ds.CreateLayer("srs_exact_4233", srs=srs_exact_4233, geom_type=ogr.wkbPoint)
gdal.PopErrorHandler()
# will fail
gdal.PushErrorHandler('CPLQuietErrorHandler')
lyr = ds.CreateLayer("srs_not_in_db", srs=srs_not_in_db, geom_type=ogr.wkbPoint)
gdal.PopErrorHandler()
ds = None
ds = ogr.Open('tmp/test.gdb')
lyr = ds.GetLayerByName("srs_exact_4326")
if lyr.GetSpatialRef().ExportToWkt().find('4326') == -1:
gdaltest.post_reason('fail')
return 'fail'
lyr = ds.GetLayerByName("srs_approx_4326")
if lyr.GetSpatialRef().ExportToWkt().find('4326') == -1:
gdaltest.post_reason('fail')
return 'fail'
lyr = ds.GetLayerByName("srs_exact_2193")
if lyr.GetSpatialRef().ExportToWkt().find('2193') == -1:
gdaltest.post_reason('fail')
return 'fail'
lyr = ds.GetLayerByName("srs_approx_2193")
if lyr.GetSpatialRef().ExportToWkt().find('2193') == -1:
gdaltest.post_reason('fail')
return 'fail'
lyr = ds.GetLayerByName("srs_approx_4230")
if lyr.GetSpatialRef().ExportToWkt().find('4230') == -1:
gdaltest.post_reason('fail')
return 'fail'
ds = None
return 'success'
###############################################################################
# Test all data types
def test_ogr_fgdb_11():
if ogrtest.fgdb_drv is None:
return 'skip'
try:
shutil.rmtree("tmp/test.gdb")
except OSError:
pass
f = open('data/test_filegdb_field_types.xml', 'rt')
xml_def = f.read()
f.close()
ds = ogrtest.fgdb_drv.CreateDataSource('tmp/test.gdb')
lyr = ds.CreateLayer("test", geom_type=ogr.wkbNone, options=['XML_DEFINITION=%s' % xml_def])
feat = ogr.Feature(lyr.GetLayerDefn())
feat.SetField("esriFieldTypeSmallInteger", 12)
feat.SetField("esriFieldTypeInteger", 3456)
feat.SetField("esriFieldTypeSingle", 78.9)
feat.SetField("esriFieldTypeDouble", 1.23)
feat.SetField("esriFieldTypeDate", "2012/12/31 12:34:56")
feat.SetField("esriFieldTypeString", "astr")
feat.SetField("esriFieldTypeGlobalID", "{12345678-9ABC-DEF0-1234-567890ABCDEF}") # This is ignored and value is generated by FileGDB SDK itself
feat.SetField("esriFieldTypeGUID", "{12345678-9abc-DEF0-1234-567890ABCDEF}")
lyr.CreateFeature(feat)
feat = None
feat = ogr.Feature(lyr.GetLayerDefn())
lyr.CreateFeature(feat)
feat = None
# Create a esriFieldTypeGlobalID field
lyr = ds.CreateLayer('test2', geom_type=ogr.wkbNone, options=['COLUMN_TYPES=global_id=esriFieldTypeGlobalID'])
lyr.CreateField(ogr.FieldDefn('global_id', ogr.OFTString))
feat = ogr.Feature(lyr.GetLayerDefn())
lyr.CreateFeature(feat)
feat = None
ds = None
ds = ogr.Open('tmp/test.gdb')
lyr = ds.GetLayerByName('test')
feat = lyr.GetNextFeature()
if feat.GetField('esriFieldTypeSmallInteger') != 12 or \
feat.GetField('esriFieldTypeInteger') != 3456 or \
abs(feat.GetField('esriFieldTypeSingle') - 78.9) > 1e-2 or \
feat.GetField('esriFieldTypeDouble') != 1.23 or \
feat.GetField('esriFieldTypeDate') != '2012/12/31 12:34:56' or \
feat.GetField('esriFieldTypeString') != 'astr' or \
feat.GetField('esriFieldTypeGUID') != '{12345678-9ABC-DEF0-1234-567890ABCDEF}' or \
(not feat.IsFieldSet('esriFieldTypeGlobalID')):
feat.DumpReadable()
return 'fail'
feat = lyr.GetNextFeature()
if not feat.IsFieldSet('esriFieldTypeGlobalID'):
feat.DumpReadable()
return 'fail'
lyr = ds.GetLayerByName('test2')
feat = lyr.GetNextFeature()
if not feat.IsFieldSet('global_id'):
feat.DumpReadable()
return 'fail'
ds = None
return 'success'
###############################################################################
# Test failed Open()
def test_ogr_fgdb_12():
if ogrtest.fgdb_drv is None:
return 'skip'
ds = ogr.Open('tmp/non_existing.gdb')
if ds is not None:
gdaltest.post_reason('fail')
return 'fail'
gdal.Unlink('tmp/dummy.gdb')
try:
shutil.rmtree('tmp/dummy.gdb')
except OSError:
pass
f = open('tmp/dummy.gdb', 'wb')
f.close()
ds = ogr.Open('tmp/dummy.gdb')
if ds is not None:
gdaltest.post_reason('fail')
return 'fail'
os.unlink('tmp/dummy.gdb')
os.mkdir('tmp/dummy.gdb')
gdal.PushErrorHandler('CPLQuietErrorHandler')
ds = ogr.Open('tmp/dummy.gdb')
gdal.PopErrorHandler()
if ds is not None:
gdaltest.post_reason('fail')
return 'fail'
shutil.rmtree('tmp/dummy.gdb')
return 'success'
###############################################################################
# Test failed CreateDataSource() and DeleteDataSource()
def test_ogr_fgdb_13():
if ogrtest.fgdb_drv is None:
return 'skip'
gdal.PushErrorHandler('CPLQuietErrorHandler')
ds = ogrtest.fgdb_drv.CreateDataSource('tmp/foo')
gdal.PopErrorHandler()
if ds is not None:
gdaltest.post_reason('fail')
return 'fail'
f = open('tmp/dummy.gdb', 'wb')
f.close()
gdal.PushErrorHandler('CPLQuietErrorHandler')
ds = ogrtest.fgdb_drv.CreateDataSource('tmp/dummy.gdb')
gdal.PopErrorHandler()
if ds is not None:
gdaltest.post_reason('fail')
return 'fail'
os.unlink('tmp/dummy.gdb')
try:
shutil.rmtree("/nonexistingdir")
except OSError:
pass
name = '/nonexistingdrive:/nonexistingdir/dummy.gdb'
gdal.PushErrorHandler('CPLQuietErrorHandler')
ds = ogrtest.fgdb_drv.CreateDataSource(name)
gdal.PopErrorHandler()
if ds is not None:
gdaltest.post_reason('fail')
return 'fail'
gdal.PushErrorHandler('CPLQuietErrorHandler')
ret = ogrtest.fgdb_drv.DeleteDataSource(name)
gdal.PopErrorHandler()
if ret == 0:
gdaltest.post_reason('fail')
return 'fail'
return 'success'
###############################################################################
# Test interleaved opening and closing of databases (#4270)
def test_ogr_fgdb_14():
if ogrtest.fgdb_drv is None:
return 'skip'
for _ in range(3):
ds1 = ogr.Open("tmp/test.gdb")
if ds1 is None:
gdaltest.post_reason('fail')
return 'fail'
ds2 = ogr.Open("tmp/test.gdb")
if ds2 is None:
gdaltest.post_reason('fail')
return 'fail'
ds2 = None
ds1 = None
return 'success'
###############################################################################
# Test opening a FGDB with both SRID and LatestSRID set (#5638)
def test_ogr_fgdb_15():
if ogrtest.fgdb_drv is None:
return 'skip'
try:
shutil.rmtree('tmp/test3005.gdb')
except OSError:
pass
gdaltest.unzip('tmp', 'data/test3005.gdb.zip')
ds = ogr.Open('tmp/test3005.gdb')
lyr = ds.GetLayer(0)
got_wkt = lyr.GetSpatialRef().ExportToWkt()
sr = osr.SpatialReference()
sr.ImportFromEPSG(3005)
expected_wkt = sr.ExportToWkt()
if got_wkt != expected_wkt:
gdaltest.post_reason('fail')
print(got_wkt)
print(expected_wkt)
return 'fail'
ds = None
return 'success'
###############################################################################
# Test fix for #5674
def test_ogr_fgdb_16():
if ogrtest.fgdb_drv is None or ogrtest.openfilegdb_drv is None:
return 'skip'
try:
gdaltest.unzip('tmp/cache', 'data/ESSENCE_NAIPF_ORI_PROV_sub93.gdb.zip')
except OSError:
pass
try:
os.stat('tmp/cache/ESSENCE_NAIPF_ORI_PROV_sub93.gdb')
except OSError:
return 'skip'
ogrtest.fgdb_drv.Deregister()
# Force FileGDB first
ogrtest.fgdb_drv.Register()
ogrtest.openfilegdb_drv.Register()
ds = ogr.Open('tmp/cache/ESSENCE_NAIPF_ORI_PROV_sub93.gdb')
if ds is None:
ret = 'fail'
else:
ret = 'success'
# Deregister OpenFileGDB again
ogrtest.openfilegdb_drv.Deregister()
shutil.rmtree('tmp/cache/ESSENCE_NAIPF_ORI_PROV_sub93.gdb')
return ret
###############################################################################
# Test not nullable fields
def test_ogr_fgdb_17():
if ogrtest.fgdb_drv is None:
return 'skip'
try:
shutil.rmtree("tmp/test.gdb")
except OSError:
pass
ds = ogrtest.fgdb_drv.CreateDataSource('tmp/test.gdb')
sr = osr.SpatialReference()
sr.ImportFromEPSG(4326)
lyr = ds.CreateLayer('test', geom_type=ogr.wkbPoint, srs=sr, options=['GEOMETRY_NULLABLE=NO'])
if lyr.GetLayerDefn().GetGeomFieldDefn(0).IsNullable() != 0:
gdaltest.post_reason('fail')
return 'fail'
field_defn = ogr.FieldDefn('field_not_nullable', ogr.OFTString)
field_defn.SetNullable(0)
lyr.CreateField(field_defn)
field_defn = ogr.FieldDefn('field_nullable', ogr.OFTString)
lyr.CreateField(field_defn)
f = ogr.Feature(lyr.GetLayerDefn())
f.SetField('field_not_nullable', 'not_null')
f.SetGeometryDirectly(ogr.CreateGeometryFromWkt('POINT(0 0)'))
ret = lyr.CreateFeature(f)
if ret != 0:
gdaltest.post_reason('fail')
return 'fail'
f = None
# Error case: missing geometry
f = ogr.Feature(lyr.GetLayerDefn())
f.SetField('field_not_nullable', 'not_null')
gdal.PushErrorHandler()
ret = lyr.CreateFeature(f)
gdal.PopErrorHandler()
if ret == 0:
gdaltest.post_reason('fail')
return 'fail'
f = None
# Error case: missing non-nullable field
f = ogr.Feature(lyr.GetLayerDefn())
f.SetGeometryDirectly(ogr.CreateGeometryFromWkt('POINT(0 0)'))
gdal.PushErrorHandler()
ret = lyr.CreateFeature(f)
gdal.PopErrorHandler()
if ret == 0:
gdaltest.post_reason('fail')
return 'fail'
f = None
ds = None
ds = ogr.Open('tmp/test.gdb', update=1)
lyr = ds.GetLayerByName('test')
if lyr.GetLayerDefn().GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex('field_not_nullable')).IsNullable() != 0:
gdaltest.post_reason('fail')
return 'fail'
if lyr.GetLayerDefn().GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex('field_nullable')).IsNullable() != 1:
gdaltest.post_reason('fail')
return 'fail'
if lyr.GetLayerDefn().GetGeomFieldDefn(0).IsNullable() != 0:
gdaltest.post_reason('fail')
return 'fail'
ds = None
return 'success'
###############################################################################
# Test default values
def test_ogr_fgdb_18():
if ogrtest.fgdb_drv is None:
return 'skip'
try:
shutil.rmtree("tmp/test.gdb")
except OSError:
pass
ds = ogrtest.fgdb_drv.CreateDataSource('tmp/test.gdb')
lyr = ds.CreateLayer('test', geom_type=ogr.wkbNone)
field_defn = ogr.FieldDefn('field_string', ogr.OFTString)
field_defn.SetDefault("'a''b'")
lyr.CreateField(field_defn)
field_defn = ogr.FieldDefn('field_int', ogr.OFTInteger)
field_defn.SetDefault('123')
lyr.CreateField(field_defn)
field_defn = ogr.FieldDefn('field_real', ogr.OFTReal)
field_defn.SetDefault('1.23')
lyr.CreateField(field_defn)
field_defn = ogr.FieldDefn('field_nodefault', ogr.OFTInteger)
lyr.CreateField(field_defn)
field_defn = ogr.FieldDefn('field_datetime', ogr.OFTDateTime)
field_defn.SetDefault("CURRENT_TIMESTAMP")
lyr.CreateField(field_defn)
field_defn = ogr.FieldDefn('field_datetime2', ogr.OFTDateTime)
field_defn.SetDefault("'2015/06/30 12:34:56'")
lyr.CreateField(field_defn)
f = ogr.Feature(lyr.GetLayerDefn())
lyr.CreateFeature(f)
f = None
ds = None
if ogrtest.openfilegdb_drv is not None:
ogrtest.openfilegdb_drv.Register()
ret = ogr_fgdb_18_test_results()
if ogrtest.openfilegdb_drv is not None:
ogrtest.openfilegdb_drv.Deregister()
return ret
def ogr_fgdb_18_test_results():
ds = ogr.Open('tmp/test.gdb', update=1)
lyr = ds.GetLayerByName('test')
if lyr.GetLayerDefn().GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex('field_string')).GetDefault() != "'a''b'":
gdaltest.post_reason('fail')
return 'fail'
if ogrtest.openfilegdb_drv is not None:
if lyr.GetLayerDefn().GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex('field_int')).GetDefault() != '123':
gdaltest.post_reason('fail')
return 'fail'
if lyr.GetLayerDefn().GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex('field_real')).GetDefault() != '1.23':
print(lyr.GetLayerDefn().GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex('field_real')).GetDefault())
gdaltest.post_reason('fail')
return 'fail'
if lyr.GetLayerDefn().GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex('field_nodefault')).GetDefault() is not None:
gdaltest.post_reason('fail')
return 'fail'
# if lyr.GetLayerDefn().GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex('field_datetime')).GetDefault() != 'CURRENT_TIMESTAMP':
# gdaltest.post_reason('fail')
# return 'fail'
# if lyr.GetLayerDefn().GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex('field_datetime2')).GetDefault() != "'2015/06/30 12:34:56'":
# gdaltest.post_reason('fail')
# print(lyr.GetLayerDefn().GetFieldDefn(lyr.GetLayerDefn().GetFieldIndex('field_datetime2')).GetDefault())
# return 'fail'
f = lyr.GetNextFeature()
if f.GetField('field_string') != 'a\'b' or f.GetField('field_int') != 123 or \
f.GetField('field_real') != 1.23 or \
not f.IsFieldNull('field_nodefault') or not f.IsFieldSet('field_datetime') or \
f.GetField('field_datetime2') != '2015/06/30 12:34:56':
gdaltest.post_reason('fail')
f.DumpReadable()
return 'fail'
ds = None
return 'success'
###############################################################################
# Test transaction support
def ogr_fgdb_19_open_update(filename):
# We need the OpenFileGDB driver for Linux improved StartTransaction()
bPerLayerCopyingForTransaction = False
if ogrtest.openfilegdb_drv is not None:
ogrtest.openfilegdb_drv.Register()
if os.name != 'nt':
val = gdal.GetConfigOption('FGDB_PER_LAYER_COPYING_TRANSACTION', 'TRUE')
if val == 'TRUE' or val == 'YES' or val == 'ON':
bPerLayerCopyingForTransaction = True
ds = ogr.Open(filename, update=1)
if ogrtest.openfilegdb_drv is not None:
ogrtest.openfilegdb_drv.Deregister()
ogrtest.fgdb_drv.Deregister()
# Force OpenFileGDB first
ogrtest.openfilegdb_drv.Register()
ogrtest.fgdb_drv.Register()
return (bPerLayerCopyingForTransaction, ds)
def test_ogr_fgdb_19():
if ogrtest.fgdb_drv is None:
return 'skip'
# FIXME likely due to too old FileGDB SDK on those targets
# fails with ERROR 1: Failed to open Geodatabase (The system cannot find the file specified.)
# File "ogr_fgdb.py", line 1664, in ogr_fgdb_19
# if ds.StartTransaction(force=True) != 0:
if gdaltest.is_travis_branch('ubuntu_1804') or gdaltest.is_travis_branch('ubuntu_1604') or gdaltest.is_travis_branch('trusty_clang') or gdaltest.is_travis_branch('python3') or gdaltest.is_travis_branch('trunk_with_coverage'):
return 'skip'
try:
shutil.rmtree("tmp/test.gdb.ogrtmp")
except OSError:
pass
try:
shutil.rmtree("tmp/test.gdb.ogredited")
except OSError:
pass
# Error case: try in read-only
ds = ogr.Open('tmp/test.gdb')
gdal.PushErrorHandler()
ret = ds.StartTransaction(force=True)
gdal.PopErrorHandler()
if ret == 0:
gdaltest.post_reason('fail')
return 'fail'
ds = None
(bPerLayerCopyingForTransaction, ds) = ogr_fgdb_19_open_update('tmp/test.gdb')
if ds.TestCapability(ogr.ODsCEmulatedTransactions) != 1:
gdaltest.post_reason('fail')
return 'fail'
# Error case: try in non-forced mode
gdal.PushErrorHandler()
ret = ds.StartTransaction(force=False)
gdal.PopErrorHandler()
if ret == 0:
gdaltest.post_reason('fail')
return 'fail'
# Error case: try StartTransaction() with a ExecuteSQL layer still active
sql_lyr = ds.ExecuteSQL('SELECT * FROM test')
gdal.PushErrorHandler()
ret = ds.StartTransaction(force=True)
gdal.PopErrorHandler()
if ret == 0:
gdaltest.post_reason('fail')
return 'fail'
ds.ReleaseResultSet(sql_lyr)
# Error case: call CommitTransaction() while there is no transaction
gdal.PushErrorHandler()
ret = ds.CommitTransaction()
gdal.PopErrorHandler()
if ret == 0:
gdaltest.post_reason('fail')
return 'fail'
# Error case: call RollbackTransaction() while there is no transaction
gdal.PushErrorHandler()
ret = ds.RollbackTransaction()
gdal.PopErrorHandler()
if ret == 0:
gdaltest.post_reason('fail')
return 'fail'
# Error case: try StartTransaction() with another active connection
ds2 = ogr.Open('tmp/test.gdb', update=1)
gdal.PushErrorHandler()
ret = ds2.StartTransaction(force=True)
gdal.PopErrorHandler()
if ret == 0:
gdaltest.post_reason('fail')
return 'fail'
ds2 = None
# Successful StartTransaction() finally!
lyr = ds.GetLayer(0)
lyr = ds.GetLayer(0) # again
old_count = lyr.GetFeatureCount()
lyr_defn = lyr.GetLayerDefn()
layer_created_before_transaction = ds.CreateLayer('layer_created_before_transaction', geom_type=ogr.wkbNone)
layer_created_before_transaction_defn = layer_created_before_transaction.GetLayerDefn()
if ds.StartTransaction(force=True) != 0:
gdaltest.post_reason('fail')
return 'fail'
try:
os.stat('tmp/test.gdb.ogredited')
except OSError:
gdaltest.post_reason('fail')
return 'fail'
try:
os.stat('tmp/test.gdb.ogrtmp')
gdaltest.post_reason('fail')
return 'fail'
except OSError:
pass
ret = lyr.CreateField(ogr.FieldDefn('foobar', ogr.OFTString))
if ret != 0:
gdaltest.post_reason('fail')
return 'fail'
ret = lyr.DeleteField(lyr.GetLayerDefn().GetFieldIndex('foobar'))
if ret != 0:
gdaltest.post_reason('fail')
return 'fail'
gdal.PushErrorHandler()
ret = lyr.CreateGeomField(ogr.GeomFieldDefn('foobar', ogr.wkbPoint))
gdal.PopErrorHandler()
if ret == 0:
gdaltest.post_reason('fail')
return 'fail'
gdal.PushErrorHandler()
ret = lyr.ReorderFields([i for i in range(lyr.GetLayerDefn().GetFieldCount())])
gdal.PopErrorHandler()
if ret == 0:
gdaltest.post_reason('fail')
return 'fail'
gdal.PushErrorHandler()
ret = lyr.AlterFieldDefn(0, ogr.FieldDefn('foo', ogr.OFTString), 0)
gdal.PopErrorHandler()
if ret == 0:
gdaltest.post_reason('fail')
return 'fail'
f = ogr.Feature(lyr_defn)
f.SetField('field_string', 'foo')
lyr.CreateFeature(f)
lyr.SetFeature(f)
fid = f.GetFID()
if fid <= 0:
gdaltest.post_reason('fail')
print(fid)
return 'fail'
lyr.ResetReading()
for i in range(fid):
f = lyr.GetNextFeature()
if f.GetFID() != fid or f.GetField('field_string') != 'foo':
gdaltest.post_reason('fail')
return 'fail'
f = lyr.GetFeature(fid)
if f.GetFID() != fid or f.GetField('field_string') != 'foo':
gdaltest.post_reason('fail')
return 'fail'
f = ogr.Feature(layer_created_before_transaction_defn)
layer_created_before_transaction.CreateFeature(f)
# Error case: call StartTransaction() while there is an active transaction
gdal.PushErrorHandler()
ret = ds.StartTransaction(force=True)
gdal.PopErrorHandler()
if ret == 0:
gdaltest.post_reason('fail')
return 'fail'
# Error case: try CommitTransaction() with a ExecuteSQL layer still active
sql_lyr = ds.ExecuteSQL('SELECT * FROM test')
gdal.PushErrorHandler()
ret = ds.CommitTransaction()
gdal.PopErrorHandler()
if ret == 0:
gdaltest.post_reason('fail')
return 'fail'
ds.ReleaseResultSet(sql_lyr)
# Error case: try RollbackTransaction() with a ExecuteSQL layer still active
sql_lyr = ds.ExecuteSQL('SELECT * FROM test')
gdal.PushErrorHandler()
ret = ds.RollbackTransaction()
gdal.PopErrorHandler()
if ret == 0:
gdaltest.post_reason('fail')
return 'fail'
ds.ReleaseResultSet(sql_lyr)
# Test that CommitTransaction() works
if ds.CommitTransaction() != 0:
gdaltest.post_reason('fail')
return 'fail'
try:
os.stat('tmp/test.gdb.ogredited')
gdaltest.post_reason('fail')
return 'fail'
except OSError:
pass
try:
os.stat('tmp/test.gdb.ogrtmp')
gdaltest.post_reason('fail')
return 'fail'
except OSError:
pass
lst = gdal.ReadDir('tmp/test.gdb')
for filename in lst:
if filename.find('.tmp') >= 0:
gdaltest.post_reason('fail')
print(lst)
return 'fail'
lyr_tmp = ds.GetLayer(0)
lyr_tmp = ds.GetLayer(0)
new_count = lyr_tmp.GetFeatureCount()
if new_count != old_count + 1:
gdaltest.post_reason('fail')
return 'fail'
old_count = new_count
if layer_created_before_transaction.GetFeatureCount() != 1:
gdaltest.post_reason('fail')
return 'fail'
for i in range(ds.GetLayerCount()):
if ds.GetLayer(i).GetName() == layer_created_before_transaction.GetName():
ds.DeleteLayer(i)
break
layer_created_before_transaction = None
# Test suppression of layer within transaction
lyr_count = ds.GetLayerCount()
ds.CreateLayer('layer_tmp', geom_type=ogr.wkbNone)
ret = ds.StartTransaction(force=True)
if ret != 0:
gdaltest.post_reason('fail')
return 'fail'
ds.DeleteLayer(ds.GetLayerCount() - 1)
if ds.CommitTransaction() != 0:
gdaltest.post_reason('fail')
return 'fail'
new_lyr_count = ds.GetLayerCount()
if new_lyr_count != lyr_count:
gdaltest.post_reason('fail')
return 'fail'
# Test that RollbackTransaction() works
ret = ds.StartTransaction(force=True)
if ret != 0:
gdaltest.post_reason('fail')
return 'fail'
f = ogr.Feature(lyr_defn)
lyr.CreateFeature(f)
layer_created_during_transaction = ds.CreateLayer('layer_created_during_transaction', geom_type=ogr.wkbNone)
layer_created_during_transaction.CreateField(ogr.FieldDefn('foo', ogr.OFTString))
if ds.RollbackTransaction() != 0:
gdaltest.post_reason('fail')
return 'fail'
try:
os.stat('tmp/test.gdb.ogredited')
gdaltest.post_reason('fail')
return 'fail'
except OSError:
pass
try:
os.stat('tmp/test.gdb.ogrtmp')
gdaltest.post_reason('fail')
return 'fail'
except OSError:
pass
if lyr.GetFeatureCount() != old_count:
gdaltest.post_reason('fail')
print(lyr.GetFeatureCount())
print(old_count)
return 'fail'
# Cannot retrieve the layer any more from fresh
if ds.GetLayerByName('layer_created_during_transaction') is not None:
gdaltest.post_reason('fail')
return 'fail'
# Pointer is in ghost state
if layer_created_during_transaction.GetLayerDefn().GetFieldCount() != 0:
gdaltest.post_reason('fail')
return 'fail'
# Simulate an error case where StartTransaction() cannot copy backup files
lyr_count = ds.GetLayerCount()
gdal.SetConfigOption('FGDB_SIMUL_FAIL', 'CASE1')
gdal.PushErrorHandler()
ret = ds.StartTransaction(force=True)
gdal.PopErrorHandler()
gdal.SetConfigOption('FGDB_SIMUL_FAIL', None)
if ret == 0:
gdaltest.post_reason('fail')
return 'fail'
if ds.GetLayerCount() != lyr_count:
gdaltest.post_reason('fail')
print(lyr_count)
print(ds.GetLayerCount())
return 'fail'
# Simulate an error case where StartTransaction() cannot reopen database
gdal.SetConfigOption('FGDB_SIMUL_FAIL', 'CASE2')
gdal.PushErrorHandler()
ret = ds.StartTransaction(force=True)
gdal.PopErrorHandler()
gdal.SetConfigOption('FGDB_SIMUL_FAIL', None)
if ret == 0:
gdaltest.post_reason('fail')
return 'fail'
if ds.GetLayerCount() != 0:
gdaltest.post_reason('fail')
return 'fail'
shutil.rmtree('tmp/test.gdb.ogredited')
# Test method on ghost datasource and layer
ds.GetName()
ds.GetLayerCount()
ds.GetLayer(0)
ds.GetLayerByName("test")
ds.DeleteLayer(0)
ds.TestCapability('foo')
ds.CreateLayer('bar', geom_type=ogr.wkbNone)
ds.CopyLayer(lyr, 'baz')
ds.GetStyleTable()
# ds.SetStyleTableDirectly(None)
ds.SetStyleTable(None)
sql_lyr = ds.ExecuteSQL('SELECT * FROM test')
ds.ReleaseResultSet(sql_lyr)
ds.FlushCache()
ds.GetMetadata()
ds.GetMetadataItem('foo')
ds.SetMetadata(None)
ds.SetMetadataItem('foo', None)
lyr.GetSpatialFilter()
lyr.SetSpatialFilter(None)
lyr.SetSpatialFilterRect(0, 0, 0, 0)
lyr.SetSpatialFilter(0, None)
lyr.SetSpatialFilterRect(0, 0, 0, 0, 0)
lyr.SetAttributeFilter(None)
lyr.ResetReading()
lyr.GetNextFeature()
lyr.SetNextByIndex(0)
lyr.GetFeature(0)
lyr.SetFeature(ogr.Feature(lyr.GetLayerDefn()))
lyr.CreateFeature(ogr.Feature(lyr.GetLayerDefn()))
lyr.DeleteFeature(0)
lyr.GetName()
lyr.GetGeomType()
lyr.GetLayerDefn()
lyr.GetSpatialRef()
lyr.GetFeatureCount()
lyr.GetExtent()
lyr.GetExtent(0)
lyr.TestCapability('foo')
lyr.CreateField(ogr.FieldDefn('foo', ogr.OFTString))
lyr.DeleteField(0)
lyr.ReorderFields([i for i in range(lyr.GetLayerDefn().GetFieldCount())])
lyr.AlterFieldDefn(0, ogr.FieldDefn('foo', ogr.OFTString), 0)
lyr.SyncToDisk()
lyr.GetStyleTable()
# lyr.SetStyleTableDirectly(None)
lyr.SetStyleTable(None)
lyr.StartTransaction()
lyr.CommitTransaction()
lyr.RollbackTransaction()
lyr.SetIgnoredFields([])
lyr.GetMetadata()
lyr.GetMetadataItem('foo')
lyr.SetMetadata(None)
lyr.SetMetadataItem('foo', None)
ds = None
if bPerLayerCopyingForTransaction:
# Test an error case where we simulate a failure of destroying a
# layer destroyed during transaction
(bPerLayerCopyingForTransaction, ds) = ogr_fgdb_19_open_update('tmp/test.gdb')
layer_tmp = ds.CreateLayer('layer_tmp', geom_type=ogr.wkbNone)
layer_tmp.CreateField(ogr.FieldDefn('foo', ogr.OFTString))
if ds.StartTransaction(force=True) != 0:
gdaltest.post_reason('fail')
return 'fail'
ds.DeleteLayer(ds.GetLayerCount() - 1)
gdal.SetConfigOption('FGDB_SIMUL_FAIL', 'CASE1')
gdal.PushErrorHandler()
ret = ds.CommitTransaction()
gdal.PopErrorHandler()
gdal.SetConfigOption('FGDB_SIMUL_FAIL', None)
if ret == 0:
gdaltest.post_reason('fail')
return 'fail'
ds = None
shutil.rmtree('tmp/test.gdb.ogredited')
lst = gdal.ReadDir('tmp/test.gdb')
for filename in lst:
if filename.find('.tmp') >= 0:
gdaltest.post_reason('fail')
print(lst)
return 'fail'
# Test an error case where we simulate a failure in renaming
# a file in original directory
(bPerLayerCopyingForTransaction, ds) = ogr_fgdb_19_open_update('tmp/test.gdb')
for i in range(ds.GetLayerCount()):
if ds.GetLayer(i).GetName() == 'layer_tmp':
ds.DeleteLayer(i)
break
if ds.StartTransaction(force=True) != 0:
gdaltest.post_reason('fail')
return 'fail'
lyr = ds.GetLayer(0)
f = lyr.GetNextFeature()
lyr.SetFeature(f)
f = None
gdal.SetConfigOption('FGDB_SIMUL_FAIL', 'CASE2')
gdal.PushErrorHandler()
ret = ds.CommitTransaction()
gdal.PopErrorHandler()
gdal.SetConfigOption('FGDB_SIMUL_FAIL', None)
if ret == 0:
gdaltest.post_reason('fail')
return 'fail'
ds = None
shutil.rmtree('tmp/test.gdb.ogredited')
lst = gdal.ReadDir('tmp/test.gdb')
for filename in lst:
if filename.find('.tmp') >= 0:
gdaltest.post_reason('fail')
print(lst)
return 'fail'
# Test an error case where we simulate a failure in moving
# a file into original directory
(bPerLayerCopyingForTransaction, ds) = ogr_fgdb_19_open_update('tmp/test.gdb')
if ds.StartTransaction(force=True) != 0:
gdaltest.post_reason('fail')
return 'fail'
lyr = ds.GetLayer(0)
f = lyr.GetNextFeature()
lyr.SetFeature(f)
f = None
gdal.SetConfigOption('FGDB_SIMUL_FAIL', 'CASE3')
gdal.PushErrorHandler()
ret = ds.CommitTransaction()
gdal.PopErrorHandler()
gdal.SetConfigOption('FGDB_SIMUL_FAIL', None)
if ret == 0:
gdaltest.post_reason('fail')
return 'fail'
ds = None
shutil.rmtree('tmp/test.gdb.ogredited')
# Remove left over .tmp files
lst = gdal.ReadDir('tmp/test.gdb')
for filename in lst:
if filename.find('.tmp') >= 0:
os.remove('tmp/test.gdb/' + filename)
# Test not critical error in removing a temporary file
for case in ('CASE4', 'CASE5'):
(bPerLayerCopyingForTransaction, ds) = ogr_fgdb_19_open_update('tmp/test.gdb')
if ds.StartTransaction(force=True) != 0:
gdaltest.post_reason('fail')
return 'fail'
lyr = ds.GetLayer(0)
f = lyr.GetNextFeature()
lyr.SetFeature(f)
f = None
gdal.SetConfigOption('FGDB_SIMUL_FAIL', case)
gdal.PushErrorHandler()
ret = ds.CommitTransaction()
gdal.PopErrorHandler()
gdal.SetConfigOption('FGDB_SIMUL_FAIL', None)
if ret != 0:
gdaltest.post_reason('fail')
print(case)
return 'fail'
ds = None
if case == 'CASE4':
try:
os.stat('tmp/test.gdb.ogredited')
gdaltest.post_reason('fail')
print(case)
return 'fail'
except OSError:
pass
else:
shutil.rmtree('tmp/test.gdb.ogredited')
# Remove left over .tmp files
lst = gdal.ReadDir('tmp/test.gdb')
for filename in lst:
if filename.find('.tmp') >= 0:
os.remove('tmp/test.gdb/' + filename)
else:
# Test an error case where we simulate a failure of rename from .gdb to .gdb.ogrtmp during commit
(bPerLayerCopyingForTransaction, ds) = ogr_fgdb_19_open_update('tmp/test.gdb')
lyr = ds.GetLayer(0)
lyr_defn = lyr.GetLayerDefn()
if ds.StartTransaction(force=True) != 0:
gdaltest.post_reason('fail')
return 'fail'
gdal.SetConfigOption('FGDB_SIMUL_FAIL', 'CASE1')
gdal.PushErrorHandler()
ret = ds.CommitTransaction()
gdal.PopErrorHandler()
gdal.SetConfigOption('FGDB_SIMUL_FAIL', None)
if ret == 0:
gdaltest.post_reason('fail')
return 'fail'
ds = None
# Test an error case where we simulate a failure of rename from .gdb.ogredited to .gdb during commit
(bPerLayerCopyingForTransaction, ds) = ogr_fgdb_19_open_update('tmp/test.gdb')
lyr = ds.GetLayer(0)
lyr_defn = lyr.GetLayerDefn()
if ds.StartTransaction(force=True) != 0:
gdaltest.post_reason('fail')
return 'fail'
gdal.SetConfigOption('FGDB_SIMUL_FAIL', 'CASE2')
gdal.PushErrorHandler()
ret = ds.CommitTransaction()
gdal.PopErrorHandler()
gdal.SetConfigOption('FGDB_SIMUL_FAIL', None)
if ret == 0:
gdaltest.post_reason('fail')
return 'fail'
ds = None
os.rename('tmp/test.gdb.ogrtmp', 'tmp/test.gdb')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment