Last active
March 6, 2024 12:15
-
-
Save cartershanklin/a0dc4f5c7a640074edeaf502b0b7e526 to your computer and use it in GitHub Desktop.
Deploy OCI Data Flow Applications Easily
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Deploy OCI Data Flow Applications easier. | |
# | |
# Features: | |
# Interactive compartment and bucket picker. | |
# Checksigs your script and archive and adds this to you Application name. | |
# Skips uploads when possible. | |
# Auto-detects your parameters and prompts you for defaults (SQL Only). | |
# Fail-fast option for completely headless operation. | |
import argparse | |
import glob | |
import hashlib | |
import logging | |
import oci | |
import os | |
import re | |
import sys | |
def upload_all_files(appfile, archive_file, bucket_name, objectstore): | |
# Compute MD5 suffixes. | |
app_md5 = file_md5(appfile)[-6:] | |
if archive_file is not None: | |
archive_md5 = file_md5(archive_file)[-6:] | |
suffix = f"{app_md5}/{archive_md5}" | |
else: | |
suffix = f"{app_md5}" | |
# Names on the destination. | |
namespace = objectstore.get_namespace().data | |
app_target_name = checksig(appfile, app_md5, archive=False) | |
app_target_uri = "oci://{}@{}/{}".format(bucket_name, namespace, app_target_name) | |
if archive_file is not None: | |
archive_target_name = checksig(appfile, archive_md5, archive=True) | |
archive_target_uri = "oci://{}@{}/{}".format( | |
bucket_name, namespace, archive_target_name | |
) | |
else: | |
archive_target_uri = None | |
# Upload the files. | |
listing = oci.pagination.list_call_get_all_results( | |
objectstore.list_objects, namespace, bucket_name | |
).data.objects | |
remote_files = [x.name for x in listing] | |
if app_target_name not in remote_files: | |
with open(appfile, "rb") as f: | |
logging.info( | |
"Uploading {} to {} in {}".format(appfile, app_target_name, bucket_name) | |
) | |
objectstore.put_object(namespace, bucket_name, app_target_name, f) | |
else: | |
logging.info("{} is already uploaded".format(app_target_name)) | |
if archive_file is not None: | |
if archive_target_name not in remote_files: | |
with open(archive_file, "rb") as f: | |
logging.info( | |
"Uploading {} to {} in {}".format( | |
archive_file, archive_target_name, bucket_name | |
) | |
) | |
objectstore.put_object(namespace, bucket_name, archive_target_name, f) | |
else: | |
logging.info("{} is already uploaded".format(archive_target_name)) | |
return app_target_uri, archive_target_uri, suffix | |
def discover_parameter_names(appfile, application_type, commandline): | |
if application_type == "SQL": | |
# Identify all parameters in the SQL script. | |
with open(appfile) as fd: | |
sql_text = fd.read() | |
parameter_names = list(set(re.findall("\$\{([^\}]+)\}", sql_text))) | |
else: | |
# Look for parameters in the command line. | |
if commandline is not None: | |
parameter_names = list(set(re.findall("\$\{([^\}]+)\}", commandline))) | |
else: | |
parameter_names = [] | |
return parameter_names | |
def get_file_choice(path, type, failfast): | |
files = glob.glob(path) | |
if len(files) > 1: | |
if failfast: | |
raise Exception( | |
"Exiting because multiple choices exist and --failfast is set" | |
) | |
else: | |
from pick import pick | |
sizes = [os.stat(x).st_size for x in files] | |
title = f"Multiple files found, pick the {type} file: " | |
options = [ | |
"{} ({} bytes)".format(os.path.basename(name), size) | |
for name, size in zip(files, sizes) | |
] | |
option, index = pick(options, title) | |
return files[index] | |
if len(files) == 1: | |
return files[0] | |
return None | |
def checksig(file, md5, archive=False): | |
suffix = file.split(".")[-1] | |
if archive: | |
return ".".join( | |
[os.path.basename(file)[: -len(suffix) - 1], "dependencies", md5, "zip"] | |
) | |
else: | |
return ".".join([os.path.basename(file)[: -len(suffix) - 1], md5, suffix]) | |
def file_md5(filename): | |
with open(filename, "rb") as f: | |
md5 = hashlib.md5() | |
block_size = 8192 | |
while True: | |
data = f.read(block_size) | |
if not data: | |
break | |
md5.update(data) | |
return md5.digest().hex() | |
def pick_compartment(config): | |
from pick import pick | |
identity = oci.identity.IdentityClient(config) | |
tenancy = identity.get_tenancy(config["tenancy"]).data | |
compartments = oci.pagination.list_call_get_all_results( | |
identity.list_compartments, tenancy.id, compartment_id_in_subtree=True | |
).data | |
all_compartments = [(x.name, x.id) for x in compartments] | |
title = "Choose a compartment: " | |
options = [x[0] for x in all_compartments] | |
option, index = pick(options, title) | |
compartment_id = all_compartments[index][1] | |
return compartment_id | |
def pick_metastore(config, compartment_id): | |
from pick import pick | |
catalog = oci.data_catalog.DataCatalogClient(config) | |
metastores = oci.pagination.list_call_get_all_results( | |
catalog.list_metastores, compartment_id | |
).data | |
all_metastores = [("No Metastore", None)] + [ | |
(x.display_name, x.id) for x in metastores | |
] | |
title = "Choose a metastore: " | |
options = [x[0] for x in all_metastores] | |
option, index = pick(options, title) | |
metastore_id = all_metastores[index][1] | |
return metastore_id | |
def pick_bucket(config, compartment_id): | |
from pick import pick | |
client = oci.object_storage.ObjectStorageClient(config) | |
namespace = client.get_namespace().data | |
buckets = oci.pagination.list_call_get_all_results( | |
client.list_buckets, namespace, compartment_id | |
).data | |
all_buckets = [x.name for x in buckets] | |
if len(all_buckets) == 0: | |
raise Exception( | |
"No storage buckets in this region! Create a bucket and try again." | |
) | |
title = "Choose a bucket: " | |
option, index = pick(all_buckets, title) | |
return all_buckets[index] | |
def main(): | |
# Set up logging. | |
format = "%(message)s" | |
logging.basicConfig(format=format, level=logging.INFO) | |
# Parse arguments. | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"--appdir", | |
help="Directory containing your application and optional dependency archive.", | |
) | |
parser.add_argument( | |
"--appfile", | |
help="Your Spark application", | |
) | |
parser.add_argument( | |
"--apptype", | |
help="Type of Spark application (e.g. SCALA)", | |
) | |
parser.add_argument( | |
"--archive-file", | |
help="Your Spark archive file", | |
) | |
parser.add_argument("--bucket-name", help="Target bucket name.") | |
parser.add_argument("--classname", help="Java/Scala class name.") | |
parser.add_argument("--commandline", help="Spark application command line.") | |
parser.add_argument("--compartment-id", help="Target Compartment ID.") | |
parser.add_argument( | |
"--configuration", | |
help="Spark application parameters. Can be provided multiple times.", | |
action="append", | |
) | |
parser.add_argument( | |
"--displayname", required=True, help="Data Flow Application name." | |
) | |
parser.add_argument( | |
"--driver-shape", default="VM.Standard2.1", help="Driver shape." | |
) | |
parser.add_argument( | |
"--executor-shape", default="VM.Standard2.1", help="Executor shape." | |
) | |
parser.add_argument( | |
"--failfast", | |
action="store_true", | |
help="Fail fast if any required info is missing.", | |
) | |
parser.add_argument("--metastore-id", help="Metastore OCID.") | |
parser.add_argument( | |
"--no-metastore", | |
default=False, | |
help="Do not configure a metastore.", | |
action="store_true", | |
) | |
parser.add_argument( | |
"--num-executors", type=int, default=1, help="Number of Spark executors." | |
) | |
parser.add_argument( | |
"--parameter-value", | |
action="append", | |
help="Parameter default in the form of key=value. Cam be provided multiple times.", | |
) | |
parser.add_argument( | |
"--profile-location", | |
default=oci.config.DEFAULT_LOCATION, | |
help="OCI Profile Location.", | |
) | |
parser.add_argument( | |
"--profile-name", default=oci.config.DEFAULT_PROFILE, help="OCI Profile Name." | |
) | |
parser.add_argument("--region") | |
parser.add_argument("--spark-version", default="3.0.2", help="Spark version.") | |
args = parser.parse_args() | |
config = oci.config.from_file( | |
file_location=args.profile_location, profile_name=args.profile_name | |
) | |
if args.region: | |
config["region"] = args.region | |
# Pull out any parameter values if given. | |
parameter_values = dict() | |
if args.parameter_value is not None: | |
for item in args.parameter_value: | |
key, value = item.split("=") | |
parameter_values[key] = value | |
# Spark properties | |
configuration = dict() | |
if args.configuration is not None: | |
for item in args.configuration: | |
key, value = item.split("=") | |
configuration[key] = value | |
extension_type_map = ( | |
("sql", "SQL", True), | |
("py", "PYTHON", True), | |
("jar", "JAVA", False), | |
) | |
if args.appfile is not None: | |
# Set the application type. | |
for extension, type, flat in extension_type_map: | |
if args.appfile.endswith(extension): | |
application_type = type | |
break | |
# Allow overrides. | |
if args.apptype is not None: | |
application_type = args.apptype | |
# Locate a Python or SQL application file. | |
if args.appfile is None: | |
if args.appdir is None: | |
raise Exception("You must set either --appfile or --appdir") | |
for extension, type, flat in extension_type_map: | |
if not flat: | |
continue | |
application_path = os.path.join(args.appdir, f"*.{extension}") | |
args.appfile = get_file_choice( | |
application_path, "application", args.failfast | |
) | |
if args.appfile is not None: | |
application_type = type | |
break | |
# Handle Java compiled by Maven. | |
if args.appfile is None: | |
target_path = os.path.join(args.appdir, "target", "*.jar") | |
args.appfile = get_file_choice(target_path, "application", args.failfast) | |
if args.appfile is not None: | |
application_type = "JAVA" | |
# Error out if we haven't found anything. | |
if args.appfile is None: | |
raise Exception("Could not find an application file in " + args.appdir) | |
# Ensure Java apps have a class set. | |
if application_type in ("JAVA", "SCALA") and args.classname is None: | |
raise Exception("--classname must be set for Java applications") | |
# Look for an archive file. | |
if args.archive_file is None and args.appdir is not None: | |
archive_path = os.path.join(args.appdir, "*.zip") | |
args.archive_file = get_file_choice(archive_path, "archive", args.failfast) | |
# Initialize our clients. | |
client = oci.data_flow.DataFlowClient(config) | |
objectstore = oci.object_storage.ObjectStorageClient(config) | |
# Interactively pick names as needed. | |
if args.compartment_id is None: | |
if args.failfast: | |
raise Exception( | |
"Exiting because Compartment ID is not set and --failfast is set" | |
) | |
args.compartment_id = pick_compartment(config) | |
logging.info("Using compartment ID " + args.compartment_id) | |
if args.bucket_name is None: | |
if args.failfast: | |
raise Exception( | |
"Exiting because bucket name is not set and --failfast is set" | |
) | |
args.bucket_name = pick_bucket(config, args.compartment_id) | |
logging.info("Using bucket name " + args.bucket_name) | |
if args.no_metastore: | |
args.metastore_id = None | |
else: | |
if args.metastore_id is None: | |
args.metastore_id = pick_metastore(config, args.compartment_id) | |
if args.metastore_id is not None: | |
logging.info("Using metastore ID " + args.metastore_id) | |
# Identify parameter names as automatically as possible. | |
parameter_names = sorted( | |
discover_parameter_names(args.appfile, application_type, args.commandline) | |
) | |
# Make sure we have all parameters set to something. | |
for parameter in parameter_names: | |
if parameter not in parameter_values: | |
if args.failfast: | |
raise Exception( | |
"Exiting because parameter {} is not set and --failfast is set".format( | |
parameter | |
) | |
) | |
result = input( | |
"Enter the default value for parameter [{}]: ".format(parameter) | |
) | |
parameter_values[parameter] = result | |
# Upload all files. | |
app_target_uri, archive_target_uri, suffix = upload_all_files( | |
args.appfile, args.archive_file, args.bucket_name, objectstore | |
) | |
# Create the Data Flow Application | |
parameters = [ | |
oci.data_flow.models.ApplicationParameter(name=x, value=parameter_values[x]) | |
for x in parameter_names | |
] | |
if application_type == "SQL": | |
arguments = ["${" + x + "}" for x in parameter_names] | |
else: | |
if args.commandline is None: | |
arguments = [] | |
else: | |
arguments = args.commandline.split() | |
display_name = "{} ({})".format(args.displayname, suffix) | |
details = dict( | |
arguments=arguments, | |
compartment_id=args.compartment_id, | |
configuration=configuration, | |
display_name=display_name, | |
driver_shape=args.driver_shape, | |
executor_shape=args.executor_shape, | |
file_uri=app_target_uri, | |
language=application_type, | |
num_executors=args.num_executors, | |
parameters=parameters, | |
spark_version=args.spark_version, | |
) | |
if application_type in ("JAVA", "SCALA"): | |
details["class_name"] = args.classname | |
if args.archive_file is not None: | |
details["archive_uri"] = archive_target_uri | |
if args.metastore_id is not None: | |
details["metastore_id"] = args.metastore_id | |
create_application_details = oci.data_flow.models.CreateApplicationDetails( | |
**details | |
) | |
logging.info("Creating the Data Flow Application " + display_name) | |
application = client.create_application( | |
create_application_details=create_application_details | |
) | |
if application.status != 200: | |
logging.error("Failed to create Data Flow Application") | |
logging.error(application.data) | |
sys.exit(1) | |
else: | |
matching_template = "tag.oci_dataflow.applicationId.value='{}'" | |
policy_templates = [ | |
# {request.principal.id = 'XXX'} | |
"allow any-user to manage object-family in compartment compartment1 where ALL {{ {} }}", | |
"allow any-user to manage object-family in compartment compartment1 where ALL {{ {}, target.bucket.name = '<bucket>', target.object.name = /sales.db*/ }}", | |
"allow any-user to manage streams in compartment compartment1 where ALL {{ {}, target.stream.id='<stream_ocid>' }}", | |
] | |
condition = matching_template.format(application.data.id) | |
logging.info( | |
"Match all Runs of this Application in an IAM Policy using this condition: [{}]".format( | |
condition | |
) | |
) | |
logging.info("Example Policies") | |
for template in policy_templates: | |
policy = template.format(condition) | |
logging.info(" " + policy) | |
print(application.data.id) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment