Skip to content

Instantly share code, notes, and snippets.

@imneonizer
Last active March 13, 2024 21:48
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save imneonizer/5a4709b0b3f3a4374e716574eab3e5f0 to your computer and use it in GitHub Desktop.
Save imneonizer/5a4709b0b3f3a4374e716574eab3e5f0 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com>
#
# This file is part of paramiko.
#
# Paramiko is free software; you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 2.1 of the License, or (at your option)
# any later version.
#
# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
"""
Sample script showing how to do local port forwarding over paramiko.
This script connects to the requested SSH server and sets up local port
forwarding (the openssh -L option) from a local port through a tunneled
connection to a destination reachable from the SSH server machine.
"""
import getpass
import os
import socket
import select
try:
import SocketServer
except ImportError:
import socketserver as SocketServer
import sys
from optparse import OptionParser
import paramiko
SSH_PORT = 22
DEFAULT_PORT = 4000
g_verbose = True
class ForwardServer(SocketServer.ThreadingTCPServer):
daemon_threads = True
allow_reuse_address = True
class Handler(SocketServer.BaseRequestHandler):
def handle(self):
try:
chan = self.ssh_transport.open_channel(
"direct-tcpip",
(self.chain_host, self.chain_port),
self.request.getpeername(),
)
except Exception as e:
verbose(
"Incoming request to %s:%d failed: %s"
% (self.chain_host, self.chain_port, repr(e))
)
return
if chan is None:
verbose(
"Incoming request to %s:%d was rejected by the SSH server."
% (self.chain_host, self.chain_port)
)
return
verbose(
"Connected! Tunnel open %r -> %r -> %r"
% (
self.request.getpeername(),
chan.getpeername(),
(self.chain_host, self.chain_port),
)
)
while True:
r, w, x = select.select([self.request, chan], [], [])
if self.request in r:
data = self.request.recv(1024)
if len(data) == 0:
break
chan.send(data)
if chan in r:
data = chan.recv(1024)
if len(data) == 0:
break
self.request.send(data)
peername = self.request.getpeername()
chan.close()
self.request.close()
verbose("Tunnel closed from %r" % (peername,))
def forward_tunnel(local_port, remote_host, remote_port, transport):
# this is a little convoluted, but lets me configure things for the Handler
# object. (SocketServer doesn't give Handlers any way to access the outer
# server normally.)
class SubHander(Handler):
chain_host = remote_host
chain_port = remote_port
ssh_transport = transport
ForwardServer(("", local_port), SubHander).serve_forever()
def verbose(s):
if g_verbose:
print(s)
HELP = """\
Set up a forward tunnel across an SSH server, using paramiko. A local port
(given with -p) is forwarded across an SSH session to an address:port from
the SSH server. This is similar to the openssh -L option.
"""
def get_host_port(spec, default_port):
"parse 'hostname:22' into a host and port, with the port optional"
args = (spec.split(":", 1) + [default_port])[:2]
args[1] = int(args[1])
return args[0], args[1]
def parse_options():
global g_verbose
parser = OptionParser(
usage="usage: %prog [options] <ssh-server>[:<server-port>]",
version="%prog 1.0",
description=HELP,
)
parser.add_option(
"-q",
"--quiet",
action="store_false",
dest="verbose",
default=True,
help="squelch all informational output",
)
parser.add_option(
"-p",
"--local-port",
action="store",
type="int",
dest="port",
default=DEFAULT_PORT,
help="local port to forward (default: %d)" % DEFAULT_PORT,
)
parser.add_option(
"-u",
"--user",
action="store",
type="string",
dest="user",
default=getpass.getuser(),
help="username for SSH authentication (default: %s)"
% getpass.getuser(),
)
parser.add_option(
"-K",
"--key",
action="store",
type="string",
dest="keyfile",
default=None,
help="private key file to use for SSH authentication",
)
parser.add_option(
"",
"--no-key",
action="store_false",
dest="look_for_keys",
default=True,
help="don't look for or use a private key file",
)
parser.add_option(
"-P",
"--password",
action="store_true",
dest="readpass",
default=False,
help="read password (for key or password auth) from stdin",
)
parser.add_option(
"-r",
"--remote",
action="store",
type="string",
dest="remote",
default=None,
metavar="host:port",
help="remote host and port to forward to",
)
options, args = parser.parse_args()
if len(args) != 1:
parser.error("Incorrect number of arguments.")
if options.remote is None:
parser.error("Remote address required (-r).")
g_verbose = options.verbose
server_host, server_port = get_host_port(args[0], SSH_PORT)
remote_host, remote_port = get_host_port(options.remote, SSH_PORT)
return options, (server_host, server_port), (remote_host, remote_port)
def main():
options, server, remote = parse_options()
password = None
if options.readpass:
password = getpass.getpass("Enter SSH password: ")
client = paramiko.SSHClient()
client.load_system_host_keys()
client.set_missing_host_key_policy(paramiko.WarningPolicy())
verbose("Connecting to ssh host %s:%d ..." % (server[0], server[1]))
try:
client.connect(
server[0],
server[1],
username=options.user,
key_filename=options.keyfile,
look_for_keys=options.look_for_keys,
password=password,
)
except Exception as e:
print("*** Failed to connect to %s:%d: %r" % (server[0], server[1], e))
sys.exit(1)
verbose(
"Now forwarding port %d to %s:%d ..."
% (options.port, remote[0], remote[1])
)
try:
forward_tunnel(
options.port, remote[0], remote[1], client.get_transport()
)
except KeyboardInterrupt:
print("C-c: Port forwarding stopped.")
sys.exit(0)
if __name__ == "__main__":
main()
import paramiko
import re
import os
import time
import collections
import threading
import hashlib
import concurrent.futures
pprint_lock = threading.Lock()
class ShellHistory:
def __init__(self, stdin_maxlen, stdout_maxlen):
self._stdin = collections.deque(maxlen=stdin_maxlen)
self._stdout = collections.deque(maxlen=stdout_maxlen)
@property
def stdin(self):
return self._stdin
@property
def stdout(self):
return self._stdout
class RemoteSSH:
def __init__(self, *args, **kwargs):
if args or kwargs:
self.connect(*args, **kwargs)
def connect(self, username, host, password, \
host_keys="/dev/null", port=22, timeout=5, \
stdin_history_size=10, stdout_history_size=20):
self.username = username.strip()
self.host = host.strip()
self.hostname = host
self.password = password
self.host_keys = host_keys
self.port = port
self.timeout = timeout
self.stdin_history_size = stdin_history_size
self.stdout_history_size = stdout_history_size
self.init_session()
self.pattern = [
self.host,
self.host,
self.hostname,
"{}@{}".format(self.username, self.host),
"{}@{}".format(self.username, self.hostname),
"{}@{} => {}".format(self.username, self.hostname, self.host),
]
return self
def init_session(self):
self.session = paramiko.SSHClient()
self.session.load_host_keys(self.host_keys)
self.session.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.session.connect(
username=self.username,
hostname=self.host,
password=self.password,
port=self.port,
timeout=self.timeout
)
self.channel = self.session.invoke_shell()
self.stdin = self.channel.makefile('wb')
self.stdout = self.channel.makefile('r')
self.sftp = self.session.open_sftp()
self.history = ShellHistory(
self.stdin_history_size,
self.stdout_history_size
)
self.hostname = self.execute("hostname").strip()
def __del__(self):
try:
self.sftp.close()
self.session.close()
except: pass
def put(self, localpath, remotepath):
sftp = self.session.open_sftp()
if os.path.isdir(localpath):
try:
# test if remote_path exists
sftp.chdir(remotepath)
except IOError:
# create remote dir
sftp.mkdir(remotepath)
# copy file to remote one by one
for file in os.listdir(localpath):
self.put(os.path.join(localpath, file), os.path.join(remotepath, file))
else:
sftp.put(localpath, remotepath)
sftp.close()
def get(self, remotepath, localpath, isdir=False):
sftp = self.session.open_sftp()
if isdir:
# create local dir
os.makedirs(localpath, exist_ok=True)
# copy files from remote one by one
for file in sftp.listdir(path=remotepath):
self.get(os.path.join(remotepath, file), os.path.join(localpath, file))
else:
try:
sftp.get(remotepath, localpath)
except OSError:
# exception means it's a directory
if os.path.exists(localpath) and os.path.isfile(localpath) and os.path.getsize(localpath) == 0:
# remove temporary file which is created by sftp
os.remove(localpath)
# recursively download directory
self.get(remotepath, localpath, isdir=True)
sftp.close()
def reset(self):
self.session.close()
self.init_session()
def terminate(self):
# sends program terminate command, i.e, ctrl+c
self.stdin.write("\x03")
def change_password(self, current_password=None, new_password=None, username=None):
current_password = current_password or self.password
new_password = new_password or self.password
username = username or self.username
if current_password == new_password:
return True
if (username == self.username) and (current_password != self.password):
raise ValueError("current_password doesn't matches with self.password")
cmd = "sudo echo -n; echo '{username}:{new_password}' | sudo chpasswd".format(
username=username,
new_password=new_password
)
self.execute(cmd)
if "incorrect password attempt" in self.execute("sudo ls"):
# this means password changed successfully!
# update current password in the object
self.password = new_password
return True
# return false if password is unchanged
return False
def write_password(self, password=None):
# write password to the shell
self.stdin.write(password or self.password + "\n")
def write(self, command, postfix="\n", sudo=False):
if command.startswith("sudo") or sudo:
# if commands starts with sudo, then automatically enter password
command = "echo {} | sudo -S {}".format(self.password, command.strip())
# execute command normally
self.stdin.write(command + postfix)
self.history._stdin.append(command)
def execute(self, command, pprint=0, end='', sudo=False):
# execute a single command and return output without writing to stdin
# faster than stdin.write
if command.startswith("sudo") or sudo:
command = "echo {} | sudo -S {}".format(self.password, command.strip())
stdin, stdout, stderr = self.session.exec_command(command.strip() + "\n")
stdout.channel.recv_exit_status()
error = ""
if stderr:
error = "\n".join(stderr.readlines())
output = '\n'.join([self.clean(line.rstrip()) for line in stdout.readlines()])
output += error
# replace traces of 'sudo enter password' from output
output = output.replace("[sudo] password for {}: ".format(self.username), "")
if pprint:
self.pprint(output, pattern=pprint, end=end)
return output
def clean(self, line):
# clean string to remove any kind of shell color formatting
return re.compile(r'(\x9B|\x1B\[)[0-?]*[ -/]*[@-~]').sub('', line).replace('\b', '').replace('\r', '')
def get_color_escape(self, r, g, b, background=False):
return '\033[{};2;{};{};{}m'.format(48 if background else 38, r, g, b)
def colorize(self, string):
RESET = '\033[0m'
return self.get_color_escape(*self.get_rgb(self.host))+string+RESET
def get_rgb(self, string):
if "." in string:
string = string.split(".")[-1]
h = str(int(hashlib.md5(string.encode('utf-8')).hexdigest(), 16) % 10**9)
r, g, b = int(h[:3]), int(h[3:6]), int(h[6:])
s, scale = r + g +b, 255
r, g, b = round((r/s)*scale), round((g/s)*scale), round((b/s)*scale)
h = int(hashlib.md5(string.encode('utf-8')).hexdigest(), 32) % 10**1
h = round(h / 3)
if h == 0:
r = r**10
elif h == 1:
g = g**10
else:
b = b**10
return r, g, b
def pprint(self, string, pattern=0, end=None):
try:
pattern = self.pattern[pattern]
except:
pattern = self.pattern[0]
pprint_lock.acquire()
try:
for line in string.split("\n"):
print(self.colorize(pattern), "|", line)
if end:
print(end, end='')
except Exception:
raise
finally:
pprint_lock.release()
def read(self, nbytes=2048, data="", bypass_sudo=True, repeat_last=False, clean=True, include_stdin=False):
# receive data in buffers
while True:
if self.channel.recv_ready():
data += self.channel.recv(nbytes).decode()
else:
break
# logic to bypass sudo
if "[sudo] password for" in data and bypass_sudo == True:
# automatically enter password
self.write_password()
time.sleep(1)
# read more from stdout after
# password is written to stdin
return self.read(data=data, bypass_sudo=False)
# save stdout history
output = ""
for line in data.split("\n"):
# if clean = True, use regex to get rid of colorful content
line = self.clean(line.rstrip()) if clean else line.rstrip()
# don't include executed command in stdout
if include_stdin == False:
try:
if line == self.history._stdin[-1]:
continue
except: pass
self.history._stdout.append(line)
output += line+"\n"
# logic to return data from history if output is empty
if (not output.rstrip()) and (repeat_last == True):
output = '\n'.join(self.history.stdout)
return output.rstrip()
class Thread(threading.Thread):
def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
threading.Thread.__init__(self, group, target, name, args, kwargs, daemon=daemon)
self._return = None
def run(self):
if self._target is not None:
self._return = self._target(*self._args, **self._kwargs)
def join(self):
threading.Thread.join(self)
return self._return
class ConcurrentThreadPool:
def map(self, target, args):
result = []
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = [executor.submit(target, arg) for arg in args]
for future in concurrent.futures.as_completed(futures):
result.append(future.result())
return result
class ThreadPool:
def map(self, target, args):
threads = [Thread(target=target, args=(arg,)) for arg in args]
[t.start() for t in threads]
return [t.join() for t in threads if t.join()]
def map(target, args, concurrent=False):
if concurrent:
return ConcurrentThreadPool().map(target=target, args=args)
return ThreadPool().map(target=target, args=args)
sync_lock = threading.Lock()
def sync(func):
def inner(*args, **kwargs):
sync_lock.acquire()
try:
func(*args, **kwargs)
except Exception:
raise
finally:
sync_lock.release()
return inner
@imneonizer
Copy link
Author

from forward import forward_tunnel
transport = s.session.get_transport()

local_port = 8000
remote_host = "192.168.0.197"
remote_port = 22

try:
    forward_tunnel(local_port, remote_host, remote_port, transport)
except KeyboardInterrupt:
    print('Port forwarding stopped.')

@imneonizer
Copy link
Author

imneonizer commented Apr 8, 2021

# __init__.py
from .shell_handler import RemoteSSH, Thread, ThreadPool, ConcurrentThreadPool, map, sync
connect = RemoteSSH

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment