package main
This program will monitor all of the containers running on a node and for each
one will check to see if its a Kubernetes managed one. If so, then it skips it.
If it's not managed by Kubernetes then we want to calculate the total amount
of resources used by all of these containers and create a pod which reserves
that much CPU/Memory. The pod itself just sleeps, so it technically isn't
using much resources at all, but by creating the pod we're now telling the
Kubernetes schedulers that there's less resources available on this node and
it should take this into account when scheduling jobs.
The fake pod is created by defining it in:
which shouild automatically get picked up by the kubelet. So this means
that this tool must be runngin on the same host as the kubelet.
import (
var minCPU = int64(0)
var minMemory = int64(0)
var containers = map[string]*Container{} // ID -> *Container
var delay = 5
var refresh = 10 * 60 // 10 minutes
var dockerSock = "/var/run/docker.sock"
var image = ""
var excludePrefixes = []string{
var verbose = 0
// Structs for the JSON returns from Docker
type ContainerPS struct {
ID string
Names []string
State string
Status string
type ContainerInspect struct {
ID string
Name string
State struct {
Status string
HostConfig struct {
CpuShares int64
CpuQuota int64
Memory int64
MemoryReservation int64
// Container is the info we store about each non-Kube container
type Container struct {
ID string
CPU int64
Memory int64
func log(v int, format string, args ...interface{}) {
if verbose < v {
if v == 0 {
fmt.Fprintf(os.Stdout, format, args...)
} else {
fmt.Fprintf(os.Stderr, format, args...)
var updateLock = sync.Mutex{}
func updateContainers(newContainers []*Container, delContainers []string, purge bool) {
defer updateLock.Unlock()
copyContainers := map[string]*Container{}
oldCPU := int64(0)
oldMemory := int64(0)
for id, c := range containers {
copyContainers[id] = c
oldCPU += c.CPU
oldMemory += c.Memory
newCPU := oldCPU
newMemory := oldMemory
for _, newC := range newContainers {
oldC, ok := containers[newC.ID]
if ok {
if newC.CPU != oldC.CPU || newC.Memory != oldC.Memory {
oldC.CPU = newC.CPU
oldC.Memory = newC.Memory
newCPU += newC.CPU - oldC.CPU
newMemory += newC.Memory - oldC.Memory
delete(copyContainers, newC.ID)
} else {
containers[newC.ID] = newC
newCPU += newC.CPU
newMemory += newC.Memory
for _, id := range delContainers {
if c, ok := containers[id]; ok {
newCPU -= c.CPU
newMemory -= c.Memory
delete(containers, id)
delete(copyContainers, id)
if purge && len(copyContainers) != 0 {
for _, oldC := range copyContainers {
log(2, "Purging container: %.12s\n", oldC.ID)
delete(containers, oldC.ID)
newCPU -= oldC.CPU
newCPU -= oldC.Memory
if oldCPU != newCPU || oldMemory != newMemory {
func openURL(url string) (*http.Response, error) {
client := &http.Client{Transport: &http.Transport{
Dial: func(proto, addr string) (conn net.Conn, err error) {
return net.Dial("unix", dockerSock)
resp, err := client.Get("http://d" + url)
if err != nil || resp == nil || resp.StatusCode != 200 {
code := 0
if resp != nil {
code = resp.StatusCode
return nil, fmt.Errorf("Err(%d): %s", code, err)
return resp, nil
// Given a URL, GET it and store the resulting JSON int he passed in 'obj'
func getData(obj interface{}, url string) error {
resp, err := openURL(url)
if err != nil {
return err
defer resp.Body.Close()
dec := json.NewDecoder(resp.Body)
if err := dec.Decode(obj); err != nil {
return fmt.Errorf("Err: %#v", err)
return nil
// Does the list of container names match any of our 'excludePrefixes'
func isExcludeName(cNames []string) bool {
for _, pre := range excludePrefixes {
for _, name := range cNames {
if strings.HasPrefix(name, pre) {
return true
return false
// Do a 'docker inspect' on the given container ID
func getContainerStats(id string) (string, int64, int64, error) {
cont := ContainerInspect{}
if err := getData(&cont, "/containers/"+id+"/json"); err != nil {
log(2, "Error in docker inspect %.12s: %v\n", id, err)
return "", 0, 0, err
// Force some minimum values for now
if cont.HostConfig.CpuShares < minCPU {
cont.HostConfig.CpuShares = minCPU
if cont.HostConfig.Memory < minMemory {
cont.HostConfig.Memory = minMemory
log(2, "Stats Container ID:%.12s CPU:%d Mem:%d\n", id,
cont.HostConfig.CpuShares, cont.HostConfig.Memory)
return cont.Name, cont.HostConfig.CpuShares, cont.HostConfig.Memory, nil
// Populate our list of container by doing a 'docker ps' and 'docker inspect'
// on each one
func fullListWatcher() error {
log(1, "Container refresher started\n")
for {
// If Docker is down then just wait until it comes up
// Note: this is reset by the event watcher thread
for dockerDown == true {
// Get the list of containers from our Docker engine
log(1, "Getting full list of containers\n")
cList := []ContainerPS{}
if err := getData(&cList, "/containers/json"); err != nil {
if !dockerDown {
dockerDown = true
log(0, "Error in 'docker ps', is Docker down? Err: %s\n", err)
// For each container do: docker inspect <id>
newContainers := []*Container{}
for _, c := range cList {
// Skip non-running or "ExcludeName" containers
if c.State != "running" || isExcludeName(c.Names) {
_, cpu, mem, err := getContainerStats(c.ID)
if err != nil {
continue // assume its a 404 and we can just continue
newContainers = append(newContainers, &Container{
ID: c.ID,
CPU: cpu,
Memory: mem,
// Update our list of containers, and purge any extras
updateContainers(newContainers, nil, true)
// Pause either until our refresh time-limit or we got an indication
// that Docker was restarted. Upon restart we don't want to wait
// until the refresh time times out
timer := time.NewTimer(time.Second * time.Duration(refresh))
select {
case <-timer.C:
case <-dockerRestarted:
timer.Stop() // Just to clean up
var dockerDown = false
var dockerRestarted = make(chan bool, 1)
func containerWatcher() {
log(1, "Container event stream watcher started\n")
for {
resp, err := openURL(`/events?filters={"type":["container"]}`)
if err != nil {
if !dockerDown {
dockerDown = true
log(0, "Error connecting to Docker, is it down? Err: %s\n", err)
time.Sleep(time.Duration(delay) * time.Second)
if dockerDown {
log(0, "Docker is back up\n")
dockerDown = false
dockerRestarted <- true
log(1, "Connected to Docker's event stream\n")
dec := json.NewDecoder(resp.Body)
// Let the main loop know we're connected and watching
for {
event := struct {
Status string
ID string
Type string
Action string
Time int
if err := dec.Decode(&event); err != nil {
log(1, "Hit end of stream\n")
// Let the main loop know we stopped so it can restart
if event.Action == "start" {
log(2, "Got a 'start' event: %.12s\n", event.ID)
name, cpu, mem, err := getContainerStats(event.ID)
if err != nil {
// Assume its a 404 and try to remove it from our list
if _, ok := containers[event.ID]; !ok {
// Not in our list so skip it. We don't want to
// unnecessarily update the fake pod spec.
log(2, "Removing container due to 404 on docker inspect\n")
updateContainers(nil, []string{event.ID}, false)
} else {
if !isExcludeName([]string{name}) {
log(2, "Adding container\n")
ID: event.ID,
CPU: cpu,
Memory: mem,
}}, nil, false)
} else {
log(2, "Skipping container due to name: %s\n", name)
} else if event.Action == "die" {
log(2, "Got a 'die' event: %.12s\n", event.ID)
updateContainers(nil, []string{event.ID}, false)
// Create, or delete, the fake pod definition file based on what we have
func createFakePod() {
podFile := "/etc/kubernetes/manifests/fakepod.yaml"
podFileContents := `apiVersion: v1
kind: Pod
name: fake
- name: fake
image: %s
cpu: %dm
memory: %dMi
imagePullPolicy: IfNotPresent
totalMem := int64(0)
totalCPU := int64(0)
for _, cont := range containers {
totalMem += cont.Memory
totalCPU += cont.CPU
totalMem /= 1024 * 1024
if totalMem == 0 && totalCPU == 0 {
// No extra containers so just remove any fake pod we may have
log(1, "Removed fake pod definition\n")
} else {
// We have at least one extra container, so create a fake pod
log(1, "FakePod: %03d Mem: %d CPU: %d\n",
len(containers), totalMem, totalCPU)
data := fmt.Sprintf(podFileContents, image, totalCPU, totalMem)
log(2, data)
err := ioutil.WriteFile(podFile, []byte(data), 0444)
if err != nil {
log(0, "Can't create pod file: %s\n", err)
func main() {
flag.Int64Var(&minCPU, "cpu", minCPU, "Minimum CPU for a container")
flag.Int64Var(&minMemory, "mem", minMemory, "Minimum Memory for a container")
flag.IntVar(&delay, "delay", delay, "Retry delay, on lost connection")
flag.IntVar(&refresh, "refresh", refresh, "Full container refesh rate, in seconds")
flag.StringVar(&dockerSock, "sock", dockerSock, "Path to docker's socket")
flag.StringVar(&image, "image", image, "Image name to use for fake pod")
flag.IntVar(&verbose, "v", verbose, "Verbose/debugging level")
log(0, "Fakepod watcher is running...\n")
// Clear old data
// Start our two watcher threads
go containerWatcher() // Watching events
fullListWatcher() // Periodically get the full list of containers
