Skip to content

Instantly share code, notes, and snippets.

@cn007b
Last active August 28, 2020 11:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cn007b/64d21893ebebafa977e02baec98dce9d to your computer and use it in GitHub Desktop.
Save cn007b/64d21893ebebafa977e02baec98dce9d to your computer and use it in GitHub Desktop.
Kubeflow simple example.

Kubeflow simple example

Simple regression example:

import numpy as np
import tensorflow as tf


np.random.seed(101)
tf.set_random_seed(101)

x = np.linspace(0, 50, 50)
y = np.linspace(0, 50, 50)
x += np.random.uniform(-4, 4, 50)
y += np.random.uniform(-4, 4, 50)
n = len(x)

X = tf.placeholder('float')
Y = tf.placeholder('float')
W = tf.Variable(np.random.randn(), name = 'W')
b = tf.Variable(np.random.randn(), name = 'b')
learning_rate = 0.01
training_epochs = 1000

y_pred = tf.add(tf.multiply(X, W), b)
cost = tf.reduce_sum(tf.pow(y_pred-Y, 2)) / (2 * n)
optimizer = tf.train.GradientDescentOptimizer(learning_rate).minimize(cost)
init = tf.global_variables_initializer()

with tf.Session() as sess:
  file_writer = tf.summary.FileWriter('/tmp/tf_logs', sess.graph)
  sess.run(init)
  for epoch in range(training_epochs):
    for (_x, _y) in zip(x, y):
      sess.run(optimizer, feed_dict = {X : _x, Y : _y})
    if (epoch + 1) % 50 == 0:
      c = sess.run(cost, feed_dict = {X : x, Y : y})
      print('Epoch', (epoch + 1), ': cost =', c, 'W =', sess.run(W), 'b =', sess.run(b))
  training_cost = sess.run(cost, feed_dict ={X: x, Y: y})
  weight = sess.run(W)
  bias = sess.run(b)

predictions = weight * x + bias
print('Training cost =', training_cost, 'Weight =', weight, 'bias =', bias, '\n')

Dockerfile:

FROM cn007b/python:3.8

RUN pip3 install numpy matplotlib tensorflow tensorboard

RUN mkdir -p /app
ADD https://raw.githubusercontent.com/cn007b/my/master/ed/ai/ml/tensorflow/examples/regression.py \
  /app/regression.py

Kubeflow pipeline

import kfp
import kfp.dsl as dsl


@dsl.pipeline(name='regression pipeline')
def r_pipeline():
  c1 = dsl.ContainerOp(
    name='Run',
    image='cn007b/pi:ai.tf',
    command=['sh', '-c'],
    arguments=['python3 /app/regression.py'],
  )
  c1.container.set_image_pull_policy('Always')


pipeline = kfp.Client().create_run_from_pipeline_func(r_pipeline, arguments={})

Kubeflow pipeline extended

import kfp
import kfp.dsl as dsl
from kubernetes.client.models import V1EnvVar


@dsl.pipeline(name='regression pipeline')
def r_pipeline():
  c1 = dsl.ContainerOp(
    name='Run',
    image='cn007b/pi:ai.tf',
    command=['sh', '-c'],
    arguments=['python3 /app/regression.py && echo $STATUS > /tmp/status.txt'],
    file_outputs={'status_file': '/tmp/status.txt'}
  )
  c1.container.set_image_pull_policy('Always')
  c1.add_env_variable(V1EnvVar(name='STATUS', value='completed'))

  c2 = dsl.ContainerOp(
    name='Read file',
    image='cn007b/pi:ai.tf',
    command=['sh', '-c'],
    arguments=['echo %s ' %c1.output],
  )
  c2.after(c1)

  c3 = dsl.ContainerOp(
    name='Render UI',
    image='cn007b/pi:ai.tf',
    command=['sh', '-c'],
    arguments=['echo \'{"outputs":[{"type": "markdown", "storage": "inline", "source": "*Status*: `completed`"}]}\' > /m.json'],
    output_artifact_paths={'mlpipeline-ui-metadata': '/m.json'},
  )
  c3.after(c2)


pipeline = kfp.Client().create_run_from_pipeline_func(r_pipeline, arguments={})
kfp.compiler.Compiler().compile(r_pipeline, 'r_pipeline.zip')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment