Skip to content

Instantly share code, notes, and snippets.

@mmalczewski
Last active August 29, 2015 14:03
Show Gist options
  • Save mmalczewski/18bf4db7ed157518eb08 to your computer and use it in GitHub Desktop.
Save mmalczewski/18bf4db7ed157518eb08 to your computer and use it in GitHub Desktop.
'''
Created on Mar 14, 2014
@author: marcin
dependencies:
sudo pip install futures
'''
import time
import sys
import subprocess
import os
from IPython.utils.io import stdout
import itertools as it
import shlex
import signal
import threading
import json
import argparse
from argparse import RawTextHelpFormatter
class ShUtils:
MAX_FIND_LOCAL_PORT_ATTEMPTS = 10
@staticmethod
def get_pid(port):
ps = subprocess.Popen('netstat -nlp | grep -i LISTEN | grep ' + port, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
output, errors = ps.communicate()
if output.strip == "":
return -1
parts = output.split()
if len(parts) > 0:
process = parts[-1]
return process.split('/')[0]
else:
return -1
@staticmethod
def run_shell_command(command_str):
out = subprocess.Popen(shlex.split(command_str), stdout=subprocess.PIPE).stdout.read()
return filter(None, out.split('\n'))
@staticmethod
def find_free_local_port(port):
num_of_attempts = 0
port_ok = False
found_port = port
i_port = int(port)
while not port_ok:
if num_of_attempts > ShUtils.MAX_FIND_LOCAL_PORT_ATTEMPTS:
# raise Exception('Maximum attempts (%s) to find free local ports exceeded' % str(
# ShUtils.MAX_FIND_LOCAL_PORT_ATTEMPTS))
print 'Maximum attempts (%s) to find free local ports exceeded' % \
str(ShUtils.MAX_FIND_LOCAL_PORT_ATTEMPTS)
return found_port, port_ok
is_port_free = ShUtils.is_port_free(found_port)
found_port = str(i_port)
if not is_port_free:
i_port += 1
print "There is a process already running on port (%s). Trying another (%s) port" % (found_port, i_port)
num_of_attempts += 1
else:
port_ok = True
return found_port, port_ok
@staticmethod
def is_port_free(port):
command = "netstat -nlp | grep -e 'LISTEN\s' | grep :" + str(port)
ps = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
output, errors = ps.communicate()
if output.strip():
return False
else:
return True
class PortCheckerThread(threading.Thread):
def __init__(self, remote_port, pid):
threading.Thread.__init__(self)
self.remote_port = remote_port
self.pid = pid
def run(self):
try:
while True:
print "Check Thread for port %s and pid %s" % (self.remote_port, self.pid)
time.sleep(1)
except Exception:
print "Finished port checker cleanly"
class JMXProxy:
# TODO handle mutliple processes returned by netstat
# FIND_LISTENING_PORTS_COMMAND = r"ssh %s netstat -nlp | grep `jps | grep -iE \'%s\' | awk \'{ print $1 }\'` | awk \'{ print $4}\' | awk -F: \'{print $NF}\'"
FIND_LISTENING_PORTS_COMMAND = r"ssh %s netstat -nlp | grep -iE `jps | grep -iE \'%s\' | awk \'{ print $1 }\' | paste -sd \'|\'` | awk \'{ print $4}\' | awk -F: \'{print $NF}\'"
PORTS_CHECK_INTERVAL = 1
def __init__(self, apps_config):
self.apps_config = apps_config
self.pids = []
self.tunnels = {}
self.ports_to_ignore = [str(port) for ports in self.apps_config.values() for port in ports]
self.processes = "|".join(self.apps_config.keys())
signal.signal(signal.SIGINT, self.__exit_handler)
signal.signal(signal.SIGTERM, self.__exit_handler)
def cleanup(self):
print "closing opened ssh tunnels"
args = ['kill'] + [str(pid) for pid in self.pids]
subprocess.call(args)
def run(self, hostnames):
print 'start script for hostnames:', hostnames
valid_hostnames = []
for hostname in hostnames:
listening_ports = self.__get_listening_ports(hostname)
print "Listening ports:", listening_ports, "for host", hostname
self.__create_and_save_tunnels(hostname, listening_ports)
if listening_ports: # check if host is valid
valid_hostnames.append(hostname)
if not valid_hostnames:
print "There is no valid hostnames. Exit script"
exit(1)
print "Start listening...", valid_hostnames
while True:
time.sleep(JMXProxy.PORTS_CHECK_INTERVAL)
for hostname in valid_hostnames:
self.__check_tunnels(hostname)
def __create_and_save_tunnels(self, hostname, listening_ports):
for remote_port in listening_ports:
pid = self.__create_ssh_tunnel(hostname, remote_port)
self.pids.append(pid)
self.tunnels[remote_port] = pid
def __check_tunnels(self, hostname):
listening_ports = set(self.__get_listening_ports(hostname))
tunneling_ports = set(self.tunnels.keys())
outdated_ports = tunneling_ports.difference(listening_ports)
new_ports = listening_ports.difference(tunneling_ports)
ok_ports = tunneling_ports.intersection(listening_ports)
print "ports - hostname: '%s', ok:[%s], outdated:[%s], new:[%s] " % (hostname,
",".join(ok_ports), ",".join(outdated_ports), ",".join(new_ports))
def __get_listening_ports(self, hostname):
list_ports_command = JMXProxy.FIND_LISTENING_PORTS_COMMAND % (hostname, self.processes)
listening_ports = ShUtils.run_shell_command(list_ports_command)
return filter(lambda p: p not in self.ports_to_ignore, listening_ports)
def __create_ssh_tunnel(self, hostname, remote_port):
# local_port = find_free_local_port(remote_port)
# local_port = ShUtils.find_free_local_port(remote_port)
local_port = remote_port
ssh_tunnel_command = r"ssh -fnN -L%s:localhost:%s %s" % (remote_port, local_port, hostname)
p = subprocess.Popen(shlex.split(ssh_tunnel_command), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p.wait() # warning: according to documentation this may cause deadlock when PIPE is set
if p.returncode != 0:
self.cleanup()
exit(1)
pid = ShUtils.get_pid(local_port)
print "Create tunnel for remote port: '%s' Process ID: '%s' Host: '%s'" % (remote_port, pid, hostname)
return pid
def __exit_handler(self, signal, frame):
print "Script interrupted. Shutting down..."
self.cleanup()
print 'stop script'
sys.exit()
DEFAULT_APPS = {
'hostname': 'cow_worker1',
'processes': {
'retina-lte-ui-backend': [8080, 8098],
'worker': [6700, 6701, 6702, 6703],
'kafka': [9092],
'QuorumPeerMain': [],
'retina-simulator-0.0.1-SNAPSHOT.jar': [9990],
'retina-lte-collector-assembly-0.0.1-SNAPSHOT.jar': [8989]
}
}
def get_config():
# example_json_string = json.dumps(DEFAULT_APPS, indent=2)
parser = argparse.ArgumentParser(description='Create and monitor ssh tunnels on remote jmx ports',
formatter_class=RawTextHelpFormatter)
# parser.add_argument('-c', '--config',
# help='path to config file (json format)\n\nExample config: %s' % example_json_string)
parser.add_argument('-n', '--hosts', help='hosts names separated by coma', nargs='+')
args = parser.parse_args()
hosts = [DEFAULT_APPS['hostname']]
processes = DEFAULT_APPS['processes']
if args.hosts:
print "hosts selected:", args.hosts
hosts = args.hosts
return hosts, processes
def main():
hosts, processes = get_config()
# hostname = apps['hostname']
# processes = apps['processes']
print "\nHosts:", hosts
print "Processes:", processes, "\n"
proxy = JMXProxy(processes)
proxy.run(hosts)
if __name__ == '__main__':
main()
# print ShUtils.is_port_free(16701)
# print ShUtils.is_port_free(16702)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment