Created
April 6, 2022 14:37
-
-
Save abkfenris/f49144cbf907101853b971034c7f092d to your computer and use it in GitHub Desktop.
Fsspec context manager and dagster resource
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from tempfile import TemporaryDirectory | |
from dagster import ( | |
resource, | |
EventMetadata, | |
EventMetadataEntry, | |
Field, | |
Noneable, | |
) | |
import fsspec | |
from fsspec.implementations import local | |
import s3fs | |
from .io_managers import pandas_io_manager | |
class FsSpecFilesystem: | |
""" | |
Base of shared filesystem access, | |
using FSSpec to unify filesystem implementations. | |
""" | |
def __init__( | |
self, protocol: str, base_path: str = None, storage_options: dict = None | |
): | |
self.protocol = protocol | |
self.base_path = base_path | |
storage_options = {} if not storage_options else storage_options | |
self.fs = fsspec.filesystem(protocol, **storage_options) | |
def with_base_path(self, path: str) -> str: | |
""" Return a path on the filesystem with the base_path included """ | |
return os.path.join(self.base_path, path) | |
def open(self, path: str, mode: str = "rb") -> fsspec.core.OpenFile: | |
""" Return an OpenFile for the specified file system """ | |
fs_path = self.with_base_path(path) | |
return self.fs.open(fs_path, mode=mode) | |
def put(self, local_path: str, destination_path: str): | |
""" Put a local file on the file system """ | |
fs_path = self.with_base_path(destination_path) | |
return self.fs.put(str(local_path), fs_path) | |
def info_metadata(self, path): | |
""" Return a selection of info metadata about the given path """ | |
info = self.fs.info(path) | |
return {"file size (in bytes)": info["size"]} | |
def yield_info_metadata(self, path): | |
""" Yield metadata about a given path """ | |
info = self.fs.info(path) | |
yield EventMetadataEntry.int(info["size"], "File Size (in bytes)") | |
def PutFile(self, file_name: str): | |
""" | |
Use a temporary file with a context manager locally to write a file, | |
then copy it to underlying storage system | |
Usage: | |
netcdf_file = context.resources.fs.PutFile(f"{date}.nc") | |
with netcdf_file as temp_path: | |
ds.to_netcdf(temp_path) | |
yield Output( | |
netcdf_file.dest_path, | |
metadata={ | |
**netcdf_file.metadata(), | |
"dataset_summary": EventMetadata.text(str(ds)) | |
} | |
) | |
""" | |
return PutFile(file_name, self) | |
def GetFile(self, path: str): | |
""" | |
Use a temporary file with a context file to locally read a file, | |
no matter the source filesystem, or the underlying tools ability | |
to access remote file systems. | |
""" | |
return GetFile(path, self) | |
class LocalFsSpecFilesystem(FsSpecFilesystem): | |
""" | |
Local filesystem | |
""" | |
def __init__(self, base_path: str = None): | |
""" | |
Initialize a local filesystem with a specified base_path | |
to append to created files. | |
""" | |
self.protocol = "file" | |
self.base_path = base_path | |
self.fs = local.LocalFileSystem(auto_mkdir=True) | |
class S3FsSpecFilesystem(FsSpecFilesystem): | |
""" | |
S3 compatible filesystem | |
""" | |
def __init__( | |
self, | |
bucket: str, | |
prefix: str, | |
credential_prefix: str = "AWS", | |
endpoint_url: str = "", | |
): | |
""" | |
Initialize a new S3 compatible filesystem | |
arguments: | |
bucket: S3 bucket to store data in. | |
prefix: Prefix to add to the beginning of paths created within the bucket. | |
credential_prefix: | |
Credentials are by default found at `AWS_ACCESS_KEY_ID` and | |
`AWS_SECRET_ACCESS_KEY`. Specify a credential prefix, such as `DO` | |
to find credentials with a different name (`DO_ACCESS_KEY_ID`). | |
endpoint_url: | |
Alternate endpoint if using a non-AWS S3 compatible service. | |
URL should include protocol `https://`. | |
""" | |
self.protocol = "s3" | |
self.bucket = bucket | |
self.prefix = prefix | |
access_key = os.environ.get(f"{credential_prefix}_ACCESS_KEY_ID") | |
secret_access_key = os.environ.get(f"{credential_prefix}_SECRET_ACCESS_KEY") | |
client_kwargs = {} | |
if endpoint_url != "": | |
client_kwargs["endpoint_url"] = endpoint_url | |
# self.fs = s3fs.S3FileSystem( | |
# key=access_key, secret=secret_access_key, client_kwargs=client_kwargs | |
# ) | |
self.fs = fsspec.filesystem( | |
"filecache", | |
target_protocol="s3", | |
target_options={ | |
"key": access_key, | |
"secret": secret_access_key, | |
"client_kwargs": client_kwargs, | |
}, | |
cache_storage=f"/tmp/s3/{bucket}/", | |
) | |
def with_base_path(self, path: str) -> str: | |
""" Prepend the bucket and prefix to created files. """ | |
return os.path.join(self.bucket, self.prefix, path) | |
def info_metadata(self, path): | |
""" | |
Return a selection of metadata about the specified path, | |
including a URL to download it. | |
""" | |
one_month_in_seconds = 60 * 60 * 24 * 30 | |
return { | |
**super().info_metadata(path), | |
"expiring url": EventMetadata.url( | |
self.fs.url(path, expires=one_month_in_seconds) | |
), | |
} | |
def yield_info_metadata(self, path): | |
""" Yield metadata about a given path """ | |
# info = self.fs.info(path) | |
# yield EventMetadataEntry.int(info['size'], "File Size (in bytes)") | |
for metadata_entry in super().yield_info_metadata(path): | |
yield metadata_entry | |
one_month_in_seconds = 60 * 60 * 24 * 30 | |
yield EventMetadataEntry.url( | |
self.fs.url(path, expires=one_month_in_seconds), "expiring URL" | |
) | |
class PutFile: | |
""" | |
Use a temporary file with a context manager locally to write a file, | |
then copy it to underlying storage system | |
Usage: | |
netcdf_file = context.resources.fs.PutFile(f"{date}.nc") | |
with netcdf_file as temp_path: | |
ds.to_netcdf(temp_path) | |
yield Output( | |
netcdf_file.dest_path, | |
metadata={ | |
**netcdf_file.metadata(), | |
"dataset_summary": EventMetadata.text(str(ds)) | |
} | |
) | |
""" | |
def __init__(self, file_name: str, fs: FsSpecFilesystem): | |
self.file_name = file_name | |
self.fs = fs | |
self.dest_path = fs.with_base_path(file_name) | |
def __enter__(self): | |
self.temp_dir = TemporaryDirectory() | |
self.temp_file = os.path.join( | |
self.temp_dir.name, os.path.basename(self.file_name) | |
) | |
return self.temp_file | |
def __exit__(self, type, value, traceback): | |
self.fs.fs.put(self.temp_file, self.dest_path) | |
self.stat = os.stat(self.temp_file) | |
self.temp_dir.cleanup() | |
def metadata(self): | |
""" Generate default metadata after creating a file """ | |
return { | |
**self.fs.info_metadata(self.dest_path), | |
"path": EventMetadata.path(self.dest_path), | |
} | |
def yield_metadata(self): | |
for metadata_entry in self.fs.yield_info_metadata(self.dest_path): | |
yield metadata_entry | |
yield EventMetadataEntry.path(self.dest_path, "Path") | |
class GetFile: | |
""" | |
Use a temporary file with a context file to locally read a file, | |
no matter the source filesystem, or the underlying tools ability | |
to access remote file systems. | |
""" | |
def __init__(self, path: str, fs: FsSpecFilesystem): | |
self.path = path | |
self.fs = fs | |
def __enter__(self): | |
self.temp_dir = TemporaryDirectory() | |
self.temp_file = os.path.join(self.temp_dir.name, os.path.basename(self.path)) | |
self.fs.fs.get(self.path, self.temp_file) | |
return self.temp_file | |
def __exit__(self, type, value, traceback): | |
self.temp_dir.cleanup() | |
@resource( | |
config_schema={ | |
"protocol": Field(str, description="fsspec protocol"), | |
"base_path": Field( | |
str, | |
default_value="", | |
is_required=False, | |
description="Path to perform all filesystem operations under", | |
), | |
"storage_options": Field( | |
dict, | |
default_value={}, | |
is_required=False, | |
description="Option arguments for spawning an fsspec filesystem", | |
), | |
} | |
) | |
def fsspec_resource(context): | |
return FsSpecFilesystem( | |
context.resource_config["protocol"], | |
context.resource_config["base_path"], | |
context.resource_config["storage_options"], | |
) | |
@resource( | |
config_schema={ | |
"base_path": Field( | |
str, | |
default_value="", | |
is_required=False, | |
description="Path to perform all filesystem operations under", | |
) | |
} | |
) | |
def local_fsspec_resource(context): | |
return LocalFsSpecFilesystem(context.resource_config["base_path"]) | |
@resource( | |
config_schema={ | |
"bucket": Field(str, description="S3 bucket to store data in"), | |
"prefix": Field( | |
str, | |
default_value="", | |
is_required=False, | |
description="Bucket prefix before files", | |
), | |
"credential_prefix": Field( | |
str, | |
default_value="AWS", | |
is_required=False, | |
description="Prefix for AWS credential environment variables. E.x.: AWS for AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY", | |
), | |
"endpoint_url": Field( | |
Noneable(str), | |
default_value=None, | |
is_required=False, | |
description="Domain for non-S3 buckets", | |
), | |
} | |
) | |
def s3_fsspec_resource(context): | |
return S3FsSpecFilesystem( | |
context.resource_config["bucket"], | |
context.resource_config["prefix"], | |
context.resource_config["credential_prefix"], | |
context.resource_config["endpoint_url"], | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment