Created
March 3, 2021 18:20
-
-
Save maxking/b62397b3f0e52cf5e4b5375a663a1d30 to your computer and use it in GitHub Desktop.
k8scontroller.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import json | |
import queue | |
import threading | |
EVENT_LIST = queue.Queue() | |
RESOURCE='pods' | |
NS='default' | |
def reader(resource=RESOURCE, ns=NS): | |
"""Reader reads from Kubernetes API and records events.""" | |
url = f'http://localhost:8001/api/v1/namespaces/{ns}/{resource}' | |
print(f'Calling Kubernetes API {url} to get all "{resource}" in namespace "{ns}".') | |
response = requests.get(url) | |
print(response) | |
resourceVersion = response.json().get('metadata').get('resourceVersion') | |
print(f'Got resource version {resourceVersion}, starting the watch...') | |
response = requests.get(f'http://localhost:8001/api/v1/namespaces/{ns}/{resource}?watch=1&resourceVersion=3689900', stream=True) | |
for line in response.iter_lines(): | |
# filter out keep-alive new lines | |
if line: | |
decoded_line = line.decode('utf-8') | |
event = json.loads(decoded_line) | |
obj = event.get('object') | |
if obj is not None: | |
print('Got event type {} for obj {}'.format(event.get('type'), obj.get('metadata').get('name'))) | |
EVENT_LIST.put(obj.get('metadata').get('name')) | |
def controller(): | |
"""Main controller loop.""" | |
while True: | |
item = EVENT_LIST.get() | |
reconcile(item) | |
def reconcile(item): | |
"""Try to GET an item and decide if it needs to be reconciled.""" | |
# Step 1: Get the object itself: | |
name = item | |
print(f'Trying to reconcile Pod {name}') | |
resource = requests.get(f'http://localhost:8001/api/v1/namespaces/default/pods/{name}').json() | |
kind = resource.get('kind') | |
if kind == 'Status': | |
print('Got status from K8s {}: {}'.format(resource.get('status'), resource.get('message'))) | |
else: | |
print('Got resource from Kubernetes: {}, labels: {}'.format(kind, resource.get('metadata').get('labels'))) | |
reconcile_single_pod(resource) | |
def reconcile_single_pod(pod): | |
"""Reconcile a single Pod if it isn't already in the target state.""" | |
if not pod.get('metadata').get('labels').get('k8scontroller') == 'awesome': | |
data = """[{"op": "add", "path": "/metadata/labels/k8scontroller", "value": "awesome" }]""" | |
print('POST updated resource to Kubernetes.') | |
response = requests.patch('http://localhost:8001/api/v1/namespaces/default/pods/{}'.format(pod.get('metadata').get('name')), | |
data=data, | |
headers={'Content-Type': 'application/json-patch+json'}) | |
print(f'Got Response: {response.status_code}') | |
else: | |
print('Nothing to do.') | |
if __name__ == '__main__': | |
threading.Thread(target=controller, daemon=True).start() | |
reader() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment