Skip to content

Instantly share code, notes, and snippets.

@westonpace
Created August 26, 2020 19:46
Show Gist options
  • Save westonpace/4107c1c492cdd78d611595d43e72964d to your computer and use it in GitHub Desktop.
Save westonpace/4107c1c492cdd78d611595d43e72964d to your computer and use it in GitHub Desktop.
Test case demonstrating ResourceWarning from using OS files with a non-LocalFileSystem
import os
import pyarrow.filesystem as pafs
import pyarrow.parquet as pq
from pyarrow.util import implements
class RelativeFilesystem(pafs.FileSystem):
def __init__(self, basedir):
self.basedir = basedir
self.target = pafs.LocalFileSystem.get_instance()
@implements(pafs.FileSystem.ls)
# pylint: disable=invalid-name
def ls(self, path):
return self.target.ls(os.path.join(self.basedir, path))
@implements(pafs.FileSystem.delete)
def delete(self, path, recursive=False):
return self.target.delete(os.path.join(self.basedir, path), recursive)
@implements(pafs.FileSystem.rename)
def rename(self, path, new_path):
return self.target.rename(os.path.join(self.basedir, path), os.path.join(self.basedir, new_path))
@implements(pafs.FileSystem.stat)
def stat(self, path):
return self.target.stat(os.path.join(self.basedir, path))
@implements(pafs.FileSystem.mkdir)
def mkdir(self, path, create_parents=True):
return self.target.mkdir(os.path.join(self.basedir, path), create_parents)
@implements(pafs.FileSystem.isdir)
def isdir(self, path):
return self.target.isdir(os.path.join(self.basedir, path))
@implements(pafs.FileSystem.isfile)
def isfile(self, path):
return self.target.isfile(os.path.join(self.basedir, path))
# pylint: disable=protected-access
@implements(pafs.FileSystem._isfilestore)
# pylint: disable=protected-access
def _isfilestore(self):
return self.target._isfilestore()
@implements(pafs.FileSystem.exists)
def exists(self, path):
return self.target.exists(os.path.join(self.basedir, path))
@implements(pafs.FileSystem.open)
def open(self, path, mode='rb'):
return self.target.open(os.path.join(self.basedir, path), mode)
@property
def pathsep(self):
return self.target.pathsep
def walk(self, path):
return self.target.walk(os.path.join(self.basedir, path))
filesystem = pafs.LocalFileSystem()
rel_filesystem = RelativeFilesystem('C:\\')
files = [
'C:\\ProgramData\\Keysight\\dfdb\\data\\e2e-iris\\batch=0\\data.parquet'
]
print('Reading with LocalFileSystem')
dataset = pq.ParquetDataset(files, filesystem=filesystem, validate_schema=False)
for piece in dataset.pieces:
table = piece.read(use_threads=True, partitions=dataset.partitions)
print(f'Read {table.num_rows} rows')
print('Reading with relative filesystem')
dataset_two = pq.ParquetDataset(files, filesystem=rel_filesystem, validate_schema=False)
for piece in dataset_two.pieces:
table = piece.read(use_threads=True, partitions=dataset_two.partitions)
print(f'Read {table.num_rows} rows')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment