Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save manojkarthick/fdd4091359525fc6d96b2ba863705b70 to your computer and use it in GitHub Desktop.
Save manojkarthick/fdd4091359525fc6d96b2ba863705b70 to your computer and use it in GitHub Desktop.
Create compute environment and job queue for AWS Batch with Boto3 (Polling and Waiter implementations)
####################################################################
# Create compute environment and job queue for AWS Batch with Boto3
# Daisuke Miyamoto
# 20200330
#
import boto3
import botocore.waiter
from botocore.exceptions import WaiterError
import time
def polling_compute_environment(client, compute_environment_name):
while True:
time.sleep(1)
response = client.describe_compute_environments(
computeEnvironments=[compute_environment_name],
)
compute_environment = response['computeEnvironments'][0]
if compute_environment['status'] == 'VALID':
break
def polling_job_queue(client, job_queue_name):
while True:
time.sleep(1)
response = client.describe_job_queues(
jobQueues=[job_queue_name],
)
job_queue = response['jobQueues'][0]
if job_queue['status'] == 'VALID':
break
def get_compute_environment_waiter(client):
waiter_id = 'ComputeEnvironmentWaiter'
model = botocore.waiter.WaiterModel({
'version': 2,
'waiters': {
waiter_id: {
'delay': 1,
'operation': 'DescribeComputeEnvironments',
'maxAttempts': 10,
'acceptors': [
{
'expected': 'VALID',
'matcher': 'pathAll',
'state': 'success',
'argument': 'computeEnvironments[].status'
},
{
'expected': 'INVALID',
'matcher': 'pathAny',
'state': 'failure',
'argument': 'computeEnvironments[].status'
}
]
}
}
})
return botocore.waiter.create_waiter_with_client(waiter_id, model, client)
def get_job_queue_waiter(client):
waiter_id = 'JobQueueWaiter'
model = botocore.waiter.WaiterModel({
'version': 2,
'waiters': {
waiter_id: {
'delay': 1,
'operation': 'DescribeJobQueues',
'maxAttempts': 10,
'acceptors': [
{
'expected': 'VALID',
'matcher': 'pathAll',
'state': 'success',
'argument': 'jobQueues[].status'
},
{
'expected': 'INVALID',
'matcher': 'pathAny',
'state': 'failure',
'argument': 'jobQueues[].status'
}
]
}
}
})
return botocore.waiter.create_waiter_with_client(waiter_id, model, client)
def my_create_compute_environment(client, compute_environment_name):
# Settings
service_role = 'arn:aws:iam::123477424058:role/service-role/AWSBatchServiceRole'
security_group_ids = ['sg-1234d41f']
subnets = ['subnet-1234914c', 'subnet-1234b05d', 'subnet-12348faa']
response = client.create_compute_environment(
computeEnvironmentName = compute_environment_name,
type = 'MANAGED',
state = 'ENABLED',
serviceRole= service_role,
computeResources={
'type': 'SPOT',
# 'type': 'EC2',
'allocationStrategy': 'SPOT_CAPACITY_OPTIMIZED',
'instanceRole': 'ecsInstanceRole',
'instanceTypes': [
'optimal'
],
'minvCpus': 0,
'desiredvCpus': 0,
'maxvCpus': 128,
'securityGroupIds': security_group_ids,
'subnets': subnets,
'tags': {
'Name': 'Batch',
'Project': 'Batch',
},
},
)
def my_create_job_queue(client, job_queue_name, compute_environment_name):
response = client.create_job_queue(
computeEnvironmentOrder=[
{
'computeEnvironment': compute_environment_name,
'order': 1,
},
],
jobQueueName=job_queue_name,
priority=1,
state='ENABLED',
)
if __name__ == '__main__':
client = boto3.client('batch')
# Example: Polling
name = 'boto3-polling'
compute_environment_name = name + '-compute'
job_queue_name = name + '-queue'
my_create_compute_environment(client, compute_environment_name)
polling_compute_environment(client, compute_environment_name)
my_create_job_queue(client, job_queue_name, compute_environment_name)
polling_job_queue(client, job_queue_name)
# Example: Waiter
name = 'boto3-waiter'
compute_environment_name = name + '-compute'
job_queue_name = name + '-queue'
my_create_compute_environment(client, compute_environment_name)
try:
waiter = get_compute_environment_waiter(client)
waiter.wait(computeEnvironments=[compute_environment_name])
except WaiterError as e:
print(e.message)
my_create_job_queue(client, job_queue_name, compute_environment_name)
try:
waiter = get_job_queue_waiter(client)
waiter.wait(jobQueues=[job_queue_name])
except WaiterError as e:
print(e.message)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment