Last active
May 1, 2021 03:18
-
-
Save duglin/83ca9d3e16a99b35d93f3aaab78da08d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
/* | |
fakepod: | |
This program will monitor all of the containers running on a node and for each | |
one will check to see if its a Kubernetes managed one. If so, then it skips it. | |
If it's not managed by Kubernetes then we want to calculate the total amount | |
of resources used by all of these containers and create a pod which reserves | |
that much CPU/Memory. The pod itself just sleeps, so it technically isn't | |
using much resources at all, but by creating the pod we're now telling the | |
Kubernetes schedulers that there's less resources available on this node and | |
it should take this into account when scheduling jobs. | |
The fake pod is created by defining it in: | |
/etc/kubernetes/manifests/fakepod.yaml | |
which shouild automatically get picked up by the kubelet. So this means | |
that this tool must be runngin on the same host as the kubelet. | |
*/ | |
import ( | |
"encoding/json" | |
"flag" | |
"fmt" | |
"io/ioutil" | |
"net" | |
"net/http" | |
"os" | |
"strings" | |
"sync" | |
"time" | |
) | |
var minCPU = int64(0) | |
var minMemory = int64(0) | |
var containers = map[string]*Container{} // ID -> *Container | |
var delay = 5 | |
var refresh = 10 * 60 // 10 minutes | |
var dockerSock = "/var/run/docker.sock" | |
var image = "gcr.io/google_containers/pause-amd64:3.0" | |
var excludePrefixes = []string{ | |
"/k8s_", | |
"/kubelet", | |
} | |
var verbose = 0 | |
// Structs for the JSON returns from Docker | |
type ContainerPS struct { | |
ID string | |
Names []string | |
State string | |
Status string | |
} | |
type ContainerInspect struct { | |
ID string | |
Name string | |
State struct { | |
Status string | |
} | |
HostConfig struct { | |
CpuShares int64 | |
CpuQuota int64 | |
Memory int64 | |
MemoryReservation int64 | |
} | |
} | |
// Container is the info we store about each non-Kube container | |
type Container struct { | |
ID string | |
CPU int64 | |
Memory int64 | |
} | |
func log(v int, format string, args ...interface{}) { | |
if verbose < v { | |
return | |
} | |
if v == 0 { | |
fmt.Fprintf(os.Stdout, format, args...) | |
} else { | |
fmt.Fprintf(os.Stderr, format, args...) | |
} | |
} | |
var updateLock = sync.Mutex{} | |
func updateContainers(newContainers []*Container, delContainers []string, purge bool) { | |
updateLock.Lock() | |
defer updateLock.Unlock() | |
copyContainers := map[string]*Container{} | |
oldCPU := int64(0) | |
oldMemory := int64(0) | |
for id, c := range containers { | |
copyContainers[id] = c | |
oldCPU += c.CPU | |
oldMemory += c.Memory | |
} | |
newCPU := oldCPU | |
newMemory := oldMemory | |
for _, newC := range newContainers { | |
oldC, ok := containers[newC.ID] | |
if ok { | |
if newC.CPU != oldC.CPU || newC.Memory != oldC.Memory { | |
oldC.CPU = newC.CPU | |
oldC.Memory = newC.Memory | |
newCPU += newC.CPU - oldC.CPU | |
newMemory += newC.Memory - oldC.Memory | |
} | |
delete(copyContainers, newC.ID) | |
} else { | |
containers[newC.ID] = newC | |
newCPU += newC.CPU | |
newMemory += newC.Memory | |
} | |
} | |
for _, id := range delContainers { | |
if c, ok := containers[id]; ok { | |
newCPU -= c.CPU | |
newMemory -= c.Memory | |
} | |
delete(containers, id) | |
delete(copyContainers, id) | |
} | |
if purge && len(copyContainers) != 0 { | |
for _, oldC := range copyContainers { | |
log(2, "Purging container: %.12s\n", oldC.ID) | |
delete(containers, oldC.ID) | |
newCPU -= oldC.CPU | |
newCPU -= oldC.Memory | |
} | |
} | |
if oldCPU != newCPU || oldMemory != newMemory { | |
createFakePod() | |
} | |
} | |
func openURL(url string) (*http.Response, error) { | |
client := &http.Client{Transport: &http.Transport{ | |
Dial: func(proto, addr string) (conn net.Conn, err error) { | |
return net.Dial("unix", dockerSock) | |
}, | |
}} | |
resp, err := client.Get("http://d" + url) | |
if err != nil || resp == nil || resp.StatusCode != 200 { | |
code := 0 | |
if resp != nil { | |
code = resp.StatusCode | |
} | |
return nil, fmt.Errorf("Err(%d): %s", code, err) | |
} | |
return resp, nil | |
} | |
// Given a URL, GET it and store the resulting JSON int he passed in 'obj' | |
func getData(obj interface{}, url string) error { | |
resp, err := openURL(url) | |
if err != nil { | |
return err | |
} | |
defer resp.Body.Close() | |
dec := json.NewDecoder(resp.Body) | |
if err := dec.Decode(obj); err != nil { | |
return fmt.Errorf("Err: %#v", err) | |
} | |
return nil | |
} | |
// Does the list of container names match any of our 'excludePrefixes' | |
func isExcludeName(cNames []string) bool { | |
for _, pre := range excludePrefixes { | |
for _, name := range cNames { | |
if strings.HasPrefix(name, pre) { | |
return true | |
} | |
} | |
} | |
return false | |
} | |
// Do a 'docker inspect' on the given container ID | |
func getContainerStats(id string) (string, int64, int64, error) { | |
cont := ContainerInspect{} | |
if err := getData(&cont, "/containers/"+id+"/json"); err != nil { | |
log(2, "Error in docker inspect %.12s: %v\n", id, err) | |
return "", 0, 0, err | |
} | |
// Force some minimum values for now | |
if cont.HostConfig.CpuShares < minCPU { | |
cont.HostConfig.CpuShares = minCPU | |
} | |
if cont.HostConfig.Memory < minMemory { | |
cont.HostConfig.Memory = minMemory | |
} | |
log(2, "Stats Container ID:%.12s CPU:%d Mem:%d\n", id, | |
cont.HostConfig.CpuShares, cont.HostConfig.Memory) | |
return cont.Name, cont.HostConfig.CpuShares, cont.HostConfig.Memory, nil | |
} | |
// Populate our list of container by doing a 'docker ps' and 'docker inspect' | |
// on each one | |
func fullListWatcher() error { | |
log(1, "Container refresher started\n") | |
for { | |
// If Docker is down then just wait until it comes up | |
// Note: this is reset by the event watcher thread | |
for dockerDown == true { | |
<-dockerRestarted | |
} | |
// Get the list of containers from our Docker engine | |
log(1, "Getting full list of containers\n") | |
cList := []ContainerPS{} | |
if err := getData(&cList, "/containers/json"); err != nil { | |
if !dockerDown { | |
dockerDown = true | |
log(0, "Error in 'docker ps', is Docker down? Err: %s\n", err) | |
} | |
continue | |
} | |
// For each container do: docker inspect <id> | |
newContainers := []*Container{} | |
for _, c := range cList { | |
// Skip non-running or "ExcludeName" containers | |
if c.State != "running" || isExcludeName(c.Names) { | |
continue | |
} | |
_, cpu, mem, err := getContainerStats(c.ID) | |
if err != nil { | |
continue // assume its a 404 and we can just continue | |
} | |
newContainers = append(newContainers, &Container{ | |
ID: c.ID, | |
CPU: cpu, | |
Memory: mem, | |
}) | |
} | |
// Update our list of containers, and purge any extras | |
updateContainers(newContainers, nil, true) | |
// Pause either until our refresh time-limit or we got an indication | |
// that Docker was restarted. Upon restart we don't want to wait | |
// until the refresh time times out | |
timer := time.NewTimer(time.Second * time.Duration(refresh)) | |
select { | |
case <-timer.C: | |
case <-dockerRestarted: | |
} | |
timer.Stop() // Just to clean up | |
} | |
} | |
var dockerDown = false | |
var dockerRestarted = make(chan bool, 1) | |
func containerWatcher() { | |
log(1, "Container event stream watcher started\n") | |
for { | |
resp, err := openURL(`/events?filters={"type":["container"]}`) | |
if err != nil { | |
if !dockerDown { | |
dockerDown = true | |
log(0, "Error connecting to Docker, is it down? Err: %s\n", err) | |
} | |
time.Sleep(time.Duration(delay) * time.Second) | |
continue | |
} | |
if dockerDown { | |
log(0, "Docker is back up\n") | |
dockerDown = false | |
dockerRestarted <- true | |
} | |
log(1, "Connected to Docker's event stream\n") | |
dec := json.NewDecoder(resp.Body) | |
// Let the main loop know we're connected and watching | |
for { | |
event := struct { | |
Status string | |
ID string | |
Type string | |
Action string | |
Time int | |
}{} | |
if err := dec.Decode(&event); err != nil { | |
resp.Body.Close() | |
log(1, "Hit end of stream\n") | |
// Let the main loop know we stopped so it can restart | |
break | |
} | |
if event.Action == "start" { | |
log(2, "Got a 'start' event: %.12s\n", event.ID) | |
name, cpu, mem, err := getContainerStats(event.ID) | |
if err != nil { | |
// Assume its a 404 and try to remove it from our list | |
if _, ok := containers[event.ID]; !ok { | |
// Not in our list so skip it. We don't want to | |
// unnecessarily update the fake pod spec. | |
continue | |
} | |
log(2, "Removing container due to 404 on docker inspect\n") | |
updateContainers(nil, []string{event.ID}, false) | |
} else { | |
if !isExcludeName([]string{name}) { | |
log(2, "Adding container\n") | |
updateContainers([]*Container{&Container{ | |
ID: event.ID, | |
CPU: cpu, | |
Memory: mem, | |
}}, nil, false) | |
} else { | |
log(2, "Skipping container due to name: %s\n", name) | |
} | |
} | |
} else if event.Action == "die" { | |
log(2, "Got a 'die' event: %.12s\n", event.ID) | |
updateContainers(nil, []string{event.ID}, false) | |
} | |
} | |
} | |
} | |
// Create, or delete, the fake pod definition file based on what we have | |
func createFakePod() { | |
podFile := "/etc/kubernetes/manifests/fakepod.yaml" | |
podFileContents := `apiVersion: v1 | |
kind: Pod | |
metadata: | |
name: fake | |
spec: | |
containers: | |
- name: fake | |
image: %s | |
resources: | |
requests: | |
cpu: %dm | |
memory: %dMi | |
imagePullPolicy: IfNotPresent | |
` | |
totalMem := int64(0) | |
totalCPU := int64(0) | |
for _, cont := range containers { | |
totalMem += cont.Memory | |
totalCPU += cont.CPU | |
} | |
totalMem /= 1024 * 1024 | |
if totalMem == 0 && totalCPU == 0 { | |
// No extra containers so just remove any fake pod we may have | |
os.Remove(podFile) | |
log(1, "Removed fake pod definition\n") | |
} else { | |
// We have at least one extra container, so create a fake pod | |
log(1, "FakePod: %03d Mem: %d CPU: %d\n", | |
len(containers), totalMem, totalCPU) | |
data := fmt.Sprintf(podFileContents, image, totalCPU, totalMem) | |
log(2, data) | |
err := ioutil.WriteFile(podFile, []byte(data), 0444) | |
if err != nil { | |
log(0, "Can't create pod file: %s\n", err) | |
} | |
} | |
} | |
func main() { | |
flag.Int64Var(&minCPU, "cpu", minCPU, "Minimum CPU for a container") | |
flag.Int64Var(&minMemory, "mem", minMemory, "Minimum Memory for a container") | |
flag.IntVar(&delay, "delay", delay, "Retry delay, on lost connection") | |
flag.IntVar(&refresh, "refresh", refresh, "Full container refesh rate, in seconds") | |
flag.StringVar(&dockerSock, "sock", dockerSock, "Path to docker's socket") | |
flag.StringVar(&image, "image", image, "Image name to use for fake pod") | |
flag.IntVar(&verbose, "v", verbose, "Verbose/debugging level") | |
flag.Parse() | |
log(0, "Fakepod watcher is running...\n") | |
// Clear old data | |
createFakePod() | |
// Start our two watcher threads | |
go containerWatcher() // Watching events | |
fullListWatcher() // Periodically get the full list of containers | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment