Skip to content

Instantly share code, notes, and snippets.

@nbren12
Created February 5, 2020 23:59
Show Gist options
  • Save nbren12/7196d56f782947d454cbdd676f1ea8de to your computer and use it in GitHub Desktop.
Save nbren12/7196d56f782947d454cbdd676f1ea8de to your computer and use it in GitHub Desktop.
# Copyright 2016 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# updated by Noah Brenowitz, Vulcan inc
# demonstrates how to wait for a job to complete.
# using the k8s api
"""
Creates, updates, and deletes a job object.
"""
from os import path
from time import sleep
import yaml
from kubernetes import client, config
JOB_NAME = "sleeper"
def create_job_object():
# Configureate Pod template container
container = client.V1Container(
name="pi",
image="alpine",
command=["sleep", "5"])
# Create and configurate a spec section
template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={"app": "pi"}),
spec=client.V1PodSpec(restart_policy="Never", containers=[container]))
# Create the specification of deployment
spec = client.V1JobSpec(
template=template,
backoff_limit=4)
# Instantiate the job object
job = client.V1Job(
api_version="batch/v1",
kind="Job",
metadata=client.V1ObjectMeta(name=JOB_NAME),
spec=spec)
return job
def create_job(api_instance, job):
api_response = api_instance.create_namespaced_job(
body=job,
namespace="default")
print("Job created. status='%s'" % str(api_response.status))
def delete_job(api_instance):
api_response = api_instance.delete_namespaced_job(
name=JOB_NAME,
namespace="default",
body=client.V1DeleteOptions(
propagation_policy='Foreground',
grace_period_seconds=5))
print("Job deleted. status='%s'" % str(api_response.status))
def job_is_succesful(api, name):
job_info = batch_v1.read_namespaced_job(JOB_NAME, namespace='default')
return job_info['status']['succeeded']
def main():
# Configs can be set in Configuration class directly or using helper
# utility. If no argument provided, the config will be loaded from
# default location.
config.load_kube_config()
batch_v1 = client.BatchV1Api()
# Create a job object with client-python API. The job we
# created is same as the `pi-job.yaml` in the /examples folder.
job = create_job_object()
create_job(batch_v1, job)
while True:
job_info = batch_v1.read_namespaced_job(JOB_NAME, namespace='default')
print(job_info.status)
sleep(1)
if job_info.status.failed == 1:
print("Job failed")
break
elif not job_info.status.active == 1:
print("Job completed")
break
delete_job(batch_v1)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment