Skip to content

Instantly share code, notes, and snippets.

@philpep
Created August 20, 2018 07:48
Show Gist options
  • Save philpep/2d9cd843af7aaed647657ee7bfcf886b to your computer and use it in GitHub Desktop.
Save philpep/2d9cd843af7aaed647657ee7bfcf886b to your computer and use it in GitHub Desktop.
Update k8s deployment images when new versions are pushed (by comparing image sha256 digest)
#!/usr/bin/env python3
import collections
import json
import logging
import re
import subprocess
import sys
LOG = logging.getLogger(__name__)
def query_yes_no(question, default="yes"):
"""Ask a yes/no question via raw_input() and return their answer.
"question" is a string that is presented to the user.
"default" is the presumed answer if the user just hits <Enter>.
It must be "yes" (the default), "no" or None (meaning
an answer is required of the user).
The "answer" return value is True for "yes" or False for "no".
"""
valid = {"yes": True, "y": True, "ye": True,
"no": False, "n": False}
if default is None:
prompt = " [y/n] "
elif default == "yes":
prompt = " [Y/n] "
elif default == "no":
prompt = " [y/N] "
else:
raise ValueError("invalid default answer: '%s'" % default)
while True:
sys.stdout.write(question + prompt)
choice = input().lower()
if default is not None and choice == '':
return valid[default]
elif choice in valid:
return valid[choice]
else:
sys.stdout.write("Please respond with 'yes' or 'no' "
"(or 'y' or 'n').\n")
def check_call(*args, log=True, **kwargs):
if log:
LOG.info(' '.join(args))
subprocess.check_call(args, **kwargs)
def check_output(*args, log=True, **kwargs):
if log:
LOG.info(' '.join(args))
try:
return subprocess.check_output(args, **kwargs)
except subprocess.CalledProcessError as exc:
print('STDOUT: ',
exc.stdout.decode('utf8') if exc.stdout is not None else '')
print('STDERR: ',
exc.stderr.decode('utf8') if exc.stderr is not None else '')
raise
class Image(collections.namedtuple('Image', ['name', 'current', 'expected'])):
@staticmethod
def get_sha256(name, _cache={}):
if name not in _cache:
# TODO: sha256 digest could be requested without pulling the image
check_output('docker', 'pull', name, log=False)
_cache[name] = check_output(
'docker', 'inspect', '--format={{index .RepoDigests 0}}',
name, log=False).decode().strip()
return _cache[name]
@property
def sha256(self):
return self.get_sha256(self.expected)
def need_update(self):
return self.sha256 not in self.current
class Deployment(collections.namedtuple('Deployment', ['name', 'images'])):
@classmethod
def collect(cls):
data = json.loads(check_output(
'kubectl', 'get', 'pod,rs,deployment', '-o', 'json').decode())
items = {'Pod': {}, 'ReplicaSet': {}, 'Deployment': {}}
for item in data['items']:
kind = item['kind']
name = item['metadata']['name']
obj = {'name': name}
if kind in ('Pod', 'ReplicaSet'):
owner, = item['metadata']['ownerReferences']
if owner['kind'] not in ('ReplicaSet', 'Deployment'):
# Job
continue
assert owner['kind'] == (
'ReplicaSet' if kind == 'Pod' else 'Deployment')
obj['owner'] = owner['name']
if kind == 'Deployment':
config = json.loads(item['metadata']['annotations'][
'kubectl.kubernetes.io/last-applied-configuration'])
obj['images'] = []
for c in config['spec']['template']['spec']['containers']:
obj['images'].append({
'name': c['name'],
'image': c['image'],
})
if kind == 'Pod':
obj['images'] = []
for c in item['status']['containerStatuses']:
if not c['imageID']:
# TODO: this occur when pod is terminating (might be a
# better way to check this)
continue
image = re.match(
'^docker-pullable://(.*@sha256:.*)$', c['imageID']
).groups()[0]
obj['images'].append({
'name': c['name'],
'image': image,
})
assert name not in items[kind]
items[kind][name] = obj
def get_pods(name):
for pod in items['Pod'].values():
if items['ReplicaSet'][pod['owner']]['owner'] == name:
yield pod
for _, deployment in sorted(items['Deployment'].items()):
images = {c['name']: Image(c['name'], set(), c['image'])
for c in deployment['images']}
for pod in get_pods(deployment['name']):
for c in pod['images']:
assert c['name'] in images, c
images[c['name']].current.add(c['image'])
yield Deployment(deployment['name'], images)
def to_update(self):
print('checking deployment/{}'.format(self.name))
for _, image in sorted(self.images.items()):
if image.need_update():
print(
'\033[0;31m {} current: {}, expected: {}\033[0m'.format(
image.name, ', '.join(image.current), image.sha256))
yield self.name, image.name, image.sha256
else:
print('\033[0;32m {} is up to date\033[0m'.format(
image.name))
def main():
to_update = []
for deployment in Deployment.collect():
to_update.extend(list(deployment.to_update()))
for name, img_name, sha256 in to_update:
cmd = ('kubectl', 'set', 'image', 'deployment/{}'.format(name),
'{}={}'.format(img_name, sha256))
if query_yes_no('run {} ? (y/n)'.format(' '.join(cmd))):
check_call(*cmd)
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)-15s %(message)s')
LOG.setLevel(logging.INFO)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment