Skip to content

Instantly share code, notes, and snippets.

@EngHabu
Created July 9, 2021 16:28
Show Gist options
  • Save EngHabu/951a4d8401eebeba2e2cdf33d69a98ee to your computer and use it in GitHub Desktop.
Save EngHabu/951a4d8401eebeba2e2cdf33d69a98ee to your computer and use it in GitHub Desktop.
Example of mounting a shared volume
import os
import time
from typing import List
from flytekit import task, workflow
from flytekitplugins.pod import Pod
from kubernetes.client.models import (
V1Container,
V1EmptyDirVolumeSource,
V1PodSpec,
V1ResourceRequirements,
V1Volume,
V1VolumeMount,
)
_AWS_CREDS_PATH = "~/.aws/credentials"
def generate_pod_spec_for_task():
# Primary containers do not require us to specify an image, the default image built for flyte tasks will get used.
primary_container = V1Container(name="primary")
# Note: for non-primary containers we must specify an image.
sidecar_container = V1Container(
name="secondary",
image="alpine",
)
sidecar_container.command = ["/bin/sh"]
sidecar_container.args = [
"-c",
"echo hi pod world > {}".format(_AWS_CREDS_PATH),
]
resources = V1ResourceRequirements(
requests={"cpu": "1", "memory": "100Mi"}, limits={"cpu": "1", "memory": "100Mi"}
)
primary_container.resources = resources
sidecar_container.resources = resources
shared_volume_mount = V1VolumeMount(
name="aws-creds",
mount_path="~/.aws/",
)
sidecar_container.volume_mounts = [shared_volume_mount]
primary_container.volume_mounts = [shared_volume_mount]
pod_spec = V1PodSpec(
init_containers=[sidecar_container],
containers=[primary_container],
volumes=[
V1Volume(
name="aws-creds", empty_dir=V1EmptyDirVolumeSource(medium="Memory")
)
],
)
return pod_spec
@task(
task_config=Pod(
pod_spec=generate_pod_spec_for_task(), primary_container_name="primary"
),
)
def my_pod_task() -> str:
# The code defined in this task will get injected into the primary container.
while not os.path.isfile(_AWS_CREDS_PATH):
time.sleep(5)
with open(_AWS_CREDS_PATH, "r") as shared_message_file:
return shared_message_file.read()
@workflow
def PodWorkflow() -> str:
s = my_pod_task()
return s
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment