Skip to content

Instantly share code, notes, and snippets.

@kozikow
Created January 31, 2017 08:17
Show Gist options
  • Save kozikow/86ee26998c5de1cf3780b6bc0cd35ec0 to your computer and use it in GitHub Desktop.
Save kozikow/86ee26998c5de1cf3780b6bc0cd35ec0 to your computer and use it in GitHub Desktop.
Submiting job to kubernetes
import os
import re
import kubernetes
import logging
import math
from kubernetes.client import V1Container
from kubernetes.client import V1EnvVar
from kubernetes.client import V1Job
from kubernetes.client import V1JobSpec
from kubernetes.client import V1ObjectMeta
from kubernetes.client import V1PodSpec
from kubernetes.client import V1PodTemplateSpec
from kubernetes.client import V1ResourceRequirements
_KUBERNETES_NAMESPACE = "echoserver"
_DEADLINE_SECONDS = 60 * 60 * 12 # 12h
_BYTES_PER_TILE = 500000 # 500 KB
_BASE_MEM_BYTES = 3000 * 1024 * 1024 # 3000 MiB
def _plan_to_job_name(plan_id):
clean_plan = plan_id.replace("_", "-").lower()
clean_plan = re.sub("[^-a-z0-9]+", "-", clean_plan)
return "worker-" + clean_plan
def _tiles_to_memory_limit(tiles):
bytes_req = _BASE_MEM_BYTES + _BYTES_PER_TILE * len(tiles)
memory_req = str(int(math.ceil(bytes_req / (1024 * 1024)))) + "Mi"
logging.info(
"Memory requirement",
extra={
"tiles_len": len(tiles),
"memory_req_bytes": bytes_req,
"memory_req": memory_req
})
return memory_req
class KubernetesTfApi(object):
def __init__(self):
logging.info("Initializing kubernetes api")
kubernetes.config.load_incluster_config()
self._batch_api = kubernetes.client.apis.batch_v1_api.BatchV1Api()
self._core_api = kubernetes.client.CoreV1Api()
def clean_old_job(self, plan_id):
label_selector = "plan=={}".format(plan_id)
self._batch_api.delete_collection_namespaced_job(
namespace=_KUBERNETES_NAMESPACE,
label_selector=label_selector,
)
self._core_api.delete_collection_namespaced_pod(
namespace=_KUBERNETES_NAMESPACE,
label_selector=label_selector
)
def request_plan_processing(self, plan_id, tiles):
job_name = _plan_to_job_name(plan_id)
self.clean_old_job(plan_id)
object_meta = V1ObjectMeta(name=job_name, labels={"plan": plan_id})
memory_usage = {"memory": _tiles_to_memory_limit(tiles)}
pod_spec = V1PodSpec(
containers=[
V1Container(
args=["python2", "/workspace/worker/main.py", plan_id],
env=[V1EnvVar(name="SERVER_STAGE",
value=os.environ["SERVER_STAGE"]),
V1EnvVar(name="OPENBLAS_NUM_THREADS",
value="2")],
image_pull_policy="IfNotPresent",
image="gcr.io/tensorflight/worker:27.0.0",
name=job_name,
resources=V1ResourceRequirements(
limits=memory_usage,
requests=memory_usage
)
)
],
restart_policy="OnFailure",
active_deadline_seconds=_DEADLINE_SECONDS
)
job_spec = V1JobSpec(
active_deadline_seconds=_DEADLINE_SECONDS,
completions=1,
parallelism=1,
template=V1PodTemplateSpec(
spec=pod_spec,
metadata=object_meta
),
)
self._batch_api.create_namespaced_job(namespace=_KUBERNETES_NAMESPACE,
body=V1Job(spec=job_spec,
metadata=object_meta))
@sassan72
Copy link

Hi. I have a python script bh1.py. i am going to run it on cluster. but I don't know how to run it with your code. my python script has been copied into /home/sassan/ directory in the cluster.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment