Skip to content

Instantly share code, notes, and snippets.

@cartershanklin
Last active December 16, 2022 18:54
Show Gist options
  • Save cartershanklin/4b778452a92a01681a65af33ceb528e6 to your computer and use it in GitHub Desktop.
Save cartershanklin/4b778452a92a01681a65af33ceb528e6 to your computer and use it in GitHub Desktop.
Call OCI Data Flow from Oracle Functions
import io
import json
import logging
import oci
import os
import requests
import sys
from fdk import response
from oci.signer import Signer
# Follow the Oracle Functions setup instructions to deploy this as a
# function called "scheduler".
# Sample invocation from fn:
# echo '{"region": "us-phoenix-1",
# "applicationId": "70614718-4b2d-4dc6-a048-32a28d7141be",
# "compartmentId": "ocid1.compartment.oc1..aaaaaaaa4q7ere32j5gygc7fevc6n6qcprwa5y57eo42myhg4z5wuual3yqa",
# "driverShape": "VM.Standard2.1",
# "executorShape": "VM.Standard2.1",
# "numExecutors": "1",
# "displayName": "Invoked from FN",
# "parameters": {
# "reporting_airline": "DL",
# "location": "oci://oow_2019_session_demo@bigdatadatasciencelarge/bts_ontime_output"
# }}' | fn invoke scheduler scheduler
def handler(ctx, data: io.BytesIO=None):
# Really doesn't belong here, unsure how to enable logging remotely otherwise.
logging.basicConfig(level=logging.INFO)
retval = dict()
# For extra debugging, uncomment this. Populate the initial return value.
# retval = dict(os.environ)
# Get an appropriate signer, automatically.
config_file = os.path.join(os.path.expanduser('~'), '.oci', 'config')
if os.path.exists(config_file):
logging.info('Loading private key from config file')
config = oci.config.from_file(config_file, 'DEFAULT')
signer = Signer(
tenancy=config['tenancy'],
user=config['user'],
fingerprint=config['fingerprint'],
private_key_file_location=config['key_file'],
pass_phrase=config['pass_phrase']
)
retval['style'] = 'local'
elif 'OCI_RESOURCE_PRINCIPAL_PRIVATE_PEM' in os.environ:
logging.info('Using resource principal for private key')
signer = oci.auth.signers.get_resource_principals_signer()
with open(os.environ['OCI_RESOURCE_PRINCIPAL_PRIVATE_PEM']) as fd:
retval['pem'] = fd.read()
retval['style'] = 'resource'
else:
logging.info('Using instance principal for private key')
signer = oci.auth.signers.InstancePrincipalsSecurityTokenSigner()
retval['style'] = 'instance'
# Ensure we got valid JSON input and all fields accounted for.
try:
body = json.loads(data.getvalue())
for item in ['applicationId', 'compartmentId', 'driverShape',
'executorShape', 'numExecutors', 'displayName', 'region']:
if item not in body:
retval['error'] = 'Missing mandatory field ' + item
return response.Response(
ctx,
response_data=json.dumps(retval),
headers={"Content-Type": "application/json"})
applicationId = body.get('applicationId')
compartmentId = body.get('compartmentId')
displayName = body.get('displayName')
driverShape = body.get('driverShape')
executorShape = body.get('executorShape')
numExecutors = body.get('numExecutors')
region = body.get('region')
if 'parameters' not in body:
parameters = dict()
else:
parameters = body.get('parameters')
except (Exception, ValueError) as ex:
retval['error'] = str(ex)
return response.Response(
ctx,
response_data=json.dumps(retval),
headers={"Content-Type": "application/json"})
# Call Data Flow.
dataflow_root = 'https://dataflow.{}.oci.oraclecloud.com/20181116'.format(region)
dataflow_runs_endpoint = dataflow_root + '/runs'
run_payload = dict(
compartmentId=compartmentId,
applicationId=applicationId,
displayName=displayName,
applicationSettings=dict(
driverShape=driverShape,
executorShape=executorShape,
numExecutors=numExecutors,
parameters=[
dict(name=key, value=value) for key, value in parameters.items()
]
),
)
retval['run_payload'] = run_payload
try:
result = requests.post(
dataflow_runs_endpoint,
json=run_payload,
auth=signer)
result_obj = json.loads(result.text)
if 'id' not in result_obj:
retval['error'] = result.text
else:
runid = result_obj['id']
retval['runid'] = result_obj['id']
except Exception as ex:
retval['error'] = str(ex)
return response.Response(ctx,
response_data=json.dumps(retval),
headers={"Content-Type": "application/json"})
if __name__ == '__main__':
from fdk import context
ctx = context.InvokeContext(None, None, None)
# Read stdin and turn it into BytesIO
input = io.BytesIO(sys.stdin.read().encode())
retval = handler(ctx, input)
print(retval.body())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment