Skip to content

Instantly share code, notes, and snippets.

@rcoup
Created August 16, 2023 09:25
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rcoup/5f4b49149ca26084eed4dccfd1b12895 to your computer and use it in GitHub Desktop.
Save rcoup/5f4b49149ca26084eed4dccfd1b12895 to your computer and use it in GitHub Desktop.
GDAL VSI Python file wrappers and associated IO functions, and tests.
"""
Tests for VSI-File
Copyright Koordinates Limited
License MIT
"""
import io
import logging
import os
import shutil
import stat
import tempfile
import uuid
import zipfile
from collections import namedtuple
import pytest
from osgeo import gdal
from . import vsi
LOCALFILE_CONTENT = b"line one\nline two"
OTHER_CONTENT = b"some.test.file\n" * 10
TEST_DATA_BUCKET = "vsi-test-data-bucket"
NONEXISTANT_BUCKET = "i.do.not.exist.bucket"
PRIVATE_BUCKET = "a.private.bucket" # access via credentials
NO_ACCESS_BUCKET = "another.private.bucket" # no access
WRITE_BUCKET = "test.writing.bucket" # write access for test run
@pytest.fixture(autouse=True)
def vsireset():
# turn up logging for VSI tests
logging.getLogger("gdal").setLevel(logging.DEBUG)
try:
# clear the Curl cache to avoid weird side-effects
gdal.VSICurlClearCache()
yield
finally:
logging.getLogger("gdal").setLevel(logging.INFO)
@pytest.fixture()
def localfile():
with tempfile.NamedTemporaryFile() as t:
t.write(LOCALFILE_CONTENT)
t.flush()
t.seek(0)
yield t
@pytest.fixture()
def localzip():
with tempfile.NamedTemporaryFile(suffix=".zip") as t:
with zipfile.ZipFile(t, "w") as z:
z.writestr("file.txt", b"hey there good lookin'\n")
yield t
def touch_file(fname, times=None, makedirs=False):
"""
Creates file if it does not exist, and sets mtime on the file to now.
If makedirs=True is given, the directory is created also.
"""
if makedirs:
ensure_makedirs(os.path.dirname(fname))
with open(fname, 'a+'):
os.utime(fname, times)
def test_vsi_open_local(localfile):
# Succeed on existing file
assert vsi.open(localfile.name, "rb")
assert vsi.open(localfile.name, "r")
# Fail on non-existing file
with pytest.raises(OSError):
vsi.open("/no-such-file")
with pytest.raises(OSError):
vsi.open("/no-such-file", "r+")
assert vsi.open(localfile.name, "w")
with tempfile.TemporaryDirectory() as tempdir:
assert vsi.open(os.path.join(tempdir, "test.file.w"), "w")
assert vsi.open(os.path.join(tempdir, "test.file.a"), "a")
assert vsi.open(localfile.name, "a")
assert vsi.open(localfile.name, "r+")
def test_vsi_read_local(localfile):
f = vsi.open(localfile.name, mode="rb")
assert f.read() == b"line one\nline two"
assert f.seek(0) == 0
assert f.read(5) == b"line "
assert f.read(5) == b"one\nl"
assert f.read() == b"ine two"
assert f.read(1) == b""
assert f.read(1) == b""
assert isinstance(f.read(), bytes)
def test_vsi_seek(localfile):
f = vsi.open(localfile.name, mode="rb")
# # Seek relative to start of file
assert f.seek(5, 0) == 5
assert f.seek(5, 0) == 5
# # seek relative to current position
assert f.seek(10, 1) == 15
# Seek relative to end of file
assert f.seek(-5, 2) == 12
def test_vsi_write(localfile):
CONTENT = b"aaaaa\x00bbbbb"
f = vsi.open(localfile.name, "wb")
f.write(CONTENT)
f.close()
with open(localfile.name, "rb") as f:
assert f.read() == CONTENT
td = tempfile.mkdtemp()
try:
path = os.path.join(td, "test.file")
with vsi.open(path, "wb") as f:
f.write(CONTENT)
with open(path, "rb") as f:
assert f.read() == CONTENT
finally:
shutil.rmtree(td)
def test_vsi_close(localfile):
f = vsi.open(localfile.name)
f.close()
# Multiple calls to close() should be ignored, not cause an error
f.close()
# Not sure why this is a ValueError instead of a IOError, but that's
# out of our hands, it's thrown by io.RawIOBase
with pytest.raises(ValueError):
f.read(1)
# And we use ValueError in these ones too to match read()
with pytest.raises(ValueError):
f.seek(1)
with pytest.raises(ValueError):
f.tell()
def test_vsi_exists(localfile):
assert vsi.exists(localfile.name)
assert not vsi.exists("/blah blah blah")
def test_vsi_stat(localfile):
st = vsi.stat(localfile.name)
assert isinstance(st.st_mtime, int)
assert st.st_mode == 0o100600
assert st.st_size == 17
assert vsi.getsize(localfile.name) == 17
assert vsi.getmtime(localfile.name) == int(os.path.getmtime(localfile.name))
def test_vsi_isdir(localfile, localzip):
assert not vsi.isdir("/doesnotexist")
assert not vsi.isdir("/doesnotexist.zip")
assert not vsi.isdir("/vsizip/doesnotexist.zip")
assert not vsi.isdir("/vsizip/doesnotexist.zip/file.txt")
assert not vsi.isdir(localfile.name)
assert vsi.isdir(os.path.dirname(localfile.name))
assert not vsi.isdir(localzip.name)
assert vsi.isdir("/vsizip/%s" % localzip.name)
assert not vsi.isdir("/vsizip/%s/file.txt" % localzip.name)
def test_vsi_isfile(localfile, localzip):
assert not vsi.isfile("/doesnotexist")
assert not vsi.isfile("/doesnotexist.zip")
assert not vsi.isfile("/vsizip/doesnotexist.zip")
assert not vsi.isfile("/vsizip/doesnotexist.zip/file.txt")
assert vsi.isfile(localfile.name)
assert not vsi.isfile(os.path.dirname(localfile.name))
assert vsi.isfile(localzip.name)
assert not vsi.isfile("/vsizip/%s" % localzip.name)
assert vsi.isfile("/vsizip/%s/file.txt" % localzip.name)
def test_vsi_listdir():
with pytest.raises(OSError):
vsi.listdir("/doesnotexist")
with tempfile.TemporaryDirectory() as tempdir:
assert vsi.listdir(tempdir) == []
os.mkdir(os.path.join(tempdir, "foo"))
touch_file(os.path.join(tempdir, "foo", "bar"))
touch_file(os.path.join(tempdir, "baz"))
assert set(vsi.listdir(tempdir)) == {"foo", "baz"}
assert vsi.listdir(os.path.join(tempdir, "foo")) == ["bar"]
def test_vsi_listdir_unicode():
with tempfile.TemporaryDirectory() as tempdir:
touch_file(os.path.join(tempdir, "ಠ_ಠ"))
touch_file(os.path.join(tempdir, "bob"))
results = vsi.listdir(tempdir)
assert set(results) == {"bob", "ಠ_ಠ"}
assert isinstance(results[0], str)
assert isinstance(results[1], str)
def test_vsi_listdir_recursive():
with pytest.raises(OSError):
vsi.listdir("/doesnotexist")
with tempfile.TemporaryDirectory() as tempdir:
assert vsi.listdir_recursive(tempdir) == []
os.mkdir(os.path.join(tempdir, "foo"))
touch_file(os.path.join(tempdir, "foo", "bar"))
touch_file(os.path.join(tempdir, "baz"))
assert set(vsi.listdir_recursive(tempdir)) == {"foo", "foo/bar", "baz"}
def test_vsi_walk():
for use_os_walk_if_possible in (True, False):
with pytest.raises(OSError):
list(
vsi.walk(
"/doesnotexist", use_os_walk_if_possible=use_os_walk_if_possible
)
)
with tempfile.TemporaryDirectory() as tempdir:
assert list(
vsi.walk(tempdir, use_os_walk_if_possible=use_os_walk_if_possible)
) == [(tempdir, [], [])]
touch_file(os.path.join(tempdir, "foo"))
os.mkdir(os.path.join(tempdir, "bar"))
touch_file(os.path.join(tempdir, "bar", "blah"))
assert list(
vsi.walk(tempdir, use_os_walk_if_possible=use_os_walk_if_possible)
) == [
(tempdir, ["bar"], ["foo"]),
(os.path.join(tempdir, "bar"), [], ["blah"]),
]
def test_vsi_copy(localfile):
with tempfile.NamedTemporaryFile() as dest:
vsi.copy(localfile, dest)
localfile.seek(0)
dest.seek(0)
assert localfile.read() == dest.read()
def test_vsi_copy_max_size_ok(localfile):
with tempfile.NamedTemporaryFile() as dest:
vsi.copy(localfile, dest, max_size=1000)
def test_vsi_copy_max_size_exceeded(localfile):
with tempfile.NamedTemporaryFile() as dest:
with pytest.raises(IOError):
vsi.copy(localfile, dest, max_size=1)
def test_vsi_copy_fd(localfile):
with tempfile.NamedTemporaryFile() as dest:
vsi.copy(localfile, dest)
assert not localfile.closed
assert not dest.closed
with tempfile.TemporaryDirectory() as tempdir:
dest = os.path.join(tempdir, "foo")
vsi.copy(localfile, dest)
assert not localfile.closed
with tempfile.NamedTemporaryFile() as dest:
vsi.copy(localfile.name, dest)
assert not dest.closed
def test_vsi_smartcopy_local(localfile):
with tempfile.NamedTemporaryFile() as dest:
vsi.smart_copy(localfile, dest)
localfile.seek(0)
dest.seek(0)
assert localfile.read() == dest.read()
with tempfile.TemporaryDirectory() as tempdir:
dest = os.path.join(tempdir, "foo")
vsi.smart_copy(localfile.name, dest)
with open(localfile.name, "rb") as src_f:
with open(dest, "rb") as dest_f:
assert src_f.read() == dest_f.read() == LOCALFILE_CONTENT
# makes directories
dest = os.path.join(tempdir, "bar/bob")
vsi.smart_copy(localfile.name, dest)
assert os.path.isfile(dest)
def test_vsi_smartcopy_fd(localfile):
with tempfile.NamedTemporaryFile() as dest:
vsi.smart_copy(localfile, dest)
assert not localfile.closed
assert not dest.closed
with tempfile.TemporaryDirectory() as tempdir:
dest = os.path.join(tempdir, "foo")
vsi.smart_copy(localfile, dest)
assert not localfile.closed
with tempfile.NamedTemporaryFile() as dest:
vsi.smart_copy(localfile.name, dest)
assert not dest.closed
def test_vsi_remove():
with tempfile.TemporaryDirectory() as tempdir:
path = os.path.join(tempdir, "the.file")
touch_file(path)
vsi.remove(path)
assert not os.path.exists(path)
with pytest.raises(OSError):
vsi.remove(path)
WorkBucket = namedtuple(
"WorkBucket", ["client", "bucket", "bucket_name", "prefix", "vsi_path"]
)
@pytest.fixture(scope="session")
def s3bucket():
import time
import boto3
if boto3.session.Session().get_credentials() is None:
pytest.skip("No AWS Credentials")
bucket_prefix = f"vsi-tests-{uuid.uuid4()}"
vsi_path = f"/vsis3/{WRITE_BUCKET}/{bucket_prefix}"
s3 = boto3.resource("s3")
bucket = s3.Bucket(WRITE_BUCKET)
try:
yield WorkBucket(
client=s3,
bucket=bucket,
bucket_name=WRITE_BUCKET,
prefix=bucket_prefix,
vsi_path=vsi_path,
)
finally:
# clean up bucket
objects = bucket.objects.filter(Prefix=bucket_prefix)
objects.delete()
@pytest.mark.internet
def test_vsis3_copy_up(s3bucket, localfile):
dest = os.path.join(s3bucket.vsi_path, "test.up")
vsi.copy(localfile, dest)
with io.BytesIO() as data:
s3bucket.bucket.download_fileobj(os.path.join(s3bucket.prefix, "test.up"), data)
assert data.getvalue() == LOCALFILE_CONTENT
@pytest.mark.internet
def test_vsis3_copy_down(s3bucket):
s3bucket.bucket.upload_fileobj(
io.BytesIO(OTHER_CONTENT), os.path.join(s3bucket.prefix, "test.down")
)
s3src = os.path.join(s3bucket.vsi_path, "test.down")
with tempfile.TemporaryDirectory() as tempdir:
localdest = os.path.join(tempdir, "test.down.local")
vsi.copy(s3src, localdest)
with open(localdest, "rb") as f:
assert f.read() == OTHER_CONTENT
@pytest.mark.internet
def test_vsis3_copy_up_perm_denied(localfile):
dest = os.path.join(f"/vsis3/{TEST_DATA_BUCKET}/vsi-tests/perm.denied")
with pytest.raises(IOError):
vsi.copy(localfile, dest)
assert not vsi.exists(dest)
@pytest.mark.internet
def test_vsis3_copy_down_not_found(s3bucket):
s3src = os.path.join(f"/vsis3/{TEST_DATA_BUCKET}/vsi-tests/not.found")
with tempfile.TemporaryDirectory() as tempdir:
localdest = os.path.join(tempdir, "test.down.local")
with pytest.raises(OSError):
vsi.copy(s3src, localdest)
@pytest.mark.internet
def test_vsis3_copy_down_perm_denied(s3bucket):
# some file that exists but we'll never have access to
s3src = os.path.join(
f"/vsis3/{NO_ACCESS_BUCKET}/test.file"
)
with tempfile.TemporaryDirectory() as tempdir:
localdest = os.path.join(tempdir, "test.down.local")
with pytest.raises(
OSError
): # comes across as not-found, since the .exists() call fails.
vsi.copy(s3src, localdest)
@pytest.mark.internet
def test_vsis3_smartcopy_up(s3bucket, localfile):
dest = os.path.join(s3bucket.vsi_path, "sm.test.up")
vsi.smart_copy(localfile, dest)
with io.BytesIO() as data:
s3bucket.bucket.download_fileobj(
os.path.join(s3bucket.prefix, "sm.test.up"), data
)
assert data.getvalue() == LOCALFILE_CONTENT
@pytest.mark.internet
def test_vsis3_smartcopy_down(s3bucket):
s3bucket.bucket.upload_fileobj(
io.BytesIO(OTHER_CONTENT), os.path.join(s3bucket.prefix, "sm.test.down")
)
s3src = os.path.join(s3bucket.vsi_path, "sm.test.down")
with tempfile.TemporaryDirectory() as tempdir:
localdest = os.path.join(tempdir, "smm.test.down.local")
vsi.smart_copy(s3src, localdest)
with open(localdest, "rb") as f:
assert f.read() == OTHER_CONTENT
# makes directories
localdest = os.path.join(tempdir, "sm/test/down.local")
vsi.smart_copy(s3src, localdest)
assert os.path.isfile(localdest)
@pytest.mark.internet
def test_vsis3_smartcopy_server(s3bucket):
s3bucket.bucket.upload_fileobj(
io.BytesIO(OTHER_CONTENT), os.path.join(s3bucket.prefix, "sm.test.server")
)
s3src = os.path.join(s3bucket.vsi_path, "sm.test.server")
s3dest = os.path.join(s3bucket.vsi_path, "sm.test.server.dest")
vsi.smart_copy(s3src, s3dest)
with io.BytesIO() as data:
s3bucket.bucket.download_fileobj(
os.path.join(s3bucket.prefix, "sm.test.server.dest"), data
)
assert data.getvalue() == OTHER_CONTENT
@pytest.mark.internet
def test_vsis3_write(s3bucket, localfile):
dest = os.path.join(s3bucket.vsi_path, "write.up")
f = vsi.open(dest, "wb")
f.write(OTHER_CONTENT)
f.close()
with io.BytesIO() as data:
s3bucket.bucket.download_fileobj(
os.path.join(s3bucket.prefix, "write.up"), data
)
assert data.getvalue() == OTHER_CONTENT
@pytest.mark.internet
def test_vsis3_read(s3bucket):
s3bucket.bucket.upload_fileobj(
io.BytesIO(OTHER_CONTENT), os.path.join(s3bucket.prefix, "read.down")
)
s3src = os.path.join(s3bucket.vsi_path, "read.down")
f = vsi.open(s3src, "rb")
assert f.read() == OTHER_CONTENT
f.close()
@pytest.mark.internet
def test_vsis3_remove(s3bucket):
dest = os.path.join(s3bucket.vsi_path, "write.up")
with vsi.open(dest, "wb") as f:
f.write(OTHER_CONTENT)
assert vsi.exists(dest)
vsi.remove(dest)
assert not vsi.exists(dest)
with pytest.raises(OSError):
vsi.remove(dest)
def test_mkdir():
# get current umask value
umask = os.umask(0)
os.umask(umask)
exp_dir_perms = "%o" % (0o777 & ~umask)
with tempfile.TemporaryDirectory() as tempdir:
path = os.path.join(tempdir, "bob")
vsi.mkdir(path)
assert os.path.isdir(path)
assert "%o" % stat.S_IMODE(os.stat(path).st_mode) == exp_dir_perms
with pytest.raises(OSError):
vsi.mkdir(path)
path = os.path.join(tempdir, "jim")
vsi.mkdir(path, 0o700)
assert os.path.isdir(path)
assert "%o" % stat.S_IMODE(os.stat(path).st_mode) == "700"
path = os.path.join(tempdir, "dave") + "/"
vsi.mkdir(path)
assert os.path.isdir(path)
@pytest.mark.internet
def test_vsis3_mkdir(s3bucket):
path = os.path.join(s3bucket.vsi_path, "mkdir.1")
vsi.mkdir(path)
assert vsi.isdir(path)
with pytest.raises(OSError):
vsi.mkdir(path)
@pytest.mark.internet
def test_makedirs(s3bucket):
# get current umask value
umask = os.umask(0)
os.umask(umask)
exp_dir_perms = "%o" % (0o777 & ~umask)
with tempfile.TemporaryDirectory() as tempdir:
parent_path = os.path.join(tempdir, "bob")
path = os.path.join(parent_path, "alex")
vsi.makedirs(path)
assert os.path.isdir(parent_path)
assert os.path.isdir(path)
assert "%o" % stat.S_IMODE(os.stat(path).st_mode) == exp_dir_perms
assert "%o" % stat.S_IMODE(os.stat(parent_path).st_mode) == exp_dir_perms
with pytest.raises(OSError):
vsi.makedirs(path)
path = os.path.join(parent_path, "jim")
vsi.makedirs(path, 0o700)
assert os.path.isdir(path)
assert "%o" % stat.S_IMODE(os.stat(path).st_mode) == "700"
assert "%o" % stat.S_IMODE(os.stat(parent_path).st_mode) == exp_dir_perms
path = os.path.join(tempdir, "jim", "owen", "dave") + "/"
vsi.makedirs(path)
assert os.path.isdir(path)
@pytest.mark.internet
def test_vsis3_makedirs(s3bucket):
path = os.path.join(s3bucket.vsi_path, "mkdir.2", "bob", "jim", "alex")
vsi.makedirs(path)
assert vsi.isdir(path)
assert vsi.isdir(os.path.split(path)[0])
with pytest.raises(OSError):
vsi.makedirs(path)
def test_rmdir():
with tempfile.TemporaryDirectory() as tempdir:
path = os.path.join(tempdir, "bob")
vsi.mkdir(path)
fn = os.path.join(path, "a.file")
with vsi.open(fn, "w") as f:
f.write("hi")
with pytest.raises(OSError):
vsi.rmdir(path)
vsi.remove(fn)
assert not os.path.exists(fn)
vsi.rmdir(path)
assert not os.path.exists(path)
with pytest.raises(OSError):
vsi.rmdir(path)
@pytest.mark.internet
def test_vsis3_rmdir(s3bucket):
path = os.path.join(s3bucket.vsi_path, "rmdir", "bob")
vsi.mkdir(path)
fn = os.path.join(path, "a.file")
with vsi.open(fn, "w") as f:
f.write("hi")
with pytest.raises(OSError):
vsi.rmdir(path)
vsi.remove(fn)
assert not vsi.exists(fn)
vsi.rmdir(path)
assert not vsi.exists(path), [
o.key for o in s3bucket.bucket.objects.filter(Prefix=s3bucket.prefix)
]
with pytest.raises(OSError):
vsi.rmdir(path)
def test_rmtree():
with tempfile.TemporaryDirectory() as tempdir:
path1 = os.path.join(tempdir, "bob")
path2 = os.path.join(path1, "jim")
vsi.makedirs(path2)
with vsi.open(os.path.join(path1, "a.file"), "w") as f:
f.write("hi")
with vsi.open(os.path.join(path2, "b.file"), "w") as f:
f.write("ho")
vsi.rmtree(path1)
assert not os.path.exists(path1)
with pytest.raises(OSError):
vsi.rmtree(path1)
@pytest.mark.internet
def test_vsis3_rmtree(s3bucket):
path1 = os.path.join(s3bucket.vsi_path, "rmtree", "bob")
path2 = os.path.join(path1, "jim")
vsi.makedirs(path2)
with vsi.open(os.path.join(path1, "a.file"), "w") as f:
f.write("hi")
with vsi.open(os.path.join(path2, "b.file"), "w") as f:
f.write("ho")
vsi.rmtree(path1)
assert not vsi.exists(path1), [
o.key for o in s3bucket.bucket.objects.filter(Prefix=s3bucket.prefix)
]
assert not vsi.exists(path2)
assert not vsi.exists(os.path.join(path1, "a.file"))
assert not vsi.exists(os.path.join(path2, "b.file"))
with pytest.raises(OSError):
vsi.rmtree(path1)
@pytest.mark.internet
def test_vsis3_cache(s3bucket, localfile):
path = os.path.join(s3bucket.vsi_path, "cache0")
vsi.makedirs(path)
assert len(vsi.listdir(path)) == 0
# add something directly to the bucket
s3bucket.bucket.upload_fileobj(
io.BytesIO(OTHER_CONTENT), os.path.join(s3bucket.prefix, "cache0", "a.file")
)
assert len(vsi.listdir(path)) == 0
# check manual cache-clearing works
vsi.clear_cache(path)
assert len(vsi.listdir(path)) == 1
# vsi methods themselves should sort the cache out
with vsi.open(os.path.join(path, "b.file"), "w") as f:
f.write("hey")
assert len(vsi.listdir(path)) == 2
# smart copy clears the cache automatically
vsi.smart_copy(localfile, os.path.join(path, "c.file"))
assert len(vsi.listdir(path)) == 3
@pytest.mark.internet
def test_vsi_gdal_behaviour(s3bucket):
# VSI functions sometimes throw errors, other times use return codes. yay
# https://trac.osgeo.org/gdal/ticket/7145
if (gdal.GetConfigOption("CPL_DEBUG") or "NO").upper() in ("YES", "ON", "TRUE", "1"):
pytest.skip("With CPL_DEBUG on we get segfaults ¯\\_(ツ)_/¯")
assert gdal.GetUseExceptions()
assert gdal.VSIFOpenL("/non-existing", "r") is None
assert gdal.VSIGetLastErrorNo() == 0
# VSIFOpenExL has a flag to set error codes. VSIFOpenL doesn't set them.
assert gdal.VSIFOpenExL("/non-existing", "r", 1) is None
assert gdal.VSIGetLastErrorNo() == 1
assert gdal.VSIGetLastErrorMsg() == "/non-existing: No such file or directory"
gdal.VSIErrorReset()
assert gdal.VSIStatL("/non-existing") is None
assert gdal.VSIGetLastErrorNo() == 0
with tempfile.TemporaryDirectory() as tempdir:
path = os.path.join(tempdir, "a.file").encode()
fd = gdal.VSIFOpenExL(path, "w+", 1)
assert fd is not None
try:
assert gdal.VSIFWriteL("a" * 1000, 1, 1000, fd) == 1000
assert gdal.VSIFTellL(fd) == 1000
assert gdal.VSIFSeekL(fd, 500, os.SEEK_SET) == 0
assert gdal.VSIFTellL(fd) == 500
assert not gdal.VSIFEofL(fd)
finally:
assert gdal.VSIFCloseL(fd) == 0
del fd
assert gdal.VSIStatL(path) is not None
assert gdal.VSIGetLastErrorNo() == 0
fd = gdal.VSIFOpenExL(path, "r", 1)
assert fd is not None
try:
# GDAL returns None if you ask for 0 bytes (!?)
assert gdal.VSIFReadL(1, 0, fd) is None
data = gdal.VSIFReadL(1, 100, fd)
assert len(data) == 100
assert data == b"a" * 100
assert not gdal.VSIFEofL(fd)
assert gdal.VSIFTellL(fd) == 100
assert gdal.VSIFSeekL(fd, 995, os.SEEK_SET) == 0
assert not gdal.VSIFEofL(fd)
data = gdal.VSIFReadL(1, 2000, fd)
assert len(data) == 5
assert data == b"a" * 5
assert gdal.VSIFEofL(fd)
assert gdal.VSIFTellL(fd) == 1000
# keep reading past the end of the file
data = gdal.VSIFReadL(1, 2000, fd)
assert len(data) == 0
assert gdal.VSIFEofL(fd)
# seek past the end of the file
assert gdal.VSIFSeekL(fd, 9999999, os.SEEK_SET) == 0
assert gdal.VSIGetLastErrorNo() == 0
assert gdal.VSIFTellL(fd) == 9999999
assert gdal.VSIFEofL(fd) == 0 # ¯\_(ツ)_/¯
finally:
assert gdal.VSIFCloseL(fd) == 0
del fd
# this path is in a private bucket
s3path = f"/vsis3/{PRIVATE_BUCKET}/vsi-tests/perm.denied"
assert gdal.VSIStatL(s3path) is None
fd = gdal.VSIFOpenExL(s3path, "r", 1)
assert fd is not None
try:
data = gdal.VSIFReadL(1, 100, fd)
# huh, shouldn't this return a 403/404?
assert gdal.VSIGetLastErrorNo() == 0
assert len(data) == 0
finally:
assert gdal.VSIFCloseL(fd) == 0
assert gdal.VSIGetLastErrorNo() == 0
# this path doesn't exist
s3path = f"/vsis3/{NONEXISTANT_BUCKET}/vsi-tests/nope"
assert gdal.VSIStatL(s3path) is None
fd = gdal.VSIFOpenExL(s3path, "r", 1)
assert fd is not None
try:
data = gdal.VSIFReadL(1, 100, fd)
# huh, shouldn't this return a 403/404?
assert gdal.VSIGetLastErrorNo() == 0
assert len(data) == 0
finally:
assert gdal.VSIFCloseL(fd) == 0
del fd
assert gdal.VSIGetLastErrorNo() == 0
# small s3 writes happen (ie. error) at close time
fd = gdal.VSIFOpenExL(s3path, "wb", 1)
assert fd is not None
assert gdal.VSIFWriteL("a", 1, 1, fd) == 1
assert gdal.VSIGetLastErrorNo() == 0
assert gdal.VSIFFlushL(fd) == 0 # ¯\_(ツ)_/¯
assert gdal.VSIGetLastErrorNo() == 0
with pytest.raises(RuntimeError) as e:
gdal.VSIFCloseL(fd)
del fd
assert gdal.VSIGetLastErrorNo() == 0 # ¯\_(ツ)_/¯
assert "DoSinglePartPUT" in str(e)
# this path exists, we can write it
s3path = os.path.join(s3bucket.vsi_path, "write.up")
fd = gdal.VSIFOpenExL(s3path, "wb", 1)
assert fd is not None
assert gdal.VSIFWriteL("a" * 100, 1, 100, fd) == 100
assert gdal.VSIGetLastErrorNo() == 0
assert gdal.VSIFFlushL(fd) == 0
assert gdal.VSIGetLastErrorNo() == 0
assert gdal.VSIFCloseL(fd) == 0
assert gdal.VSIGetLastErrorNo() == 0
del fd
assert gdal.VSIStatL(s3path) is not None
fd = gdal.VSIFOpenExL(s3path, "r", 1)
assert fd is not None
try:
data = gdal.VSIFReadL(1, 50, fd)
# huh, shouldn't this return a 404?
assert gdal.VSIGetLastErrorNo() == 0
assert len(data) == 50
assert data == b"a" * 50
finally:
assert gdal.VSIFCloseL(fd) == 0
del fd
assert gdal.VSIGetLastErrorNo() == 0
# mkdir errors on paths with trailing /
path = os.path.join(tempdir, "jim") + "/"
with pytest.raises(RuntimeError) as e:
gdal.Mkdir(path, 0o777)
path = os.path.join(tempdir, "jim", "alex", "fred") + "/"
with pytest.raises(RuntimeError) as e:
gdal.MkdirRecursive(path, 0o777)
# mkdirrecursive doesn't error on existing paths
with tempfile.TemporaryDirectory() as tempdir:
path = os.path.join(tempdir, "bob")
assert gdal.MkdirRecursive(path, 0o777) == 0
assert os.path.isdir(path)
assert gdal.MkdirRecursive(path, 0o777) == 0
# VSIRmdir has weird behaviour with vsimem
assert gdal.MkdirRecursive("/vsimem/x/y/z", 0o777) == 0
assert "x" in gdal.ReadDir("/vsimem")
assert gdal.ReadDirRecursive("/vsimem/x") == ["y/", "y/z/"]
# err, this shouldn't succeed:
assert gdal.Rmdir("/vsimem/x") == 0
# the directory is gone...
assert "x" not in (gdal.ReadDir("/vsimem") or [])
# but the file still exists?!
assert gdal.ReadDirRecursive("/vsimem/x") == ["y/", "y/z/"]
assert gdal.ReadDirRecursive("/vsimem/x") == ["y/", "y/z/"]
"""
Implements I/O functions using GDAL's VSI functions, so that they can
operate on virtual filesystem paths.
Tries to mirror the standard python functions as closely as possible,
but see the docstrings on the individual functions.
Copyright Koordinates Limited
License MIT
"""
from collections import defaultdict, namedtuple
from contextlib import contextmanager
import tempfile
import errno
import io
import logging
import os
import re
from django.utils.encoding import force_str
from osgeo import gdal
__all__ = (
"copy",
"exists",
"getmtime",
"getsize",
"isdir",
"listdir",
"listdir_recursive",
"open",
"remove",
"smart_copy",
"stat",
"VSIFile",
"walk",
"mkdir",
"makedirs",
"rmdir",
"rmtree",
"clear_cache",
)
_VSI_CURL_PREFIX_RE = re.compile(
r"/vsi(s3|curl|gs|az|oss|swift|webhdfs|hdfs)(_streaming)?/"
)
L = logging.getLogger("vsi")
def _vsi_err_wrap(func, *args, **kwargs):
"""
Wrap VSI_RETVAL functions in exceptions
Pass an exception_class kwarg to raise something other than IOError
"""
exception_class = kwargs.pop("exception_class", IOError)
# clear any error code
gdal.VSIErrorReset()
try:
try:
ret = func(*args, **kwargs)
except RuntimeError as e:
raise exception_class(str(e))
if ret != 0:
# Note: the error codes from VSIGetLastErrorNo() aren't
# really mappable to IOError.errno in any way.
# List is here: https://github.com/koordinates/gdal/blob/trunk-kx-build/gdal/port/cpl_vsi_error.h#L45-L55
code = gdal.VSIGetLastErrorNo()
msg = gdal.VSIGetLastErrorMsg().decode()
raise exception_class("%s [%d]" % (msg, code))
else:
return ret
except:
# clear any error code
gdal.VSIErrorReset()
raise
class VSIFile(io.RawIOBase):
def __init__(self, path, mode="r"):
# Assuming we're not in 'w' or 'a' mode, the file must exist.
# Because stat calls are cached by VSI filesystems, whereas open() isn't,
# we can optimise by doing a stat first.
if "r" in mode and not exists(path):
raise OSError(errno.ENOENT, "No such file or directory: %r" % path)
# VSIFOpenExL has a flag to set error codes. VSIFOpenL doesn't set them.
fd = gdal.VSIFOpenExL(str(path), mode, 1)
if fd is None:
code = gdal.VSIGetLastErrorNo()
msg = gdal.VSIGetLastErrorMsg() or "Failed to open file: %s" % path
# Note: the error codes from VSIGetLastErrorNo() aren't
# really mappable to IOError.errno in any way.
# List is here: https://github.com/koordinates/gdal/blob/trunk-kx-build/gdal/port/cpl_vsi_error.h#L45-L55
gdal.VSIErrorReset()
raise OSError("%s [%d]" % (msg, code))
self._fd = fd
self._closed = False
self.mode = mode
# Some methods to handle closing sanely. Weird that RawIOBase doesn't handle this for us (?)
def _check_open(self):
if self._closed:
# Maybe should be IOError? But RawIOBase.read() throws a ValueError,
# so we use ValueError here too to be consistent.
raise ValueError("I/O operation on closed file")
def close(self):
if not self._closed:
try:
_vsi_err_wrap(gdal.VSIFCloseL, self._fd)
finally:
# if VSIFCloseL() errors the fd is broken, we can't call it again
self._closed = True
def flush(self):
self._check_open()
_vsi_err_wrap(gdal.VSIFFlushL, self._fd)
@property
def closed(self):
return self._closed
def readable(self):
return True
def seek(self, offset, whence=os.SEEK_SET):
self._check_open()
_vsi_err_wrap(gdal.VSIFSeekL, self._fd, offset, whence)
# Have to return the new absolute position (!)
try:
return gdal.VSIFTellL(self._fd)
except RuntimeError as e:
raise OSError(str(e))
def seekable(self):
return True
def tell(self):
self._check_open()
try:
return gdal.VSIFTellL(self._fd)
except RuntimeError as e:
raise OSError(str(e))
def readinto(self, b):
"""
Read into an existing writable buffer (memoryview, probably)
This is the underlying implementation of read(), read1(), readall(), readlines(), etc.
"""
self._check_open()
n = len(b)
if n == 0:
# GDAL returns None if you ask for 0 bytes (!?)
# Luckily, reading 0 bytes is pretty easy!
return 0
try:
bytez = gdal.VSIFReadL(1, n, self._fd)
except RuntimeError as e:
raise OSError(str(e))
num_bytes = len(bytez)
b[:num_bytes] = bytez
return num_bytes
def writable(self):
return "w" in self.mode or "a" in self.mode or "+" in self.mode
def write(self, b):
"""
Write b to the underlying raw stream, and return the number of bytes written.
The object b should be an array of bytes, either bytes, bytearray, or memoryview.
The return value can be less than len(b), depending on specifics of the underlying raw stream,
and especially if it is in non-blocking mode.
None is returned if the raw stream is set not to block and no single byte could be readily
written to it.
The caller may release or mutate b after this method returns,
so the implementation should only access b during the method call.
"""
self._check_open()
n = len(b)
try:
return gdal.VSIFWriteL(b.tobytes(), 1, n, self._fd)
except RuntimeError as e:
raise OSError(str(e))
def open(filename, mode="r", buffering=None, encoding=None, errors=None, newline=None):
"""
Open a file using gdal's VSI functions.
Returns a file-like object.
Drop-in replacement for __builtins__.open()
"""
# Much of this copied from https://www.python.org/dev/peps/pep-3116/#the-open-built-in-function
assert isinstance(filename, (str, int, os.PathLike)), (
"filename: Expected Path, str, or int, got: %r" % filename
)
assert isinstance(mode, str), "mode: Expected str, got: %r" % mode
assert buffering is None or isinstance(buffering, int), (
"buffering: Expected int, got: %r" % buffering
)
assert encoding is None or isinstance(encoding, str), (
"encoding: Expected str, got: %r" % encoding
)
assert newline in (None, "", "\n", "\r", "\r\n"), (
"newline: Invalid newline char: %r" % newline
)
modes = set(mode)
if modes - set("arwb+t") or len(mode) > len(modes):
raise ValueError("invalid mode: %r" % mode)
reading = "r" in modes
writing = "w" in modes
binary = "b" in modes
appending = "a" in modes
updating = "+" in modes
text = "t" in modes or not binary
if text and binary:
raise ValueError("can't have text and binary mode at once")
if reading + writing + appending > 1:
raise ValueError("can't have read/write/append mode at once")
if not (reading or writing or appending):
raise ValueError("must have exactly one of read/write/append mode")
if binary and encoding is not None:
raise ValueError("binary modes doesn't take an encoding arg")
if binary and errors is not None:
raise ValueError("binary modes doesn't take an errors arg")
if binary and newline is not None:
raise ValueError("binary modes doesn't take a newline arg")
raw = VSIFile(filename, mode=mode)
line_buffering = buffering == 1 or buffering is None and raw.isatty()
if line_buffering or buffering is None:
buffering = 8 * 1024 # International standard buffer size
# XXX Try setting it to fstat().st_blksize
if buffering < 0:
raise ValueError("invalid buffering size")
if buffering == 0:
if binary:
return raw
raise ValueError("can't have unbuffered text I/O")
if updating:
buffered = io.BufferedRandom(raw, buffering)
elif writing or appending:
buffered = io.BufferedWriter(raw, buffering)
else:
buffered = io.BufferedReader(raw, buffering)
if binary:
return buffered
return io.TextIOWrapper(buffered, encoding, errors, newline, line_buffering)
def _copyfileobj(fsrc, fdst, length=16 * 1024, max_size=None):
"""
Like shutil.copyfileobj, but takes an extra max_size arg.
If max_size is given, the copy will be aborted (IOError)
if the size reaches that number of bytes.
The dest file will be partially written though.
"""
ptr = 0
while 1:
buf = fsrc.read(length)
if not buf:
break
ptr += len(buf)
fdst.write(buf)
if max_size is not None and ptr >= max_size:
raise OSError(f"Copy exceeded {max_size} bytes")
def copy(src, dest, max_size=None, buffering=1 * 1024 * 1024):
"""
Copy a file.
src and dest can be path strings or open file-like objects (eg. from vsi.open())
If paths, the files are accessed using GDAL's VSI functions, so they can be on virtual/remote
filesystems. For S3 objects, you might want to use smart_copy() for additional speedups.
If src/dest are paths, the file will be opened & closed by this function. If they're file-like
objects the caller will need to close them. Note that some VSI functions don't do much work
(or error) until close() is called, so don't assume the dest file is ready on return from here.
If max_size is given, the copy will be aborted if the size reaches that number of bytes. The
dest file will still be written to, though.
"""
if isinstance(src, (str, os.PathLike)):
# VSI open()
fsrc = open(src, "rb")
else:
fsrc = src
try:
if isinstance(dest, (str, os.PathLike)):
# VSI open()
fdest = open(dest, "wb")
else:
fdest = dest
try:
_copyfileobj(fsrc, fdest, length=buffering, max_size=max_size)
# Debatable whether this is the caller's responsibility or not,
# but it's a gotcha I've hit *several* times while testing so it
# seems reasonable to do it here and save everyone some grief.
fdest.flush()
finally:
if isinstance(dest, (str, os.PathLike)):
fdest.close()
finally:
if isinstance(src, (str, os.PathLike)):
fsrc.close()
def _readdir(gdal_readdir, path):
listing = gdal_readdir(path)
if listing is None:
if isdir(path):
return []
# Probably? No actual error code exposed AFAICT
raise OSError(errno.ENOENT, "No such directory: %r" % path)
return [
# GDAL returns a *mixture* of bytes and unicode.
# force_str() decodes only as required.
force_str(
# GDAL lists directories with a trailing slash sometimes. Strip that
filename.rstrip("/")
)
for filename in listing
# Never include `.` and `..`. Different VSI drivers are quite inconsistent
# about whether they return these paths or not.
# But os.listdir() doesn't, so we don't either.
if filename not in {".", ".."}
]
def listdir(path):
"""
Lists files and directories in the given directory path.
Returns a list of filenames.
Raises OSError if the given directory doesn't exist.
POSIX entries '.' and '..' are not included in the returned value.
"""
return _readdir(gdal.ReadDir, path)
def listdir_recursive(path):
"""
Read a directory listing *including* all files in subfolders.
No direct equivalent in stdlib, but you could achieve a similar thing with a
thinnish wrapper around os.walk().
Returns a list of paths relative to the given directory path.
Raises OSError if the given directory doesn't exist.
POSIX entries '.' and '..' are not included in the returned value.
"""
try:
return _readdir(gdal.ReadDirRecursive, path)
except OSError:
# HACK workaround: ReadDirRecursive returns None for an empty dir,
# whereas ReadDir doesn't.
# TODO: ticket this. maybe related to:
# * https://github.com/OSGeo/gdal/issues/1739
# * https://trac.osgeo.org/gdal/ticket/7135
if listdir(path) == []:
return []
raise
def walk(root_dir, topdown=True, onerror=None, use_os_walk_if_possible=True):
"""
Generator. An implementation of os.walk on top of GDAL's VSI functions.
This calls listdir_recursive() but organises the results so that they
look like the results of os.walk().
Yields 3-tuples: (path, dirs, files)
This has some differences from os.walk():
* modifying `dirs` in place has no effect on future iterations
* errors from listdir_recursive and stat are not ignored by default. (see below!)
* always follows symlinks (in os.walk the default is *not* to follow symlinks).
Note that this can have nasty results if a symlink links to its parent dir.
If use_os_walk_if_possible (the default), then this may actually just
use os.walk() for improved performance, if the root filesystem
isn't actually a GDAL virtual filesystem.
NOTE: the `onerror` callback is a bit tricky:
* by default, all errors will be raised. This is *different* than os.walk() behaviour.
* If onerror is given:
* it will be ignored for VSI filesystems - errors are always raised.
* it will be passed to os.walk() for normal filesystems.
"""
if use_os_walk_if_possible and not root_dir.startswith("/vsi"):
if onerror is None:
# Raise all errors by default. (ie, same as for VSI filesystems)
def onerror(exc):
raise exc
for p, dirs, files in os.walk(
root_dir, topdown=topdown, followlinks=True, onerror=onerror
):
# Ensure that modifying dirs doesn't affect the next iteration.
dirs = dirs[:]
yield p, dirs, files
return
# GDAL treats 'foo' and 'foo/' slightly differently.
# If you *don't* put a trailing slash on, listdir_recursive()
# will yield an entry for *both* the trailing-slash and the non-trailing-slash
# versions of the root dir.
relative_paths = listdir_recursive(os.path.join(root_dir, ""))
dirs = defaultdict(list)
files = defaultdict(list)
all_dir_paths = {root_dir}
# We're assuming that isdir() calls here have been cached by GDAL, so shouldn't be too expensive.
for rel_path in relative_paths:
abs_path = os.path.join(root_dir, rel_path)
parent_dir, basename = os.path.split(abs_path)
all_dir_paths.add(parent_dir)
if isdir(abs_path):
dirs[parent_dir].append(basename)
all_dir_paths.add(abs_path)
else:
files[parent_dir].append(basename)
# Yield an entry for *every* directory, including the root, including empty ones.
for p in sorted(all_dir_paths, reverse=not topdown):
yield p, dirs[p], files[p]
VSIStatResult = namedtuple("VSIStatResult", ("st_mode", "st_mtime", "st_size"))
def stat(path, _flags=0):
"""
Stats the given VSI path. Returns an object which can be used directly for stat results,
The return value is an object whose attributes correspond to the members of the stat structure.
(i.e. https://docs.python.org/2/library/os.html#os.stat )
HOWEVER only basic information is available. Other results are not implemented
and will cause errors when accessed.
Currently implemented: st_mode, st_mtime, st_size.
"""
s = gdal.VSIStatL(path, _flags)
if s is None:
raise OSError(errno.ENOENT, "No such file or directory: %r" % path)
return VSIStatResult(s.mode, s.mtime, s.size)
def exists(path):
"""
Return True if path refers to an existing path.
This function may return False if permission is not granted to execute os.stat() on the requested file, even if the path physically exists.
"""
return gdal.VSIStatL(str(path), gdal.VSI_STAT_EXISTS_FLAG) is not None
def isdir(path):
"""
Return True if path is an existing directory.
"""
if isinstance(path, os.PathLike):
path = str(path)
if path.startswith("/vsizip") and path.rstrip("/").lower().endswith(".zip"):
# HACK: workaround https://github.com/OSGeo/gdal/issues/1739
return exists(path)
st = gdal.VSIStatL(path, (gdal.VSI_STAT_EXISTS_FLAG | gdal.VSI_STAT_NATURE_FLAG))
return (st is not None) and bool(st.IsDirectory())
def isfile(path):
"""
Returns True if path is an existing non-directory.
"""
if isinstance(path, os.PathLike):
path = str(path)
if path.startswith("/vsizip") and path.rstrip("/").lower().endswith(".zip"):
# HACK: workaround https://github.com/OSGeo/gdal/issues/1739
return False
st = gdal.VSIStatL(path, (gdal.VSI_STAT_EXISTS_FLAG | gdal.VSI_STAT_NATURE_FLAG))
return (st is not None) and not st.IsDirectory()
def getsize(path):
"""
Return the size, in bytes, of path.
Raise OSError if the file does not exist or is inaccessible.
"""
return stat(path, _flags=gdal.VSI_STAT_SIZE_FLAG).st_size
def getmtime(path):
"""
Return the time of last modification of path.
The return value is a number giving the number of seconds since the epoch (see the time module).
NOTE: this may not work on various non-local filesystems
Raise OSError if the file does not exist or is inaccessible.
"""
return stat(path).st_mtime
def smart_copy(
src, dest, max_size=None, buffering=1 * 1024 * 1024, s3_client=None, dir_mode=0o777
):
"""
Similar to copy(), but for /vsis3/ file paths uses boto for:
- multi-threaded/multi-part S3 uploads
- multi-threaded/multi-part S3 downloads
- server-side S3 copies
Falls back to copy() which uses VSI
Also uses VSI for chained (eg. /vsizip/vsis3/...) paths.
if src & dest are file-like objects, treats them as local(vsi)
If src/dest are paths, the file will be opened & closed by this function. If they're file-like
objects the caller will need to close them^. Note that some VSI functions don't do much work
(or error) until close() is called, so don't assume the dest file is ready on return from here.
If dest is a path, will call makedirs() to recursively create the directory hierarchy.
^ Boto has a bug where it may close source files by itself when uploading: https://github.com/boto/s3transfer/issues/80
"""
import boto3
re_s3key = r"^/vsis3/(?P<bucket>[^/]+)/(?P<key>.+)$"
m_src = re.match(re_s3key, src) if isinstance(src, str) else False
m_dest = re.match(re_s3key, dest) if isinstance(dest, str) else False
if m_src and m_dest:
# S3 -> S3 copy
s3_client = s3_client or boto3.client("s3")
copy_source = {"Bucket": m_src.group("bucket"), "Key": m_src.group("key")}
s3_client.copy(copy_source, m_dest.group("bucket"), m_dest.group("key"))
clear_cache(dest)
elif m_src:
# S3 -> local/vsi
s3_client = s3_client or boto3.client("s3")
if isinstance(dest, str):
dest_dir = os.path.split(dest)[0]
if dest_dir and not os.path.isdir(dest_dir):
makedirs(dest_dir, mode=dir_mode)
dest = open(dest, mode="wb", buffering=buffering)
try:
s3_client.download_fileobj(m_src.group("bucket"), m_src.group("key"), dest)
dest.flush()
finally:
if isinstance(dest, str):
dest.close()
elif m_dest:
# local/vsi -> S3
s3_client = s3_client or boto3.client("s3")
if isinstance(src, str):
src = open(src, mode="rb", buffering=buffering)
try:
s3_client.upload_fileobj(src, m_dest.group("bucket"), m_dest.group("key"))
clear_cache(dest)
finally:
if isinstance(src, str):
if not src.closed: # https://github.com/boto/s3transfer/issues/80
src.close()
else:
# fallback to vsi -> vsi
if isinstance(dest, str):
dest_dir = os.path.split(dest)[0]
if dest_dir and not os.path.isdir(dest_dir):
makedirs(dest_dir, mode=dir_mode)
dest = open(dest, mode="wb", buffering=buffering)
return copy(src, dest, max_size=max_size, buffering=buffering)
@contextmanager
def smart_download_to_local_path(path):
"""
Contextmanager.
Given a path, makes sure it's a local path and yields it.
If it's a VSI path, this copies it to a local temp file and yields the local path.
The local temp file is removed when the context exits.
"""
if path.startswith("/vsi"):
with tempfile.NamedTemporaryFile() as f:
smart_copy(path, f)
yield f.name
else:
yield path
def remove(path):
"""Remove (delete) the file path."""
_vsi_err_wrap(gdal.Unlink, str(path), exception_class=OSError)
def mkdir(path, mode=0o777):
"""
Create a directory named path with numeric mode mode.
The default mode is 0777 (octal). If the directory already exists, OSError is raised.
"""
path = os.path.abspath(path) # strips trailing slashes
if exists(path):
raise OSError(17, "File exists: '%s'" % path)
_vsi_err_wrap(gdal.Mkdir, path, mode, exception_class=OSError)
def makedirs(path, mode=0o777):
"""
Recursive directory creation function. Like mkdir(), but makes all intermediate-level
directories needed to contain the leaf directory. Raises an error exception if the leaf
directory already exists or cannot be created. The default mode is 0777 (octal).
"""
path = os.path.abspath(path) # strips trailing slashes
if exists(path):
raise OSError(17, "File exists: '%s'" % path)
_vsi_err_wrap(gdal.MkdirRecursive, path, mode, exception_class=OSError)
def rmdir(path):
"""
Remove (delete) the directory path. Only works when the directory is empty, otherwise, OSError is raised.
In order to remove whole directory trees, rmtree() can be used.
Note: rmdir() has odd behaviour on /vsimem/, sometimes it'll error and remove anyway, although the deeper
paths stay present.
"""
if not isdir(path):
raise OSError("Path isn't a directory: %s" % path)
try:
_vsi_err_wrap(gdal.Rmdir, path, exception_class=OSError)
finally:
# otherwise weird stuff happens
clear_cache(path)
def rmtree(path, ignore_errors=True):
"""
Delete an entire directory tree; path must point to a directory (but not a symbolic link to a directory).
If ignore_errors is true, errors resulting from failed removals will be ignored. This is the default
because RmdirRecursive() seems to raise errors for no reason.
Note: rmtree() has odd behaviour on /vsimem/, sometimes it'll error and remove anyway, although the deeper
paths stay present.
"""
if not isdir(path):
raise OSError("Path isn't a directory: %s" % path)
try:
_vsi_err_wrap(gdal.RmdirRecursive, path, exception_class=OSError)
except Exception:
if not ignore_errors:
raise
finally:
# otherwise weird stuff happens
clear_cache(path)
def clear_cache(path):
if _VSI_CURL_PREFIX_RE.search(path):
# TODO: Use VSICurlPartialClearCache() when it's in the Python bindings
L.debug("clear_cache")
gdal.VSICurlClearCache()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment