Skip to content

Instantly share code, notes, and snippets.

@kryptek
Created September 25, 2018 23:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kryptek/c9fdd1ea5a68cd94103a66a242af3440 to your computer and use it in GitHub Desktop.
Save kryptek/c9fdd1ea5a68cd94103a66a242af3440 to your computer and use it in GitHub Desktop.
Paramiko SSH
import logging
import os
import paramiko
import subprocess
import socket
import time
logging.basicConfig(level=logging.INFO, datefmt='%F %T', format='%(asctime)s %(name)s.%(module)s %(levelname)s: %(message)s')
logger = logging.getLogger('ssh_client')
CONFIG = os.path.expanduser('~/.ssh/config')
CONNECT_TIMEOUT = 2
EXEC_TIMEOUT = 300
KNOWN_HOSTS = os.path.expanduser('~/.ssh/known_hosts')
POLL_TIMEOUT = 0.01
USER = os.environ.get('USER', os.environ.get('LOGNAME'))
class SSHClient(paramiko.SSHClient):
def exec_command(self, command, bufsize=-1, timeout=None, get_pty=False, environment=None):
self.channel = self._transport.open_session(timeout=timeout)
if get_pty:
self.channel.get_pty()
self.channel.settimeout(timeout)
if environment:
self.channel.update_environment(environment)
self.channel.exec_command(command)
stdin = self.channel.makefile('wb', bufsize)
stdout = self.channel.makefile('r', bufsize)
stderr = self.channel.makefile_stderr('r', bufsize)
self.io = (stdin, stdout, stderr)
def exit_status_ready(self):
return self.channel.exit_status_ready()
def recv_exit_status(self):
return self.channel.recv_exit_status()
def wait_for_completion(self, poll_timeout=POLL_TIMEOUT, exec_timeout=EXEC_TIMEOUT):
start = time.time()
while not self.exit_status_ready():
if (time.time() - start) > EXEC_TIMEOUT:
raise paramiko.SSHException('Execution exceeded time of {}'.format(self.EXEC_TIMEOUT))
time.sleep(poll_timeout)
return self.channel.recv_exit_status()
def fetch_output(self):
stdout = self.io[1].read().decode()
stderr = self.io[2].read().decode()
return stdout, stderr
class SSH(object):
def __init__(self, host):
self.host = host
self.ssh = SSHClient()
self.config = paramiko.SSHConfig()
if os.path.exists(CONFIG):
with open(CONFIG) as fp:
self.config.parse(fp)
def __enter__(self):
logger.debug('Entering host=%s', self.host)
self.ssh.load_system_host_keys()
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
host_config = self.config.lookup(self.host)
proxy = None
if 'proxycommand' in host_config:
proxy = paramiko.ProxyCommand(
subprocess.check_output([os.environ['SHELL'], '-c', 'echo %s -o StrictHostKeyChecking=no' % host_config['proxycommand']]).strip())
logger.debug('Proxy: %s', proxy)
options = {
'port': 22,
'username': USER,
'timeout': CONNECT_TIMEOUT,
'auth_timeout': CONNECT_TIMEOUT,
'banner_timeout': CONNECT_TIMEOUT,
'sock': proxy,
}
try:
self.ssh.connect(self.host, **options)
return self
except paramiko.BadHostKeyException as exc:
logger.error('host key could not be verified: %s', exc)
except paramiko.AuthenticationException as exc:
logger.error('authentication failed: %s', exc)
except paramiko.SSHException as exc:
logger.error('SSH ooops: %s', exc)
except socket.error as exc:
logger.error('SSH connect: %s', exc)
def __exit__(self, et, ev, etb):
self.ssh.close()
logger.debug('Leaving host %s', self.host)
def start(self, command):
command = '{ %s; } <"%s"' % (command, os.devnull)
options = {'get_pty': False, 'timeout': EXEC_TIMEOUT}
logger.debug('Running %s %s', command, options)
try:
self.ssh.exec_command(command, **options)
self.ssh.io[0].close()
except paramiko.SSHException as exc:
logger.error('SSH Exception on %s: %s', command, exc)
raise
def download_file(self, file, localfile):
sftp = self.ssh.open_sftp()
sftp.get(file, localfile)
sftp.close()
def run(self, command):
try:
self.start(command)
except paramiko.SSHException as exc:
return None, None, None
except Exception as exc:
logger.error('Exception on %s: %s', self.host, exc)
return None, None, None
try:
rc = self.ssh.wait_for_completion()
logger.debug('SSH(%s) %s: done!', self.host, command)
return (rc,) + self.ssh.fetch_output()
except paramiko.SSHException as exc:
logger.error('RuntimeException on %s: %s', self.host, exc)
return None, None, None
@kryptek
Copy link
Author

kryptek commented Sep 25, 2018

Usage

with SSH('hostname') as ssh:
  rc, out, err = ssh.run('uptime')

  print("Output: {}".format(out))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment