Skip to content

Instantly share code, notes, and snippets.

@lauralorenz
Created April 21, 2022 03:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lauralorenz/34194e2e4f4245c3f1ec3c0bf0610d58 to your computer and use it in GitHub Desktop.
Save lauralorenz/34194e2e4f4245c3f1ec3c0bf0610d58 to your computer and use it in GitHub Desktop.
cloud2!!!

Notes from cloud2 demo

  1. Install Prefect :-)
  2. sign up for Cloud 2.0 beta
    • prefect.io
    • Prefect 2.0 banner or (Product > Cloud 2.0)
    • Get Started
    • Sign up / Sign in
    • Make API key
  3. Initial CLI setup
    • prefect cloud login
  4. You can run a flow right now and the metadata will be stored in Cloud because of the CLI setup
  5. Configure storage
    • Create S3 bucket in your AWS account
    • prefect storage create with your AWS bucket name and API keys
    • prefect storage set-default {id} if it wasn't already your default (prefect storage ls to see all storage configs)
  6. Create a work queue
    • Make in the UI or with the CLI command: prefect work-queue create -fr KubernetesFlowRunner k8s-queue
  7. Create a deployment with prefect deployment create {yourfile}.py
  8. Deploy agent using kubectl apply -f agent-manifest.yaml or a similar file with name of your work queue

Demo part 0

  1. Create deployment with prefect deployment create basicflow-subprocess.py
  2. Start a local agent that will pick up local jobs
    1. prefect agent start local-queue
  3. Deploy prefect deployment run 'Demo/local-example'
  4. Edit the concurrency for the db tag:
    1. prefect concurrency-limit create db 1
    2. Run again prefect deployment run 'Demo/local-example'

Demo part 1

  1. Create deployment with prefect deployment create basicflow-k8s.py configured with 'dev' tag.
  2. Consider two clusters’ node pools
    1. Dev - using t1.micro node pools. teeny tiny. really only good for one job at a time.
    2. Staging - using something bigger (for demo I used t3.medium). can run multiple jobs at once.
  3. Create new work queues for ‘dev’ and ‘staging’ tags, still for the k8s flow runner
    1. Limit the ‘dev’ work queue to only one at a time
    2. Limit the ‘staging’ work queue to 10 jobs at a time
  4. Create new agents, one for each work queue in the appropriate cluster
  5. Start up multiple runs for the deployment; they will go to the dev agent.
  6. Edit the flow to change the deployment's tag to 'staging' and re-deploy and run it, and it will now go to your staging agent.

Demo part 2

  1. Given a deployment tagged with staging from prefect deployment create basicflow-k8s.py.
  2. Set a concurrency limit on the db tag.
  3. Rerun as above.
# The SA used by your agent workload needs these k8s RBAC permissions
# in order to track the job it starts for KubernetesFlowRunner configured deployments
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: orion-job-access-clusterrole
rules:
- apiGroups: [""]
resources: ["pods", "pods/log"]
verbs: ["get", "list", "watch"]
- apiGroups: ["batch", "extensions"]
resources: ["jobs"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: SA-use-orion
subjects:
- kind: ServiceAccount
name: default # choose the k8s SAs your workloads will use
namespace: default # choose the namespace your k8s SA is in
roleRef:
kind: ClusterRole
name: orion-job-access-clusterrole
apiGroup: rbac.authorization.k8s.io
apiVersion: apps/v1
kind: Deployment
metadata:
name: k8s-dev-queue
spec:
selector:
matchLabels:
app: k8s-dev-queue
replicas: 1
template:
metadata:
labels:
app: k8s-dev-queue
spec:
containers:
- name: agent
image: prefecthq/prefect:2.0b2-python3.8
command: ["prefect", "agent", "start"]
args: ["$(WORK_QUEUE)"]
imagePullPolicy: "IfNotPresent"
env:
- name: WORK_QUEUE # you can configure this as a value directly instead if you like
valueFrom:
fieldRef:
fieldPath: metadata.labels['app']
- name: PREFECT_API_URL
value: https://api-beta.prefect.io/api/accounts/{your account id}/workspaces/{your workspace id}
- name: PREFECT_API_KEY
value: {your api key}

AWS Deployment stuff

A few tips I collected along the way from various docs setting up an EKS cluster for the cloud2 demo.

Create a cluster

There is a lot of setup for an EKS cluster so its easiest to use the provided eksctl utility which automates some of it for you.

eksctl create cluster -f cluster.yaml

You can describe a cluster.yaml with modifications to the default behavior, like the following:

# cluster.yaml
apiVersion: eksctl.io/v1alpha5
kind: ClusterConfig

metadata:
  name: dev 
  region: us-west-2 

nodeGroups:
  - name: dev-ng-1
    instanceType: t2.micro
    desiredCapacity: 2

Give yourself console access

If you want to be able to view workloads from the console, you need to give your user access. This eks-console-full-access.yaml is the most permissive of a few provided options on the AWS docs here.

kubectl apply -f eks-console-full-access.yaml

kubectl edit -n kube-system configmap/aws-auth

mapUsers: |
    - userarn: arn:aws:iam::{your account}:user/{your user}
      username: {k8s username}
      groups:
        - system:cluster-admin
        - eks-console-dashboard-restricted-access-group

Give your workloads permission

Your workloads need permissions to your storage location, to read flows and to write results; if you are using S3 as your storage location, you will configure this through AWS IAM.

See more details about this on the AWS docs here.

Setup IAM OIDC

Check OIDC ID. aws eks describe-cluster --name {cluster} --query "cluster.identity.oidc.issuer" --output text

See if it's already been set. aws iam list-open-id-connect-providers | grep {oidc issuer}

Add it if not. eksctl utils associate-iam-oidc-provider --cluster {cluster} --approve

Bind IAM policy to k8s SA

Define a policy that has the AWS IAM your agent workload needs; as mentioned above, the policy needs at least access to your storage bucket configured against Cloud to read flows and write results to.

Then, you can use eksctl to create a k8s SA and bind that IAM policy to the k8s SA in one command.

eksctl create iamserviceaccount \
    --name default \
    --cluster dev \
    --role-name "orion-access" \
    --attach-policy-arn arn:aws:iam::{your account}:policy/{your policy} \
    --approve \
    --override-existing-serviceaccounts

Or annotate k8s SA

Or, you can annotate a k8s SA direclty so it knows what IAM role it should use when contacting AWS services. My workloads are using the default/default SA, but if your agent workload is configured to use a different k8s SA, annotate that one.

kubectl annotate serviceaccount -n default default \
eks.amazonaws.com/role-arn=arn:aws:iam::{your account}:role/orion-access

Give kubernetes flow runner other in-cluster rbac for job runs

kubectl apply -f agent-job-access.yaml

import os
import time
from random import randint
from anyio import sleep
from prefect import flow, task, get_run_logger
from prefect.task_runners import ConcurrentTaskRunner
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import KubernetesFlowRunner, KubernetesImagePullPolicy
@flow(name="Demo", task_runner=ConcurrentTaskRunner)
async def basic_flow():
for i in range(10):
await solo_task()
for i in range(10):
await concurrent_task()
@task()
async def concurrent_task():
t = randint(1, 10)
logger = get_run_logger()
logger.warning(f"Sleeping for {t}")
sleep(t)
@task(tags=['db']) # this tag is used to set a task concurrency limit
async def solo_task():
t = randint(5, 15)
logger = get_run_logger()
logger.warning(f"Sleeping for {t}")
sleep(t)
DeploymentSpec(
name="k8s-example",
flow=basic_flow,
tags=['staging'], # this tag is used to filter by my work queues
flow_runner=KubernetesFlowRunner(
image="prefecthq/prefect:2.0b2-python3.8",
image_pull_policy=KubernetesImagePullPolicy.ALWAYS,
env={
'PREFECT_API_URL': os.getenv('PREFECT_API_URL'),
'PREFECT_API_KEY': os.getenv('PREFECT_API_KEY'),
}
)
)
from prefect import flow, task, get_run_logger
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import SubprocessFlowRunner
@flow(name="Demo")
def basic_flow():
for i in range(100):
message()
@task(tags=['db']) # this tag is used to set a task concurrency limit
def message():
logger = get_run_logger()
logger.warning("The fun is about to begin")
DeploymentSpec(
name="local-example",
flow=basic_flow,
flow_runner=SubprocessFlowRunner()
)
from prefect import flow, get_run_logger
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import SubprocessFlowRunner
@flow(name="Demo")
def basic_flow():
logger = get_run_logger()
logger.warning("The fun is about to begin")
DeploymentSpec(
name="basic-example",
flow=basic_flow,
flow_runner=SubprocessFlowRunner(),
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment