Created
April 6, 2018 20:11
-
-
Save jdahlin/4e31abf5db8ef180030ca9a0517d59f0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
from typing import Optional, Generator | |
import enum | |
class TaskDefinitionStatus(enum.Enum): | |
ACTIVE = 'ACTIVE' | |
class TaskDefinitionSort(enum.Enum): | |
DESC = 'DESC' | |
ASC = 'ASC' | |
class Arn(object): | |
def __init__(self, arn): | |
# arn:partition:service:region:account-id:resourcetype/resource | |
self.arn = arn | |
p = arn.split(':') | |
self.partition = p[1] | |
self.service = p[2] | |
self.region = p[3] | |
self.account = p[4] | |
self.resourcetype = p[5] | |
self.resource = None | |
if len(p) == 7: | |
self.resourcetype, self.resource = p[5:] | |
elif '/' in p[5]: | |
self.resourcetype, self.resource = p[5].split('/') | |
else: | |
self.resource = p[5] | |
class TaskDefinitionArn(Arn): | |
def __init__(self, arn): | |
super(TaskDefinitionArn, self).__init__(arn) | |
assert self.service == 'ecs' | |
ecstype, self.family = arn.resourcetype.split('/', 1) | |
assert ecstype == 'task-definition' | |
class ECS(object): | |
def __init__(self, region_name=None): | |
# type: (str) -> None | |
self.client = boto3.client('ecs', region_name=region_name) | |
def clusters(self): | |
response = self.client.list_clusters() | |
for clusterArn in response['clusterArns']: | |
arn = Arn(clusterArn) | |
assert arn.service == 'ecs', arn.service | |
assert arn.resourcetype == 'cluster', arn.resourcetype | |
yield ECSCluster(self, arn.resource) | |
def get_cluster(self, cluster_name): | |
return ECSCluster(self, cluster_name) | |
def task_definitions( | |
self, | |
family_prefix=None, # type: Optional[str], | |
status=None, # type: Optional[TaskDefinitionStatus] | |
sort=None, # type: Optional[TaskDefinitionSort] | |
max_results=None, # type: Optional[int] | |
): | |
# type: (...) -> Generator[ECSTaskDefinition] | |
kwargs = {} | |
if family_prefix is not None: | |
kwargs['familyPrefix'] = family_prefix | |
if status is not None: | |
kwargs['status'] = status | |
if sort is not None: | |
kwargs['sort'] = sort | |
if max_results is not None: | |
kwargs['maxResults'] = max_results | |
response = self.client.list_task_definitions(**kwargs) | |
for td_arn in response['taskDefinitionArns']: | |
response = self.client.describe_task_definition(taskDefinition=td_arn) | |
yield ECSTaskDefinition.from_response(self, response['taskDefinition']) | |
def get_task_definition(self, query): | |
# type: (str) -> Optional[ECSTaskDefinition] | |
# The family for the latest ACTIVE revision, | |
# family and revision (family:revision) for a specific revision in the family, or | |
# full Amazon Resource Name (ARN) of the task definition to describe. | |
response = self.client.describe_task_definition(taskDefinitionArn=query) | |
if response: | |
return ECSTaskDefinition.from_response( | |
ecs=self, | |
response=response) | |
def services(self, cluster_name='default'): | |
# type: (str) -> Generator[ECSService] | |
nextToken = '' | |
while True: | |
response = self.client.list_services(cluster=cluster_name, nextToken=nextToken) | |
nextToken = response.get('nextToken', '') | |
serviceArns = response['serviceArns'] | |
response = self.client.describe_services(cluster=cluster_name, services=serviceArns) | |
for serviceResponse in response['services']: | |
yield ECSService.from_response(self, serviceResponse) | |
if nextToken == '': | |
break | |
class ECSCluster(object): | |
def __init__(self, ecs, name): | |
# type: (ECS, str) -> None | |
self.ecs = ecs | |
self.name = name | |
def __repr__(self): | |
return '<{}: {}>'.format(type(self).__name__, self.name) | |
def services(self): | |
return self.ecs.services(cluster_name=self.name) | |
@classmethod | |
def from_response(cls, cluster, response): | |
self = cls(cluster) | |
return self | |
class ECSService(object): | |
def __init__(self, cluster): | |
# type: (ECSCluster, str) -> None | |
self.cluster = cluster | |
self.arn = '' # type: str | |
self.name = '' # type: str | |
self.status = '' # type: str | |
self.desiredCount = '' # type: str | |
self.pendingCount = '' # type: str | |
self.runningCount = '' # type: str | |
self.name = '' # type: str | |
self._taskDefinition = '' # type: str | |
def __repr__(self): | |
return '<{}: {}>'.format(type(self).__name__, self.name) | |
@classmethod | |
def from_response(cls, cluster, response): | |
self = cls(cluster) | |
self.arn = response['serviceArn'] | |
self.desiredCount = response['desiredCount'] | |
self.events = response['events'] | |
self.launchType = response['launchType'] | |
self.name = response['serviceName'] | |
self.pendingCount = response['pendingCount'] | |
self.placementConstraints = response['placementConstraints'] | |
self.placementStrategy = response['placementStrategy'] | |
self.runningCount = response['runningCount'] | |
self.status = response['status'] | |
self._taskDefinition = response['taskDefinition'] | |
return self | |
@property | |
def taskDefinition(self): | |
return ECSTaskDefinition.from_arn( | |
ecs=self.cluster.ecs, | |
arn_string=self._taskDefinition) | |
class ECSTaskDefinition(object): | |
def __init__(self, ecs, family, revision): | |
# type: (ECS, str, int) -> None | |
self.ecs = ecs | |
self.family = family | |
self.revision = revision | |
self.containerDefinitions = [] | |
def __repr__(self): | |
return '<{}: {}:{}>'.format(type(self).__name__, self.family, self.revision) | |
@classmethod | |
def from_arn(cls, ecs, arn_string): | |
# type: (ECS, str) -> ECSTaskDefinition | |
arn = ARN(arn_string) | |
assert arn.service == 'ecs' | |
ecstype, family = arn.resourcetype.split('/', 1) | |
assert ecstype == 'task-definition' | |
return cls(ecs, family, int(arn.resource)) | |
@classmethod | |
def from_family(cls, ecs, family, revision=None): | |
# type: (ECS, str, Optional[int]) -> Optional[ECSTaskDefinition] | |
if revision is not None: | |
return cls(ecs, family, revision, revision) | |
else: | |
g = ecs.task_definitions(family_prefix=family, | |
status=TaskDefinitionStatus.ACTIVE, | |
sort=TaskDefinitionSort.DESC, | |
max_results=1) | |
return g.next() | |
@classmethod | |
def from_response(cls, ecs, response): | |
import pprint | |
pprint.pprint(response) | |
self = cls(ecs, family=response['family'], revision=response['revision']) | |
self.containerDefinition = response['containerDefinitions'] | |
self.status = response['status'] | |
self.placementConstraints = response['placementConstraints'] | |
self.volumes = response['volumes'] | |
return self | |
if __name__ == '__main__': | |
ecs = ECS(region_name='us-east-1') | |
print ecs.task_definitions().next() | |
# print 'Clusters and their services' | |
# for cluster in ecs.clusters(): | |
# print cluster | |
# for service in cluster.services(): | |
# print '*', service | |
# | |
# print 'Task definitions' | |
# for td in ecs.task_definitions(): | |
# print '*', td | |
# |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment