Skip to content

Instantly share code, notes, and snippets.

@brews
Last active December 12, 2023 18:48
Show Gist options
  • Save brews/0a42fc5d0c276930c7e4db581cebe99e to your computer and use it in GitHub Desktop.
Save brews/0a42fc5d0c276930c7e4db581cebe99e to your computer and use it in GitHub Desktop.
Fills in implied directory blobs for GCS bucket mounted with GCSfuse without using the --implied-dir option.
"""
Fills in implied directory blobs for GCS bucket mounted with GCSfuse without using the --implied-dir option.
If you tell it to look in `mygcsbucket` for prefix `path/to/files/to/read/in/gcsfuse/`. It will
ensure implied dirs nested in gs://mygcsbucket/path/to/files/to/read/in/gcsfuse/ get covered.
It can handle 10 - 100k directories in under 30 minutes if you run it from Cloud Shell.
"""
import logging
from pathlib import Path
from collections.abc import Iterator
from google.cloud import storage
BUCKET_NAME = "mygcsbucket"
PREFIX = "path/to/files/to/read/in/gcsfuse/"
_log = logging.getLogger(__name__)
# Caching could make this better, reduce API calls, etc.
# Async with thread pool? Prob should do this in golang.
def find_implied_dirs(blob_name: str) -> Iterator[str]:
"""
Get all implied dir names in bucket blob name
"""
p = Path(blob_name).parent
# Skip if root dir.
if p == Path("."):
yield
# Parse implied parent dirs in blob name.
for i in range(1, len(p.parts) + 1):
# Add the "/" to end for bucket path compat.
parentdir = str(Path(*p.parts[:i])) + "/"
yield parentdir
def main(bucket_name: str, prefix: str, dryrun: bool=False):
_log.info(f"Begining to process gs://{bucket_name}/{prefix}")
client = storage.Client()
bucket = client.get_bucket(bucket_name)
# Get a sorted, unique list of all the implied directories we need to consider.
candidates = sorted(
set(
(c for b in bucket.list_blobs(prefix=prefix) for c in find_implied_dirs(b.name))
)
)
_log.info(f"Found n={len(candidates)}")
for candidate in candidates:
_log.debug(f"Processing {candidate}")
# Create empty blob for any non-exist directories
dir_blob = bucket.blob(candidate)
if dir_blob.exists():
_log.debug(f"Placeholder blob already exists for {dir_blob.name}, skipping")
continue
if not dryrun:
# Create empty blob as dir placeholder.
dir_blob.upload_from_string("", content_type="application/x-www-form-urlencoded;charset=UTF-8")
_log.info(f"Created blob for {dir_blob.name}")
_log.info(f"Done processing gs://{bucket_name}/{prefix}")
if __name__ == "__main__":
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s:%(message)s', level="INFO")
main(
bucket_name=BUCKET_NAME,
prefix=PREFIX,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment