Last active
August 29, 2015 14:03
-
-
Save mmalczewski/18bf4db7ed157518eb08 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Created on Mar 14, 2014 | |
@author: marcin | |
dependencies: | |
sudo pip install futures | |
''' | |
import time | |
import sys | |
import subprocess | |
import os | |
from IPython.utils.io import stdout | |
import itertools as it | |
import shlex | |
import signal | |
import threading | |
import json | |
import argparse | |
from argparse import RawTextHelpFormatter | |
class ShUtils: | |
MAX_FIND_LOCAL_PORT_ATTEMPTS = 10 | |
@staticmethod | |
def get_pid(port): | |
ps = subprocess.Popen('netstat -nlp | grep -i LISTEN | grep ' + port, shell=True, stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE) | |
output, errors = ps.communicate() | |
if output.strip == "": | |
return -1 | |
parts = output.split() | |
if len(parts) > 0: | |
process = parts[-1] | |
return process.split('/')[0] | |
else: | |
return -1 | |
@staticmethod | |
def run_shell_command(command_str): | |
out = subprocess.Popen(shlex.split(command_str), stdout=subprocess.PIPE).stdout.read() | |
return filter(None, out.split('\n')) | |
@staticmethod | |
def find_free_local_port(port): | |
num_of_attempts = 0 | |
port_ok = False | |
found_port = port | |
i_port = int(port) | |
while not port_ok: | |
if num_of_attempts > ShUtils.MAX_FIND_LOCAL_PORT_ATTEMPTS: | |
# raise Exception('Maximum attempts (%s) to find free local ports exceeded' % str( | |
# ShUtils.MAX_FIND_LOCAL_PORT_ATTEMPTS)) | |
print 'Maximum attempts (%s) to find free local ports exceeded' % \ | |
str(ShUtils.MAX_FIND_LOCAL_PORT_ATTEMPTS) | |
return found_port, port_ok | |
is_port_free = ShUtils.is_port_free(found_port) | |
found_port = str(i_port) | |
if not is_port_free: | |
i_port += 1 | |
print "There is a process already running on port (%s). Trying another (%s) port" % (found_port, i_port) | |
num_of_attempts += 1 | |
else: | |
port_ok = True | |
return found_port, port_ok | |
@staticmethod | |
def is_port_free(port): | |
command = "netstat -nlp | grep -e 'LISTEN\s' | grep :" + str(port) | |
ps = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE) | |
output, errors = ps.communicate() | |
if output.strip(): | |
return False | |
else: | |
return True | |
class PortCheckerThread(threading.Thread): | |
def __init__(self, remote_port, pid): | |
threading.Thread.__init__(self) | |
self.remote_port = remote_port | |
self.pid = pid | |
def run(self): | |
try: | |
while True: | |
print "Check Thread for port %s and pid %s" % (self.remote_port, self.pid) | |
time.sleep(1) | |
except Exception: | |
print "Finished port checker cleanly" | |
class JMXProxy: | |
# TODO handle mutliple processes returned by netstat | |
# FIND_LISTENING_PORTS_COMMAND = r"ssh %s netstat -nlp | grep `jps | grep -iE \'%s\' | awk \'{ print $1 }\'` | awk \'{ print $4}\' | awk -F: \'{print $NF}\'" | |
FIND_LISTENING_PORTS_COMMAND = r"ssh %s netstat -nlp | grep -iE `jps | grep -iE \'%s\' | awk \'{ print $1 }\' | paste -sd \'|\'` | awk \'{ print $4}\' | awk -F: \'{print $NF}\'" | |
PORTS_CHECK_INTERVAL = 1 | |
def __init__(self, apps_config): | |
self.apps_config = apps_config | |
self.pids = [] | |
self.tunnels = {} | |
self.ports_to_ignore = [str(port) for ports in self.apps_config.values() for port in ports] | |
self.processes = "|".join(self.apps_config.keys()) | |
signal.signal(signal.SIGINT, self.__exit_handler) | |
signal.signal(signal.SIGTERM, self.__exit_handler) | |
def cleanup(self): | |
print "closing opened ssh tunnels" | |
args = ['kill'] + [str(pid) for pid in self.pids] | |
subprocess.call(args) | |
def run(self, hostnames): | |
print 'start script for hostnames:', hostnames | |
valid_hostnames = [] | |
for hostname in hostnames: | |
listening_ports = self.__get_listening_ports(hostname) | |
print "Listening ports:", listening_ports, "for host", hostname | |
self.__create_and_save_tunnels(hostname, listening_ports) | |
if listening_ports: # check if host is valid | |
valid_hostnames.append(hostname) | |
if not valid_hostnames: | |
print "There is no valid hostnames. Exit script" | |
exit(1) | |
print "Start listening...", valid_hostnames | |
while True: | |
time.sleep(JMXProxy.PORTS_CHECK_INTERVAL) | |
for hostname in valid_hostnames: | |
self.__check_tunnels(hostname) | |
def __create_and_save_tunnels(self, hostname, listening_ports): | |
for remote_port in listening_ports: | |
pid = self.__create_ssh_tunnel(hostname, remote_port) | |
self.pids.append(pid) | |
self.tunnels[remote_port] = pid | |
def __check_tunnels(self, hostname): | |
listening_ports = set(self.__get_listening_ports(hostname)) | |
tunneling_ports = set(self.tunnels.keys()) | |
outdated_ports = tunneling_ports.difference(listening_ports) | |
new_ports = listening_ports.difference(tunneling_ports) | |
ok_ports = tunneling_ports.intersection(listening_ports) | |
print "ports - hostname: '%s', ok:[%s], outdated:[%s], new:[%s] " % (hostname, | |
",".join(ok_ports), ",".join(outdated_ports), ",".join(new_ports)) | |
def __get_listening_ports(self, hostname): | |
list_ports_command = JMXProxy.FIND_LISTENING_PORTS_COMMAND % (hostname, self.processes) | |
listening_ports = ShUtils.run_shell_command(list_ports_command) | |
return filter(lambda p: p not in self.ports_to_ignore, listening_ports) | |
def __create_ssh_tunnel(self, hostname, remote_port): | |
# local_port = find_free_local_port(remote_port) | |
# local_port = ShUtils.find_free_local_port(remote_port) | |
local_port = remote_port | |
ssh_tunnel_command = r"ssh -fnN -L%s:localhost:%s %s" % (remote_port, local_port, hostname) | |
p = subprocess.Popen(shlex.split(ssh_tunnel_command), stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
p.wait() # warning: according to documentation this may cause deadlock when PIPE is set | |
if p.returncode != 0: | |
self.cleanup() | |
exit(1) | |
pid = ShUtils.get_pid(local_port) | |
print "Create tunnel for remote port: '%s' Process ID: '%s' Host: '%s'" % (remote_port, pid, hostname) | |
return pid | |
def __exit_handler(self, signal, frame): | |
print "Script interrupted. Shutting down..." | |
self.cleanup() | |
print 'stop script' | |
sys.exit() | |
DEFAULT_APPS = { | |
'hostname': 'cow_worker1', | |
'processes': { | |
'retina-lte-ui-backend': [8080, 8098], | |
'worker': [6700, 6701, 6702, 6703], | |
'kafka': [9092], | |
'QuorumPeerMain': [], | |
'retina-simulator-0.0.1-SNAPSHOT.jar': [9990], | |
'retina-lte-collector-assembly-0.0.1-SNAPSHOT.jar': [8989] | |
} | |
} | |
def get_config(): | |
# example_json_string = json.dumps(DEFAULT_APPS, indent=2) | |
parser = argparse.ArgumentParser(description='Create and monitor ssh tunnels on remote jmx ports', | |
formatter_class=RawTextHelpFormatter) | |
# parser.add_argument('-c', '--config', | |
# help='path to config file (json format)\n\nExample config: %s' % example_json_string) | |
parser.add_argument('-n', '--hosts', help='hosts names separated by coma', nargs='+') | |
args = parser.parse_args() | |
hosts = [DEFAULT_APPS['hostname']] | |
processes = DEFAULT_APPS['processes'] | |
if args.hosts: | |
print "hosts selected:", args.hosts | |
hosts = args.hosts | |
return hosts, processes | |
def main(): | |
hosts, processes = get_config() | |
# hostname = apps['hostname'] | |
# processes = apps['processes'] | |
print "\nHosts:", hosts | |
print "Processes:", processes, "\n" | |
proxy = JMXProxy(processes) | |
proxy.run(hosts) | |
if __name__ == '__main__': | |
main() | |
# print ShUtils.is_port_free(16701) | |
# print ShUtils.is_port_free(16702) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment