Skip to content

Instantly share code, notes, and snippets.

@cartershanklin
Last active March 6, 2024 12:15
Show Gist options
  • Save cartershanklin/ce5df1881c30d29d808ef15a4d4aa947 to your computer and use it in GitHub Desktop.
Save cartershanklin/ce5df1881c30d29d808ef15a4d4aa947 to your computer and use it in GitHub Desktop.
OCI Data Flow Tutorial Example 1 using the Python SDK
#!/usr/bin/env python
# This script executes Example 1 of the OCI Data Flow Tutorial
# https://docs.cloud.oracle.com/en-us/iaas/data-flow/data-flow-tutorial/tutorial/dfs_tut_etl_java.htm#etl_with_java
import argparse
import oci
import sys
import time
def main():
parser = argparse.ArgumentParser()
required = parser.add_argument_group("required arguments")
required.add_argument("--compartment-id", help="Target compartment", required=True)
required.add_argument("--output-path", help="Output ETL destination", required=True)
args = parser.parse_args()
# Initialize our client.
config = oci.config.from_file()
client = oci.data_flow.DataFlowClient(config)
# Create a new Data Flow Application
input_parameter = oci.data_flow.models.ApplicationParameter(
name="input",
value="oci://oow_2019_dataflow_lab@bigdatadatasciencelarge/usercontent/kaggle_berlin_airbnb_listings_summary.csv",
)
output_parameter = oci.data_flow.models.ApplicationParameter(
name="output", value=args.output_path
)
create_application_details = oci.data_flow.models.CreateApplicationDetails(
compartment_id=args.compartment_id,
display_name="Data Flow Tutorial App 1 CLI",
driver_shape="VM.Standard2.1",
executor_shape="VM.Standard2.1",
num_executors=1,
spark_version="2.4.4",
file_uri="oci://oow_2019_dataflow_lab@bigdatadatasciencelarge/usercontent/oow-lab-2019-java-etl-1.0-SNAPSHOT.jar",
language="JAVA",
class_name="convert.Convert",
arguments=["${input}", "${output}"],
parameters=[input_parameter, output_parameter],
)
print("Creating the Data Flow Application")
application = client.create_application(
create_application_details=create_application_details
)
if application.status != 200:
print("Failed to create Data Flow Application")
print(application.data)
sys.exit(1)
else:
print("Data Flow Application ID is " + application.data.id)
# Create a Run from this Application
print("Creating the Data Flow Run")
create_run_details = oci.data_flow.models.CreateRunDetails(
compartment_id=args.compartment_id,
application_id=application.data.id,
display_name="Data Flow Tutorial App 1 CLI",
)
run = client.create_run(create_run_details=create_run_details)
if run.status != 200:
print("Failed to create Data Flow Run")
print(run.data)
sys.exit(1)
else:
print("Data Flow Run ID is " + run.data.id)
# Wait for it to complete.
probe_count = 0
while probe_count < 100:
print("Waiting for the Run to finish")
time.sleep(15)
probe_count += 1
probe = client.get_run(run_id=run.data.id)
if probe.status != 200:
print("Failed to load Run information")
print(probe.data)
sys.exit(1)
print("Run is in state " + probe.data.lifecycle_state)
if (
probe.data.lifecycle_state == "ACCEPTED"
or probe.data.lifecycle_state == "IN_PROGRESS"
):
print("Run is in progress, waiting")
elif probe.data.lifecycle_state == "FAILED":
print("Run failed, more information:")
print(probe.data.lifecycle_details)
sys.exit(1)
elif probe.data.lifecycle_state == "SUCCEEDED":
print("Run is finished.")
break
else:
print("Unexpected state {}, stopping".format(probe.data.lifecycle_state))
sys.exit(1)
if probe_count >= 100:
print("Run is taking too long, giving up.")
sys.exit(1)
# Print the output.
print("Output of the Data Flow Run follows:")
log = client.get_run_log(run_id=run.data.id, name="spark_application_stdout.log.gz")
print(log.data.text)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment