Skip to content

Instantly share code, notes, and snippets.

@zackhillman
Created August 26, 2020 22:28
Show Gist options
  • Save zackhillman/a141df02a290e7d19b0423c347853be9 to your computer and use it in GitHub Desktop.
Save zackhillman/a141df02a290e7d19b0423c347853be9 to your computer and use it in GitHub Desktop.
import subprocess
import time
import BaseHTTPServer
import json
import os
from urlparse import urlparse, parse_qs
HOST_NAME = '0.0.0.0'
PORT_NUMBER = 9000
JMXTERM_PATH = '/opt/jmxterm/jmxterm-1.0.0-uber.jar'
class MyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
@staticmethod
def _get_pid():
try:
tmpid = subprocess.check_output(['systemctl', 'show', '--property', 'MainPID', 'taskmanager.service'])
except subprocess.CalledProcessError:
raise Exception('flink task manager is not running')
tmpid = tmpid.strip()
if tmpid.startswith('MainPID='):
return tmpid[8:]
raise Exception('could not get pid of task manager')
def do_GET(self):
"""Respond to a GET request."""
self.send_response(200)
self.end_headers()
query_components = parse_qs(urlparse(self.path).query)
pid = self._get_pid()
mbean = query_components['mbean'][0]
cmd = 'echo get -s -b {} IngestionRate | sudo -H -u flink java -jar {} -n -l {}'.format(mbean, JMXTERM_PATH, pid)
output = subprocess.check_output(cmd, shell=True)
self.wfile.write("Ingest Rate is {}\n".format(output))
def do_POST(self):
length = int(self.headers.getheader('content-length', 0))
body = self.rfile.read(length)
print(body)
request_body = json.loads(body)
self.end_headers()
self.wfile.write("Attempting to set rate to {}\n".format(request_body['rate']))
try:
SLOT_AWARE_RATE_UPDATE_COMMAND_TEMPLATE = (
"/home/ubuntu/set_rate.py --job-name {job_name} "
"--rate {rate} --jobmanager-dns {jobmanager_dns} "
"--operator-name {operator} {is_dry_run} "
)
OPERATOR_TEMPLATE = "{topic}-KafkaSource-0"
command = SLOT_AWARE_RATE_UPDATE_COMMAND_TEMPLATE.format(
job_name=request_body['job_name'],
rate=request_body['rate'],
jobmanager_dns=request_body['jobmanager_dns'],
operator=OPERATOR_TEMPLATE.format(topic=request_body['topic']),
is_dry_run=''
)
os.system(command)
self.send_response(200)
self.wfile.write("Successfully to set rate to {}\n".format(request_body['rate']))
except Exception as e:
self.send_response(500)
self.wfile.write(e.message)
if __name__ == '__main__':
server_class = BaseHTTPServer.HTTPServer
httpd = server_class((HOST_NAME, PORT_NUMBER), MyHandler)
print time.asctime(), "Server Starts - %s:%s" % (HOST_NAME, PORT_NUMBER)
try:
httpd.serve_forever()
except KeyboardInterrupt:
pass
httpd.server_close()
print time.asctime(), "Server Stops - %s:%s" % (HOST_NAME, PORT_NUMBER)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment