Skip to content

Instantly share code, notes, and snippets.

@vijayanandrp
Forked from rueedlinger/kconnect.py
Last active January 7, 2023 01:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vijayanandrp/4886d2089f671a5075a7f00fdb027acd to your computer and use it in GitHub Desktop.
Save vijayanandrp/4886d2089f671a5075a7f00fdb027acd to your computer and use it in GitHub Desktop.
Kafka Connect Python Script - https | user auth | Status | restart | pause | resume
# credits source : https://gist.github.com/rueedlinger/76af36d04a0798a8e1f43ed16595bd97
import sys
import os
import json
import argparse
from base64 import b64encode
PYTHON_MAJOR_VERSION = sys.version_info.major
DEFAULT_HOST = 'localhost'
DEFAULT_PORT = '8084'
BASE_PATH = '/connectors'
KAFKA_CONNECT_API_CREDENTIALS_USER = 'admin'
KAFKA_CONNECT_API_CREDENTIALS_PWD = 'admin'
if PYTHON_MAJOR_VERSION == 2:
import httplib
else:
import http.client as httplib
if 'KAFKA_CONNECT_REST' in os.environ:
KAFKA_CONNECT_REST = os.environ['KAFKA_CONNECT_REST']
else:
KAFKA_CONNECT_REST = DEFAULT_HOST + ':' + str(DEFAULT_PORT)
# Authorization token: we need to base 64 encode it
# and then decode it to acsii as python 3 stores it as a byte string
def basic_auth(username, password):
return {
"Authorization": "Basic {}".format(
b64encode(bytes(f"{username}:{password}", "utf-8")).decode("ascii")
)
}
class ConnectError(Exception):
def __init__(self, method, path, http_status, reason):
self.method = method
self.path = path
self.http_status = http_status
self.reason = reason
class HttpUtil:
def __init__(self, http_connection, headers):
self.http_connection = http_connection
self.headers = headers
def get(self, path):
self.http_connection.request(method='GET', url=path, body=None, headers=self.headers)
response = self.http_connection.getresponse()
if response.status != 200:
raise ConnectError(method='GET', path=path, http_status=response.status, reason=response.reason)
return json.loads(response.read())
def post(self, path):
self.http_connection.request('POST', url=path, body=None, headers=self.headers)
response = self.http_connection.getresponse()
response.read()
if response.status != 204:
raise ConnectError(method='POST', path=path, http_status=response.status, reason=response.reason)
return {'http_status': response.status, 'reason': response.reason, 'path': path, 'method': 'POST'}
def put(self, path):
self.http_connection.request('PUT', url=path, body=None, headers=self.headers)
response = self.http_connection.getresponse()
response.read()
if response.status != 202:
raise ConnectError(method='PUT', path=path, http_status=response.status, reason=response.reason)
return {'http_status': response.status, 'reason': response.reason, 'path': path, 'method': 'PUT'}
if __name__ == '__main__':
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(help='Functions', dest='cmd')
cmd_status = subparsers.add_parser('status', help='show the status')
cmd_status.add_argument('connector_id', metavar='<connector_id>', nargs='?', help='the id of the connector')
cmd_status = subparsers.add_parser('restart', help='restart connector')
cmd_status.add_argument('connector_id', metavar='<connector_id>', help='the id of the connector')
cmd_status = subparsers.add_parser('pause', help='pause connector')
cmd_status.add_argument('connector_id', metavar='<connector_id>', help='the id of the connector')
cmd_status = subparsers.add_parser('resume', help='resume connector')
cmd_status.add_argument('connector_id', metavar='<connector_id>', help='the id of the connector')
args = parser.parse_args()
# conn = httplib.HTTPConnection(KAFKA_CONNECT_REST)
conn = httplib.HTTPSConnection(KAFKA_CONNECT_REST)
http_util = HttpUtil(conn, basic_auth(username=KAFKA_CONNECT_API_CREDENTIALS_USER,
password=KAFKA_CONNECT_API_CREDENTIALS_PWD))
try:
if args.cmd == 'status':
if args.connector_id:
status = http_util.get(BASE_PATH + '/' + args.connector_id + '/status')
print(status['name'] + ': ' + status['connector']['state'])
for tasks in status['tasks']:
print(' task ' + str(tasks['id']) + ': ' + tasks['state'])
else:
connectors = http_util.get(BASE_PATH)
for connector in connectors:
status = http_util.get(BASE_PATH + '/' + connector + '/status')
print(status['name'] + ': ' + status['connector']['state'])
for tasks in status['tasks']:
print(' task ' + str(tasks['id']) + ': ' + tasks['state'])
elif args.cmd == 'pause':
http_util.put(BASE_PATH + '/' + args.connector_id + '/pause')
elif args.cmd == 'resume':
http_util.put(BASE_PATH + '/' + args.connector_id + '/resume')
elif args.cmd == 'restart':
resp = http_util.get(BASE_PATH + '/' + args.connector_id)
http_util.post(BASE_PATH + '/' + args.connector_id + '/restart')
for task in resp['tasks']:
http_util.post(BASE_PATH + '/' + args.connector_id + '/tasks/' + str(task['task']) + '/restart')
else:
parser.print_help()
except ConnectError as ex:
print('Got error %s (%s) for request %s %s%s ' % (ex.http_status, ex.reason,
ex.method, KAFKA_CONNECT_REST, ex.path))
print(' command: ' + args.cmd)
if args.connector_id:
print(' connector_id: ' + str(args.connector_id))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment