Skip to content

Instantly share code, notes, and snippets.

@abkfenris
Created April 6, 2022 14:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save abkfenris/f49144cbf907101853b971034c7f092d to your computer and use it in GitHub Desktop.
Save abkfenris/f49144cbf907101853b971034c7f092d to your computer and use it in GitHub Desktop.
Fsspec context manager and dagster resource
import os
from tempfile import TemporaryDirectory
from dagster import (
resource,
EventMetadata,
EventMetadataEntry,
Field,
Noneable,
)
import fsspec
from fsspec.implementations import local
import s3fs
from .io_managers import pandas_io_manager
class FsSpecFilesystem:
"""
Base of shared filesystem access,
using FSSpec to unify filesystem implementations.
"""
def __init__(
self, protocol: str, base_path: str = None, storage_options: dict = None
):
self.protocol = protocol
self.base_path = base_path
storage_options = {} if not storage_options else storage_options
self.fs = fsspec.filesystem(protocol, **storage_options)
def with_base_path(self, path: str) -> str:
""" Return a path on the filesystem with the base_path included """
return os.path.join(self.base_path, path)
def open(self, path: str, mode: str = "rb") -> fsspec.core.OpenFile:
""" Return an OpenFile for the specified file system """
fs_path = self.with_base_path(path)
return self.fs.open(fs_path, mode=mode)
def put(self, local_path: str, destination_path: str):
""" Put a local file on the file system """
fs_path = self.with_base_path(destination_path)
return self.fs.put(str(local_path), fs_path)
def info_metadata(self, path):
""" Return a selection of info metadata about the given path """
info = self.fs.info(path)
return {"file size (in bytes)": info["size"]}
def yield_info_metadata(self, path):
""" Yield metadata about a given path """
info = self.fs.info(path)
yield EventMetadataEntry.int(info["size"], "File Size (in bytes)")
def PutFile(self, file_name: str):
"""
Use a temporary file with a context manager locally to write a file,
then copy it to underlying storage system
Usage:
netcdf_file = context.resources.fs.PutFile(f"{date}.nc")
with netcdf_file as temp_path:
ds.to_netcdf(temp_path)
yield Output(
netcdf_file.dest_path,
metadata={
**netcdf_file.metadata(),
"dataset_summary": EventMetadata.text(str(ds))
}
)
"""
return PutFile(file_name, self)
def GetFile(self, path: str):
"""
Use a temporary file with a context file to locally read a file,
no matter the source filesystem, or the underlying tools ability
to access remote file systems.
"""
return GetFile(path, self)
class LocalFsSpecFilesystem(FsSpecFilesystem):
"""
Local filesystem
"""
def __init__(self, base_path: str = None):
"""
Initialize a local filesystem with a specified base_path
to append to created files.
"""
self.protocol = "file"
self.base_path = base_path
self.fs = local.LocalFileSystem(auto_mkdir=True)
class S3FsSpecFilesystem(FsSpecFilesystem):
"""
S3 compatible filesystem
"""
def __init__(
self,
bucket: str,
prefix: str,
credential_prefix: str = "AWS",
endpoint_url: str = "",
):
"""
Initialize a new S3 compatible filesystem
arguments:
bucket: S3 bucket to store data in.
prefix: Prefix to add to the beginning of paths created within the bucket.
credential_prefix:
Credentials are by default found at `AWS_ACCESS_KEY_ID` and
`AWS_SECRET_ACCESS_KEY`. Specify a credential prefix, such as `DO`
to find credentials with a different name (`DO_ACCESS_KEY_ID`).
endpoint_url:
Alternate endpoint if using a non-AWS S3 compatible service.
URL should include protocol `https://`.
"""
self.protocol = "s3"
self.bucket = bucket
self.prefix = prefix
access_key = os.environ.get(f"{credential_prefix}_ACCESS_KEY_ID")
secret_access_key = os.environ.get(f"{credential_prefix}_SECRET_ACCESS_KEY")
client_kwargs = {}
if endpoint_url != "":
client_kwargs["endpoint_url"] = endpoint_url
# self.fs = s3fs.S3FileSystem(
# key=access_key, secret=secret_access_key, client_kwargs=client_kwargs
# )
self.fs = fsspec.filesystem(
"filecache",
target_protocol="s3",
target_options={
"key": access_key,
"secret": secret_access_key,
"client_kwargs": client_kwargs,
},
cache_storage=f"/tmp/s3/{bucket}/",
)
def with_base_path(self, path: str) -> str:
""" Prepend the bucket and prefix to created files. """
return os.path.join(self.bucket, self.prefix, path)
def info_metadata(self, path):
"""
Return a selection of metadata about the specified path,
including a URL to download it.
"""
one_month_in_seconds = 60 * 60 * 24 * 30
return {
**super().info_metadata(path),
"expiring url": EventMetadata.url(
self.fs.url(path, expires=one_month_in_seconds)
),
}
def yield_info_metadata(self, path):
""" Yield metadata about a given path """
# info = self.fs.info(path)
# yield EventMetadataEntry.int(info['size'], "File Size (in bytes)")
for metadata_entry in super().yield_info_metadata(path):
yield metadata_entry
one_month_in_seconds = 60 * 60 * 24 * 30
yield EventMetadataEntry.url(
self.fs.url(path, expires=one_month_in_seconds), "expiring URL"
)
class PutFile:
"""
Use a temporary file with a context manager locally to write a file,
then copy it to underlying storage system
Usage:
netcdf_file = context.resources.fs.PutFile(f"{date}.nc")
with netcdf_file as temp_path:
ds.to_netcdf(temp_path)
yield Output(
netcdf_file.dest_path,
metadata={
**netcdf_file.metadata(),
"dataset_summary": EventMetadata.text(str(ds))
}
)
"""
def __init__(self, file_name: str, fs: FsSpecFilesystem):
self.file_name = file_name
self.fs = fs
self.dest_path = fs.with_base_path(file_name)
def __enter__(self):
self.temp_dir = TemporaryDirectory()
self.temp_file = os.path.join(
self.temp_dir.name, os.path.basename(self.file_name)
)
return self.temp_file
def __exit__(self, type, value, traceback):
self.fs.fs.put(self.temp_file, self.dest_path)
self.stat = os.stat(self.temp_file)
self.temp_dir.cleanup()
def metadata(self):
""" Generate default metadata after creating a file """
return {
**self.fs.info_metadata(self.dest_path),
"path": EventMetadata.path(self.dest_path),
}
def yield_metadata(self):
for metadata_entry in self.fs.yield_info_metadata(self.dest_path):
yield metadata_entry
yield EventMetadataEntry.path(self.dest_path, "Path")
class GetFile:
"""
Use a temporary file with a context file to locally read a file,
no matter the source filesystem, or the underlying tools ability
to access remote file systems.
"""
def __init__(self, path: str, fs: FsSpecFilesystem):
self.path = path
self.fs = fs
def __enter__(self):
self.temp_dir = TemporaryDirectory()
self.temp_file = os.path.join(self.temp_dir.name, os.path.basename(self.path))
self.fs.fs.get(self.path, self.temp_file)
return self.temp_file
def __exit__(self, type, value, traceback):
self.temp_dir.cleanup()
@resource(
config_schema={
"protocol": Field(str, description="fsspec protocol"),
"base_path": Field(
str,
default_value="",
is_required=False,
description="Path to perform all filesystem operations under",
),
"storage_options": Field(
dict,
default_value={},
is_required=False,
description="Option arguments for spawning an fsspec filesystem",
),
}
)
def fsspec_resource(context):
return FsSpecFilesystem(
context.resource_config["protocol"],
context.resource_config["base_path"],
context.resource_config["storage_options"],
)
@resource(
config_schema={
"base_path": Field(
str,
default_value="",
is_required=False,
description="Path to perform all filesystem operations under",
)
}
)
def local_fsspec_resource(context):
return LocalFsSpecFilesystem(context.resource_config["base_path"])
@resource(
config_schema={
"bucket": Field(str, description="S3 bucket to store data in"),
"prefix": Field(
str,
default_value="",
is_required=False,
description="Bucket prefix before files",
),
"credential_prefix": Field(
str,
default_value="AWS",
is_required=False,
description="Prefix for AWS credential environment variables. E.x.: AWS for AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY",
),
"endpoint_url": Field(
Noneable(str),
default_value=None,
is_required=False,
description="Domain for non-S3 buckets",
),
}
)
def s3_fsspec_resource(context):
return S3FsSpecFilesystem(
context.resource_config["bucket"],
context.resource_config["prefix"],
context.resource_config["credential_prefix"],
context.resource_config["endpoint_url"],
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment