Skip to content

Instantly share code, notes, and snippets.

@even-wei
Last active January 6, 2022 00:27
Show Gist options
  • Save even-wei/32a85d998bc13343c06f9000b66807a9 to your computer and use it in GitHub Desktop.
Save even-wei/32a85d998bc13343c06f9000b66807a9 to your computer and use it in GitHub Desktop.
A demo for integrating PrimeHub and Airflow
import logging as log
import datetime
from datetime import timedelta
import requests
import numpy as np
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
from airflow.models import Variable
# Operators; we need this to operate!
from airflow.operators.python import PythonOperator
# PrimeHub SDK
from primehub import PrimeHub, PrimeHubConfig
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email": ["airflow@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
"retry_delay": timedelta(minutes=5),
}
with DAG(
"primehub_tutorial",
default_args=default_args,
description="A demo of Airflow and PrimeHub Python SDK",
schedule_interval=None,
start_date=datetime.datetime.today(),
tags=["primehub"],
) as dag:
def init_primehub_sdk():
ph = PrimeHub(PrimeHubConfig())
ph.config.set_endpoint(Variable.get("PRIMEHUB_API_ENDPOINT"))
ph.config.set_token(Variable.get("PRIMEHUB_API_TOKEN"))
ph.config.set_group(Variable.get("PRIMEHUB_GROUP"))
if ph.is_ready():
log.info("PrimeHub Python SDK setup successfully")
else:
log.info("PrimeHub Python SDK couldn't get the group information, follow the 00-getting-started.ipynb to complete it")
return ph
def submit_primehub_job(config):
ph = init_primehub_sdk()
log.info("[ Submit ]")
short_job = ph.jobs.submit(config)
log.info(short_job)
log.info("[ Waiting ]")
ph.jobs.wait(short_job["id"])
log.info("[ Job Done ]")
def update_script():
config = {
"instanceType": "cpu-1",
"image": "base-notebook",
"displayName": "update_script",
"command": "mkdir -p /home/jovyan/phusers/airflow-demo && cd /home/jovyan/phusers/airflow-demo && wget https://raw.githubusercontent.com/InfuseAI/primehub/master/examples/notebook-examples/data/tensorflow2/0_quickstart/0_tensorflow2_training.ipynb"
}
submit_primehub_job(config)
def train_model():
config = {
"instanceType": "cpu-1",
"image": "tf-2",
"displayName": "training",
"command": "cd /home/jovyan/phusers/airflow-demo && papermill 0_tensorflow2_training.ipynb 0_tensorflow2_training_output.ipynb"
}
submit_primehub_job(config)
deploy_id = "deploy-from-airflow-xyz"
deploy_name = "deploy-from-airflow"
def deploy_model():
ph = init_primehub_sdk()
latest_model = sorted([x['name'] for x in ph.files.list('/example-models')])[-1]
log.info(f"[ Deploy ] {latest_model}")
deployments = ph.deployments.list()
exist = any([1 for d in deployments if d['name'] == deploy_name])
if not exist:
config = {
"id": deploy_id,
"name": deploy_name,
"modelImage": "infuseai/tensorflow2-prepackaged:v0.2.0",
"modelURI": f"phfs:///example-models/{latest_model}",
"instanceType": "cpu-1",
"replicas": 1
}
ph.deployments.create(config)
else:
ph.deployments.stop(deploy_id)
config = {
"modelURI": f"phfs:///example-models/{latest_model}",
}
ph.deployments.update(deploy_id, config)
log.info("[ Wait Deployment ]")
ph.deployments.wait(deploy_id)
log.info("[ Deploy Done ]")
def verify_deployment():
ph = init_primehub_sdk()
endpoint = ph.deployments.get(deploy_id)["endpoint"]
headers = {'Content-Type': 'application/json'}
data = '{ "data": {"ndarray} }'
response = requests.post(endpoint, headers=headers, data=data)
result = np.argmax(response.json()["data"]["ndarray"][0])
if result != 7:
log.info("Wrong prediction")
# [START main_flow]
t_update = PythonOperator(
task_id="update",
python_callable=update_script,
)
t_train = PythonOperator(
task_id="train",
python_callable=train_model,
)
t_deploy = PythonOperator(
task_id="deploy",
python_callable=deploy_model,
)
t_verify = PythonOperator(
task_id="verify",
python_callable=verify_deployment,
)
t_update >> t_train
t_train >> t_deploy
t_deploy >> t_verify
# [END main_flow]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment