Skip to content

Instantly share code, notes, and snippets.

@jbylund
Created May 7, 2016 00:24
Show Gist options
  • Save jbylund/446a7efdf13b3ea2843ee6e218127d8a to your computer and use it in GitHub Desktop.
Save jbylund/446a7efdf13b3ea2843ee6e218127d8a to your computer and use it in GitHub Desktop.
#!/usr/bin/python
"""Get the set of instance id's (exclude spot instances)!"""
import boto3
import datetime
import json
import redis
import sys
import base64
VERSION = 0.2
import base64
import struct
def encode(n):
n = int(n[2:], 16)
data = struct.pack('<Q', n).rstrip('\x00')
if len(data) == 0:
data = '\x00'
s = base64.urlsafe_b64encode(data).rstrip('=')
return "{:>6}".format(s) # right just with spaces?
def decode(s):
data = base64.urlsafe_b64decode(s + '==')
n = struct.unpack('<Q', data + '\x00'* (8-len(data)) )
return "i-{:>08}".format(hex(n[0])[2:])
def cache(innerfunc):
redis_conn = redis.StrictRedis()
key = (innerfunc.__name__, VERSION)
def wrapped():
cached_val = redis_conn.get(key)
if cached_val:
val = json.loads(cached_val)
else:
val = innerfunc()
redis_conn.setex(key, 60*30, json.dumps(val))
return val
return wrapped
def datetime_to_timestamp(idatetime):
if isinstance(idatetime, datetime.datetime):
epoch = datetime.datetime.utcfromtimestamp(0)
return (idatetime.replace(tzinfo=None) - epoch).total_seconds()
else:
return idatetime
def recursive_date_to_epoch(obj):
"""find any date time objects and transform them into time since epoch"""
if isinstance(obj, dict):
for key, val in obj.iteritems():
obj[key] = datetime_to_timestamp(val)
recursive_date_to_epoch(val)
elif isinstance(obj, list):
for i, j in enumerate(obj):
obj[i] = datetime_to_timestamp(j)
recursive_date_to_epoch(j)
@cache
def get_running_instances():
conn = boto3.client('ec2', 'us-east-1')
response = conn.describe_instances(
Filters=[
{
'Name': 'instance-state-name',
'Values': [
'running',
]
},
],
)['Reservations']
recursive_date_to_epoch(response)
all_instances = list()
for i in response:
all_instances.extend(i['Instances'])
non_spot_instances = [x for x in all_instances if x.get("InstanceLifecycle") != "spot"]
return non_spot_instances
def tag_reservation(obj_id, tag_key, tag_val):
conn = boto3.client('ec2', 'us-east-1')
response = conn.create_tags(
Resources=[
obj_id,
],
Tags=[
{
'Key': tag_key,
'Value': tag_val
},
]
)
#@cache
def get_reservations():
conn = boto3.client('ec2', 'us-east-1')
response = conn.describe_reserved_instances(
Filters=[
{
'Name': 'state',
'Values': [
'active'
]
}
]
)['ReservedInstances']
recursive_date_to_epoch(response)
return response
def partition(iterable, func):
ans = dict()
for i in iterable:
key = func(i)
ans.setdefault(key, list())
ans[key].append(i)
return ans
def unpack(packed_string):
ans = list()
try:
for startpos in xrange(0, len(packed_string), 6):
packed_id = packed_string[startpos: startpos+6]
ans.append(decode(packed_id))
return sorted(ans)
except Exception as oops:
return list()
class ReservationManager(object):
instance_group_function = lambda x: (x['InstanceType'], x['Placement']['AvailabilityZone'])
res_group_function = lambda x: (x['InstanceType'], x['AvailabilityZone'])
def __init__(self):
self.running_instances = get_running_instances()
self.partitioned_instances = partition(self.running_instances, self.instance_group_function)
self.clean_instances()
self.reservations = get_reservations()
self.partitioned_reservations = partition(self.reservations, self.res_group_function)
self.clean_reservations()
self.instance_to_reservation = dict()
# filter out reserved instances? and remove used reservations
self.reservation_to_instances = dict()
for res in self.reservations:
res_id = res['ReservedInstancesId']
self.reservation_to_instances[res_id] = set()
self.iid_to_instance = {instance['InstanceId']: instance for instance in self.running_instances}
self.sync_with_saved()
def sync_with_saved(self):
self.clean_instances()
for instance in self.running_instances:
res_id = instance.get('Tags', {}).get('reservation_id')
instance_id = instance['InstanceId']
try:
self.reservation_to_instances[res_id].add(instance_id)
except:
pass # you land here when the reservation has expired
for res in self.reservations:
res_id = res['ReservedInstancesId']
res.setdefault('Tags', dict())
tags = res.get('Tags', {})
current_instances = tags.get('instances', '')
unpacked = unpack(current_instances)
res['Tags']['instances'] = set(unpacked)
for iid in unpacked:
if iid in self.iid_to_instance: # i.e. it's a running instance
self.instance_to_reservation[iid] = res_id
def clean_instances(self):
for instance in self.running_instances:
for key in ["NetworkInterfaces", "BlockDeviceMappings", "SecurityGroups", "IamInstanceProfile", "Monitoring", "State"]:
instance.pop(key, None)
tag_list = instance.get('Tags')
if type(tag_list) == type(list()):
tag_dict = dict([(x['Key'], x['Value']) for x in tag_list])
instance['Tags'] = tag_dict
def clean_reservations(self):
for res in self.reservations:
tag_list = res.get('Tags')
if type(tag_list) == type(list()):
tag_dict = dict([(x['Key'], x['Value']) for x in tag_list])
res['Tags'] = tag_dict
def update_for_partition(self, partition):
(instance_type, az) = partition
instances = self.partitioned_instances[partition]
instances.sort(key=lambda x: x['LaunchTime']) # reserve oldest instance first
reservations = self.partitioned_reservations[partition]
reservations.sort(key=lambda x: x['End'], reverse=True) # sort from ending furthest in the future to soonest
first_unreserved_index = 0
total_slots = sum([x['InstanceCount'] for x in reservations])
for reservation in reservations:
res_id = reservation['ReservedInstancesId']
for slot in xrange(reservation['InstanceCount']):
try:
instance = instances[first_unreserved_index]
except IndexError:
return
instance_id = instance['InstanceId']
first_unreserved_index += 1
self.reservation_to_instances[res_id].add(instance_id)
def tag_reservations(self):
for res in self.reservations:
res_id = res['ReservedInstancesId']
old_instances = sorted(res['Tags']['instances'])
new_instances = sorted(self.reservation_to_instances[res_id])
if old_instances != new_instances:
new_tag = "".join([encode(instance) for instance in new_instances])
tag_reservation(res_id, 'instances', new_tag)
else:
pass
def adjust_single_reservation(self, res_id):
print >> sys.stderr, "Would update {}".format(res_id)
pass
def adjust_reservations(self):
for res in self.reservations:
res_id = res['ReservedInstancesId']
slots = res['InstanceCount']
instance_type = res['InstanceType']
used = len(self.reservation_to_instances[res_id])
if slots != used:
print "{} of {} {} reserved for {}".format(used, instance_type, slots, res_id)
self.adjust_single_reservation(res_id)
# figure out how to use the excess here...
def update_reserved_instances(self):
for partition in sorted(set(self.partitioned_instances) & set(self.partitioned_reservations)):
self.update_for_partition(partition)
for key, val in self.reservation_to_instances.iteritems():
self.reservation_to_instances[key] = sorted(val)
# see if there are reservations which are not used up...
self.adjust_reservations()
# update reservation configurations... (ONLY FOR THOSE WHICH CHANGED)
# tag both ways...
self.tag_reservations()
def main():
reservation_manager = ReservationManager()
reservation_manager.update_reserved_instances()
if "__main__" == __name__:
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment