Skip to content

Instantly share code, notes, and snippets.

@aprice
Created August 11, 2017 15:07
Show Gist options
  • Save aprice/0f8472524bc4df9dff18dbf0925adbe5 to your computer and use it in GitHub Desktop.
Save aprice/0f8472524bc4df9dff18dbf0925adbe5 to your computer and use it in GitHub Desktop.
busboy cleans up after Chef nodes terminated in EC2 by listening for CloudWatch termination events on an SQS queue.
package main
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/go-chef/chef"
)
// Config for the service
type Config struct {
// ChefURL is the base URL to the Chef server, as in knife.rb, *MUST* end in a slash!
ChefURL string
// ChefKeyPath is the path to the Chef key on local disk
ChefKeyPath string
// ChefName is the user name associated with key
ChefName string
// QueueRegion is the AWS region where the SQS queue lives
QueueRegion string
// QueueURL is the URL to the SQS queue to read termination events from
QueueURL string
}
/* CloudWatch event pattern:
{
"source": [
"aws.ec2"
],
"detail-type": [
"EC2 Instance State-change Notification"
],
"detail": {
"state": [
"terminated"
]
}
}
Target = dedicated SQS queue
*/
func main() {
configFile, err := os.Open("config.json")
if err != nil {
log.Fatal("Failed to open config.json: ", err)
}
var config Config
err = json.NewDecoder(configFile).Decode(&config)
if err != nil {
log.Fatal("Failed to parse config.json: ", err)
}
// Init SQS client
sess, err := session.NewSessionWithOptions(session.Options{
Config: aws.Config{
Region: aws.String(config.QueueRegion),
},
})
if err != nil {
log.Fatal("Failed to get AWS session: ", err)
}
svc := sqs.New(sess)
// Init Chef client
key, err := ioutil.ReadFile(config.ChefKeyPath)
if err != nil {
log.Fatal("Couldn't read Chef key: ", err)
}
client, err := chef.NewClient(&chef.Config{
Name: config.ChefName,
Key: string(key),
BaseURL: config.ChefURL,
})
if err != nil {
log.Fatal("Issue setting up client: ", err)
}
// Poll for messages
for {
result, err := svc.ReceiveMessage(&sqs.ReceiveMessageInput{
AttributeNames: []*string{
aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
},
MessageAttributeNames: []*string{
aws.String(sqs.QueueAttributeNameAll),
},
QueueUrl: aws.String(config.QueueURL),
MaxNumberOfMessages: aws.Int64(1),
VisibilityTimeout: aws.Int64(0),
WaitTimeSeconds: aws.Int64(20),
})
if err != nil {
log.Fatal("Error retrieving message: ", err)
}
if len(result.Messages) == 0 {
time.Sleep(1 * time.Second)
continue
}
var notif ec2InstanceStateChangeNotification
err = json.Unmarshal([]byte(*result.Messages[0].Body), &notif)
if err != nil {
log.Fatal("Failed to decode message: ", err, *result.Messages[0].Body)
}
qstr := "ec2_instance_id:" + notif.Detail.InstanceID
log.Print("Query: ", qstr)
nodeQuery, err := client.Search.NewQuery("node", qstr)
if err != nil {
log.Fatal("Failed to build query: ", err)
}
nodeQuery.SortBy = "name asc"
// Can't just use search.Exec/Do/whatever because it doesn't actually
// decode the repsonse into a type, you just get a map[string]interface{}
// back which is really stupid.
fullPath := fmt.Sprintf("search/%s", nodeQuery)
searchPayload := struct {
Total int
Start int
Rows []chef.Node
}{}
req, err := client.NewRequest("GET", fullPath, nil)
if err != nil {
log.Fatal("Failed to build request: ", err)
}
// Don't know why this returns the response when it's already decoded
// the payload, nor why it doesn't empty & close the body itself
res, err := client.Do(req, &searchPayload)
if res != nil {
defer func() {
io.Copy(ioutil.Discard, res.Body)
res.Body.Close()
}()
}
if err != nil {
log.Fatal("Failed to query nodes: ", err)
} else if searchPayload.Total == 0 || len(searchPayload.Rows) == 0 {
log.Print("Instance not found: ", notif.Detail.InstanceID)
} else {
node := searchPayload.Rows[0]
err = client.Nodes.Delete(node.Name)
if err != nil {
log.Fatal("Failed to delete node: ", err)
}
log.Print("Chef node deleted: ", node.Name)
err = client.Clients.Delete(node.Name)
if err != nil {
log.Fatal("Failed to delete client: ", err)
}
log.Print("Chef client deleted: ", node.Name)
}
_, err = svc.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: aws.String(config.QueueURL),
ReceiptHandle: result.Messages[0].ReceiptHandle,
})
if err != nil {
log.Fatal("Delete Error:", err)
}
}
}
type ec2InstanceStateChangeNotification struct {
Region string
Resources []string
Detail struct {
InstanceID string `json:"instance-id"`
State string
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment