Skip to content

Instantly share code, notes, and snippets.

@jacobtomlinson
Last active February 26, 2024 14:35
Show Gist options
  • Star 18 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jacobtomlinson/ee5ba79228e42bcc9975faf0179c3d1a to your computer and use it in GitHub Desktop.
Save jacobtomlinson/ee5ba79228e42bcc9975faf0179c3d1a to your computer and use it in GitHub Desktop.
Dask on Fargate from scratch
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Dask on Fargate from scratch\n",
"This notebook covers creating a dask distributed cluster on AWS Fargate."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## AWS Config\n",
"In order to create the resources we need on AWS you will need to specify some keys. They will need permission to create ECS resources, IAM roles and EC2 security groups. You can either set the keys up using the aws command line tool or set them as environment variables. You will also need to specify the region you are in. \n",
"\n",
"We then need to create a few service clients with boto."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import boto3\n",
"import os\n",
"\n",
"# os.environ['AWS_ACCESS_KEY_ID'] = ''\n",
"# os.environ['AWS_SECRET_ACCESS_KEY'] = ''\n",
"os.environ['AWS_REGION'] = 'eu-west-2'\n",
"\n",
"ec2 = boto3.client('ec2')\n",
"ecs = boto3.client('ecs')\n",
"iam = boto3.client('iam')\n",
"logs = boto3.client('logs')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Container image\n",
"To run dask on Fargate we will need a container image which can run the scheduler and worker tasks. It is recommended that you ensure the package versions in the image are the same as the environment you wish to use the cluster from. But for this example we are going to use the official dask docker image."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"CONTAINER_IMAGE = 'daskdev/dask:0.19.4'"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## AWS Resources\n",
"We need to create a number of resources in AWS before we can launch on Fargate."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### ECS Cluster\n",
"We will need a fargate capable ECS cluster."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"CLUSTER_NAME = 'dask_fargate_test'\n",
"CLUSTER_ARN = ecs.create_cluster(clusterName=CLUSTER_NAME)['cluster']['clusterArn']\n",
"CLUSTER_ARN"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### IAM roles\n",
"\n",
"#### Execution role\n",
"We need a role for Fargate to use when executing our tasks. It will need Elastic Container Registry (ECR) read access if you want to use a docker image stored there and CloudWatch Logs write access in order to record logs from the tasks. This role will also need a trust relationship with ECS tasks."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"EXECUTION_ROLE_NAME = 'dask-fargate-execution'\n",
"try:\n",
" \n",
" response = iam.create_role(\n",
" RoleName=EXECUTION_ROLE_NAME,\n",
" AssumeRolePolicyDocument=\"\"\"{\n",
" \"Version\": \"2012-10-17\",\n",
" \"Statement\": [\n",
" {\n",
" \"Effect\": \"Allow\",\n",
" \"Principal\": {\n",
" \"Service\": \"ecs-tasks.amazonaws.com\"\n",
" },\n",
" \"Action\": \"sts:AssumeRole\"\n",
" }\n",
" ]\n",
" }\"\"\",\n",
" Description='A role for Fargate to use when executing'\n",
" )\n",
" iam.attach_role_policy(RoleName=EXECUTION_ROLE_NAME, PolicyArn='arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryReadOnly')\n",
" iam.attach_role_policy(RoleName=EXECUTION_ROLE_NAME, PolicyArn='arn:aws:iam::aws:policy/CloudWatchLogsFullAccess')\n",
" iam.attach_role_policy(RoleName=EXECUTION_ROLE_NAME, PolicyArn='arn:aws:iam::aws:policy/service-role/AmazonEC2ContainerServiceRole')\n",
"except:\n",
" response = iam.get_role(RoleName=EXECUTION_ROLE_NAME)\n",
"\n",
"EXECUTION_ROLE_ARN = response['Role']['Arn']\n",
"EXECUTION_ROLE_ARN"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Task role\n",
"\n",
"We will also need an IAM role for our tasks. This decides which AWS services we want our dask workers to be able to access. It is likely we will want to give them some S3 access at the very minimum. This role will also need a trust relationship with ECS tasks."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"TASK_ROLE_NAME = 'dask-fargate-task'\n",
"try:\n",
" \n",
" response = iam.create_role(\n",
" RoleName=TASK_ROLE_NAME,\n",
" AssumeRolePolicyDocument=\"\"\"{\n",
" \"Version\": \"2012-10-17\",\n",
" \"Statement\": [\n",
" {\n",
" \"Effect\": \"Allow\",\n",
" \"Principal\": {\n",
" \"Service\": \"ecs-tasks.amazonaws.com\"\n",
" },\n",
" \"Action\": \"sts:AssumeRole\"\n",
" }\n",
" ]\n",
" }\"\"\",\n",
" Description='A role for dask containers to use when executing'\n",
" )\n",
" iam.attach_role_policy(RoleName=TASK_ROLE_NAME, PolicyArn='arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess')\n",
"except:\n",
" response = iam.get_role(RoleName=TASK_ROLE_NAME)\n",
"\n",
"TASK_ROLE_ARN = response['Role']['Arn']\n",
"TASK_ROLE_ARN"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### CloudWatch Logs\n",
"We will be storing logs in CloudWatch so that we can explore them later if we need to. This is particularly useful if you encounter issues with your worker tasks and wish to debug them. We don't actually need to create anything here as they will be automatically created when we write to them, but to ensure we don't end up paying to store logs we don't need any more let's create a group now and set a 30 day retention policy."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"CLOUDWATCH_LOGS_GROUP= '/pangeo/dask-fargate'\n",
"CLOUDWATCH_LOGS_STREAM_PREFIX = 'dask-fargate'\n",
"CLOUDWATCH_LOGS_RETENTION = 30"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"try:\n",
" logs.create_log_group(\n",
" logGroupName=CLOUDWATCH_LOGS_GROUP\n",
" )\n",
" logs.put_retention_policy(\n",
" logGroupName=CLOUDWATCH_LOGS_GROUP,\n",
" retentionInDays=CLOUDWATCH_LOGS_RETENTION\n",
" )\n",
"except: \n",
" pass"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### VPC Networking\n",
"\n",
"We need to tell fargate which subnets it can assign network interfaces in and which security groups to attach to the containers. We will get the subnets from the default VPC."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"[default_vpc] = [vpc for vpc in ec2.describe_vpcs()['Vpcs'] if vpc['IsDefault']]\n",
"VPC_SUBNETS = [subnet['SubnetId'] for subnet in ec2.describe_subnets()['Subnets'] if subnet['VpcId'] == default_vpc['VpcId']]\n",
"VPC_SUBNETS"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We also need a security group for our tasks. It needs to allow external access on the dask distributed ports for us to connect to our cluster and view the dashboard. It also needs to allow all communication between the workers and the scheduler as workers communicate on random high ports."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"VPC_SECURITY_GROUP_NAME = 'dask'\n",
"try:\n",
" response = ec2.create_security_group(\n",
" Description='A security group for dask',\n",
" GroupName=VPC_SECURITY_GROUP_NAME,\n",
" VpcId=default_vpc['VpcId'],\n",
" DryRun=False\n",
" )\n",
" ec2.authorize_security_group_ingress(\n",
" GroupId=response['GroupId'],\n",
" IpPermissions=[\n",
" {\n",
" 'IpProtocol': 'TCP',\n",
" 'FromPort': 8786,\n",
" 'ToPort': 8787,\n",
" 'IpRanges': [\n",
" {\n",
" 'CidrIp': '0.0.0.0/0',\n",
" 'Description': 'Anywhere'\n",
" },\n",
" ],\n",
" 'Ipv6Ranges': [\n",
" {\n",
" 'CidrIpv6': '::/0',\n",
" 'Description': 'Anywhere'\n",
" },\n",
" ]\n",
" },\n",
" {\n",
" 'IpProtocol': 'TCP',\n",
" 'FromPort': 0,\n",
" 'ToPort': 65535,\n",
" 'UserIdGroupPairs': [\n",
" {\n",
" 'GroupName': VPC_SECURITY_GROUP_NAME\n",
" },\n",
" ]\n",
" },\n",
" ],\n",
" DryRun=False\n",
" )\n",
"except:\n",
" [response] = ec2.describe_security_groups(\n",
" GroupNames=[VPC_SECURITY_GROUP_NAME]\n",
" )['SecurityGroups']\n",
" \n",
"VPC_SECURITY_GROUPS = [response['GroupId']]\n",
"VPC_SECURITY_GROUPS"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Task size and count\n",
"Specify the cpu and memory for your containers and number of workers you need.\n",
"\n",
"CPU is specified in units of vCPU / 1024. Therefore to allocate 1 vCPU you must specify 1024.\n",
"Memory is specified in megabytes. Therefore to allocate 4GB of RAM you must specify 4096."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"SCHEDULER_CPU = 1024\n",
"SCHEDULER_MEM = 4096\n",
"WORKER_CPU = 4096\n",
"WORKER_MEM = 16384\n",
"NUM_WORKERS = 25"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create task definitions\n",
"Next let's create a task definition for the sheculer and worker. This is basically a template for the Docker containers we want to run on Fargate."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"scheduler_task_definition = ecs.register_task_definition(\n",
" family='dask-scheduler',\n",
" taskRoleArn=TASK_ROLE_ARN,\n",
" executionRoleArn=EXECUTION_ROLE_ARN,\n",
" networkMode='awsvpc',\n",
" containerDefinitions=[\n",
" {\n",
" 'name': 'dask-scheduler',\n",
" 'image': CONTAINER_IMAGE,\n",
" 'cpu': SCHEDULER_CPU,\n",
" 'memory': SCHEDULER_MEM,\n",
" 'memoryReservation': SCHEDULER_MEM,\n",
" 'essential': True,\n",
" 'command': ['dask-scheduler'],\n",
" 'logConfiguration': {\n",
" 'logDriver': 'awslogs',\n",
" 'options': {\n",
" 'awslogs-region': os.environ['AWS_REGION'],\n",
" 'awslogs-group': CLOUDWATCH_LOGS_GROUP,\n",
" 'awslogs-stream-prefix': CLOUDWATCH_LOGS_STREAM_PREFIX,\n",
" 'awslogs-create-group': 'true'\n",
" }\n",
" }\n",
" },\n",
" ],\n",
" volumes=[],\n",
" requiresCompatibilities=[\n",
" 'FARGATE'\n",
" ],\n",
" cpu=str(SCHEDULER_CPU),\n",
" memory=str(SCHEDULER_MEM)\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"worker_task_definition = ecs.register_task_definition(\n",
" family='dask-worker',\n",
" taskRoleArn=TASK_ROLE_ARN,\n",
" executionRoleArn=EXECUTION_ROLE_ARN,\n",
" networkMode='awsvpc',\n",
" containerDefinitions=[\n",
" {\n",
" 'name': 'dask-worker',\n",
" 'image': CONTAINER_IMAGE,\n",
" 'cpu': WORKER_CPU,\n",
" 'memory': WORKER_MEM,\n",
" 'memoryReservation': WORKER_MEM,\n",
" 'essential': True,\n",
" 'command': ['dask-worker', '--nthreads', '{}'.format(int(WORKER_CPU / 1024)), '--no-bokeh', '--memory-limit', '{}GB'.format(int(WORKER_MEM / 1024)), '--death-timeout', '60'],\n",
" 'logConfiguration': {\n",
" 'logDriver': 'awslogs',\n",
" 'options': {\n",
" 'awslogs-region': os.environ['AWS_REGION'],\n",
" 'awslogs-group': CLOUDWATCH_LOGS_GROUP,\n",
" 'awslogs-stream-prefix': CLOUDWATCH_LOGS_STREAM_PREFIX,\n",
" 'awslogs-create-group': 'true'\n",
" }\n",
" }\n",
" },\n",
" ],\n",
" volumes=[],\n",
" requiresCompatibilities=[\n",
" 'FARGATE'\n",
" ],\n",
" cpu=str(WORKER_CPU),\n",
" memory=str(WORKER_MEM)\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"[scheduler_task_definition['taskDefinition']['taskDefinitionArn'],\n",
" worker_task_definition['taskDefinition']['taskDefinitionArn']]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Start tasks\n",
"\n",
"Now that we have created all the AWS resources we need we can start launching tasks on Fargate.\n",
"\n",
"### Create a distributed scheduler\n",
"Let's start by creating a scheduler task."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"[scheduler_task] = ecs.run_task(\n",
" cluster=CLUSTER_ARN,\n",
" taskDefinition=scheduler_task_definition['taskDefinition']['taskDefinitionArn'],\n",
" overrides={},\n",
" count=1,\n",
" launchType='FARGATE',\n",
" networkConfiguration={\n",
" 'awsvpcConfiguration': {\n",
" 'subnets': VPC_SUBNETS,\n",
" 'securityGroups': VPC_SECURITY_GROUPS,\n",
" 'assignPublicIp': 'ENABLED'\n",
" }\n",
" }\n",
")['tasks']\n",
"scheduler_task_arn = scheduler_task['taskArn']"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Tasks take a little while to start, so before moving on we should wait for the scheduler to start. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import time\n",
"print('Waiting for scheduler', end=\"\")\n",
"while scheduler_task['lastStatus'] in ['PENDING', 'PROVISIONING']:\n",
" time.sleep(1)\n",
" [scheduler_task] = ecs.describe_tasks(cluster=CLUSTER_ARN, tasks=[scheduler_task_arn])['tasks']\n",
" print('.', end='')\n",
" \n",
"if scheduler_task['lastStatus'] == 'RUNNING':\n",
" print(\"\"\"\n",
"Ready!\n",
"\n",
"Container placed on Fargate after {connectivity}s\n",
"Image started pulling after {pullstart}s\n",
"Image finished pulling after {pullstop}s\n",
"Container started after {containerstart}s\n",
" \"\"\".format(\n",
" start=(scheduler_task['createdAt'] - scheduler_task['createdAt']).total_seconds(),\n",
" connectivity=(scheduler_task['connectivityAt'] - scheduler_task['createdAt']).total_seconds(),\n",
" pullstart=(scheduler_task['pullStartedAt'] - scheduler_task['createdAt']).total_seconds(),\n",
" pullstop=(scheduler_task['pullStoppedAt'] - scheduler_task['createdAt']).total_seconds(),\n",
" containerstart=(scheduler_task['startedAt'] - scheduler_task['createdAt']).total_seconds(),\n",
" ))\n",
"else:\n",
" print(\"\"\"\n",
"Failed to start\"\"\")\n",
" for container in scheduler_task['containers']:\n",
" print(container['reason'])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We also need to know the public and private IP addresses of the scheduler in order to connect workers to it and view the dashboard."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"[scheduler_eni] = [attachment for attachment in scheduler_task['attachments'] if attachment['type'] == 'ElasticNetworkInterface']\n",
"[scheduler_network_interface_id] = [detail['value'] for detail in scheduler_eni['details'] if detail['name'] == 'networkInterfaceId']\n",
"scheduler_eni = ec2.describe_network_interfaces(NetworkInterfaceIds=[scheduler_network_interface_id])\n",
"interface = scheduler_eni['NetworkInterfaces'][0]\n",
"scheduler_public_ip = interface['Association']['PublicIp']\n",
"scheduler_private_ip = interface['PrivateIpAddresses'][0]['PrivateIpAddress']"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"scheduler_public_ip"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"scheduler_private_ip"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can now navigate to the dashboard URL."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from IPython.core.display import display, HTML\n",
"display(HTML('<a href=\"{url}\" target=\"_blank\">{url}</a>'.format(url='http://{}:8787/status'.format(scheduler_public_ip))))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Run worker tasks\n",
"Now we can create our workers and point them towards our scheduler on its private IP."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"task_batches = ([10] * int(NUM_WORKERS / 10)) + [NUM_WORKERS % 10]\n",
"for batch in task_batches:\n",
" worker_task = ecs.run_task(\n",
" cluster=CLUSTER_ARN,\n",
" taskDefinition=worker_task_definition['taskDefinition']['taskDefinitionArn'],\n",
" overrides={\n",
" 'containerOverrides': [{\n",
" 'name': 'dask-worker',\n",
" 'environment': [\n",
" {\n",
" 'name': 'DASK_SCHEDULER_ADDRESS',\n",
" 'value': 'tcp://{}:8786'.format(scheduler_private_ip)\n",
" },\n",
" ]\n",
" }]\n",
" },\n",
" count=batch,\n",
" launchType='FARGATE',\n",
" networkConfiguration={\n",
" 'awsvpcConfiguration': {\n",
" 'subnets': VPC_SUBNETS,\n",
" 'securityGroups': VPC_SECURITY_GROUPS,\n",
" 'assignPublicIp': 'ENABLED'\n",
" }\n",
" }\n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Connect to cluster\n",
"We can now connect distributed to our cluster. We will likely not see any workers yet as they will still be starting up, but once they arrive they will start doing work."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import dask\n",
"import distributed"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"c = distributed.Client('tcp://{}:8786'.format(scheduler_public_ip))\n",
"c"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Test dask\n",
"Let's give dask some work to do and display the progress so we can see when workers join and start calculating."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import dask.array as da\n",
"x = da.random.random((10000, 10000), chunks=(1000, 1000))\n",
"x"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"y = x + x.T\n",
"z = y[::2, 5000:].mean(axis=1)\n",
"z"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"z = z.persist()\n",
"distributed.progress(z)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Cluster cost estimate\n",
"Thanks to the per second pricing of Fargate we can calculate our costs at any time.\n",
"\n",
"Fargate calculates cost by the number of CPUs and amount of memory your tasks have requested per second. Let's start by setting some constants for the per second price. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"COST_VCPU_PRICE = 0.0582 / 3600\n",
"COST_MEM_PRICE = 0.0146 / 3600"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now we can get the list of tasks in our cluster with the amount of CPU and memory requested along with the total number of seconds the task has been active."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def get_tasks_cost_metrics():\n",
" import pytz\n",
" from datetime import datetime\n",
"\n",
" tasks = ecs.describe_tasks(cluster=CLUSTER_ARN, tasks=ecs.list_tasks(cluster=CLUSTER_ARN)['taskArns'])['tasks']\n",
" cpu = [int(task['cpu']) / 1024 for task in tasks]\n",
" mem = [int(task['memory']) / 1024 for task in tasks]\n",
"\n",
" now = datetime.utcnow().replace(tzinfo=pytz.utc)\n",
" return [{'cpu': int(task['cpu']) / 1024, 'mem': int(task['memory']) / 1024, 'time': (now - task['pullStartedAt']).total_seconds()} for task in tasks]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"From this list we can calculate the total cost of ownership per minute and the total cost of the cluster so far."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"tasks_cost_metrics = get_tasks_cost_metrics()\n",
"\n",
"per_second_task_costs = [(task['cpu'] * COST_VCPU_PRICE ) + (task['mem'] * COST_MEM_PRICE) for task in tasks_cost_metrics]\n",
"print(\"Your cluster costs ${cost:.5f} per minute.\".format(cost=sum(per_second_task_costs) * 60))\n",
"\n",
"cumulative_task_costs = [(task['cpu'] * COST_VCPU_PRICE * task['time']) + (task['mem'] * COST_MEM_PRICE * task['time']) for task in tasks_cost_metrics]\n",
"print(\"It has cost you ${cost:.5f} so far.\".format(cost=sum(cumulative_task_costs)))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Shutting down the cluster\n",
"Currently there is no way for the cluster to automatically shut down, so we need to do this manually. This is as simple as stopping the scheduler task as the workers will exit when they lose connection."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"scheduler_task = ecs.stop_task(cluster=CLUSTER_ARN, task=scheduler_task_arn)['task']\n",
"scheduler_task['desiredStatus']"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Delete resources (optional)\n",
"\n",
"At this point everything which was costing money on our account has been removed (with the exception of the log messages which will be removed automatically after 30 days, we paid to ingest and store them for a month so we may as well leave them there until they expire). \n",
"\n",
"However for neatness we can also go through the AWS resources we created earlier and remove them all again."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# ECS cluster\n",
"ecs.delete_cluster(cluster=CLUSTER_NAME)\n",
"\n",
"# Execution role\n",
"for policy in iam.list_attached_role_policies(RoleName=EXECUTION_ROLE_NAME)['AttachedPolicies']:\n",
" iam.detach_role_policy(RoleName=EXECUTION_ROLE_NAME, PolicyArn=policy['PolicyArn'])\n",
"iam.delete_role(RoleName=EXECUTION_ROLE_NAME)\n",
" \n",
"# Task role\n",
"for policy in iam.list_attached_role_policies(RoleName=TASK_ROLE_NAME)['AttachedPolicies']:\n",
" iam.detach_role_policy(RoleName=TASK_ROLE_NAME, PolicyArn=policy['PolicyArn'])\n",
"iam.delete_role(RoleName=TASK_ROLE_NAME)\n",
" \n",
"# CloudWatch log group\n",
"logs.delete_log_group(logGroupName=CLOUDWATCH_LOGS_GROUP)\n",
" \n",
"# VPC Security group\n",
"ec2.delete_security_group(GroupName=VPC_SECURITY_GROUP_NAME, DryRun=False)\n",
" \n",
"# Scheduler task definition\n",
"ecs.deregister_task_definition(taskDefinition=scheduler_task_definition['taskDefinition']['taskDefinitionArn'])\n",
" \n",
"# Worker task definition\n",
"ecs.deregister_task_definition(taskDefinition=worker_task_definition['taskDefinition']['taskDefinitionArn'])\n",
"\n",
"print(\"Done!\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python [default]",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.3"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
@KseniiaPalin
Copy link

Thank you!

Probably the first guide of sort which works from the first attempt :)

p.s. Not counting setting the region. I had to to the following to overwrite my default region:
region = 'eu-west-2' ec2 = boto3.client('ec2', region_name=region) ecs = boto3.client('ecs', region_name=region) iam = boto3.client('iam', region_name=region) logs = boto3.client('logs', region_name=region)

@jacobtomlinson
Copy link
Author

@KseniiaPalin great to hear it worked for you! I usually set my region once with aws configure using the CLI tool, always forget about that :).

This work is actually now wrapped up in dask-cloudprovider. Would love some folks to test it out and give feedback. It should now be as easy as:

from dask_cloudprovider import FargateCluster
cluster = FargateCluster()  # This runs basically the whole notebook above

from distributed import Client
client = Client(cluster)

@KseniiaPalin
Copy link

Thank you @jacobtomlinson!

With dask-cloudprovider I am currently having problems specifying configuration.
My default AWS environment is eu-north-1 and I don't want to change it.
Here is the code I use to create a cluster:

from dask_cloudprovider import FargateCluster
cluster = FargateCluster(
    environment={ 'AWS_REGION': 'eu-west-2'}, # The resources are being created in `eu-north-1` anyway
    name='dask_fargate_cluster' # The name is being generated anyway
)  

What is a correct way to configure the AWS environment for a cluster?

Also, in this Notebook you are re-using resources when they've been created, but dask-cloudprovider produces and exception when i.e. CloudWatch log group name is already taken.

@jacobtomlinson
Copy link
Author

Thanks for testing this out. If you have issues like these could you please raise them on dask-cloudprovider.

Quick answers you are setting the environment for your workers there, not the current session. I recommend using os.environ to set you local variables. This does suggest that there should be a kwarg to set you region in FargateCluster, perhaps to set yours keys too.

Would you be able to raise all of this in issues on dask-cloudprovider? It would be super helpful!

@ks233ever
Copy link

ks233ever commented Feb 18, 2020

@jacobtomlinson if we already have a fargate cluster set up, how are we able to connect to it in this step

c = distributed.Client('tcp://{}:8786'.format(scheduler_public_ip))

following the above steps returns a timeout error

@jacobtomlinson
Copy link
Author

Your cluster needs to expose the dash scheduler on the scheduler_public_ip on port 8786. If it isn't you will need to modify this to use the correct url.

@RichardScottOZ
Copy link

Thanks Jacob...scheduler typo under Create Task Definitions btw

@adair-kovac
Copy link

I'm getting the same timeout as ks233ever was, yet can connect to the ip and port from telnet on the same machine where the notebook is running.

@jacobtomlinson
Copy link
Author

@adair-kovac this gist is pretty old now. All current work is being done in dask-cloudprovider and things here are definitely not supported.

@adamjacobgilbert
Copy link

Is there an example of the above, but using the Dask Cloudprovider FargateCluster() class?

@adamjacobgilbert
Copy link

Thanks. Ive read this one a few times. I am struggling with the networking set up.

cluster = FargateCluster(
cluster_name_template='Dask-Test-30',
image="daskdev/dask:latest",
#environment={'EXTRA_PIP_PACKAGES':'joblib'},
scheduler_cpu=1024,
scheduler_mem=4096,
worker_cpu=4096,
worker_mem=16384,
execution_role_arn="arn:aws:iam::260849320:role/dask-fargate-execution", #UPDATED
task_role_arn='arn:aws:iam::260849720:role/dask-fargate-task', #UPDATED
#task_role_policies=[]
#vpc='vpc-0280b92031b9f010c',
subnets=[
'subnet-06cc237e',
'subnet-2a505861',
'subnet-cf04f2',
'subnet-3a2756',
'subnet-08ba9c01b59b6'
], # updated
security_groups=['sg-02fe57ad943901'], #updated
n_workers=25,
fargate_use_private_ip=False,
scheduler_timeout="15 minutes"

                    )

cluster

@adamjacobgilbert
Copy link

When I got to generate the link to the scheduler, I get network / connectivity issues, which makes me think my companies firewall is messing it up, and that my security group rules need to be updated, but I dont know how to troubleshoot this.

from IPython.core.display import display, HTML display(HTML('<a href="{url}" target="_blank">{url}</a>'.format(url='http://{}:8787/status'.format(scheduler_public_ip))))

@RichardScottOZ
Copy link

Yeah, an AWS issue there, had the same problem.

@RichardScottOZ
Copy link

One of the AWS Data Scientists suggested this post to me about using the load balancer to monitor:- https://aws.amazon.com/blogs/machine-learning/machine-learning-on-distributed-dask-using-amazon-sagemaker-and-aws-fargate/

@adamjacobgilbert
Copy link

Ive tried this as well. The cloud formation template doesnt work either. I essentially need to get my python instance into a the same VPC that the dask fargate cluster is in, or conversely, to use a proxy server.

@c-leber
Copy link

c-leber commented Oct 7, 2021

@jacobtomlinson if we already have a fargate cluster set up, how are we able to connect to it in this step

c = distributed.Client('tcp://{}:8786'.format(scheduler_public_ip))

following the above steps returns a timeout error

Hi @ks233ever and @adair-kovac
Any resolution of this Client timeout issue?

@jacobtomlinson
Copy link
Author

To reiterate

All current work is being done in dask-cloudprovider and things here are definitely not supported.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment