Last active
March 5, 2018 22:50
-
-
Save nuriel77/19cacf4c3ec323f7e50fb01ed3cb8126 to your computer and use it in GitHub Desktop.
Watch for events from kubernetes
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from kubernetes import client, config, watch | |
from pprint import pprint, pformat | |
import argparse | |
import logging | |
import urllib3 | |
import threading | |
import signal | |
import time | |
import json | |
import sys | |
import re | |
if sys.version_info[0] < 3: | |
raise Exception("Must be using Python>=3") | |
urllib3.disable_warnings() | |
logger = logging.getLogger(__name__) | |
class Job(threading.Thread): | |
def __init__(self, params): | |
threading.Thread.__init__(self) | |
""" | |
The shutdown_flag is a threading.Event object that | |
indicates whether the thread should be terminated. | |
""" | |
self.shutdown_flag = threading.Event() | |
self.watch_obj = params['watch_obj'] | |
self.label_selector = params['label_selector'] | |
self.watch = watch.Watch() | |
self.ignore_regex = r'' + params['ignore_regex'] | |
def run(self): | |
logger.debug('%s started' % self.name) | |
while True: | |
try: | |
for event in self.watch.stream(self.watch_obj, | |
label_selector=self.label_selector, | |
timeout_seconds=60, | |
_request_timeout=60): | |
if self.shutdown_flag.is_set(): | |
return | |
if self.ignore_regex != '' and \ | |
re.match(self.ignore_regex, event['object'].metadata.name): | |
logger.debug("%s Skip: matched ignore regex: %s" % | |
(self.name, event['object'].metadata.name)) | |
continue | |
logger.info("%s Event: %s %s %s" % | |
(self.name, | |
event['type'], | |
event['object'].kind, | |
event['object'].metadata.name)) | |
logger.debug("%s %s" % | |
(self.name, pformat(event['raw_object']['status']))) | |
except urllib3.exceptions.ReadTimeoutError: | |
""" | |
To handle socket timeout we let the | |
outer loop continue = restarting watch stream | |
""" | |
logger.warning("%s Socket read timeout error" | |
", restarting watcher..." % self.name) | |
break | |
logger.info('%s stopped' % self.name) | |
class ServiceExit(Exception): | |
""" | |
Custom exception which is used to trigger the clean exit | |
of all running threads and the main program. | |
""" | |
pass | |
def service_shutdown(signum, frame): | |
logger.info('Caught signal %d' % signum) | |
raise ServiceExit | |
""" | |
Can be used: | |
with Timeout(seconds=5) | |
some_func() | |
""" | |
class Timeout: | |
def __init__(self, seconds=1, error_message='Timeout'): | |
self.seconds = seconds | |
self.error_message = error_message | |
def handle_timeout(self, signum, frame): | |
raise TimeoutError(self.error_message) | |
def __enter__(self): | |
signal.signal(signal.SIGALRM, self.handle_timeout) | |
signal.alarm(self.seconds) | |
def __exit__(self, type, value, traceback): | |
signal.alarm(0) | |
def main(): | |
""" Parse arguments """ | |
try: | |
args = parse_args() | |
except Exception as e: | |
sys.stderr.write("Error parsing arguments: %s\n" % e) | |
sys.exit(1) | |
""" Setup logger """ | |
set_logger(debug=args.debug) | |
""" Register the signal handlers """ | |
signal.signal(signal.SIGTERM, service_shutdown) | |
signal.signal(signal.SIGINT, service_shutdown) | |
""" Load configuration, set pool maxsize """ | |
config.load_kube_config() | |
config.kube_config.configuration.connection_pool_maxsize = 10 | |
""" Instantiate extensions """ | |
v1 = client.CoreV1Api() | |
v1ext = client.ExtensionsV1beta1Api() | |
""" Kubernetes events to watch """ | |
watch_dict = dict( | |
pods = v1.list_pod_for_all_namespaces, | |
deployments = v1ext.list_deployment_for_all_namespaces | |
) | |
""" Set params, parse event types """ | |
params = {'ignore_regex': args.ignore_regex, | |
'label_selector': args.label_selector} | |
event_types = args.event_type.split(',') | |
""" Check all event types are valid """ | |
for event in event_types: | |
if event not in watch_dict.keys(): | |
logger.error("Invalid event type: %s" % event) | |
sys.exit(1) | |
logger.info("Starting up %d event watchers..." % len(event_types)) | |
threads = [] | |
""" Start the job threads """ | |
try: | |
for obj in event_types: | |
params['watch_obj'] = watch_dict[obj] | |
th = Job(params) | |
th.daemon = True | |
th.start() | |
threads.append(th) | |
""" | |
Keep the main thread running, | |
otherwise signals are ignored. | |
""" | |
while True: | |
time.sleep(0.5) | |
except ServiceExit: | |
""" | |
Terminate the running threads. | |
Set the shutdown flag on each thread to | |
trigger a clean shutdown of each thread. | |
""" | |
for th in threads: | |
logger.debug("Try graceful shutdown of %s" % th.name) | |
th.shutdown_flag.set() | |
th.watch.stop() | |
th._stop = True | |
""" Wait for the threads to close... """ | |
try: | |
with Timeout(seconds=1): | |
th.join() | |
except TimeoutError: | |
pass | |
logger.info('Exiting main program') | |
def parse_args(): | |
parser = argparse.ArgumentParser( | |
description='Listen to Kubernetes events') | |
parser.add_argument('--event-type', '-t', metavar='type', | |
type=str, | |
default='pods', | |
help='Type of event(s) to listen to,' | |
' comma delimited') | |
parser.add_argument('--ignore-regex', '-i', metavar='regex', | |
type=str, | |
default='', | |
help='Regex to ignore') | |
parser.add_argument('--label-selector', '-l', metavar='label', | |
type=str, | |
default='', | |
help='Label selector (example=name).') | |
parser.add_argument('--debug', '-d', action='store_true', | |
help='Debug output') | |
return parser.parse_args() | |
def set_logger(debug=False): | |
if debug is True: | |
logger.setLevel(logging.DEBUG) | |
else: | |
logger.setLevel(logging.INFO) | |
ch = logging.StreamHandler(sys.stdout) | |
formatter = logging.Formatter('%(levelname)s: %(message)s') | |
ch.setFormatter(formatter) | |
logger.addHandler(ch) | |
logger.debug('Set debug level') | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment