Skip to content

Instantly share code, notes, and snippets.

@cartershanklin
Last active March 6, 2024 12:15
Show Gist options
  • Save cartershanklin/a0dc4f5c7a640074edeaf502b0b7e526 to your computer and use it in GitHub Desktop.
Save cartershanklin/a0dc4f5c7a640074edeaf502b0b7e526 to your computer and use it in GitHub Desktop.
Deploy OCI Data Flow Applications Easily
#!/usr/bin/env python3
# Deploy OCI Data Flow Applications easier.
#
# Features:
# Interactive compartment and bucket picker.
# Checksigs your script and archive and adds this to you Application name.
# Skips uploads when possible.
# Auto-detects your parameters and prompts you for defaults (SQL Only).
# Fail-fast option for completely headless operation.
import argparse
import glob
import hashlib
import logging
import oci
import os
import re
import sys
def upload_all_files(appfile, archive_file, bucket_name, objectstore):
# Compute MD5 suffixes.
app_md5 = file_md5(appfile)[-6:]
if archive_file is not None:
archive_md5 = file_md5(archive_file)[-6:]
suffix = f"{app_md5}/{archive_md5}"
else:
suffix = f"{app_md5}"
# Names on the destination.
namespace = objectstore.get_namespace().data
app_target_name = checksig(appfile, app_md5, archive=False)
app_target_uri = "oci://{}@{}/{}".format(bucket_name, namespace, app_target_name)
if archive_file is not None:
archive_target_name = checksig(appfile, archive_md5, archive=True)
archive_target_uri = "oci://{}@{}/{}".format(
bucket_name, namespace, archive_target_name
)
else:
archive_target_uri = None
# Upload the files.
listing = oci.pagination.list_call_get_all_results(
objectstore.list_objects, namespace, bucket_name
).data.objects
remote_files = [x.name for x in listing]
if app_target_name not in remote_files:
with open(appfile, "rb") as f:
logging.info(
"Uploading {} to {} in {}".format(appfile, app_target_name, bucket_name)
)
objectstore.put_object(namespace, bucket_name, app_target_name, f)
else:
logging.info("{} is already uploaded".format(app_target_name))
if archive_file is not None:
if archive_target_name not in remote_files:
with open(archive_file, "rb") as f:
logging.info(
"Uploading {} to {} in {}".format(
archive_file, archive_target_name, bucket_name
)
)
objectstore.put_object(namespace, bucket_name, archive_target_name, f)
else:
logging.info("{} is already uploaded".format(archive_target_name))
return app_target_uri, archive_target_uri, suffix
def discover_parameter_names(appfile, application_type, commandline):
if application_type == "SQL":
# Identify all parameters in the SQL script.
with open(appfile) as fd:
sql_text = fd.read()
parameter_names = list(set(re.findall("\$\{([^\}]+)\}", sql_text)))
else:
# Look for parameters in the command line.
if commandline is not None:
parameter_names = list(set(re.findall("\$\{([^\}]+)\}", commandline)))
else:
parameter_names = []
return parameter_names
def get_file_choice(path, type, failfast):
files = glob.glob(path)
if len(files) > 1:
if failfast:
raise Exception(
"Exiting because multiple choices exist and --failfast is set"
)
else:
from pick import pick
sizes = [os.stat(x).st_size for x in files]
title = f"Multiple files found, pick the {type} file: "
options = [
"{} ({} bytes)".format(os.path.basename(name), size)
for name, size in zip(files, sizes)
]
option, index = pick(options, title)
return files[index]
if len(files) == 1:
return files[0]
return None
def checksig(file, md5, archive=False):
suffix = file.split(".")[-1]
if archive:
return ".".join(
[os.path.basename(file)[: -len(suffix) - 1], "dependencies", md5, "zip"]
)
else:
return ".".join([os.path.basename(file)[: -len(suffix) - 1], md5, suffix])
def file_md5(filename):
with open(filename, "rb") as f:
md5 = hashlib.md5()
block_size = 8192
while True:
data = f.read(block_size)
if not data:
break
md5.update(data)
return md5.digest().hex()
def pick_compartment(config):
from pick import pick
identity = oci.identity.IdentityClient(config)
tenancy = identity.get_tenancy(config["tenancy"]).data
compartments = oci.pagination.list_call_get_all_results(
identity.list_compartments, tenancy.id, compartment_id_in_subtree=True
).data
all_compartments = [(x.name, x.id) for x in compartments]
title = "Choose a compartment: "
options = [x[0] for x in all_compartments]
option, index = pick(options, title)
compartment_id = all_compartments[index][1]
return compartment_id
def pick_metastore(config, compartment_id):
from pick import pick
catalog = oci.data_catalog.DataCatalogClient(config)
metastores = oci.pagination.list_call_get_all_results(
catalog.list_metastores, compartment_id
).data
all_metastores = [("No Metastore", None)] + [
(x.display_name, x.id) for x in metastores
]
title = "Choose a metastore: "
options = [x[0] for x in all_metastores]
option, index = pick(options, title)
metastore_id = all_metastores[index][1]
return metastore_id
def pick_bucket(config, compartment_id):
from pick import pick
client = oci.object_storage.ObjectStorageClient(config)
namespace = client.get_namespace().data
buckets = oci.pagination.list_call_get_all_results(
client.list_buckets, namespace, compartment_id
).data
all_buckets = [x.name for x in buckets]
if len(all_buckets) == 0:
raise Exception(
"No storage buckets in this region! Create a bucket and try again."
)
title = "Choose a bucket: "
option, index = pick(all_buckets, title)
return all_buckets[index]
def main():
# Set up logging.
format = "%(message)s"
logging.basicConfig(format=format, level=logging.INFO)
# Parse arguments.
parser = argparse.ArgumentParser()
parser.add_argument(
"--appdir",
help="Directory containing your application and optional dependency archive.",
)
parser.add_argument(
"--appfile",
help="Your Spark application",
)
parser.add_argument(
"--apptype",
help="Type of Spark application (e.g. SCALA)",
)
parser.add_argument(
"--archive-file",
help="Your Spark archive file",
)
parser.add_argument("--bucket-name", help="Target bucket name.")
parser.add_argument("--classname", help="Java/Scala class name.")
parser.add_argument("--commandline", help="Spark application command line.")
parser.add_argument("--compartment-id", help="Target Compartment ID.")
parser.add_argument(
"--configuration",
help="Spark application parameters. Can be provided multiple times.",
action="append",
)
parser.add_argument(
"--displayname", required=True, help="Data Flow Application name."
)
parser.add_argument(
"--driver-shape", default="VM.Standard2.1", help="Driver shape."
)
parser.add_argument(
"--executor-shape", default="VM.Standard2.1", help="Executor shape."
)
parser.add_argument(
"--failfast",
action="store_true",
help="Fail fast if any required info is missing.",
)
parser.add_argument("--metastore-id", help="Metastore OCID.")
parser.add_argument(
"--no-metastore",
default=False,
help="Do not configure a metastore.",
action="store_true",
)
parser.add_argument(
"--num-executors", type=int, default=1, help="Number of Spark executors."
)
parser.add_argument(
"--parameter-value",
action="append",
help="Parameter default in the form of key=value. Cam be provided multiple times.",
)
parser.add_argument(
"--profile-location",
default=oci.config.DEFAULT_LOCATION,
help="OCI Profile Location.",
)
parser.add_argument(
"--profile-name", default=oci.config.DEFAULT_PROFILE, help="OCI Profile Name."
)
parser.add_argument("--region")
parser.add_argument("--spark-version", default="3.0.2", help="Spark version.")
args = parser.parse_args()
config = oci.config.from_file(
file_location=args.profile_location, profile_name=args.profile_name
)
if args.region:
config["region"] = args.region
# Pull out any parameter values if given.
parameter_values = dict()
if args.parameter_value is not None:
for item in args.parameter_value:
key, value = item.split("=")
parameter_values[key] = value
# Spark properties
configuration = dict()
if args.configuration is not None:
for item in args.configuration:
key, value = item.split("=")
configuration[key] = value
extension_type_map = (
("sql", "SQL", True),
("py", "PYTHON", True),
("jar", "JAVA", False),
)
if args.appfile is not None:
# Set the application type.
for extension, type, flat in extension_type_map:
if args.appfile.endswith(extension):
application_type = type
break
# Allow overrides.
if args.apptype is not None:
application_type = args.apptype
# Locate a Python or SQL application file.
if args.appfile is None:
if args.appdir is None:
raise Exception("You must set either --appfile or --appdir")
for extension, type, flat in extension_type_map:
if not flat:
continue
application_path = os.path.join(args.appdir, f"*.{extension}")
args.appfile = get_file_choice(
application_path, "application", args.failfast
)
if args.appfile is not None:
application_type = type
break
# Handle Java compiled by Maven.
if args.appfile is None:
target_path = os.path.join(args.appdir, "target", "*.jar")
args.appfile = get_file_choice(target_path, "application", args.failfast)
if args.appfile is not None:
application_type = "JAVA"
# Error out if we haven't found anything.
if args.appfile is None:
raise Exception("Could not find an application file in " + args.appdir)
# Ensure Java apps have a class set.
if application_type in ("JAVA", "SCALA") and args.classname is None:
raise Exception("--classname must be set for Java applications")
# Look for an archive file.
if args.archive_file is None and args.appdir is not None:
archive_path = os.path.join(args.appdir, "*.zip")
args.archive_file = get_file_choice(archive_path, "archive", args.failfast)
# Initialize our clients.
client = oci.data_flow.DataFlowClient(config)
objectstore = oci.object_storage.ObjectStorageClient(config)
# Interactively pick names as needed.
if args.compartment_id is None:
if args.failfast:
raise Exception(
"Exiting because Compartment ID is not set and --failfast is set"
)
args.compartment_id = pick_compartment(config)
logging.info("Using compartment ID " + args.compartment_id)
if args.bucket_name is None:
if args.failfast:
raise Exception(
"Exiting because bucket name is not set and --failfast is set"
)
args.bucket_name = pick_bucket(config, args.compartment_id)
logging.info("Using bucket name " + args.bucket_name)
if args.no_metastore:
args.metastore_id = None
else:
if args.metastore_id is None:
args.metastore_id = pick_metastore(config, args.compartment_id)
if args.metastore_id is not None:
logging.info("Using metastore ID " + args.metastore_id)
# Identify parameter names as automatically as possible.
parameter_names = sorted(
discover_parameter_names(args.appfile, application_type, args.commandline)
)
# Make sure we have all parameters set to something.
for parameter in parameter_names:
if parameter not in parameter_values:
if args.failfast:
raise Exception(
"Exiting because parameter {} is not set and --failfast is set".format(
parameter
)
)
result = input(
"Enter the default value for parameter [{}]: ".format(parameter)
)
parameter_values[parameter] = result
# Upload all files.
app_target_uri, archive_target_uri, suffix = upload_all_files(
args.appfile, args.archive_file, args.bucket_name, objectstore
)
# Create the Data Flow Application
parameters = [
oci.data_flow.models.ApplicationParameter(name=x, value=parameter_values[x])
for x in parameter_names
]
if application_type == "SQL":
arguments = ["${" + x + "}" for x in parameter_names]
else:
if args.commandline is None:
arguments = []
else:
arguments = args.commandline.split()
display_name = "{} ({})".format(args.displayname, suffix)
details = dict(
arguments=arguments,
compartment_id=args.compartment_id,
configuration=configuration,
display_name=display_name,
driver_shape=args.driver_shape,
executor_shape=args.executor_shape,
file_uri=app_target_uri,
language=application_type,
num_executors=args.num_executors,
parameters=parameters,
spark_version=args.spark_version,
)
if application_type in ("JAVA", "SCALA"):
details["class_name"] = args.classname
if args.archive_file is not None:
details["archive_uri"] = archive_target_uri
if args.metastore_id is not None:
details["metastore_id"] = args.metastore_id
create_application_details = oci.data_flow.models.CreateApplicationDetails(
**details
)
logging.info("Creating the Data Flow Application " + display_name)
application = client.create_application(
create_application_details=create_application_details
)
if application.status != 200:
logging.error("Failed to create Data Flow Application")
logging.error(application.data)
sys.exit(1)
else:
matching_template = "tag.oci_dataflow.applicationId.value='{}'"
policy_templates = [
# {request.principal.id = 'XXX'}
"allow any-user to manage object-family in compartment compartment1 where ALL {{ {} }}",
"allow any-user to manage object-family in compartment compartment1 where ALL {{ {}, target.bucket.name = '<bucket>', target.object.name = /sales.db*/ }}",
"allow any-user to manage streams in compartment compartment1 where ALL {{ {}, target.stream.id='<stream_ocid>' }}",
]
condition = matching_template.format(application.data.id)
logging.info(
"Match all Runs of this Application in an IAM Policy using this condition: [{}]".format(
condition
)
)
logging.info("Example Policies")
for template in policy_templates:
policy = template.format(condition)
logging.info(" " + policy)
print(application.data.id)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment