Skip to content

Instantly share code, notes, and snippets.

@manning-ncsa
Last active July 11, 2024 14:40
Show Gist options
  • Save manning-ncsa/a2e7a3b9c3efe8998e25992012a7e5f7 to your computer and use it in GitHub Desktop.
Save manning-ncsa/a2e7a3b9c3efe8998e25992012a7e5f7 to your computer and use it in GitHub Desktop.
S3-compatible object storage upload utilities
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

S3-compatible object storage upload utilities

This script performs three functions related to the Y6 BAO data release: listing files, comparing file lists, and verifying file integrity.

List uploaded files

List all uploaded files:

$ python main.py list

Compare list of files

Compare a list of desired files and folders to what is currently uploaded:

$ # The second argument is the path to a text file containing the target list of files and folders
$ python main.py compare ./y6_bao_target_files.txt

File missing: "/despublic/y3a2_files/y6_bao/Mocks/Clustering/ACF/xi_4096_x15_v1.0_seed00642_map1_bin1_DESPhoto.dat"

Verify file integrity

To verify uploaded files, run the command as shown below, where the argument after verify is the local path containing the files to verify. The directory tree will be recursively walked and must match the structure of the uploaded files. For example, if your local files included DESY6BAO_mask.fits.gz and its remote path was /Data/Mask/DESY6BAO_mask.fits.gz, then locally the file path could be /home/user/y6_bao/Data/Mask/DESY6BAO_mask.fits.gz, where the command would then be

$ python main.py verify /home/user/y6_bao

ERROR    Comparing "DESY6BAO_nz.txt"... False
INFO     Comparing "DESY6BAO_datavectors.zip"... True
INFO     Comparing "COLA_BAOsample_Y6_v1.0_seed00152_map2_DESPhoto.parquet"... True
INFO     Comparing "/Data/Mask/DESY6BAO_mask.fits.gz"... True
...
INFO     Comparison complete.

The results should be similar to what is shown above, where the first log output line shows an example of an upload with failed integrity that should be replaced.

from object_store import ObjectStore, get_logger
import os
import sys
from pathlib import PurePath
import csv
logger = get_logger(__name__)
s3 = ObjectStore()
def uploaded_file_info(data_path='/despublic/y3a2_files/y6_bao'):
size_in_bytes = 0
num_files = 0
latest_time = None
object_keys = s3.list_directory(root_path=data_path)
for file_path in object_keys:
try:
object_info = s3.object_info(path=file_path)
logger.debug(file_path)
file_size = int(object_info['ContentLength'])
modified_time = object_info['LastModified']
if not latest_time or modified_time > latest_time:
latest_time = modified_time
size_in_bytes += file_size
num_files += 1
logger.debug(f'''[{int(file_size/(1024**2))} MiB] "{file_path}"...''')
except Exception as err:
logger.error(f'''"{file_path}": {err}''')
continue
logger.info(f'''Total size ({num_files} files): {int(size_in_bytes/(1024**3))} GiB''')
logger.info(f'''Latest modified time: {latest_time}''')
def verify_uploads(local_dir, remote_dir):
for dirpath, dirnames, filenames in os.walk(local_dir):
for filename in filenames:
file_path = os.path.join(dirpath, filename)
sub_path = os.path.join(dirpath, filename).replace(local_dir, '').strip('/')
logger.debug(f'''local sub_path: {sub_path}''')
remote_path = os.path.join(remote_dir, sub_path)
logger.debug(f'''remote filepath: {remote_path}''')
obj = s3.object_info(path=remote_path)
etag = obj['ETag']
logger.debug(f'source etag: {etag}')
checksum_match = s3.etag_compare(file_path, etag)
log_msg = f'''Comparing "{file_path.replace(local_dir, '').strip('/')}"... {checksum_match}'''
if checksum_match:
logger.info(log_msg)
else:
logger.error(log_msg)
logger.info('''Comparison complete.''')
def print_tree(tree, indent=0):
for key, value in tree.items():
print(' ' * indent + str(key))
if isinstance(value, dict):
print_tree(value, indent + 2)
def tree_to_dict(in_dict={}):
out_dict = {}
in_dict = dict(in_dict)
out_dict = dict(out_dict)
# print(f'''in_dict: {in_dict}''')
# print(f'''out_dict: {out_dict}''')
for key, value in in_dict.items():
if not key:
continue
# if key == 'y6_bao':
# print(f'''key: {key}''')
# print(f'''value: {value}''')
if isinstance(value, dict):
# sub_dict = deepcopy(out_dict)[key]
# print(sub_dict)
out_dict[str(key)] = tree_to_dict(value)
if not value:
out_dict[str(key)] = value
return out_dict
def compare_target_file_list(file_path, path_prefix='', create_missing_dirs=False):
with open(file_path) as fp:
files = csv.reader(fp, skipinitialspace=True, strict=True)
for row in files:
if not row or len(row) < 1:
continue
rel_path = row[0]
is_dir = False
if rel_path.endswith('/'):
is_dir = True
rel_path = rel_path.strip('/')
if is_dir:
# The trailing slash is necessary for boto3 to create a folder object
rel_path = f'''{rel_path}/'''
obj_path = os.path.join(path_prefix, rel_path)
# print(f'''Searching path: "{obj_path}"''')
exists = s3.object_exists(obj_path)
if not exists:
if is_dir and create_missing_dirs:
print(f'''Creating missing folder: "{obj_path}"...''')
s3.create_folder(obj_path)
else:
print(f'''File missing: "{obj_path}"''')
# else:
# print(f'''Path exists: "{obj_path}"''')
def print_all_files(root_path):
objects = s3.list_directory(root_path=root_path, full_object=True)
for obj in objects:
key = obj['Key']
rel_path = PurePath(key.replace(root_path.strip('/'), ''))
print(rel_path)
if __name__ == '__main__':
if len(sys.argv) > 1:
cmd = sys.argv[1]
root_path = '/despublic/y3a2_files/y6_bao'
if cmd == 'verify':
# Verify uploads against local files
local_dir = sys.argv[2]
verify_uploads(local_dir=local_dir, remote_dir=root_path)
elif cmd == 'compare':
# Compare the target file list against the uploads
file_path = sys.argv[2]
compare_target_file_list(file_path=file_path, path_prefix=root_path, create_missing_dirs=True)
elif cmd == 'list':
# List all uploaded files
print_all_files(root_path)
sys.exit()
import boto3
import os, sys
import json
from botocore.exceptions import ClientError
from collections import defaultdict
import logging
from botocore import UNSIGNED
from botocore.client import Config
import hashlib
def get_logger(name):
# Configure logging
logging.basicConfig(
# format='%(asctime)s [%(name)-12s] %(levelname)-8s %(message)s',
format='%(levelname)-8s %(message)s',
)
logger = logging.getLogger(name)
logger.setLevel(os.getenv('LOG_LEVEL', logging.INFO))
# handler = logging.StreamHandler(sys.stdout)
# handler.setLevel(os.getenv('LOG_LEVEL', logging.INFO))
# logger.addHandler(handler)
return logger
logger = get_logger(__name__)
class ObjectStore:
def __init__(self) -> None:
'''Initialize S3 client'''
self.config = {
'endpoint-url': os.getenv("S3_ENDPOINT_URL", "https://ncsa.osn.xsede.org"),
'region-name': os.getenv("S3_REGION_NAME", ""),
'bucket': os.getenv("S3_BUCKET", "phy240006-bucket01"),
'aws_access_key_id': os.getenv("AWS_ACCESS_KEY_ID", ''),
'aws_secret_access_key': os.getenv("AWS_SECRET_ACCESS_KEY", ''),
}
if not (self.config['aws_access_key_id'] and self.config['aws_secret_access_key']):
self.client = boto3.client(
"s3",
endpoint_url=self.config['endpoint-url'],
region_name=self.config['region-name'],
config=Config(signature_version=UNSIGNED),
)
else:
self.client = boto3.client(
"s3",
endpoint_url=self.config['endpoint-url'],
region_name=self.config['region-name'],
aws_access_key_id=self.config['aws_access_key_id'],
aws_secret_access_key=self.config['aws_secret_access_key'],
)
def store_folder(self, src_dir="", bucket_root_path=""):
for dirpath, dirnames, filenames in os.walk(src_dir):
for filename in filenames:
self.put_object(
path=os.path.join(bucket_root_path, dirpath.replace(src_dir, '').strip('/'), filename),
file_path=os.path.join(dirpath, filename),
)
def put_object(self, path="", data="", file_path="", json_output=True):
if data:
logger.debug(f'''Uploading data object to object store: "{path}"''')
if json_output:
body = json.dumps(data, indent=2)
else:
body = data
self.client.put_object(Body=body, Bucket=self.config['bucket'], Key=path)
elif file_path:
logger.debug(f'''Uploading file to object store: "{path}"''')
self.client.upload_file(file_path, self.config['bucket'], path)
def get_object(self, path=""):
try:
obj = self.client.get_object(
Bucket=self.config['bucket'],
Key=path)
except ClientError as ex:
if ex.response['Error']['Code'] == 'NoSuchKey':
obj = None
else:
raise
return obj
def download_object(self, path="", file_path=""):
self.client.download_file(
self.config['bucket'],
path,
file_path,
)
def create_folder(self, folder_path):
response = self.client.put_object(
Bucket=self.config['bucket'],
Body='',
Key=f'''{folder_path.strip('/')}/'''
)
print(response)
def delete_directory(self, root_path):
objects = self.client.list_objects(
Bucket=self.config['bucket'],
Prefix=root_path,
)
if 'Contents' not in objects:
return
for obj in objects['Contents']:
self.client.delete_object(Bucket=self.config['bucket'], Key=obj['Key'])
def list_directory(self, root_path, full_object=False):
response = self.client.list_objects_v2(
Bucket=self.config['bucket'],
Prefix=root_path.strip('/'),
)
if 'Contents' not in response:
return []
objects = response['Contents']
if response['IsTruncated']:
while 'NextContinuationToken' in response:
response = self.client.list_objects_v2(
Bucket=self.config['bucket'],
Prefix=root_path.strip('/'),
ContinuationToken=response['NextContinuationToken']
)
if 'Contents' in response:
objects.extend(response['Contents'])
if full_object:
return objects
else:
return [obj['Key'] for obj in objects]
def list_directory_tree(self, root_path):
objects = self.client.list_objects_v2(
Bucket=self.config['bucket'],
Prefix=root_path.strip('/'),
)
tree = lambda: defaultdict(tree)
files_tree = tree()
if 'Contents' in objects:
for obj in objects['Contents']:
parts = obj['Key'].split('/')
current_level = files_tree
for part in parts[:-1]:
current_level = current_level[part]
parts[-1]
return files_tree
def object_info(self, path):
# print(f'''Getting object info for path: {path}''')
if path.endswith('/'):
response = self.client.list_objects_v2(
Bucket=self.config['bucket'],
Prefix=path.strip('/'),
)
if 'Contents' not in response:
return {}
else:
return response
try:
response = self.client.head_object(
Bucket=self.config['bucket'],
Key=path.strip('/'),
)
except ClientError as ex:
if ex.response['Error']['Code'] == 'NoSuchKey':
response = {}
elif ex.response['Error']['Code'] == '404':
response = {}
else:
print(ex.response['Error']['Code'])
raise
return response
def object_exists(self, path):
# print(f'''Checking if path exists: {path}''')
if self.object_info(path):
return True
else:
return False
def copy_directory(self, src_path, dst_root_path):
objects = self.client.list_objects(
Bucket=self.config['bucket'],
Prefix=src_path,
)
for obj in objects['Contents']:
logger.debug(obj)
dst_rel_path = obj['Key'].replace(src_path, '').strip('/')
dst_path = os.path.join(dst_root_path, dst_rel_path)
logger.debug(f'dst_path: {dst_path}')
self.client.copy_object(CopySource={
'Bucket': self.config['bucket'],
'Key': obj['Key']
}, Bucket=self.config['bucket'], Key=dst_path)
def md5_checksum(self, file_path):
'''https://stackoverflow.com/a/58239738'''
m = hashlib.md5()
with open(file_path, 'rb') as f:
for data in iter(lambda: f.read(1024 * 1024), b''):
m.update(data)
hexdigest = m.hexdigest()
logger.debug(f'calculated md5 checksum: {hexdigest}')
return hexdigest
def etag_checksum(self, file_path, chunk_size=512 * 1024 * 1024):
'''https://stackoverflow.com/a/58239738'''
md5s = []
with open(file_path, 'rb') as f:
for data in iter(lambda: f.read(chunk_size), b''):
md5s.append(hashlib.md5(data).digest())
md5sum = hashlib.md5(b"".join(md5s))
etag_checksum = f'{md5sum.hexdigest()}-{len(md5s)}'
logger.debug(f'calculated etag checksum: {etag_checksum}')
return etag_checksum
def etag_compare(self, file_path, etag_source, chunk_size=512 * 1024 * 1024):
'''https://stackoverflow.com/a/58239738'''
etag_source = etag_source.strip('"')
logger.debug(f'source etag checksum: {etag_source}')
if '-' in etag_source and etag_source == self.etag_checksum(file_path, chunk_size=chunk_size):
return True
if '-' not in etag_source and etag_source == self.md5_checksum(file_path):
return True
return False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment