Skip to content

Instantly share code, notes, and snippets.

@lauralorenz
Created March 26, 2021 18:50
Show Gist options
  • Save lauralorenz/5bc3f8f005710f33dc60f7cfcda384d9 to your computer and use it in GitHub Desktop.
Save lauralorenz/5bc3f8f005710f33dc60f7cfcda384d9 to your computer and use it in GitHub Desktop.
Introducing 0.14.0 Livestream code
from prefect.environments.storage import S3
from prefect.environments import FargateTaskEnvironment
from prefect.engine.executors import DaskExecutor, LocalDaskExecutor
from prefect import task, Flow, Parameter
import prefect
STORAGE = S3(bucket="demo-prefect-flows-14")
ENVIRONMENT = FargateTaskEnvironment(
networkMode="awsvpc",
family="13-fargate-dask",
taskDefinition="13-fargate-dask",
cpu="256",
memory="512",
cluster="prefect-demo-cluster",
networkConfiguration={'awsvpcConfiguration': {'assignPublicIp': 'ENABLED', 'subnets': ['subnet-7410175c'], 'securityGroups': []}},
taskRoleArn="arn:aws:iam::{{REDACTED}}:role/prefect-demo-fargate-task-role",
executionRoleArn="arn:aws:iam::{{REDACTED}}:role/ecsTaskExecutionRole",
metadata={"image":'prefecthq/prefect:all_extras-0.13.19'},
executor=DaskExecutor(cluster_class="dask_cloudprovider.aws.FargateCluster",
cluster_kwargs={"n_workers": 4, "image": "prefecthq/prefect:all_extras-0.13.19"}))
@task
def hi():
logger = prefect.context.get('logger')
logger.info("Hello!")
with Flow("prefect-13-dask", storage=STORAGE, environment=ENVIRONMENT) as flow:
hi()
# if __name__ == '__main__':
# import logging
# import sys
# logger = logging.getLogger('distributed.scheduler')
# logger.setLevel('DEBUG')
# log_stream = logging.StreamHandler(sys.stdout)
# logger.addHandler(log_stream)
# flow.run()
#flow.run(executor=DaskExecutor(cluster_kwargs={"n_workers": 4, "threads_per_worker": 2}))
flow.register("14-demo")
# #########
# # agent #
# #########
if __name__ == "__main__":
from prefect.agent.fargate import FargateAgent
AGENT = FargateAgent(labels=["s3-flow-storage"],
cpu="256",
memory="512",
cluster="prefect-demo-cluster",
networkConfiguration={'awsvpcConfiguration': {'assignPublicIp': 'ENABLED', 'subnets': ['subnet-7410175c'], 'securityGroups': []}},
taskRoleArn="arn:aws:iam::{{REDACTED}}:role/prefect-demo-fargate-task-role",
executionRoleArn="arn:aws:iam::{{REDACTED}}:role/ecsTaskExecutionRole",
enable_task_revisions=True)
AGENT.start()
from prefect.environments.storage import S3
from prefect.environments import FargateTaskEnvironment
from prefect import task, Flow, Parameter
import prefect
STORAGE = S3(bucket="demo-prefect-flows-14")
ENVIRONMENT = FargateTaskEnvironment(
networkMode="awsvpc",
family="13-fargate",
taskDefinition="13-fargate",
cpu="256",
memory="512",
cluster="prefect-demo-cluster",
networkConfiguration={'awsvpcConfiguration': {'assignPublicIp': 'ENABLED', 'subnets': ['subnet-7410175c'], 'securityGroups': []}},
taskRoleArn="arn:aws:iam::{{REDACTED}}:role/prefect-demo-fargate-task-role",
executionRoleArn="arn:aws:iam::{{REDACTED}}:role/ecsTaskExecutionRole",
metadata={"image":'prefecthq/prefect:all_extras-0.13.19'})
@task
def hi():
logger = prefect.context.get('logger')
logger.info("Hello!")
with Flow("prefect-13", storage=STORAGE, environment=ENVIRONMENT) as flow:
hi()
flow.register("14-demo")
#########
# agent #
#########
if __name__ == "__main__":
from prefect.agent.fargate import FargateAgent
AGENT = FargateAgent(labels=["s3-flow-storage"],
cpu="256",
memory="512",
cluster="prefect-demo-cluster",
networkConfiguration={'awsvpcConfiguration': {'assignPublicIp': 'ENABLED', 'subnets': ['subnet-7410175c'], 'securityGroups': []}},
taskRoleArn="arn:aws:iam::{{REDACTED}}:role/prefect-demo-fargate-task-role",
executionRoleArn="arn:aws:iam::{{REDACTED}}:role/ecsTaskExecutionRole",
enable_task_revisions=True)
AGENT.start()
from prefect.storage import GitHub
from prefect.run_configs import ECSRun
from prefect.executors import DaskExecutor
from prefect import task, Flow, Parameter
import prefect
STORAGE = GitHub(repo="lauralorenz/fourteen-oh", ref="main", path="/14-fargate-executor.py")
RUN_CONFIG = ECSRun(
run_task_kwargs={
#"networkMode":"awsvpc",
#"taskDefinition":"14-fargate",
"cluster":"prefect-demo-cluster",
"networkConfiguration": {'awsvpcConfiguration': {'assignPublicIp': 'ENABLED', 'subnets': ['subnet-7410175c'], 'securityGroups': []}},
#"executionRoleArn": "arn:aws:iam::{{REDACTED}}:role/ecsTaskExecutionRole"
},
task_role_arn="arn:aws:iam::{{REDACTED}}:role/prefect-demo-fargate-task-role",
image='prefecthq/prefect:0.14.0',
memory="512",
cpu="256")
EXECUTOR = DaskExecutor()
@task
def hi():
logger = prefect.context.get('logger')
logger.info("Hello!")
with Flow("prefect-14-dask", storage=STORAGE, run_config=RUN_CONFIG, executor=EXECUTOR) as flow:
hi()
flow.register("14-demo")
#########
# agent #
#########
if __name__ == '__main__':
from prefect.agent.ecs.agent import ECSAgent
AGENT = ECSAgent(#labels=["s3-flow-storage"],
#cpu="256",
#memory="512",
cluster="prefect-demo-cluster",
#networkConfiguration={'awsvpcConfiguration': {'assignPublicIp': 'ENABLED', 'subnets': ['subnet-7410175c'], 'securityGroups': []}},
task_role_arn="arn:aws:iam::{{REDACTED}}:role/prefect-demo-fargate-task-role",
#executionRoleArn="arn:aws:iam::{{REDACTED}}:role/ecsTaskExecutionRole"
)
AGENT.start()
from prefect.storage import GitHub
from prefect.run_configs import ECSRun
from prefect import task, Flow, Parameter
import prefect
STORAGE = GitHub(repo="lauralorenz/fourteen-oh", ref="main", path="/14-fargate.py")
RUN_CONFIG = ECSRun(
run_task_kwargs={
#"networkMode":"awsvpc",
#"taskDefinition":"14-fargate",
"cluster":"prefect-demo-cluster",
"networkConfiguration": {'awsvpcConfiguration': {'assignPublicIp': 'ENABLED', 'subnets': ['subnet-7410175c'], 'securityGroups': []}},
#"executionRoleArn": "arn:aws:iam::{{REDACTED}}:role/ecsTaskExecutionRole"
},
task_role_arn="arn:aws:iam::{{REDACTED}}:role/prefect-demo-fargate-task-role",
#image='prefecthq/prefect:all_extras-0.13.19',
memory="512",
cpu="256")
@task
def hi():
logger = prefect.context.get('logger')
logger.info("Hello!")
with Flow("prefect-14", storage=STORAGE, run_config=RUN_CONFIG) as flow:
hi()
flow.register("14-demo")
#########
# agent #
#########
if __name__ == '__main__':
from prefect.agent.ecs.agent import ECSAgent
AGENT = ECSAgent(
#cpu="256",
#memory="512",
cluster="prefect-demo-cluster",
#networkConfiguration={'awsvpcConfiguration': {'assignPublicIp': 'ENABLED', 'subnets': ['subnet-7410175c'], 'securityGroups': []}},
task_role_arn="arn:aws:iam::{{REDACTED}}:role/prefect-demo-fargate-task-role",
#executionRoleArn="arn:aws:iam::{{REDACTED}}:role/ecsTaskExecutionRole"
)
AGENT.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment