Skip to content

Instantly share code, notes, and snippets.

@nuriel77
Last active March 5, 2018 22:50
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nuriel77/19cacf4c3ec323f7e50fb01ed3cb8126 to your computer and use it in GitHub Desktop.
Save nuriel77/19cacf4c3ec323f7e50fb01ed3cb8126 to your computer and use it in GitHub Desktop.
Watch for events from kubernetes
from kubernetes import client, config, watch
from pprint import pprint, pformat
import argparse
import logging
import urllib3
import threading
import signal
import time
import json
import sys
import re
if sys.version_info[0] < 3:
raise Exception("Must be using Python>=3")
urllib3.disable_warnings()
logger = logging.getLogger(__name__)
class Job(threading.Thread):
def __init__(self, params):
threading.Thread.__init__(self)
"""
The shutdown_flag is a threading.Event object that
indicates whether the thread should be terminated.
"""
self.shutdown_flag = threading.Event()
self.watch_obj = params['watch_obj']
self.label_selector = params['label_selector']
self.watch = watch.Watch()
self.ignore_regex = r'' + params['ignore_regex']
def run(self):
logger.debug('%s started' % self.name)
while True:
try:
for event in self.watch.stream(self.watch_obj,
label_selector=self.label_selector,
timeout_seconds=60,
_request_timeout=60):
if self.shutdown_flag.is_set():
return
if self.ignore_regex != '' and \
re.match(self.ignore_regex, event['object'].metadata.name):
logger.debug("%s Skip: matched ignore regex: %s" %
(self.name, event['object'].metadata.name))
continue
logger.info("%s Event: %s %s %s" %
(self.name,
event['type'],
event['object'].kind,
event['object'].metadata.name))
logger.debug("%s %s" %
(self.name, pformat(event['raw_object']['status'])))
except urllib3.exceptions.ReadTimeoutError:
"""
To handle socket timeout we let the
outer loop continue = restarting watch stream
"""
logger.warning("%s Socket read timeout error"
", restarting watcher..." % self.name)
break
logger.info('%s stopped' % self.name)
class ServiceExit(Exception):
"""
Custom exception which is used to trigger the clean exit
of all running threads and the main program.
"""
pass
def service_shutdown(signum, frame):
logger.info('Caught signal %d' % signum)
raise ServiceExit
"""
Can be used:
with Timeout(seconds=5)
some_func()
"""
class Timeout:
def __init__(self, seconds=1, error_message='Timeout'):
self.seconds = seconds
self.error_message = error_message
def handle_timeout(self, signum, frame):
raise TimeoutError(self.error_message)
def __enter__(self):
signal.signal(signal.SIGALRM, self.handle_timeout)
signal.alarm(self.seconds)
def __exit__(self, type, value, traceback):
signal.alarm(0)
def main():
""" Parse arguments """
try:
args = parse_args()
except Exception as e:
sys.stderr.write("Error parsing arguments: %s\n" % e)
sys.exit(1)
""" Setup logger """
set_logger(debug=args.debug)
""" Register the signal handlers """
signal.signal(signal.SIGTERM, service_shutdown)
signal.signal(signal.SIGINT, service_shutdown)
""" Load configuration, set pool maxsize """
config.load_kube_config()
config.kube_config.configuration.connection_pool_maxsize = 10
""" Instantiate extensions """
v1 = client.CoreV1Api()
v1ext = client.ExtensionsV1beta1Api()
""" Kubernetes events to watch """
watch_dict = dict(
pods = v1.list_pod_for_all_namespaces,
deployments = v1ext.list_deployment_for_all_namespaces
)
""" Set params, parse event types """
params = {'ignore_regex': args.ignore_regex,
'label_selector': args.label_selector}
event_types = args.event_type.split(',')
""" Check all event types are valid """
for event in event_types:
if event not in watch_dict.keys():
logger.error("Invalid event type: %s" % event)
sys.exit(1)
logger.info("Starting up %d event watchers..." % len(event_types))
threads = []
""" Start the job threads """
try:
for obj in event_types:
params['watch_obj'] = watch_dict[obj]
th = Job(params)
th.daemon = True
th.start()
threads.append(th)
"""
Keep the main thread running,
otherwise signals are ignored.
"""
while True:
time.sleep(0.5)
except ServiceExit:
"""
Terminate the running threads.
Set the shutdown flag on each thread to
trigger a clean shutdown of each thread.
"""
for th in threads:
logger.debug("Try graceful shutdown of %s" % th.name)
th.shutdown_flag.set()
th.watch.stop()
th._stop = True
""" Wait for the threads to close... """
try:
with Timeout(seconds=1):
th.join()
except TimeoutError:
pass
logger.info('Exiting main program')
def parse_args():
parser = argparse.ArgumentParser(
description='Listen to Kubernetes events')
parser.add_argument('--event-type', '-t', metavar='type',
type=str,
default='pods',
help='Type of event(s) to listen to,'
' comma delimited')
parser.add_argument('--ignore-regex', '-i', metavar='regex',
type=str,
default='',
help='Regex to ignore')
parser.add_argument('--label-selector', '-l', metavar='label',
type=str,
default='',
help='Label selector (example=name).')
parser.add_argument('--debug', '-d', action='store_true',
help='Debug output')
return parser.parse_args()
def set_logger(debug=False):
if debug is True:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
ch = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(levelname)s: %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
logger.debug('Set debug level')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment