Skip to content

Instantly share code, notes, and snippets.

@DanielViglione
Last active May 26, 2021 14:38
Show Gist options
  • Save DanielViglione/0e96b3ccb919234883ec2cc96123e17f to your computer and use it in GitHub Desktop.
Save DanielViglione/0e96b3ccb919234883ec2cc96123e17f to your computer and use it in GitHub Desktop.
Python script to update or create new scheduled tasks.
import boto3
import json
import glob
import sys
import os
import re
# Get latest version of task definition.
def list_image(taskdefname):
client = boto3.client('ecs')
response = client.list_task_definitions(
familyPrefix=taskdefname,
sort='DESC',
status='ACTIVE',
maxResults=1
)
return response['taskDefinitionArns'][0]
# Function to create new scheduled tasks.
def create_new_scheduled_task(client, rulename, container_name, st_state, scheduledExpression):
response = client.put_rule(
Name=rulename,
ScheduleExpression=scheduledExpression,
State=st_state,
Description='Scheduled tasks created/updated by python script.',
RoleArn="arn:aws:iam::465464725359:role/service-role/Amazon_EventBridge_Invoke_ECS_677098387"
)
print('Http code: {}'.format(response['ResponseMetadata']['HTTPStatusCode']))
response = client.put_targets(
Rule=rule_name,
Targets=[
{
"Input":"""
{
"containerOverrides": [
{
"name": "%s",
"command": ["npm", "run", "%s"]
}
]
}
""" % (container_name,rule_name),
'Id': rule_name,
'Arn': ECSClusterArn,
'RoleArn': "arn:aws:iam::465464725359:role/service-role/Amazon_EventBridge_Invoke_ECS_677098387",
'EcsParameters': {
'TaskDefinitionArn': task_def_arn,
'TaskCount': 1,
'LaunchType': 'FARGATE',
'PlatformVersion': 'LATEST',
'NetworkConfiguration': {
'awsvpcConfiguration': {
'Subnets': [
subnet1,
subnet2
],
'SecurityGroups': [
SecurityGroup,
],
'AssignPublicIp': 'DISABLED'
}
},
},
},
]
)
print('Http code: {}'.format(response['ResponseMetadata']['HTTPStatusCode']))
# Function to get cron schedule defined on first line of file.
def get_cron_schedule(file):
# Function to parse file and read first line.
infile = open(file, 'r')
firstLine = infile.readline()
cronschedule = ''.join(firstLine.split('//', 1)).strip()
return cronschedule
# If Scheduled tasks already exists then this fuction will get the current state of Scheduled tasks. whether its in ENABLED or DISABLED state.
def get_sts_state(rule_name):
client = boto3.client('events')
try:
response = client.list_rules(
NamePrefix=rule_name,
Limit=2
)
st_state = response['Rules'][0]['State']
return st_state
except IndexError:
st_state = "ENABLED"
return st_state
# Specify the name of Task definition.
taskdefname = sys.argv[1]
subnet1 = sys.argv[2]
subnet2 = sys.argv[3]
SecurityGroup = sys.argv[4]
ECSClusterArn = sys.argv[5]
TaskRoleArn = sys.argv[6]
container_name = sys.argv[7]
task_def_arn = list_image(taskdefname)
# Get list of all files with .ts extension and create scheduled tasks for those.
tsfile = glob.glob("*.ts")
for f in tsfile:
# Check if first line contains any cron expression.
infile = open(f, 'r')
firstLine = infile.readline()
if "cron" in firstLine:
rule_name = f.rsplit( ".", 1 )[ 0 ]
cron_schedule = get_cron_schedule(f)
st_state = get_sts_state(rule_name)
if st_state == "ENABLED":
# Create new client to work with events.
client = boto3.client('events')
create_new_scheduled_task(client, rule_name, container_name, st_state, cron_schedule)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment