Skip to content

Instantly share code, notes, and snippets.

@lgarrison
Last active September 24, 2020 17:17
Show Gist options
  • Save lgarrison/ca2b33b037415aa15a8aa8902f27e879 to your computer and use it in GitHub Desktop.
Save lgarrison/ca2b33b037415aa15a8aa8902f27e879 to your computer and use it in GitHub Desktop.
Abacus Globus script
#!/usr/bin/env python3
'''
This script facilitates Globus transfers of Abacus simulation data.
In particular, for AbacusSummit we want to transfer particle data and
halo catalogs from OLCF to NERSC after each simulation is done.
Both sites already have Globus endpoints, so the job of this script
is to initiate a transfer between the two, saving the transfer ID
number so we can query the transfer status later.
The intention of this script is primarily to do disk-to-disk
transfers rather than invoking HPSS, which is probably best
done separately via htar.
Important: this script assumes that a simulation's name is a unique
identifier.
Usage
=====
$ ./globus.py --help
Based on Globus automation-examples/globus_folder_sync.py.
'''
import sys
import os
import urllib
import argparse
import glob
import toml
from braceexpand import braceexpand
from globus_sdk import (NativeAppAuthClient, TransferClient,
RefreshTokenAuthorizer, TransferData)
from globus_sdk.exc import GlobusAPIError, TransferAPIError
from fair_research_login import NativeClient
from os.path import join as pjoin
# source and destination endpoints
OLCF_DTN_ENDPOINT = 'ef1a9560-7ca1-11e5-992c-22000b96db58'
NERSC_DTN_ENDPOINT = '9d6d994a-6d04-11e5-ba46-22000b92c6ec'
MARVIN_ENDPOINT = '45d63946-5906-11ea-9682-0e56c063f437'
# Destination Path -- The directory will be created if it doesn't exist
DEFAULT_NERSC_DEST = '/global/cfs/cdirs/desi/cosmosim/Abacus'
DEFAULT_MARVIN_DEST = '/mnt/store/AbacusSummit'
# You will need to register a *Native App* at https://developers.globus.org/
# Your app should include the following:
# - The scopes should match the SCOPES variable below
# - Your app's clientid should match the CLIENT_ID var below
# - "Native App" should be checked
# For more information:
# https://docs.globus.org/api/auth/developer-guide/#register-app
CLIENT_ID = ''
REDIRECT_URI = 'https://auth.globus.org/v2/web/auth-code'
SCOPES = ('openid email profile '
'urn:globus:auth:scope:transfer.api.globus.org:all')
APP_NAME = 'AbacusSummit Data Transfer'
# The status of jobs will not be re-queried if they were in one of these states last time we checked
GLOBUS_COMPLETION_STATUSES = ['SUCCEEDED', 'FAILED'] # other possibilities are ACTIVE, INACTIVE
# Create the destination folder if it does not already exist
CREATE_DESTINATION_FOLDER = True
DEFAULT_STATUS_FILE = 'globus_status.toml'
DEFAULT_STATUS_FILE = pjoin(os.path.dirname(os.path.abspath(__file__)), DEFAULT_STATUS_FILE) # make absolute
def load_status_log(status_log_fn):
try:
status_log = toml.load(status_log_fn)
except FileNotFoundError:
status_log = {}
return status_log
def write_status_log(status_log, status_log_fn):
with open(status_log_fn, 'w') as fp:
toml.dump(status_log, fp)
def update_all_status_from_globus(transfer_client=None, status_log=None,
status_log_fn=DEFAULT_STATUS_FILE,
source_endpoint=OLCF_DTN_ENDPOINT, dest_endpoint=NERSC_DTN_ENDPOINT):
'''
For each unfinished box in the status_log dict,
query globus to get its status_log and update
the dict.
If `transfer_client` and `status_log` can be given
if those resources are already open. Otherwise
they will be opened.
'''
own_status_log = False
if status_log is None:
status_log = load_status_log(status_log_fn)
own_status_log = True
if transfer_client is None:
transfer_client = setup_transfer_client(source_endpoint, dest_endpoint)
for boxname in status_log:
boxstatus = status_log[boxname]
oldstatus = boxstatus.get('status',None)
if oldstatus in GLOBUS_COMPLETION_STATUSES:
# We already know the fate of this task, no need to check again
continue
submission = boxstatus
task = transfer_client.get_task(submission['task_id'])
task = task.data # get dict
if 'event_link' in task:
del task['event_link']
if task['status'] != oldstatus:
print(f'Globus task status for {boxname} changed from {oldstatus} to {task["status"]}')
status_log[boxname] = task
if own_status_log:
write_status_log(status_log, status_log_fn)
def status(boxname, status_log_fn=DEFAULT_STATUS_FILE):
'''Get the log status of a single box'''
status_log = load_status_log(status_log_fn)
if boxname in status_log:
return status_log[boxname]['status']
return None
def get_client_tokens():
tokens = None
client = NativeClient(client_id=CLIENT_ID, app_name=APP_NAME)
try:
# if we already have tokens, load and use them
tokens = client.load_tokens(requested_scopes=SCOPES)
except:
pass
if not tokens:
# if we need to get tokens, start the Native App authentication process
# need to specify that we want refresh tokens
# N.B. the no_local_server is the key option not in the Globus automation-examples
# that lets us accomplish the login on a remote node.
tokens = client.login(requested_scopes=SCOPES,
refresh_tokens=True, no_local_server=True)
try:
client.save_tokens(tokens)
except:
pass
return tokens
def setup_transfer_client(source_endpoint, dest_endpoint):
tokens = get_client_tokens()
transfer_tokens = tokens['transfer.api.globus.org']
authorizer = RefreshTokenAuthorizer(
transfer_tokens['refresh_token'],
NativeAppAuthClient(client_id=CLIENT_ID),
access_token=transfer_tokens['access_token'],
expires_at=transfer_tokens['expires_at_seconds'])
transfer_client = TransferClient(authorizer=authorizer)
try:
transfer_client.endpoint_autoactivate(source_endpoint)
transfer_client.endpoint_autoactivate(dest_endpoint)
except GlobusAPIError as ex:
if ex.http_status == 401:
# TODO: when does this happen? Does the user need to delete a local file or just login again?
print('Globus refresh token has expired.', file=sys.stderr)
raise ex
else:
raise ex
return transfer_client
def check_endpoint_path(transfer_client, endpoint, path):
"""Check the endpoint path exists"""
try:
transfer_client.operation_ls(endpoint, path=path)
except TransferAPIError as tapie:
print(f'Globus: Failed to query endpoint "{endpoint}": {tapie.message}', file=sys.stderr)
raise tapie
def create_destination_directory(transfer_client, dest_ep, dest_path):
"""Create the destination path if it does not exist"""
try:
transfer_client.operation_ls(dest_ep, path=dest_path)
except TransferAPIError:
try:
transfer_client.operation_mkdir(dest_ep, dest_path)
print('Globus: Created directory on destination endpoint: {}'.format(dest_path))
except TransferAPIError as tapie:
print(f'Globus: Failed to create directory on destination endpoint: {tapie.message}', file=sys.stderr)
raise tapie
def _submit_transfer(tdata, transfer_client, status_log, status_log_fn, boxname):
'''
Submit the tdata TransferData object to the transfer_client
TODO: would probably make sense to wrap this whole interface in an object
so we don't have to pass so many params
'''
assert boxname not in status_log # should never fail, but double check
task = transfer_client.submit_transfer(tdata)
task = task.data # turn into a dict
del task['task_link'] # don't need this in the log file
task['status'] = task.pop('code') # rename for consistency with the task status
status_log[boxname] = task
# Immediately write the status log, this is our only (local) record
# Of course, one can always check the Globus web interface too
write_status_log(status_log, status_log_fn)
assert task['status'] == 'Accepted'
def start_globus_transfer(source_path, exclude=None, include=None, dest_path=DEFAULT_NERSC_DEST,
source_endpoint=OLCF_DTN_ENDPOINT, dest_endpoint=NERSC_DTN_ENDPOINT,
globus_verify_checksum=True, status_log_fn=DEFAULT_STATUS_FILE,
always=['info/','abacus.par','status.log', 'checksums.crc32']):
'''
Start a Globus transfer from the `source_path` on the `source_endpoint` to `dest_path`
on the `dest_endpoint`.
Note that `source_path` and `dest_path` are the paths as seen from the Globus endpoint,
which might(??) not be the same as the regular filesystem path. One can double-check
this with the Globus web interface.
'''
# Determine the box name from the file path
boxname = os.path.basename(os.path.abspath(source_path))
assert(os.path.isdir(source_path))
# Globus transfers a directory's contents, not the directory itself
# so need to make the containing directory on the destination
if dest_endpoint == MARVIN_ENDPOINT:
dest_path = DEFAULT_MARVIN_DEST
dest_path = pjoin(dest_path, boxname)
# Check that this box is not already in the status file
status_log = load_status_log(status_log_fn)
if boxname in status_log:
# Could instead check if status is FAILED (or whatever) and allow reruns in that case
raise RuntimeError(f'Box {boxname} was already found in {status_log_fn}. Is this a duplicate transfer?')
# Authenticate with Globus
transfer_client = setup_transfer_client(source_endpoint, dest_endpoint)
# Set up the destination directory
check_endpoint_path(transfer_client, source_endpoint, source_path)
if CREATE_DESTINATION_FOLDER:
create_destination_directory(transfer_client, dest_endpoint,
dest_path)
else:
check_endpoint_path(transfer_client, dest_endpoint, dest_path)
tdata = TransferData(
transfer_client,
source_endpoint,
dest_endpoint,
label=f'{boxname} transfer',
sync_level=None, # always overwrite
verify_checksum=globus_verify_checksum,
encrypt_data=False,
preserve_timestamp=True,
recursive_symlinks='ignore' # not supported
)
#if we are copying the entire directory (neither --exclude nor --include specified)
if not exclude and not include:
tdata.add_item(source_path, dest_path, recursive=True)
elif include:
for a in always:
sourcea = pjoin(source_path,a)
if os.path.exists(sourcea):
tdata.add_item(sourcea, pjoin(dest_path,a), recursive=os.path.isdir(sourcea))
for pattern in include:
for p in braceexpand(pattern):
prevdir = os.getcwd()
os.chdir(source_path)
files = glob.glob(p)
os.chdir(prevdir)
for f in files:
sourcef = pjoin(source_path, f)
tdata.add_item(sourcef, pjoin(dest_path, f), recursive=os.path.isdir(sourcef))
elif exclude:
print('Exclude not yet implemented!')
exit(1)
# files = set(glob("*"))
# for pattern in exclude:
# for p in braceexpand(pattern):
# files -= set(glob.glob(source_path + p))
else:
raise RuntimeError
# also writes status log
_submit_transfer(tdata, transfer_client, status_log, status_log_fn, boxname)
print('Globus transfer has been started from\n {}:{}\nto\n {}:{}'.format(
source_endpoint,
source_path,
dest_endpoint,
dest_path
))
url_string = 'https://app.globus.org/app/transfer?' + \
urllib.parse.urlencode({
'origin_id': source_endpoint,
'origin_path': source_path,
'destination_id': dest_endpoint,
'destination_path': dest_path
})
print('Visit the link below to see the changes:\n{}'.format(url_string))
# While we're here, go ahead and update the status of any pending transfers for other boxes
update_all_status_from_globus(transfer_client, status_log)
write_status_log(status_log, status_log_fn)
def subcommand_transfer(**args):
'''Process the command-line args for the "transfer" subcommand'''
args['globus_verify_checksum'] = not args.pop('no_globus_checksum')
for sim_dir in args.pop('simulation-dir'):
start_globus_transfer(sim_dir, **args)
def subcommand_status(simulation, status_log_fn=DEFAULT_STATUS_FILE, **kwargs):
# In case we were passed a path, get absolute basename
boxname = os.path.basename(os.path.abspath(simulation))
gstat = status(boxname, status_log_fn)
print('Not found in log' if gstat is None else gstat)
class ArgParseFormatter(argparse.RawDescriptionHelpFormatter, argparse.ArgumentDefaultsHelpFormatter):
pass
if __name__ == '__main__':
# TODO: want to show main help alongside subcommand help, or show all help in monolithic block
parser = argparse.ArgumentParser(description=__doc__, formatter_class=ArgParseFormatter)
subparsers = parser.add_subparsers(help='Sub-commands, git-style', required=True, dest='subcommand')
parser.add_argument('--status-log', help='Globus status log filename', default=DEFAULT_STATUS_FILE)
parser.add_argument('--source-endpoint', help='The Globus source endpoint', default='OLCF_DTN_ENDPOINT')
parser.add_argument('--dest-endpoint', help='The Globus destination endpoint', default='NERSC_DTN_ENDPOINT')
tparser = subparsers.add_parser('transfer', help='Transfer one or more boxes via Globus',
description='Transfer one or more boxes via Globus')
tparser.add_argument('simulation-dir', help='The simulation OutputDirectory containing the time slices and halo catalogs to transfer.', nargs='+')
tparser.add_argument('--no-globus-checksum', help='Turn off Globus checksum verification', action='store_true')
group = tparser.add_mutually_exclusive_group()
group.add_argument('--exclude', help='Exclude files matching glob pattern from transfer', nargs='?')
group.add_argument('--include', help='Only transfer files matching glob pattern. Understands literal globs and brace expansion.', nargs='?', action='append')
tparser.set_defaults(func=subcommand_transfer)
uparser = subparsers.add_parser('update', help='Update the status log from Globus',
description='Update the status log by querying Globus for the status of all pending boxes in the log')
uparser.set_defaults(func=update_all_status_from_globus) # just a straight pass-through
sparser = subparsers.add_parser('status', help='Get the log status of one box',
description='Read the local status log to get the transfer status of a box (based on last time we checked Globus')
sparser.add_argument('simulation', help='The simulation dir or box name. Only the basename will be used if a path is given.')
sparser.set_defaults(func=subcommand_status)
args = parser.parse_args()
func = args.func
args = vars(args)
args['status_log_fn'] = args.pop('status_log')
# Allow user to pass *_ENDPOINT variable names
for k in ('source_endpoint','dest_endpoint'):
if args[k].endswith('_ENDPOINT'):
args[k] = eval(args[k])
del args['subcommand'], args['func']
func(**args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment