Skip to content

Instantly share code, notes, and snippets.

@duglin
Last active May 1, 2021 03:18
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save duglin/83ca9d3e16a99b35d93f3aaab78da08d to your computer and use it in GitHub Desktop.
Save duglin/83ca9d3e16a99b35d93f3aaab78da08d to your computer and use it in GitHub Desktop.
package main
/*
fakepod:
This program will monitor all of the containers running on a node and for each
one will check to see if its a Kubernetes managed one. If so, then it skips it.
If it's not managed by Kubernetes then we want to calculate the total amount
of resources used by all of these containers and create a pod which reserves
that much CPU/Memory. The pod itself just sleeps, so it technically isn't
using much resources at all, but by creating the pod we're now telling the
Kubernetes schedulers that there's less resources available on this node and
it should take this into account when scheduling jobs.
The fake pod is created by defining it in:
/etc/kubernetes/manifests/fakepod.yaml
which shouild automatically get picked up by the kubelet. So this means
that this tool must be runngin on the same host as the kubelet.
*/
import (
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"strings"
"sync"
"time"
)
var minCPU = int64(0)
var minMemory = int64(0)
var containers = map[string]*Container{} // ID -> *Container
var delay = 5
var refresh = 10 * 60 // 10 minutes
var dockerSock = "/var/run/docker.sock"
var image = "gcr.io/google_containers/pause-amd64:3.0"
var excludePrefixes = []string{
"/k8s_",
"/kubelet",
}
var verbose = 0
// Structs for the JSON returns from Docker
type ContainerPS struct {
ID string
Names []string
State string
Status string
}
type ContainerInspect struct {
ID string
Name string
State struct {
Status string
}
HostConfig struct {
CpuShares int64
CpuQuota int64
Memory int64
MemoryReservation int64
}
}
// Container is the info we store about each non-Kube container
type Container struct {
ID string
CPU int64
Memory int64
}
func log(v int, format string, args ...interface{}) {
if verbose < v {
return
}
if v == 0 {
fmt.Fprintf(os.Stdout, format, args...)
} else {
fmt.Fprintf(os.Stderr, format, args...)
}
}
var updateLock = sync.Mutex{}
func updateContainers(newContainers []*Container, delContainers []string, purge bool) {
updateLock.Lock()
defer updateLock.Unlock()
copyContainers := map[string]*Container{}
oldCPU := int64(0)
oldMemory := int64(0)
for id, c := range containers {
copyContainers[id] = c
oldCPU += c.CPU
oldMemory += c.Memory
}
newCPU := oldCPU
newMemory := oldMemory
for _, newC := range newContainers {
oldC, ok := containers[newC.ID]
if ok {
if newC.CPU != oldC.CPU || newC.Memory != oldC.Memory {
oldC.CPU = newC.CPU
oldC.Memory = newC.Memory
newCPU += newC.CPU - oldC.CPU
newMemory += newC.Memory - oldC.Memory
}
delete(copyContainers, newC.ID)
} else {
containers[newC.ID] = newC
newCPU += newC.CPU
newMemory += newC.Memory
}
}
for _, id := range delContainers {
if c, ok := containers[id]; ok {
newCPU -= c.CPU
newMemory -= c.Memory
}
delete(containers, id)
delete(copyContainers, id)
}
if purge && len(copyContainers) != 0 {
for _, oldC := range copyContainers {
log(2, "Purging container: %.12s\n", oldC.ID)
delete(containers, oldC.ID)
newCPU -= oldC.CPU
newCPU -= oldC.Memory
}
}
if oldCPU != newCPU || oldMemory != newMemory {
createFakePod()
}
}
func openURL(url string) (*http.Response, error) {
client := &http.Client{Transport: &http.Transport{
Dial: func(proto, addr string) (conn net.Conn, err error) {
return net.Dial("unix", dockerSock)
},
}}
resp, err := client.Get("http://d" + url)
if err != nil || resp == nil || resp.StatusCode != 200 {
code := 0
if resp != nil {
code = resp.StatusCode
}
return nil, fmt.Errorf("Err(%d): %s", code, err)
}
return resp, nil
}
// Given a URL, GET it and store the resulting JSON int he passed in 'obj'
func getData(obj interface{}, url string) error {
resp, err := openURL(url)
if err != nil {
return err
}
defer resp.Body.Close()
dec := json.NewDecoder(resp.Body)
if err := dec.Decode(obj); err != nil {
return fmt.Errorf("Err: %#v", err)
}
return nil
}
// Does the list of container names match any of our 'excludePrefixes'
func isExcludeName(cNames []string) bool {
for _, pre := range excludePrefixes {
for _, name := range cNames {
if strings.HasPrefix(name, pre) {
return true
}
}
}
return false
}
// Do a 'docker inspect' on the given container ID
func getContainerStats(id string) (string, int64, int64, error) {
cont := ContainerInspect{}
if err := getData(&cont, "/containers/"+id+"/json"); err != nil {
log(2, "Error in docker inspect %.12s: %v\n", id, err)
return "", 0, 0, err
}
// Force some minimum values for now
if cont.HostConfig.CpuShares < minCPU {
cont.HostConfig.CpuShares = minCPU
}
if cont.HostConfig.Memory < minMemory {
cont.HostConfig.Memory = minMemory
}
log(2, "Stats Container ID:%.12s CPU:%d Mem:%d\n", id,
cont.HostConfig.CpuShares, cont.HostConfig.Memory)
return cont.Name, cont.HostConfig.CpuShares, cont.HostConfig.Memory, nil
}
// Populate our list of container by doing a 'docker ps' and 'docker inspect'
// on each one
func fullListWatcher() error {
log(1, "Container refresher started\n")
for {
// If Docker is down then just wait until it comes up
// Note: this is reset by the event watcher thread
for dockerDown == true {
<-dockerRestarted
}
// Get the list of containers from our Docker engine
log(1, "Getting full list of containers\n")
cList := []ContainerPS{}
if err := getData(&cList, "/containers/json"); err != nil {
if !dockerDown {
dockerDown = true
log(0, "Error in 'docker ps', is Docker down? Err: %s\n", err)
}
continue
}
// For each container do: docker inspect <id>
newContainers := []*Container{}
for _, c := range cList {
// Skip non-running or "ExcludeName" containers
if c.State != "running" || isExcludeName(c.Names) {
continue
}
_, cpu, mem, err := getContainerStats(c.ID)
if err != nil {
continue // assume its a 404 and we can just continue
}
newContainers = append(newContainers, &Container{
ID: c.ID,
CPU: cpu,
Memory: mem,
})
}
// Update our list of containers, and purge any extras
updateContainers(newContainers, nil, true)
// Pause either until our refresh time-limit or we got an indication
// that Docker was restarted. Upon restart we don't want to wait
// until the refresh time times out
timer := time.NewTimer(time.Second * time.Duration(refresh))
select {
case <-timer.C:
case <-dockerRestarted:
}
timer.Stop() // Just to clean up
}
}
var dockerDown = false
var dockerRestarted = make(chan bool, 1)
func containerWatcher() {
log(1, "Container event stream watcher started\n")
for {
resp, err := openURL(`/events?filters={"type":["container"]}`)
if err != nil {
if !dockerDown {
dockerDown = true
log(0, "Error connecting to Docker, is it down? Err: %s\n", err)
}
time.Sleep(time.Duration(delay) * time.Second)
continue
}
if dockerDown {
log(0, "Docker is back up\n")
dockerDown = false
dockerRestarted <- true
}
log(1, "Connected to Docker's event stream\n")
dec := json.NewDecoder(resp.Body)
// Let the main loop know we're connected and watching
for {
event := struct {
Status string
ID string
Type string
Action string
Time int
}{}
if err := dec.Decode(&event); err != nil {
resp.Body.Close()
log(1, "Hit end of stream\n")
// Let the main loop know we stopped so it can restart
break
}
if event.Action == "start" {
log(2, "Got a 'start' event: %.12s\n", event.ID)
name, cpu, mem, err := getContainerStats(event.ID)
if err != nil {
// Assume its a 404 and try to remove it from our list
if _, ok := containers[event.ID]; !ok {
// Not in our list so skip it. We don't want to
// unnecessarily update the fake pod spec.
continue
}
log(2, "Removing container due to 404 on docker inspect\n")
updateContainers(nil, []string{event.ID}, false)
} else {
if !isExcludeName([]string{name}) {
log(2, "Adding container\n")
updateContainers([]*Container{&Container{
ID: event.ID,
CPU: cpu,
Memory: mem,
}}, nil, false)
} else {
log(2, "Skipping container due to name: %s\n", name)
}
}
} else if event.Action == "die" {
log(2, "Got a 'die' event: %.12s\n", event.ID)
updateContainers(nil, []string{event.ID}, false)
}
}
}
}
// Create, or delete, the fake pod definition file based on what we have
func createFakePod() {
podFile := "/etc/kubernetes/manifests/fakepod.yaml"
podFileContents := `apiVersion: v1
kind: Pod
metadata:
name: fake
spec:
containers:
- name: fake
image: %s
resources:
requests:
cpu: %dm
memory: %dMi
imagePullPolicy: IfNotPresent
`
totalMem := int64(0)
totalCPU := int64(0)
for _, cont := range containers {
totalMem += cont.Memory
totalCPU += cont.CPU
}
totalMem /= 1024 * 1024
if totalMem == 0 && totalCPU == 0 {
// No extra containers so just remove any fake pod we may have
os.Remove(podFile)
log(1, "Removed fake pod definition\n")
} else {
// We have at least one extra container, so create a fake pod
log(1, "FakePod: %03d Mem: %d CPU: %d\n",
len(containers), totalMem, totalCPU)
data := fmt.Sprintf(podFileContents, image, totalCPU, totalMem)
log(2, data)
err := ioutil.WriteFile(podFile, []byte(data), 0444)
if err != nil {
log(0, "Can't create pod file: %s\n", err)
}
}
}
func main() {
flag.Int64Var(&minCPU, "cpu", minCPU, "Minimum CPU for a container")
flag.Int64Var(&minMemory, "mem", minMemory, "Minimum Memory for a container")
flag.IntVar(&delay, "delay", delay, "Retry delay, on lost connection")
flag.IntVar(&refresh, "refresh", refresh, "Full container refesh rate, in seconds")
flag.StringVar(&dockerSock, "sock", dockerSock, "Path to docker's socket")
flag.StringVar(&image, "image", image, "Image name to use for fake pod")
flag.IntVar(&verbose, "v", verbose, "Verbose/debugging level")
flag.Parse()
log(0, "Fakepod watcher is running...\n")
// Clear old data
createFakePod()
// Start our two watcher threads
go containerWatcher() // Watching events
fullListWatcher() // Periodically get the full list of containers
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment