Last active
March 6, 2024 12:15
-
-
Save cartershanklin/ce5df1881c30d29d808ef15a4d4aa947 to your computer and use it in GitHub Desktop.
OCI Data Flow Tutorial Example 1 using the Python SDK
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# This script executes Example 1 of the OCI Data Flow Tutorial | |
# https://docs.cloud.oracle.com/en-us/iaas/data-flow/data-flow-tutorial/tutorial/dfs_tut_etl_java.htm#etl_with_java | |
import argparse | |
import oci | |
import sys | |
import time | |
def main(): | |
parser = argparse.ArgumentParser() | |
required = parser.add_argument_group("required arguments") | |
required.add_argument("--compartment-id", help="Target compartment", required=True) | |
required.add_argument("--output-path", help="Output ETL destination", required=True) | |
args = parser.parse_args() | |
# Initialize our client. | |
config = oci.config.from_file() | |
client = oci.data_flow.DataFlowClient(config) | |
# Create a new Data Flow Application | |
input_parameter = oci.data_flow.models.ApplicationParameter( | |
name="input", | |
value="oci://oow_2019_dataflow_lab@bigdatadatasciencelarge/usercontent/kaggle_berlin_airbnb_listings_summary.csv", | |
) | |
output_parameter = oci.data_flow.models.ApplicationParameter( | |
name="output", value=args.output_path | |
) | |
create_application_details = oci.data_flow.models.CreateApplicationDetails( | |
compartment_id=args.compartment_id, | |
display_name="Data Flow Tutorial App 1 CLI", | |
driver_shape="VM.Standard2.1", | |
executor_shape="VM.Standard2.1", | |
num_executors=1, | |
spark_version="2.4.4", | |
file_uri="oci://oow_2019_dataflow_lab@bigdatadatasciencelarge/usercontent/oow-lab-2019-java-etl-1.0-SNAPSHOT.jar", | |
language="JAVA", | |
class_name="convert.Convert", | |
arguments=["${input}", "${output}"], | |
parameters=[input_parameter, output_parameter], | |
) | |
print("Creating the Data Flow Application") | |
application = client.create_application( | |
create_application_details=create_application_details | |
) | |
if application.status != 200: | |
print("Failed to create Data Flow Application") | |
print(application.data) | |
sys.exit(1) | |
else: | |
print("Data Flow Application ID is " + application.data.id) | |
# Create a Run from this Application | |
print("Creating the Data Flow Run") | |
create_run_details = oci.data_flow.models.CreateRunDetails( | |
compartment_id=args.compartment_id, | |
application_id=application.data.id, | |
display_name="Data Flow Tutorial App 1 CLI", | |
) | |
run = client.create_run(create_run_details=create_run_details) | |
if run.status != 200: | |
print("Failed to create Data Flow Run") | |
print(run.data) | |
sys.exit(1) | |
else: | |
print("Data Flow Run ID is " + run.data.id) | |
# Wait for it to complete. | |
probe_count = 0 | |
while probe_count < 100: | |
print("Waiting for the Run to finish") | |
time.sleep(15) | |
probe_count += 1 | |
probe = client.get_run(run_id=run.data.id) | |
if probe.status != 200: | |
print("Failed to load Run information") | |
print(probe.data) | |
sys.exit(1) | |
print("Run is in state " + probe.data.lifecycle_state) | |
if ( | |
probe.data.lifecycle_state == "ACCEPTED" | |
or probe.data.lifecycle_state == "IN_PROGRESS" | |
): | |
print("Run is in progress, waiting") | |
elif probe.data.lifecycle_state == "FAILED": | |
print("Run failed, more information:") | |
print(probe.data.lifecycle_details) | |
sys.exit(1) | |
elif probe.data.lifecycle_state == "SUCCEEDED": | |
print("Run is finished.") | |
break | |
else: | |
print("Unexpected state {}, stopping".format(probe.data.lifecycle_state)) | |
sys.exit(1) | |
if probe_count >= 100: | |
print("Run is taking too long, giving up.") | |
sys.exit(1) | |
# Print the output. | |
print("Output of the Data Flow Run follows:") | |
log = client.get_run_log(run_id=run.data.id, name="spark_application_stdout.log.gz") | |
print(log.data.text) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment