Last active
December 16, 2022 18:54
-
-
Save cartershanklin/4b778452a92a01681a65af33ceb528e6 to your computer and use it in GitHub Desktop.
Call OCI Data Flow from Oracle Functions
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io | |
import json | |
import logging | |
import oci | |
import os | |
import requests | |
import sys | |
from fdk import response | |
from oci.signer import Signer | |
# Follow the Oracle Functions setup instructions to deploy this as a | |
# function called "scheduler". | |
# Sample invocation from fn: | |
# echo '{"region": "us-phoenix-1", | |
# "applicationId": "70614718-4b2d-4dc6-a048-32a28d7141be", | |
# "compartmentId": "ocid1.compartment.oc1..aaaaaaaa4q7ere32j5gygc7fevc6n6qcprwa5y57eo42myhg4z5wuual3yqa", | |
# "driverShape": "VM.Standard2.1", | |
# "executorShape": "VM.Standard2.1", | |
# "numExecutors": "1", | |
# "displayName": "Invoked from FN", | |
# "parameters": { | |
# "reporting_airline": "DL", | |
# "location": "oci://oow_2019_session_demo@bigdatadatasciencelarge/bts_ontime_output" | |
# }}' | fn invoke scheduler scheduler | |
def handler(ctx, data: io.BytesIO=None): | |
# Really doesn't belong here, unsure how to enable logging remotely otherwise. | |
logging.basicConfig(level=logging.INFO) | |
retval = dict() | |
# For extra debugging, uncomment this. Populate the initial return value. | |
# retval = dict(os.environ) | |
# Get an appropriate signer, automatically. | |
config_file = os.path.join(os.path.expanduser('~'), '.oci', 'config') | |
if os.path.exists(config_file): | |
logging.info('Loading private key from config file') | |
config = oci.config.from_file(config_file, 'DEFAULT') | |
signer = Signer( | |
tenancy=config['tenancy'], | |
user=config['user'], | |
fingerprint=config['fingerprint'], | |
private_key_file_location=config['key_file'], | |
pass_phrase=config['pass_phrase'] | |
) | |
retval['style'] = 'local' | |
elif 'OCI_RESOURCE_PRINCIPAL_PRIVATE_PEM' in os.environ: | |
logging.info('Using resource principal for private key') | |
signer = oci.auth.signers.get_resource_principals_signer() | |
with open(os.environ['OCI_RESOURCE_PRINCIPAL_PRIVATE_PEM']) as fd: | |
retval['pem'] = fd.read() | |
retval['style'] = 'resource' | |
else: | |
logging.info('Using instance principal for private key') | |
signer = oci.auth.signers.InstancePrincipalsSecurityTokenSigner() | |
retval['style'] = 'instance' | |
# Ensure we got valid JSON input and all fields accounted for. | |
try: | |
body = json.loads(data.getvalue()) | |
for item in ['applicationId', 'compartmentId', 'driverShape', | |
'executorShape', 'numExecutors', 'displayName', 'region']: | |
if item not in body: | |
retval['error'] = 'Missing mandatory field ' + item | |
return response.Response( | |
ctx, | |
response_data=json.dumps(retval), | |
headers={"Content-Type": "application/json"}) | |
applicationId = body.get('applicationId') | |
compartmentId = body.get('compartmentId') | |
displayName = body.get('displayName') | |
driverShape = body.get('driverShape') | |
executorShape = body.get('executorShape') | |
numExecutors = body.get('numExecutors') | |
region = body.get('region') | |
if 'parameters' not in body: | |
parameters = dict() | |
else: | |
parameters = body.get('parameters') | |
except (Exception, ValueError) as ex: | |
retval['error'] = str(ex) | |
return response.Response( | |
ctx, | |
response_data=json.dumps(retval), | |
headers={"Content-Type": "application/json"}) | |
# Call Data Flow. | |
dataflow_root = 'https://dataflow.{}.oci.oraclecloud.com/20181116'.format(region) | |
dataflow_runs_endpoint = dataflow_root + '/runs' | |
run_payload = dict( | |
compartmentId=compartmentId, | |
applicationId=applicationId, | |
displayName=displayName, | |
applicationSettings=dict( | |
driverShape=driverShape, | |
executorShape=executorShape, | |
numExecutors=numExecutors, | |
parameters=[ | |
dict(name=key, value=value) for key, value in parameters.items() | |
] | |
), | |
) | |
retval['run_payload'] = run_payload | |
try: | |
result = requests.post( | |
dataflow_runs_endpoint, | |
json=run_payload, | |
auth=signer) | |
result_obj = json.loads(result.text) | |
if 'id' not in result_obj: | |
retval['error'] = result.text | |
else: | |
runid = result_obj['id'] | |
retval['runid'] = result_obj['id'] | |
except Exception as ex: | |
retval['error'] = str(ex) | |
return response.Response(ctx, | |
response_data=json.dumps(retval), | |
headers={"Content-Type": "application/json"}) | |
if __name__ == '__main__': | |
from fdk import context | |
ctx = context.InvokeContext(None, None, None) | |
# Read stdin and turn it into BytesIO | |
input = io.BytesIO(sys.stdin.read().encode()) | |
retval = handler(ctx, input) | |
print(retval.body()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment