Skip to content

Instantly share code, notes, and snippets.

@sirosen
Last active July 27, 2020 13:53
Show Gist options
  • Save sirosen/0d6a8afa87130281320cb1df84d0da59 to your computer and use it in GitHub Desktop.
Save sirosen/0d6a8afa87130281320cb1df84d0da59 to your computer and use it in GitHub Desktop.
Copy Globus Endpoint ACL Script
#!/usr/bin/env python3
"""
Copy all of the Access Control Lists from one Globus Endpoint to another, by
means of the globus-cli
This will not remove any ACLs from either endpoints and it will not stop to
check for duplicates
"""
import argparse
import json
import subprocess
import sys
import textwrap
def fail(message, code=1):
print(message, file=sys.stderr)
sys.exit(code)
def _run(*args):
return subprocess.run(
["globus"] + list(args), stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
def check_logged_in():
rc = _run("whoami").returncode
if rc != 0:
print(
"you must be logged in with the globus-cli to run this script",
file=sys.stderr,
)
sys.exit(2)
def check_endpoint_exists(epid):
rc = _run("endpoint", "show", epid).returncode
if rc != 0:
print("no endpoint found with ID {}".format(epid), file=sys.stderr)
sys.exit(1)
def copy_acls(src, dst, checkpoint_file):
# fetch and parse source ACLs
src_acls_raw = _run(
"endpoint", "permission", "list", src, "--jmespath", "DATA[?id!=null]"
).stdout
src_acls = json.loads(src_acls_raw)
# TODO: filter on an existing checkpoint_file here
# get an open handle on the checkpoint file in append mode, if one was
# passed (otherwise, none)
fp = open(checkpoint_file, "a") if checkpoint_file else None
# recreate on destination
try:
for doc in src_acls:
path = doc["path"]
permissions = doc["permissions"]
principal = doc["principal"]
principal_type = doc["principal_type"]
principal_args = (
"--group" if principal_type == "group" else "--identity",
principal,
)
create_result = _run(
"endpoint",
"permission",
"create",
"{}:{}".format(dst, path),
"--permissions",
permissions,
*principal_args
)
if create_result.returncode == 0 and fp:
fp.write(json.dumps(doc) + "\n")
else:
print("error creating permission:", file=sys.stderr)
print(textwrap.indent(create_result.stderr.decode("utf-8"), " "), file=sys.stderr)
sys.exit(1)
finally:
if fp:
fp.close()
def main():
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument("SOURCE_ENDPOINT_ID")
parser.add_argument("TARGET_ENDPOINT_ID")
# this script could be extended to skip rules found in any existing
# checkpoint file, so you can resume a failed copy
parser.add_argument(
"--checkpoint-file",
help=(
"An optional file where checkpoints will be written so you can "
"track which rules have already been created."
),
)
args = parser.parse_args()
check_logged_in()
check_endpoint_exists(args.SOURCE_ENDPOINT_ID)
check_endpoint_exists(args.TARGET_ENDPOINT_ID)
copy_acls(
args.SOURCE_ENDPOINT_ID,
args.TARGET_ENDPOINT_ID,
checkpoint_file=args.checkpoint_file,
)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment