Skip to content

Instantly share code, notes, and snippets.

@maxking
Created March 3, 2021 18:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save maxking/b62397b3f0e52cf5e4b5375a663a1d30 to your computer and use it in GitHub Desktop.
Save maxking/b62397b3f0e52cf5e4b5375a663a1d30 to your computer and use it in GitHub Desktop.
k8scontroller.py
import requests
import json
import queue
import threading
EVENT_LIST = queue.Queue()
RESOURCE='pods'
NS='default'
def reader(resource=RESOURCE, ns=NS):
"""Reader reads from Kubernetes API and records events."""
url = f'http://localhost:8001/api/v1/namespaces/{ns}/{resource}'
print(f'Calling Kubernetes API {url} to get all "{resource}" in namespace "{ns}".')
response = requests.get(url)
print(response)
resourceVersion = response.json().get('metadata').get('resourceVersion')
print(f'Got resource version {resourceVersion}, starting the watch...')
response = requests.get(f'http://localhost:8001/api/v1/namespaces/{ns}/{resource}?watch=1&resourceVersion=3689900', stream=True)
for line in response.iter_lines():
# filter out keep-alive new lines
if line:
decoded_line = line.decode('utf-8')
event = json.loads(decoded_line)
obj = event.get('object')
if obj is not None:
print('Got event type {} for obj {}'.format(event.get('type'), obj.get('metadata').get('name')))
EVENT_LIST.put(obj.get('metadata').get('name'))
def controller():
"""Main controller loop."""
while True:
item = EVENT_LIST.get()
reconcile(item)
def reconcile(item):
"""Try to GET an item and decide if it needs to be reconciled."""
# Step 1: Get the object itself:
name = item
print(f'Trying to reconcile Pod {name}')
resource = requests.get(f'http://localhost:8001/api/v1/namespaces/default/pods/{name}').json()
kind = resource.get('kind')
if kind == 'Status':
print('Got status from K8s {}: {}'.format(resource.get('status'), resource.get('message')))
else:
print('Got resource from Kubernetes: {}, labels: {}'.format(kind, resource.get('metadata').get('labels')))
reconcile_single_pod(resource)
def reconcile_single_pod(pod):
"""Reconcile a single Pod if it isn't already in the target state."""
if not pod.get('metadata').get('labels').get('k8scontroller') == 'awesome':
data = """[{"op": "add", "path": "/metadata/labels/k8scontroller", "value": "awesome" }]"""
print('POST updated resource to Kubernetes.')
response = requests.patch('http://localhost:8001/api/v1/namespaces/default/pods/{}'.format(pod.get('metadata').get('name')),
data=data,
headers={'Content-Type': 'application/json-patch+json'})
print(f'Got Response: {response.status_code}')
else:
print('Nothing to do.')
if __name__ == '__main__':
threading.Thread(target=controller, daemon=True).start()
reader()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment