Skip to content

Instantly share code, notes, and snippets.

@lwille
Last active June 20, 2023 10:13
Show Gist options
  • Save lwille/4be3bcdaf7caa587b1b99316f4d221c5 to your computer and use it in GitHub Desktop.
Save lwille/4be3bcdaf7caa587b1b99316f4d221c5 to your computer and use it in GitHub Desktop.
A utility to get progress information from mongosync running inside kubernetes pods, and send API commands to them
#!/usr/bin/env python3
"""
Copyright 2023 Leonhardt Wille / The Jodel Venture GmbH.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import argparse
import requests
import sys
import threading
from kubernetes import config, client
# Setup kubernetes configuration
config.load_kube_config()
v1 = client.CoreV1Api()
def log(pod, message: str):
print("[{}] {}".format(pod["name"], message), file=sys.stderr)
# TODO: add type hints
def call_api(pod, command: str, payload: str): # -> dict:
# either call api/v1/pause or api/v1/resume, based on the value of `command`
try:
response = requests.post(
"http://{}:8080/api/v1/{}".format(pod["ip"], command), data=payload
)
return response.json()
except:
return {}
def filter_pod(pod, match_state: str, mongosync_ids=[]):
# call the /api/v1/progress endpoint to check the state of the pod, if it matches the state we are looking for, return True
response = requests.get("http://{}:8080/api/v1/progress".format(pod["ip"]))
json_response = response.json()
mongosync_id = json_response.get("progress", {}).get("mongosyncID")
state = json_response.get("progress", {}).get("state")
info = json_response.get("progress", {}).get("info")
# if we are looking for a specific state, check if progress.state matches match_state.
# additionally, check if mongosyncId is in the mongosync_ids list we are looking for, unless mongosync_ids is empty.
if (match_state is None or state == match_state) and (
len(mongosync_ids) == 0 or mongosync_id in mongosync_ids
):
print("{:25}\t{:^12}\t{}".format(mongosync_id, state, info), file=sys.stderr)
return True
return False
def process_pod(pod, state: str, command: str, mongosync_ids=[], payload="{}"):
if filter_pod(pod, state, mongosync_ids) and command != "_noop":
response = call_api(pod, command, payload)
if response.get("success") != True:
log(pod, "{}: {}".format(response["error"], response["errorDescription"]))
def parallel_process(pods, state: str, command: str, mongosync_ids=[], payload="{}"):
threads = []
for pod in pods:
thread = threading.Thread(
target=process_pod, args=(pod, state, command, mongosync_ids, payload)
)
thread.start()
threads.append(thread)
# Wait for all threads to complete
for thread in threads:
thread.join()
def get_pods(namespace="default", labels={}):
pods = v1.list_namespaced_pod(namespace, label_selector=",".join(labels))
return [{"name": pod.metadata.name, "ip": pod.status.pod_ip} for pod in pods.items]
def main():
parser = argparse.ArgumentParser()
parser.description = "A utility to get progress information from mongosync running inside kubernetes pods, and send API commands to them."
parser.epilog = (
"Example: python mongosync-controller.py -l app=mongosync -s PAUSED resume"
)
parser.add_argument(
"-d",
"--data",
default="{}",
help="JSON payload to send to the API endpoint, defaults to an empty JSON object {}.",
)
parser.add_argument(
"-s",
"--state",
default=None,
help="State to check before issuing the command",
choices={"PAUSED", "RUNNING"},
)
parser.add_argument(
"-n", "--namespace", default="default", help="Kubernetes namespace"
)
parser.add_argument(
"-l",
"--labels",
default=[],
help="Labels to filter the pods by, in the form of key=value",
action="append",
)
parser.add_argument(
"-m",
"--mongosync-ids",
default=[],
help="Mongosync IDs to filter the pods by",
action="append",
)
parser.add_argument(
"command",
nargs="?",
default="_noop",
help="API command, e.g. `pause` or `resume`. If not specified, the script will only print the progress information of the matching pods",
choices={"pause", "resume", "_noop"},
)
args = parser.parse_args()
pods = get_pods(args.namespace, args.labels)
if len(pods) > 0:
parallel_process(pods, args.state, args.command, args.mongosync_ids, args.data)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment