Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Spot Fleet, Docs and code samples

Spot Fleet / ECS Training Overview

Components

Spot & Spot Fleet

What is Spot

AWS Spot is a AWS service which provides EC2 capacity at significant discounts. Savings of upto 80-90% can be achieved (vs on Demand pricing) utilizing Spot.

Spot

Spot is a active market you specify the amount your willing to pay and the actual price you pay is whatever the current market price actually is. If the market price goes above your bid price your instance will be terminated. AWS provides a 2 minute warning before terminating your instance. It is therefore important to design your overall architecture around the ability for instances to terminate at any given time.

It should be noted that spot capacity (and therefore its price) varies per instance type and AZ in operates in its therefore important to understand the historic pricing of the instances your interested in, in the specific AZ your wanting to run it in!. A number of useful tools to find this data includes: Spot Bid Advisor, Spot Pricing History and spot history data on the command line.

Finally its important to understand that spot prices can (and do) go above onDemand pricing, the market is not in any way limited by standard onDemand prices for EC2 instances - hence if supply is short prices can spike significantly.

What is Spot Fleet

AWS Spot provides the ability to bid for 1 or more instances. Spot Fleet provides the ability to spin up and maintain a fleet of instances across a number of AZ's and instance types.

Spot Fleet provides a diversified option, allowing for a fleet of EC2 instances to be balanced across AZ's and instance family's. This greatly reduces the risk of a large portion of your EC2 capacity being terminated at the same time.

Spot Fleet can be autoscaled similiar to standard onDemand autoscale groups.

Why is Spot and Spot Fleet suited to container based workloads

Spot provides super cheap EC2 resources with the caveat that your workload must be able to easily be terminated and moved to a different EC2 instance (in a different AZ, or perhaps a different instance family). Therefore the workload has to be fairly ephemeral or at least not maintain state. It also ideally should not store anything local to the EC2 instance itself.

Containerization (this one, not the freight one) is a pretty good match for the above constraints. A workload in a container is isolated from the underlying EC2 instance, and hence is able easily to be moved around between instances. Containerized apps typically try to adhere to the twelve-factor principals. Of specific interest is the disposibility principal which mandates a apps processes should be capable of being started and stopped at a moments notice. This fits beautifully with spot and its 2 minute termination window!

Key Features of Spot Fleet

Distribution of Instances

Spot Fleet provides the mechanism to distribute capacity across a number of capacity pools

Autoscaling

Spot Fleet provides the mechanism to scale the fleet, by increasing or decreasing the TargetCapacity of the fleet based on any given cloudwatch metric/alarm.

Spot termination management

Spot Fleet reacts to terminated capacity by re-bidding for new capacity once a instance has been terminated by Spot.

However Spot Fleet does not get an advance warning of the EC2 termination. Therefore it is better to add in a termination hook on the instance itself, so that as soon as the instance is notified of ther termination event, a trigger (SNS topic) can call a Lambda function to automatically increase the desiredCapacity of the fleet.

ECS

What is ECS

The AWS EC2 Container Service is a FREE container management service. It is managed service where AWS takes care of task and service management and a customer managed fleet of EC2 instances operate as worker nodes - ready to run tasks and services as instructed by the ECS service.

ECS

Features

ECS is feature rich highlights include:

  1. Service load balancing with ALB with dynamic port allocation
  2. Service level autoscaling of containers
  3. IAM Roles for tasks and containers
  4. Container Instance Draining

Tasks and Services

ECS tasks encapsulate the docker image, parameters, ports, IAM roles, data volumes and resource allocations (CPU and memory).

A ECS Service defines the number of tasks required to be simulatiously run within the cluster to form the "service". It also defines the ALB to assosiate the service against (and its Target Group)

ALB integration

ECS automatically manages the registration of each task (within a service) to its assosiated ALB target group.

ECS has a ephemeral port range on each physical EC2 instance. A random port in this range is allocated to a running task and is automatically mapped to the static port within the container.

The port on the EC2 instance is then registered against the ALB target group. The ALB will distribute traffic to all registered targets.

Instance draining

ECS supports the ability for the underlying EC2 instances to be drained and removed from service.

Once a container instance has been set to DRAINING no new tasks will be started on the underlying instance and existing running tasks will be moved to new container instances running on alterative EC2 instances.

Instance draining is a superb way of dealing with Spot EC2 termination notifications. It is realively trivial to wire-up a Lambda function to automatically set the EC2 instance into a draining state as soon as the 2-minute warning is given. This allows ECS to proactively move running tasks to a new service before the EC2 instance is terminated.

Task Scheduling

ECS provides a standard task/service scheduler and a open sourced task scheduler which provides more control on how tasks will be scheduled against the underlying EC2 worker instances.

Different task strategies are provided that allow for varying distributions. In addition task placement constraints allow for specific criteria to be considered before a task is placed on a given EC2 container instance.

It is easily possible for the overall ECS constainer fleet to have a mix of instances of different types, cost strategies (onDemand and Spot) and capabilities. It is then possible to define task scheduling criteria to ensure things like:

  • High value tasks are only placed on onDemand Instances
  • Tasks requiring specific hardware (GPU's etc) are placed only on instances that have them
  • Tasks with minimal CPU requirements are placed on cheaper burstable EC2 instance types.

ECS provides a large amount of flexability in its task scheduling, normal operation does not require in-depth knowledge of this however it is important to understand what is possible if and when a specific need arises.

package main
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"time"
"github.com/heitorlessa/ec2spotnotify"
"flag"
"os"
"bytes"
)
const (
instanceDetailsURL = "http://169.254.169.254/latest/dynamic/instance-identity/document" // dumps ec2 metadata JSON
containerDetailsURL = "http://localhost:51678/v1/metadata"
)
var (
requestTimeout = 300 * time.Millisecond // how long before we give up on reaching ec2 metadata
)
func main() {
log.Println("[*] Loading configuration...")
var config ec2spotnotify.Config
config.LoadConfig()
if config.Err != nil {
log.Fatalf("[*] Cowardly quitting due to: %s", config.Err)
}
// flag used to print out contents of fetched metadata
boolPrintMetadata := flag.Bool("printmeta", false, "a bool")
// flag used to send termination for instance immediately.
boolTerminate := flag.Bool("terminate", false, "Use Flag to send termination SNS message immediately")
// parse flags
flag.Parse()
if *boolPrintMetadata {
message, err := getInstanceData()
if err != nil {
log.Fatalln(err)
}
fmt.Println("Fetched Metadata:")
fmt.Println(jsonPrettyPrint(string(message)))
os.Exit(0)
}
if *boolTerminate {
message, err := getInstanceData()
if err != nil {
log.Fatalln(err)
}
config.SNS.Message = string(message)
if err := ec2spotnotify.PublishSNS(config); err != nil {
log.Fatalln(err)
}
os.Exit(0)
}
notification, err := ec2spotnotify.GetNotificationTime()
if err != nil {
log.Fatalf("[!] Error found while trying to query Instance Metadata: %s ", err)
}
// as notification may take a while to be injected on EC2 Metadata - Read from channel provided with range
for timestamp := range notification {
log.Println("[*] Received notification -> ", timestamp)
message, err := getInstanceData()
if err != nil {
log.Fatalln(err)
}
log.Println("[*] Publishing termination notification to SNS")
config.SNS.Message = string(message)
if err := ec2spotnotify.PublishSNS(config); err != nil {
log.Fatalln(err)
}
}
}
func getInstanceData() (messageJson []byte, err error) {
instance, err := collectInstanceDetails(instanceDetailsURL)
if err != nil {
return nil, err
}
ecsagent, err := collectInstanceDetails(containerDetailsURL)
if err != nil {
return nil, err
}
message := make(map[string]string)
message["terminationNotificiation"] = "true"
message["instanceId"] = instance["instanceId"].(string)
message["instanceType"] = instance["instanceType"].(string)
message["ecsCluster"] = ecsagent["Cluster"].(string)
message["ecsContainer"] = ecsagent["ContainerInstanceArn"].(string)
messageJson, err = json.Marshal(message)
if err != nil {
return nil, err
}
return messageJson, nil
}
// Look up for instance metadata to gather more details about this EC2 instance
func collectInstanceDetails(url string) (data map[string]interface{}, err error) {
req := http.Client{Timeout: requestTimeout}
resp, errs := req.Get(url)
if resp != nil {
defer resp.Body.Close()
}
if errs != nil {
err = fmt.Errorf("[!] An error occurred while retrieving instance details: %v", errs)
return nil, err
}
instanceDump, errs := ioutil.ReadAll(resp.Body)
if errs != nil {
err = fmt.Errorf("[!] An error occurred while reading response: %v", errs)
return nil, err
}
var parsed map[string]interface{}
err = json.Unmarshal(instanceDump, &parsed)
if err != nil {
return nil, err
}
return parsed, err
}
func jsonPrettyPrint(in string) string {
var out bytes.Buffer
err := json.Indent(&out, []byte(in), "", "\t")
if err != nil {
return in
}
return out.String()
}
from __future__ import division
import boto3
import json
import logging
from datetime import timedelta
from datetime import datetime
logger = logging.getLogger()
logger.setLevel(logging.INFO)
asgPerServerVCPU = 4
asgGroupName = 'ecsSpotAutoScale'
##
## Note: It seems retrieving current targetCapacity from cloudwatch actually quite often fails.
## so this means we are consistantly spinning up 8 servers and consuming cost. For not setting to zero.
##
defaultTargetCapacity = 0 ## Note this is only used if we cant actually retrieve it
def checkFleetScalled(fleetID):
### Function checks if the fleet has been manually scalled in the last 15 minutes
### We do this to stop spinning up demand instances which spot can cover if we have just asked it to scale
cw = boto3.client('cloudwatch')
try:
data = cw.get_metric_statistics(
Namespace='AWS/EC2Spot',
MetricName='TargetCapacity',
Dimensions=[
{
'Name': 'FleetRequestId',
'Value': fleetID
},
],
StartTime=datetime.utcnow() - timedelta(seconds=600),
EndTime=datetime.utcnow(),
Period=60,
Statistics=['Maximum'],
Unit='Count')
except Exception, e:
## on error assume it has
logger.info(str(e))
return True
firstMax = None
for max in data['Datapoints']:
if firstMax is not None and firstMax != max['Maximum']:
return True
if firstMax is None:
firstMax = max['Maximum']
return False
def getECSMetrics(fleetID):
cw = boto3.client('cloudwatch')
metrics = {'PendingCapacity': None,'TargetCapacity': None }
try:
for metric in metrics:
data = cw.get_metric_statistics(
Namespace='AWS/EC2Spot',
MetricName=metric,
Dimensions=[
{
'Name': 'FleetRequestId',
'Value': fleetID
},
],
StartTime=datetime.utcnow() - timedelta(seconds=60),
EndTime=datetime.utcnow(),
Period=60,
Statistics=['Maximum'],
Unit='Count')
metrics[metric] = data['Datapoints'][0]['Maximum']
except Exception, e:
## If we failed to get metrics assume something is seriously wrong with the Fleet
logger.info(str(e))
metrics = {'PendingCapacity': defaultTargetCapacity,'TargetCapacity': defaultTargetCapacity }
return metrics
def setContainerInstanceState(cluster, instance, state):
ecs = boto3.client('ecs')
try:
response = ecs.update_container_instances_state(
cluster=cluster,
containerInstances=[instance,],
status=state
)
except Exception, e:
logger.info("Failed to update instance: " + instance + ", in cluster: " + cluster + ", error: " + str(e))
return
def setASGDesired(numInstances, cooldown):
asg = boto3.client('autoscaling')
try:
asg.set_desired_capacity(
AutoScalingGroupName=asgGroupName,
DesiredCapacity=numInstances,
HonorCooldown=cooldown
)
except Exception, e:
logger.info("Failed to update desired capacity: " + str(numInstances) + ", error: " + str(e))
return
def describeASG():
asg = boto3.client('autoscaling')
return asg.describe_auto_scaling_groups(
AutoScalingGroupNames=[
asgGroupName,
],
MaxRecords=1
)
def lambda_handler(event, context):
fleetID = None
capacityModifier = 0
logger.info(event)
### we have multiple events triggering function so determine where to get fleetID from
if 'fleetID' in event:
fleetID = event['fleetID']
else:
try:
alert = json.loads(event['Records'][0]['Sns']['Message'])
## check if this is a Termination notification
if 'terminationNotificiation' in alert:
logger.info("Found Termination")
setContainerInstanceState(alert["ecsCluster"], alert["ecsContainer"], "DRAINING")
## hack - need to add into SNS notification
fleetID = "sfr-44def1f0-9e0c-4582-9ee3-95095db3805d"
## set capacity modification based on instance size being terminated.
if '2xlarge' in alert["instanceType"]:
capacityModifier = 8
elif 'xlarge' in alert["instanceType"]:
capacityModifier = 4
else:
capacityModifier = 2
else:
try:
fleetID = alert['Trigger']['Dimensions'][0]['value']
except Exception, e:
logger.info("Failed to extract process event")
logger.info(event)
return
except Exception, e:
logger.info("Failed to extract process event")
logger.info(event)
return
### check if fleet has been recently scaled and if so ignore trigger
if checkFleetScalled(fleetID):
logger.info("yup ignoring")
return
### get current metrics and calculate number of ASG instsnces required (if any)
metrics = getECSMetrics(fleetID)
metrics['PendingCapacity'] += capacityModifier
instancesRequired = int((metrics['PendingCapacity'] / asgPerServerVCPU) + 0.999999) ## note we always round up!
logger.info(metrics['PendingCapacity'])
logger.info(instancesRequired)
### get current size of ASG to determine if we going up or down
asg = describeASG()
currentCapacity = asg['AutoScalingGroups'][0]['DesiredCapacity']
if instancesRequired > currentCapacity:
logger.info("Scaling UP to " + str(instancesRequired) + " instances")
setASGDesired(instancesRequired, False) ## override cooldown period - we need to scale up now!
elif instancesRequired < currentCapacity:
logger.info("Scaling DOWN to " + str(instancesRequired) + " instances")
setASGDesired(instancesRequired, True) ## honor cooldown, so we get max value from instances already paid for!
## Do nothing wanted and current capacity are same!
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.